Skip to main content

aws_smithy_runtime/client/http/body/minimum_throughput/
http_body_1_x.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use super::{BoxError, Error, MinimumThroughputDownloadBody};
7use crate::client::http::body::minimum_throughput::throughput::DownloadReport;
8use crate::client::http::body::minimum_throughput::ThroughputReadingBody;
9use aws_smithy_async::rt::sleep::AsyncSleep;
10use http_body_1x::Frame;
11use std::future::Future;
12use std::pin::{pin, Pin};
13use std::task::{Context, Poll};
14
15impl<B> http_body_1x::Body for MinimumThroughputDownloadBody<B>
16where
17    B: http_body_1x::Body<Data = bytes::Bytes, Error = BoxError>,
18{
19    type Data = bytes::Bytes;
20    type Error = BoxError;
21
22    fn poll_frame(
23        mut self: Pin<&mut Self>,
24        cx: &mut Context<'_>,
25    ) -> Poll<Option<Result<http_body_1x::Frame<Self::Data>, Self::Error>>> {
26        #[allow(unused_imports)]
27        use crate::client::http::body::minimum_throughput::throughput::ThroughputReport;
28        // this code is called quite frequently in production—one every millisecond or so when downloading
29        // a stream. However, SystemTime::now is on the order of nanoseconds
30        let now = self.time_source.now();
31        // Attempt to read the data from the inner body, then update the
32        // throughput logs.
33        let mut this = self.as_mut().project();
34        let poll_res = match this.inner.poll_frame(cx) {
35            Poll::Ready(Some(Ok(frame))) => {
36                if frame.is_data() {
37                    let bytes = frame.into_data().expect("Is data frame");
38                    tracing::trace!("received data: {}", bytes.len());
39                    this.throughput_logs
40                        .push_bytes_transferred(now, bytes.len() as u64);
41                    Poll::Ready(Some(Ok(Frame::data(bytes))))
42                } else {
43                    tracing::trace!("received trailer");
44                    Poll::Ready(Some(Ok(frame)))
45                }
46            }
47            Poll::Pending => {
48                tracing::trace!("received poll pending");
49                this.throughput_logs.push_pending(now);
50                Poll::Pending
51            }
52            // If we've read all the data or an error occurred, then return that result.
53            res => return res,
54        };
55
56        // Check the sleep future to see if it needs refreshing.
57        let mut sleep_fut = this
58            .sleep_fut
59            .take()
60            .unwrap_or_else(|| this.async_sleep.sleep(*this.resolution));
61        if let Poll::Ready(()) = pin!(&mut sleep_fut).poll(cx) {
62            tracing::trace!("sleep future triggered—triggering a wakeup");
63            // Whenever the sleep future expires, we replace it.
64            sleep_fut = this.async_sleep.sleep(*this.resolution);
65
66            // We also schedule a wake up for current task to ensure that
67            // it gets polled at least one more time.
68            cx.waker().wake_by_ref();
69        };
70        this.sleep_fut.replace(sleep_fut);
71
72        // Calculate the current throughput and emit an error if it's too low and
73        // the grace period has elapsed.
74        let report = this.throughput_logs.report(now);
75        let (violated, current_throughput) =
76            report.minimum_throughput_violated(this.options.minimum_throughput());
77        if violated {
78            if this.grace_period_fut.is_none() {
79                tracing::debug!("entering minimum throughput grace period");
80            }
81            let mut grace_period_fut = this
82                .grace_period_fut
83                .take()
84                .unwrap_or_else(|| this.async_sleep.sleep(this.options.grace_period()));
85            if let Poll::Ready(()) = pin!(&mut grace_period_fut).poll(cx) {
86                // The grace period has ended!
87                return Poll::Ready(Some(Err(Box::new(Error::ThroughputBelowMinimum {
88                    expected: self.options.minimum_throughput(),
89                    actual: current_throughput,
90                }))));
91            };
92            this.grace_period_fut.replace(grace_period_fut);
93        } else {
94            // Ensure we don't have an active grace period future if we're not
95            // currently below the minimum throughput.
96            if this.grace_period_fut.is_some() {
97                tracing::debug!("throughput recovered; exiting grace period");
98            }
99            let _ = this.grace_period_fut.take();
100        }
101
102        poll_res
103    }
104
105    fn is_end_stream(&self) -> bool {
106        self.inner.is_end_stream()
107    }
108
109    fn size_hint(&self) -> http_body_1x::SizeHint {
110        self.inner.size_hint()
111    }
112}
113
114impl<B> http_body_1x::Body for ThroughputReadingBody<B>
115where
116    B: http_body_1x::Body<Data = bytes::Bytes, Error = BoxError>,
117{
118    type Data = bytes::Bytes;
119    type Error = BoxError;
120
121    fn poll_frame(
122        mut self: Pin<&mut Self>,
123        cx: &mut Context<'_>,
124    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
125        // this code is called quite frequently in production—one every millisecond or so when downloading
126        // a stream. However, SystemTime::now is on the order of nanoseconds
127        let now = self.time_source.now();
128        // Attempt to read the data from the inner body, then update the
129        // throughput logs.
130        let this = self.as_mut().project();
131        match this.inner.poll_frame(cx) {
132            Poll::Ready(Some(Ok(frame))) => {
133                if frame.is_data() {
134                    let bytes = frame.into_data().expect("Is data frame");
135                    tracing::trace!("received data: {}", bytes.len());
136                    this.throughput
137                        .push_bytes_transferred(now, bytes.len() as u64);
138
139                    // hyper will optimistically stop polling when end of stream is reported
140                    // (e.g. when content-length amount of data has been consumed) which means
141                    // we may never get to `Poll:Ready(None)`. Check for same condition and
142                    // attempt to stop checking throughput violations _now_ as we may never
143                    // get polled again. The caveat here is that it depends on `Body` implementations
144                    // implementing `is_end_stream()` correctly. Users can also disable SSP as an
145                    // alternative for such fringe use cases.
146                    if self.is_end_stream() {
147                        tracing::trace!("stream reported end of stream before Poll::Ready(None) reached; marking stream complete");
148                        self.throughput.mark_complete();
149                    }
150                    Poll::Ready(Some(Ok(Frame::data(bytes))))
151                } else {
152                    Poll::Ready(Some(Ok(frame)))
153                }
154            }
155            Poll::Pending => {
156                tracing::trace!("received poll pending");
157                this.throughput.push_pending(now);
158                Poll::Pending
159            }
160            // If we've read all the data or an error occurred, then return that result.
161            res => {
162                if this.throughput.mark_complete() {
163                    tracing::trace!("stream completed: {:?}", res);
164                }
165                res
166            }
167        }
168    }
169
170    fn is_end_stream(&self) -> bool {
171        self.inner.is_end_stream()
172    }
173
174    fn size_hint(&self) -> http_body_1x::SizeHint {
175        self.inner.size_hint()
176    }
177}