use std::fmt;
use bytes::Bytes;
use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
use futures::sync::{mpsc, oneshot};
#[cfg(feature = "tokio-proto")]
use tokio_proto;
use std::borrow::Cow;
use common::Never;
use super::Chunk;
#[cfg(feature = "tokio-proto")]
pub type TokioBody = tokio_proto::streaming::Body<Chunk, ::Error>;
pub type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
#[must_use = "streams do nothing unless polled"]
pub struct Body {
kind: Kind,
delayed_eof: Option<DelayEof>,
}
#[derive(Debug)]
enum Kind {
#[cfg(feature = "tokio-proto")]
Tokio(TokioBody),
Chan {
close_tx: oneshot::Sender<bool>,
rx: mpsc::Receiver<Result<Chunk, ::Error>>,
},
Once(Option<Chunk>),
Empty,
}
type DelayEofUntil = oneshot::Receiver<Never>;
enum DelayEof {
NotEof(DelayEofUntil),
Eof(DelayEofUntil),
}
#[derive(Debug)]
pub struct ChunkSender {
close_rx: oneshot::Receiver<bool>,
close_rx_check: bool,
tx: BodySender,
}
impl Body {
#[inline]
pub fn empty() -> Body {
Body::new(Kind::Empty)
}
#[inline]
pub fn pair() -> (mpsc::Sender<Result<Chunk, ::Error>>, Body) {
let (tx, rx) = channel();
(tx.tx, rx)
}
#[inline]
pub fn is_empty(&self) -> bool {
match self.kind {
Kind::Empty => true,
_ => false,
}
}
fn new(kind: Kind) -> Body {
Body {
kind: kind,
delayed_eof: None,
}
}
pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
self.delayed_eof = Some(DelayEof::NotEof(fut));
}
fn poll_eof(&mut self) -> Poll<Option<Chunk>, ::Error> {
match self.delayed_eof.take() {
Some(DelayEof::NotEof(mut delay)) => {
match self.poll_inner() {
ok @ Ok(Async::Ready(Some(..))) |
ok @ Ok(Async::NotReady) => {
self.delayed_eof = Some(DelayEof::NotEof(delay));
ok
},
Ok(Async::Ready(None)) => match delay.poll() {
Ok(Async::Ready(never)) => match never {},
Ok(Async::NotReady) => {
self.delayed_eof = Some(DelayEof::Eof(delay));
Ok(Async::NotReady)
},
Err(_done) => {
Ok(Async::Ready(None))
},
},
Err(e) => Err(e),
}
},
Some(DelayEof::Eof(mut delay)) => {
match delay.poll() {
Ok(Async::Ready(never)) => match never {},
Ok(Async::NotReady) => {
self.delayed_eof = Some(DelayEof::Eof(delay));
Ok(Async::NotReady)
},
Err(_done) => {
Ok(Async::Ready(None))
},
}
},
None => self.poll_inner(),
}
}
fn poll_inner(&mut self) -> Poll<Option<Chunk>, ::Error> {
match self.kind {
#[cfg(feature = "tokio-proto")]
Kind::Tokio(ref mut rx) => rx.poll(),
Kind::Chan { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") {
Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))),
Async::Ready(Some(Err(err))) => Err(err),
Async::Ready(None) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
},
Kind::Once(ref mut val) => Ok(Async::Ready(val.take())),
Kind::Empty => Ok(Async::Ready(None)),
}
}
}
impl Default for Body {
#[inline]
fn default() -> Body {
Body::empty()
}
}
impl Stream for Body {
type Item = Chunk;
type Error = ::Error;
#[inline]
fn poll(&mut self) -> Poll<Option<Chunk>, ::Error> {
self.poll_eof()
}
}
impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("Body")
.field(&self.kind)
.finish()
}
}
pub fn channel() -> (ChunkSender, Body) {
let (tx, rx) = mpsc::channel(0);
let (close_tx, close_rx) = oneshot::channel();
let tx = ChunkSender {
close_rx: close_rx,
close_rx_check: true,
tx: tx,
};
let rx = Body::new(Kind::Chan {
close_tx: close_tx,
rx: rx,
});
(tx, rx)
}
impl ChunkSender {
pub fn poll_ready(&mut self) -> Poll<(), ()> {
if self.close_rx_check {
match self.close_rx.poll() {
Ok(Async::Ready(true)) | Err(_) => return Err(()),
Ok(Async::Ready(false)) => {
self.close_rx_check = false;
}
Ok(Async::NotReady) => (),
}
}
self.tx.poll_ready().map_err(|_| ())
}
pub fn start_send(&mut self, msg: Result<Chunk, ::Error>) -> StartSend<(), ()> {
match self.tx.start_send(msg) {
Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready),
Ok(AsyncSink::NotReady(_)) => Ok(AsyncSink::NotReady(())),
Err(_) => Err(()),
}
}
}
feat_server_proto! {
impl From<Body> for tokio_proto::streaming::Body<Chunk, ::Error> {
fn from(b: Body) -> tokio_proto::streaming::Body<Chunk, ::Error> {
match b.kind {
Kind::Tokio(b) => b,
Kind::Chan { close_tx, rx } => {
let _ = close_tx.send(false);
rx.into()
},
Kind::Once(Some(chunk)) => TokioBody::from(chunk),
Kind::Once(None) |
Kind::Empty => TokioBody::empty(),
}
}
}
impl From<tokio_proto::streaming::Body<Chunk, ::Error>> for Body {
fn from(tokio_body: tokio_proto::streaming::Body<Chunk, ::Error>) -> Body {
Body::new(Kind::Tokio(tokio_body))
}
}
}
impl From<mpsc::Receiver<Result<Chunk, ::Error>>> for Body {
#[inline]
fn from(src: mpsc::Receiver<Result<Chunk, ::Error>>) -> Body {
let (tx, _) = oneshot::channel();
Body::new(Kind::Chan {
close_tx: tx,
rx: src,
})
}
}
impl From<Chunk> for Body {
#[inline]
fn from (chunk: Chunk) -> Body {
Body::new(Kind::Once(Some(chunk)))
}
}
impl From<Bytes> for Body {
#[inline]
fn from (bytes: Bytes) -> Body {
Body::from(Chunk::from(bytes))
}
}
impl From<Vec<u8>> for Body {
#[inline]
fn from (vec: Vec<u8>) -> Body {
Body::from(Chunk::from(vec))
}
}
impl From<&'static [u8]> for Body {
#[inline]
fn from (slice: &'static [u8]) -> Body {
Body::from(Chunk::from(slice))
}
}
impl From<Cow<'static, [u8]>> for Body {
#[inline]
fn from (cow: Cow<'static, [u8]>) -> Body {
match cow {
Cow::Borrowed(b) => Body::from(b),
Cow::Owned(o) => Body::from(o)
}
}
}
impl From<String> for Body {
#[inline]
fn from (s: String) -> Body {
Body::from(Chunk::from(s.into_bytes()))
}
}
impl From<&'static str> for Body {
#[inline]
fn from(slice: &'static str) -> Body {
Body::from(Chunk::from(slice.as_bytes()))
}
}
impl From<Cow<'static, str>> for Body {
#[inline]
fn from(cow: Cow<'static, str>) -> Body {
match cow {
Cow::Borrowed(b) => Body::from(b),
Cow::Owned(o) => Body::from(o)
}
}
}
impl From<Option<Body>> for Body {
#[inline]
fn from (body: Option<Body>) -> Body {
body.unwrap_or_default()
}
}
fn _assert_send_sync() {
fn _assert_send<T: Send>() {}
fn _assert_sync<T: Sync>() {}
_assert_send::<Body>();
_assert_send::<Chunk>();
_assert_sync::<Chunk>();
}
#[test]
fn test_body_stream_concat() {
use futures::{Sink, Stream, Future};
let (tx, body) = Body::pair();
::std::thread::spawn(move || {
let tx = tx.send(Ok("hello ".into())).wait().unwrap();
tx.send(Ok("world".into())).wait().unwrap();
});
let total = body.concat2().wait().unwrap();
assert_eq!(total.as_ref(), b"hello world");
}