Struct aws_smithy_http::byte_stream::ByteStream
source · pub struct ByteStream { /* private fields */ }
Expand description
Stream of binary data
ByteStream
wraps a stream of binary data for ease of use.
Getting data out of a ByteStream
ByteStream
provides two primary mechanisms for accessing the data:
-
With
.collect()
:.collect()
reads the complete ByteStream into memory and stores it inAggregatedBytes
, a non-contiguous ByteBuffer.use aws_smithy_http::byte_stream::{ByteStream, AggregatedBytes}; use aws_smithy_http::body::SdkBody; use bytes::Buf; async fn example() { let stream = ByteStream::new(SdkBody::from("hello! This is some data")); // Load data from the stream into memory: let data = stream.collect().await.expect("error reading data"); // collect returns a `bytes::Buf`: println!("first chunk: {:?}", data.chunk()); }
-
Via
impl Stream
:Note: An import of
StreamExt
is required to use.try_next()
.For use-cases where holding the entire ByteStream in memory is unnecessary, use the
Stream
implementation:use aws_smithy_http::byte_stream::{ByteStream, AggregatedBytes, error::Error}; use aws_smithy_http::body::SdkBody; use tokio_stream::StreamExt; async fn example() -> Result<(), Error> { let mut stream = ByteStream::from(vec![1, 2, 3, 4, 5, 99]); let mut digest = crc32::Digest::new(); while let Some(bytes) = stream.try_next().await? { digest.write(&bytes); } println!("digest: {}", digest.finish()); Ok(()) }
-
Via
.into_async_read()
:Note: The
rt-tokio
feature must be active to use.into_async_read()
.It’s possible to convert a
ByteStream
into a struct that implementstokio::io::AsyncRead
. Then, you can use pre-existing tools liketokio::io::BufReader
:use aws_smithy_http::byte_stream::ByteStream; use aws_smithy_http::body::SdkBody; use tokio::io::{AsyncBufReadExt, BufReader}; #[cfg(feature = "rt-tokio")] async fn example() -> std::io::Result<()> { let stream = ByteStream::new(SdkBody::from("hello!\nThis is some data")); // Wrap the stream in a BufReader let buf_reader = BufReader::new(stream.into_async_read()); let mut lines = buf_reader.lines(); assert_eq!(lines.next_line().await?, Some("hello!".to_owned())); assert_eq!(lines.next_line().await?, Some("This is some data".to_owned())); assert_eq!(lines.next_line().await?, None); Ok(()) }
Getting data into a ByteStream
ByteStreams can be created in one of three ways:
-
From in-memory binary data: ByteStreams created from in-memory data are always retryable. Data will be converted into
Bytes
enabling a cheap clone during retries.use bytes::Bytes; use aws_smithy_http::byte_stream::ByteStream; let stream = ByteStream::from(vec![1,2,3]); let stream = ByteStream::from(Bytes::from_static(b"hello!"));
-
From a file: ByteStreams created from a path can be retried. A new file descriptor will be opened if a retry occurs.
#[cfg(feature = "tokio-rt")] use aws_smithy_http::byte_stream::ByteStream; let stream = ByteStream::from_path("big_file.csv");
-
From an
SdkBody
directly: For more advanced / custom use cases, a ByteStream can be created directly from an SdkBody. When created from an SdkBody, care must be taken to ensure retriability. An SdkBody is retryable when constructed from in-memory data or when usingSdkBody::retryable
.use aws_smithy_http::byte_stream::ByteStream; use aws_smithy_http::body::SdkBody; use bytes::Bytes; let (mut tx, channel_body) = hyper::Body::channel(); // this will not be retryable because the SDK has no way to replay this stream let stream = ByteStream::new(SdkBody::from(channel_body)); tx.send_data(Bytes::from_static(b"hello world!")); tx.send_data(Bytes::from_static(b"hello again!")); // NOTE! You must ensure that `tx` is dropped to ensure that EOF is sent
Implementations§
source§impl ByteStream
impl ByteStream
pub fn new(body: SdkBody) -> Self
pub fn from_static(bytes: &'static [u8]) -> Self
sourcepub fn into_inner(self) -> SdkBody
pub fn into_inner(self) -> SdkBody
Consumes the ByteStream, returning the wrapped SdkBody
sourcepub async fn collect(self) -> Result<AggregatedBytes, Error>
pub async fn collect(self) -> Result<AggregatedBytes, Error>
Read all the data from this ByteStream
into memory
If an error in the underlying stream is encountered, ByteStreamError
is returned.
Data is read into an AggregatedBytes
that stores data non-contiguously as it was received
over the network. If a contiguous slice is required, use into_bytes()
.
use bytes::Bytes;
use aws_smithy_http::body;
use aws_smithy_http::body::SdkBody;
use aws_smithy_http::byte_stream::{ByteStream, error::Error};
async fn get_data() {
let stream = ByteStream::new(SdkBody::from("hello!"));
let data: Result<Bytes, Error> = stream.collect().await.map(|data| data.into_bytes());
}
sourcepub fn read_from() -> FsBuilder
pub fn read_from() -> FsBuilder
Returns a FsBuilder
, allowing you to build a ByteStream
with
full control over how the file is read (eg. specifying the length of the file or the size of the buffer used to read the file).
use aws_smithy_http::byte_stream::{ByteStream, Length};
async fn bytestream_from_file() -> ByteStream {
let bytestream = ByteStream::read_from()
.path("docs/some-large-file.csv")
// Specify the size of the buffer used to read the file (in bytes, default is 4096)
.buffer_size(32_784)
// Specify the length of the file used (skips an additional call to retrieve the size)
.length(Length::Exact(123_456))
.build()
.await
.expect("valid path");
bytestream
}
sourcepub async fn from_path(path: impl AsRef<Path>) -> Result<Self, Error>
pub async fn from_path(path: impl AsRef<Path>) -> Result<Self, Error>
Create a ByteStream that streams data from the filesystem
This function creates a retryable ByteStream for a given path
. The returned ByteStream
will provide a size hint when used as an HTTP body. If the request fails, the read will
begin again by reloading the file handle.
Warning
The contents of the file MUST not change during retries. The length & checksum of the file will be cached. If the contents of the file change, the operation will almost certainly fail.
Furthermore, a partial write MAY seek in the file and resume from the previous location.
Note: If you want more control, such as specifying the size of the buffer used to read the file
or the length of the file, use a FsBuilder
as returned
from ByteStream::read_from
Examples
use aws_smithy_http::byte_stream::ByteStream;
use std::path::Path;
async fn make_bytestream() -> ByteStream {
ByteStream::from_path("docs/rows.csv").await.expect("file should be readable")
}
sourcepub async fn from_file(file: File) -> Result<Self, Error>
👎Deprecated since 0.40.0: Prefer the more extensible ByteStream::read_from() API
pub async fn from_file(file: File) -> Result<Self, Error>
Create a ByteStream from a file
NOTE: This will NOT result in a retryable ByteStream. For a ByteStream that can be retried in the case of
upstream failures, use ByteStream::from_path
sourcepub fn into_async_read(self) -> impl AsyncRead
pub fn into_async_read(self) -> impl AsyncRead
Convert this ByteStream
into a struct that implements AsyncRead
.
Example
use tokio::io::{BufReader, AsyncBufReadExt};
use aws_smithy_http::byte_stream::ByteStream;
let mut lines = BufReader::new(my_bytestream.into_async_read()).lines();
while let Some(line) = lines.next_line().await? {
// Do something line by line
}
pub fn map( self, f: impl Fn(SdkBody) -> SdkBody + Send + Sync + 'static ) -> ByteStream
Trait Implementations§
source§impl Debug for ByteStream
impl Debug for ByteStream
source§impl Default for ByteStream
impl Default for ByteStream
source§impl From<Body> for ByteStream
impl From<Body> for ByteStream
source§impl From<Bytes> for ByteStream
impl From<Bytes> for ByteStream
Construct a retryable ByteStream from bytes::Bytes
source§impl From<SdkBody> for ByteStream
impl From<SdkBody> for ByteStream
source§impl From<Vec<u8, Global>> for ByteStream
impl From<Vec<u8, Global>> for ByteStream
Construct a retryable ByteStream from a Vec<u8>
.
This will convert the Vec<u8>
into bytes::Bytes
to enable efficient
retries.
source§impl Stream for ByteStream
impl Stream for ByteStream
impl<'__pin> Unpin for ByteStreamwhere __Origin<'__pin>: Unpin,
Auto Trait Implementations§
impl !RefUnwindSafe for ByteStream
impl Send for ByteStream
impl Sync for ByteStream
impl !UnwindSafe for ByteStream
Blanket Implementations§
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> StreamExt for Twhere
T: Stream + ?Sized,
impl<T> StreamExt for Twhere T: Stream + ?Sized,
source§fn next(&mut self) -> Next<'_, Self>where
Self: Unpin,
fn next(&mut self) -> Next<'_, Self>where Self: Unpin,
source§fn into_future(self) -> StreamFuture<Self>where
Self: Sized + Unpin,
fn into_future(self) -> StreamFuture<Self>where Self: Sized + Unpin,
source§fn map<T, F>(self, f: F) -> Map<Self, F>where
F: FnMut(Self::Item) -> T,
Self: Sized,
fn map<T, F>(self, f: F) -> Map<Self, F>where F: FnMut(Self::Item) -> T, Self: Sized,
source§fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
fn enumerate(self) -> Enumerate<Self>where Self: Sized,
source§fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>where
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>,
Self: Sized,
fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,
source§fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = Option<T>>,
Self: Sized,
fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = Option<T>>, Self: Sized,
source§fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>where
F: FnMut(Self::Item) -> Fut,
Fut: Future,
Self: Sized,
fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,
source§fn collect<C>(self) -> Collect<Self, C>where
C: Default + Extend<Self::Item>,
Self: Sized,
fn collect<C>(self) -> Collect<Self, C>where C: Default + Extend<Self::Item>, Self: Sized,
source§fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>where
FromA: Default + Extend<A>,
FromB: Default + Extend<B>,
Self: Sized + Stream<Item = (A, B)>,
fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>where FromA: Default + Extend<A>, FromB: Default + Extend<B>, Self: Sized + Stream<Item = (A, B)>,
source§fn concat(self) -> Concat<Self>where
Self: Sized,
Self::Item: Extend<<Self::Item as IntoIterator>::Item> + IntoIterator + Default,
fn concat(self) -> Concat<Self>where Self: Sized, Self::Item: Extend<<Self::Item as IntoIterator>::Item> + IntoIterator + Default,
source§fn count(self) -> Count<Self>where
Self: Sized,
fn count(self) -> Count<Self>where Self: Sized,
source§fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>where
F: FnMut(T, Self::Item) -> Fut,
Fut: Future<Output = T>,
Self: Sized,
fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>where F: FnMut(T, Self::Item) -> Fut, Fut: Future<Output = T>, Self: Sized,
source§fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = bool>,
Self: Sized,
fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,
true
if any element in stream satisfied a predicate. Read moresource§fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = bool>,
Self: Sized,
fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,
true
if all element in stream satisfied a predicate. Read moresource§fn flatten(self) -> Flatten<Self>where
Self::Item: Stream,
Self: Sized,
fn flatten(self) -> Flatten<Self>where Self::Item: Stream, Self: Sized,
source§fn flatten_unordered(
self,
limit: impl Into<Option<usize>>
) -> FlattenUnorderedWithFlowController<Self, ()>where
Self::Item: Stream + Unpin,
Self: Sized,
fn flatten_unordered( self, limit: impl Into<Option<usize>> ) -> FlattenUnorderedWithFlowController<Self, ()>where Self::Item: Stream + Unpin, Self: Sized,
source§fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>where
F: FnMut(Self::Item) -> U,
U: Stream,
Self: Sized,
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>where F: FnMut(Self::Item) -> U, U: Stream, Self: Sized,
source§fn flat_map_unordered<U, F>(
self,
limit: impl Into<Option<usize>>,
f: F
) -> FlatMapUnordered<Self, U, F>where
U: Stream + Unpin,
F: FnMut(Self::Item) -> U,
Self: Sized,
fn flat_map_unordered<U, F>( self, limit: impl Into<Option<usize>>, f: F ) -> FlatMapUnordered<Self, U, F>where U: Stream + Unpin, F: FnMut(Self::Item) -> U, Self: Sized,
StreamExt::map
but flattens nested Stream
s
and polls them concurrently, yielding items in any order, as they made
available. Read moresource§fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>where
F: FnMut(&mut S, Self::Item) -> Fut,
Fut: Future<Output = Option<B>>,
Self: Sized,
fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>where F: FnMut(&mut S, Self::Item) -> Fut, Fut: Future<Output = Option<B>>, Self: Sized,
StreamExt::fold
that holds internal state
and produces a new stream. Read moresource§fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>where
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>,
Self: Sized,
fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,
true
. Read moresource§fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>where
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>,
Self: Sized,
fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,
true
. Read moresource§fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>where
Fut: Future,
Self: Sized,
fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>where Fut: Future, Self: Sized,
source§fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = ()>,
Self: Sized,
fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,
source§fn for_each_concurrent<Fut, F>(
self,
limit: impl Into<Option<usize>>,
f: F
) -> ForEachConcurrent<Self, Fut, F>where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = ()>,
Self: Sized,
fn for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F ) -> ForEachConcurrent<Self, Fut, F>where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,
source§fn take(self, n: usize) -> Take<Self>where
Self: Sized,
fn take(self, n: usize) -> Take<Self>where Self: Sized,
n
items of the underlying stream. Read moresource§fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
fn skip(self, n: usize) -> Skip<Self>where Self: Sized,
n
items of the underlying stream. Read moresource§fn catch_unwind(self) -> CatchUnwind<Self>where
Self: Sized + UnwindSafe,
fn catch_unwind(self) -> CatchUnwind<Self>where Self: Sized + UnwindSafe,
source§fn boxed<'a>(
self
) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a, Global>>where
Self: Sized + Send + 'a,
fn boxed<'a>( self ) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a, Global>>where Self: Sized + Send + 'a,
source§fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a, Global>>where
Self: Sized + 'a,
fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a, Global>>where Self: Sized + 'a,
source§fn buffered(self, n: usize) -> Buffered<Self>where
Self::Item: Future,
Self: Sized,
fn buffered(self, n: usize) -> Buffered<Self>where Self::Item: Future, Self: Sized,
source§fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>where
Self::Item: Future,
Self: Sized,
fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>where Self::Item: Future, Self: Sized,
source§fn zip<St>(self, other: St) -> Zip<Self, St>where
St: Stream,
Self: Sized,
fn zip<St>(self, other: St) -> Zip<Self, St>where St: Stream, Self: Sized,
source§fn chain<St>(self, other: St) -> Chain<Self, St>where
St: Stream<Item = Self::Item>,
Self: Sized,
fn chain<St>(self, other: St) -> Chain<Self, St>where St: Stream<Item = Self::Item>, Self: Sized,
source§fn peekable(self) -> Peekable<Self>where
Self: Sized,
fn peekable(self) -> Peekable<Self>where Self: Sized,
peek
method. Read moresource§fn chunks(self, capacity: usize) -> Chunks<Self>where
Self: Sized,
fn chunks(self, capacity: usize) -> Chunks<Self>where Self: Sized,
source§fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>where
Self: Sized,
fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>where Self: Sized,
source§fn inspect<F>(self, f: F) -> Inspect<Self, F>where
F: FnMut(&Self::Item),
Self: Sized,
fn inspect<F>(self, f: F) -> Inspect<Self, F>where F: FnMut(&Self::Item), Self: Sized,
source§fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>where
Self: Unpin,
fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>where Self: Unpin,
Stream::poll_next
on Unpin
stream types.source§fn select_next_some(&mut self) -> SelectNextSome<'_, Self>where
Self: Unpin + FusedStream,
fn select_next_some(&mut self) -> SelectNextSome<'_, Self>where Self: Unpin + FusedStream,
source§impl<S> TryStreamExt for Swhere
S: TryStream + ?Sized,
impl<S> TryStreamExt for Swhere S: TryStream + ?Sized,
source§fn err_into<E>(self) -> ErrInto<Self, E>where
Self: Sized,
Self::Error: Into<E>,
fn err_into<E>(self) -> ErrInto<Self, E>where Self: Sized, Self::Error: Into<E>,
source§fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>where
Self: Sized,
F: FnMut(Self::Ok) -> T,
fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>where Self: Sized, F: FnMut(Self::Ok) -> T,
source§fn map_err<E, F>(self, f: F) -> MapErr<Self, F>where
Self: Sized,
F: FnMut(Self::Error) -> E,
fn map_err<E, F>(self, f: F) -> MapErr<Self, F>where Self: Sized, F: FnMut(Self::Error) -> E,
source§fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>where
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Error = Self::Error>,
Self: Sized,
fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Error = Self::Error>, Self: Sized,
f
. Read moresource§fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>where
F: FnMut(Self::Error) -> Fut,
Fut: TryFuture<Ok = Self::Ok>,
Self: Sized,
fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>where F: FnMut(Self::Error) -> Fut, Fut: TryFuture<Ok = Self::Ok>, Self: Sized,
f
. Read moresource§fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>where
F: FnMut(&Self::Ok),
Self: Sized,
fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>where F: FnMut(&Self::Ok), Self: Sized,
source§fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>where
F: FnMut(&Self::Error),
Self: Sized,
fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>where F: FnMut(&Self::Error), Self: Sized,
source§fn into_stream(self) -> IntoStream<Self>where
Self: Sized,
fn into_stream(self) -> IntoStream<Self>where Self: Sized,
source§fn try_next(&mut self) -> TryNext<'_, Self>where
Self: Unpin,
fn try_next(&mut self) -> TryNext<'_, Self>where Self: Unpin,
source§fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>where
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = Self::Error>,
Self: Sized,
fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Ok = (), Error = Self::Error>, Self: Sized,
source§fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>where
F: FnMut(&Self::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = Self::Error>,
Self: Sized,
fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,
true
. Read moresource§fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>where
F: FnMut(&Self::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = Self::Error>,
Self: Sized,
fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,
true
. Read more