hyper_utils/http_body_util/
stream.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use bytes::Buf;
7use futures_util::{ready, stream::Stream};
8use hyper::body::{Body, Frame};
9use pin_project_lite::pin_project;
10
11pin_project! {
12 #[derive(Clone, Copy, Debug)]
14 pub struct StreamBody<S> {
15 #[pin]
16 stream: S,
17 }
18}
19
20impl<S> StreamBody<S> {
21 pub fn new(stream: S) -> Self {
23 Self {
24 stream,
25 }
26 }
27}
28
29impl<S, D, E> Body for StreamBody<S>
30where
31 S: Stream<Item = Result<Frame<D>, E>>,
32 D: Buf,
33{
34 type Data = D;
35 type Error = E;
36
37 fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
38 match self.project().stream.poll_next(cx) {
39 Poll::Ready(Some(result)) => Poll::Ready(Some(result)),
40 Poll::Ready(None) => Poll::Ready(None),
41 Poll::Pending => Poll::Pending,
42 }
43 }
44}
45
46impl<S: Stream> Stream for StreamBody<S> {
47 type Item = S::Item;
48
49 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50 self.project().stream.poll_next(cx)
51 }
52
53 fn size_hint(&self) -> (usize, Option<usize>) {
54 self.stream.size_hint()
55 }
56}
57
58pin_project! {
59 #[derive(Clone, Copy, Debug)]
61 pub struct BodyStream<B> {
62 #[pin]
63 body: B,
64 }
65}
66
67impl<B> BodyStream<B> {
68 pub fn new(body: B) -> Self {
70 Self {
71 body,
72 }
73 }
74}
75
76impl<B> Body for BodyStream<B>
77where
78 B: Body,
79{
80 type Data = B::Data;
81 type Error = B::Error;
82
83 fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
84 self.project().body.poll_frame(cx)
85 }
86}
87
88impl<B> Stream for BodyStream<B>
89where
90 B: Body,
91{
92 type Item = Result<Frame<B::Data>, B::Error>;
93
94 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95 match self.project().body.poll_frame(cx) {
96 Poll::Ready(Some(frame)) => Poll::Ready(Some(frame)),
97 Poll::Ready(None) => Poll::Ready(None),
98 Poll::Pending => Poll::Pending,
99 }
100 }
101}
102
103pin_project! {
104 #[derive(Clone, Copy, Debug)]
106 pub struct BodyDataStream<B> {
107 #[pin]
108 body: B,
109 }
110}
111
112impl<B> BodyDataStream<B> {
113 pub fn new(body: B) -> Self {
115 Self {
116 body,
117 }
118 }
119}
120
121impl<B> Stream for BodyDataStream<B>
122where
123 B: Body,
124{
125 type Item = Result<B::Data, B::Error>;
126
127 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
128 loop {
129 return match ready!(self.as_mut().project().body.poll_frame(cx)) {
130 Some(Ok(frame)) => match frame.into_data() {
131 Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
132 Err(_) => continue,
133 },
134 Some(Err(err)) => Poll::Ready(Some(Err(err))),
135 None => Poll::Ready(None),
136 };
137 }
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use std::convert::Infallible;
144
145 use bytes::Bytes;
146 use futures_util::StreamExt;
147 use hyper::body::Frame;
148
149 use crate::http_body_util::{BodyExt, BodyStream, StreamBody};
150
151 #[tokio::test]
152 async fn body_from_stream() {
153 let chunks: Vec<Result<_, Infallible>> = vec![
154 Ok(Frame::data(Bytes::from(vec![1]))),
155 Ok(Frame::data(Bytes::from(vec![2]))),
156 Ok(Frame::data(Bytes::from(vec![3]))),
157 ];
158 let stream = futures_util::stream::iter(chunks);
159 let mut body = StreamBody::new(stream);
160
161 assert_eq!(body.frame().await.unwrap().unwrap().into_data().unwrap().as_ref(), [1]);
162 assert_eq!(body.frame().await.unwrap().unwrap().into_data().unwrap().as_ref(), [2]);
163 assert_eq!(body.frame().await.unwrap().unwrap().into_data().unwrap().as_ref(), [3]);
164
165 assert!(body.frame().await.is_none());
166 }
167
168 #[tokio::test]
169 async fn stream_from_body() {
170 let chunks: Vec<Result<_, Infallible>> = vec![
171 Ok(Frame::data(Bytes::from(vec![1]))),
172 Ok(Frame::data(Bytes::from(vec![2]))),
173 Ok(Frame::data(Bytes::from(vec![3]))),
174 ];
175 let stream = futures_util::stream::iter(chunks);
176 let body = StreamBody::new(stream);
177
178 let mut stream = BodyStream::new(body);
179
180 assert_eq!(stream.next().await.unwrap().unwrap().into_data().unwrap().as_ref(), [1]);
181 assert_eq!(stream.next().await.unwrap().unwrap().into_data().unwrap().as_ref(), [2]);
182 assert_eq!(stream.next().await.unwrap().unwrap().into_data().unwrap().as_ref(), [3]);
183
184 assert!(stream.next().await.is_none());
185 }
186}