aws_smithy_runtime/client/http/body/minimum_throughput/
http_body_1_x.rs1use super::{BoxError, Error, MinimumThroughputDownloadBody};
7use crate::client::http::body::minimum_throughput::throughput::DownloadReport;
8use crate::client::http::body::minimum_throughput::ThroughputReadingBody;
9use aws_smithy_async::rt::sleep::AsyncSleep;
10use http_body_1x::Frame;
11use std::future::Future;
12use std::pin::{pin, Pin};
13use std::task::{Context, Poll};
14
15impl<B> http_body_1x::Body for MinimumThroughputDownloadBody<B>
16where
17 B: http_body_1x::Body<Data = bytes::Bytes, Error = BoxError>,
18{
19 type Data = bytes::Bytes;
20 type Error = BoxError;
21
22 fn poll_frame(
23 mut self: Pin<&mut Self>,
24 cx: &mut Context<'_>,
25 ) -> Poll<Option<Result<http_body_1x::Frame<Self::Data>, Self::Error>>> {
26 #[allow(unused_imports)]
27 use crate::client::http::body::minimum_throughput::throughput::ThroughputReport;
28 let now = self.time_source.now();
31 let mut this = self.as_mut().project();
34 let poll_res = match this.inner.poll_frame(cx) {
35 Poll::Ready(Some(Ok(frame))) => {
36 if frame.is_data() {
37 let bytes = frame.into_data().expect("Is data frame");
38 tracing::trace!("received data: {}", bytes.len());
39 this.throughput_logs
40 .push_bytes_transferred(now, bytes.len() as u64);
41 Poll::Ready(Some(Ok(Frame::data(bytes))))
42 } else {
43 tracing::trace!("received trailer");
44 Poll::Ready(Some(Ok(frame)))
45 }
46 }
47 Poll::Pending => {
48 tracing::trace!("received poll pending");
49 this.throughput_logs.push_pending(now);
50 Poll::Pending
51 }
52 res => return res,
54 };
55
56 let mut sleep_fut = this
58 .sleep_fut
59 .take()
60 .unwrap_or_else(|| this.async_sleep.sleep(*this.resolution));
61 if let Poll::Ready(()) = pin!(&mut sleep_fut).poll(cx) {
62 tracing::trace!("sleep future triggered—triggering a wakeup");
63 sleep_fut = this.async_sleep.sleep(*this.resolution);
65
66 cx.waker().wake_by_ref();
69 };
70 this.sleep_fut.replace(sleep_fut);
71
72 let report = this.throughput_logs.report(now);
75 let (violated, current_throughput) =
76 report.minimum_throughput_violated(this.options.minimum_throughput());
77 if violated {
78 if this.grace_period_fut.is_none() {
79 tracing::debug!("entering minimum throughput grace period");
80 }
81 let mut grace_period_fut = this
82 .grace_period_fut
83 .take()
84 .unwrap_or_else(|| this.async_sleep.sleep(this.options.grace_period()));
85 if let Poll::Ready(()) = pin!(&mut grace_period_fut).poll(cx) {
86 return Poll::Ready(Some(Err(Box::new(Error::ThroughputBelowMinimum {
88 expected: self.options.minimum_throughput(),
89 actual: current_throughput,
90 }))));
91 };
92 this.grace_period_fut.replace(grace_period_fut);
93 } else {
94 if this.grace_period_fut.is_some() {
97 tracing::debug!("throughput recovered; exiting grace period");
98 }
99 let _ = this.grace_period_fut.take();
100 }
101
102 poll_res
103 }
104
105 fn is_end_stream(&self) -> bool {
106 self.inner.is_end_stream()
107 }
108
109 fn size_hint(&self) -> http_body_1x::SizeHint {
110 self.inner.size_hint()
111 }
112}
113
114impl<B> http_body_1x::Body for ThroughputReadingBody<B>
115where
116 B: http_body_1x::Body<Data = bytes::Bytes, Error = BoxError>,
117{
118 type Data = bytes::Bytes;
119 type Error = BoxError;
120
121 fn poll_frame(
122 mut self: Pin<&mut Self>,
123 cx: &mut Context<'_>,
124 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
125 let now = self.time_source.now();
128 let this = self.as_mut().project();
131 match this.inner.poll_frame(cx) {
132 Poll::Ready(Some(Ok(frame))) => {
133 if frame.is_data() {
134 let bytes = frame.into_data().expect("Is data frame");
135 tracing::trace!("received data: {}", bytes.len());
136 this.throughput
137 .push_bytes_transferred(now, bytes.len() as u64);
138
139 if self.is_end_stream() {
147 tracing::trace!("stream reported end of stream before Poll::Ready(None) reached; marking stream complete");
148 self.throughput.mark_complete();
149 }
150 Poll::Ready(Some(Ok(Frame::data(bytes))))
151 } else {
152 Poll::Ready(Some(Ok(frame)))
153 }
154 }
155 Poll::Pending => {
156 tracing::trace!("received poll pending");
157 this.throughput.push_pending(now);
158 Poll::Pending
159 }
160 res => {
162 if this.throughput.mark_complete() {
163 tracing::trace!("stream completed: {:?}", res);
164 }
165 res
166 }
167 }
168 }
169
170 fn is_end_stream(&self) -> bool {
171 self.inner.is_end_stream()
172 }
173
174 fn size_hint(&self) -> http_body_1x::SizeHint {
175 self.inner.size_hint()
176 }
177}