Skip to main content

hyper_utils/http_body_util/
stream.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use bytes::Buf;
7use futures_util::{ready, stream::Stream};
8use hyper::body::{Body, Frame};
9use pin_project_lite::pin_project;
10
11pin_project! {
12    /// A body created from a [`Stream`].
13    #[derive(Clone, Copy, Debug)]
14    pub struct StreamBody<S> {
15        #[pin]
16        stream: S,
17    }
18}
19
20impl<S> StreamBody<S> {
21    /// Create a new `StreamBody`.
22    pub fn new(stream: S) -> Self {
23        Self {
24            stream,
25        }
26    }
27}
28
29impl<S, D, E> Body for StreamBody<S>
30where
31    S: Stream<Item = Result<Frame<D>, E>>,
32    D: Buf,
33{
34    type Data = D;
35    type Error = E;
36
37    fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
38        match self.project().stream.poll_next(cx) {
39            Poll::Ready(Some(result)) => Poll::Ready(Some(result)),
40            Poll::Ready(None) => Poll::Ready(None),
41            Poll::Pending => Poll::Pending,
42        }
43    }
44}
45
46impl<S: Stream> Stream for StreamBody<S> {
47    type Item = S::Item;
48
49    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50        self.project().stream.poll_next(cx)
51    }
52
53    fn size_hint(&self) -> (usize, Option<usize>) {
54        self.stream.size_hint()
55    }
56}
57
58pin_project! {
59    /// A stream created from a [`Body`].
60    #[derive(Clone, Copy, Debug)]
61    pub struct BodyStream<B> {
62        #[pin]
63        body: B,
64    }
65}
66
67impl<B> BodyStream<B> {
68    /// Create a new `BodyStream`.
69    pub fn new(body: B) -> Self {
70        Self {
71            body,
72        }
73    }
74}
75
76impl<B> Body for BodyStream<B>
77where
78    B: Body,
79{
80    type Data = B::Data;
81    type Error = B::Error;
82
83    fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
84        self.project().body.poll_frame(cx)
85    }
86}
87
88impl<B> Stream for BodyStream<B>
89where
90    B: Body,
91{
92    type Item = Result<Frame<B::Data>, B::Error>;
93
94    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95        match self.project().body.poll_frame(cx) {
96            Poll::Ready(Some(frame)) => Poll::Ready(Some(frame)),
97            Poll::Ready(None) => Poll::Ready(None),
98            Poll::Pending => Poll::Pending,
99        }
100    }
101}
102
103pin_project! {
104    /// A data stream created from a [`Body`].
105    #[derive(Clone, Copy, Debug)]
106    pub struct BodyDataStream<B> {
107        #[pin]
108        body: B,
109    }
110}
111
112impl<B> BodyDataStream<B> {
113    /// Create a new `BodyDataStream`
114    pub fn new(body: B) -> Self {
115        Self {
116            body,
117        }
118    }
119}
120
121impl<B> Stream for BodyDataStream<B>
122where
123    B: Body,
124{
125    type Item = Result<B::Data, B::Error>;
126
127    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
128        loop {
129            return match ready!(self.as_mut().project().body.poll_frame(cx)) {
130                Some(Ok(frame)) => match frame.into_data() {
131                    Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
132                    Err(_) => continue,
133                },
134                Some(Err(err)) => Poll::Ready(Some(Err(err))),
135                None => Poll::Ready(None),
136            };
137        }
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use std::convert::Infallible;
144
145    use bytes::Bytes;
146    use futures_util::StreamExt;
147    use hyper::body::Frame;
148
149    use crate::http_body_util::{BodyExt, BodyStream, StreamBody};
150
151    #[tokio::test]
152    async fn body_from_stream() {
153        let chunks: Vec<Result<_, Infallible>> = vec![
154            Ok(Frame::data(Bytes::from(vec![1]))),
155            Ok(Frame::data(Bytes::from(vec![2]))),
156            Ok(Frame::data(Bytes::from(vec![3]))),
157        ];
158        let stream = futures_util::stream::iter(chunks);
159        let mut body = StreamBody::new(stream);
160
161        assert_eq!(body.frame().await.unwrap().unwrap().into_data().unwrap().as_ref(), [1]);
162        assert_eq!(body.frame().await.unwrap().unwrap().into_data().unwrap().as_ref(), [2]);
163        assert_eq!(body.frame().await.unwrap().unwrap().into_data().unwrap().as_ref(), [3]);
164
165        assert!(body.frame().await.is_none());
166    }
167
168    #[tokio::test]
169    async fn stream_from_body() {
170        let chunks: Vec<Result<_, Infallible>> = vec![
171            Ok(Frame::data(Bytes::from(vec![1]))),
172            Ok(Frame::data(Bytes::from(vec![2]))),
173            Ok(Frame::data(Bytes::from(vec![3]))),
174        ];
175        let stream = futures_util::stream::iter(chunks);
176        let body = StreamBody::new(stream);
177
178        let mut stream = BodyStream::new(body);
179
180        assert_eq!(stream.next().await.unwrap().unwrap().into_data().unwrap().as_ref(), [1]);
181        assert_eq!(stream.next().await.unwrap().unwrap().into_data().unwrap().as_ref(), [2]);
182        assert_eq!(stream.next().await.unwrap().unwrap().into_data().unwrap().as_ref(), [3]);
183
184        assert!(stream.next().await.is_none());
185    }
186}