use streaming::{Message, Body};
use futures::sync::mpsc;
use futures::{Future, Poll, Async, Stream, Sink, AsyncSink, StartSend};
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::{fmt, io};
use super::frame_buf::{FrameBuf, FrameDeque};
use super::{Frame, RequestId, Transport};
use buffer_one::BufferOne;
const MAX_BUFFERED_FRAMES: usize = 128;
pub struct Multiplex<T> where T: Dispatch {
run: bool,
made_progress: bool,
blocked_on_dispatch: bool,
blocked_on_flush: WriteState,
dispatch: BufferOne<DispatchSink<T>>,
exchanges: HashMap<RequestId, Exchange<T>>,
is_flushed: bool,
dispatch_deque: VecDeque<RequestId>,
frame_buf: FrameBuf<Option<Result<T::BodyOut, T::Error>>>,
scratch: Vec<RequestId>,
}
impl<T> fmt::Debug for Multiplex<T>
where T: Dispatch + fmt::Debug,
T::In: fmt::Debug,
T::Out: fmt::Debug,
T::BodyIn: fmt::Debug,
T::BodyOut: fmt::Debug,
T::Error: fmt::Debug,
T::Stream: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Multiplex")
.field("run", &self.run)
.field("made_progress", &self.made_progress)
.field("blocked_on_dispatch", &self.blocked_on_dispatch)
.field("dispatch", &self.dispatch)
.field("exhanges", &self.exchanges)
.field("is_flushed", &self.is_flushed)
.field("dispatch_deque", &self.dispatch_deque)
.field("frame_buf", &"FrameBuf { ... }")
.field("scratch", &self.scratch)
.finish()
}
}
#[derive(Debug)]
struct DispatchSink<T> {
inner: T,
}
type BodySender<B, E> = mpsc::Sender<Result<B, E>>;
struct Exchange<T: Dispatch> {
request: Request<T>,
responded: bool,
out_body: Option<BodySender<T::BodyOut, T::Error>>,
out_deque: FrameDeque<Option<Result<T::BodyOut, T::Error>>>,
out_is_ready: bool,
in_body: Option<T::Stream>,
}
#[derive(Debug)]
enum Request<T: Dispatch> {
In,
Out(Option<Message<T::Out, Body<T::BodyOut, T::Error>>>),
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum WriteState {
NoWrite,
Wrote,
Blocked,
}
#[derive(Debug)]
pub struct MultiplexMessage<T, B, E> {
pub id: RequestId,
pub message: Result<Message<T, B>, E>,
pub solo: bool,
}
pub trait Dispatch {
type Io;
type In;
type BodyIn;
type Out;
type BodyOut;
type Error: From<io::Error>;
type Stream: Stream<Item = Self::BodyIn, Error = Self::Error>;
type Transport: Transport<Self::BodyOut,
Item = Frame<Self::Out, Self::BodyOut, Self::Error>,
SinkItem = Frame<Self::In, Self::BodyIn, Self::Error>>;
fn transport(&mut self) -> &mut Self::Transport;
fn poll(&mut self) -> Poll<Option<MultiplexMessage<Self::In, Self::Stream, Self::Error>>, io::Error>;
fn poll_ready(&self) -> Async<()>;
fn dispatch(&mut self, message: MultiplexMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>) -> io::Result<()>;
fn cancel(&mut self, request_id: RequestId) -> io::Result<()>;
}
impl<T> Multiplex<T> where T: Dispatch {
pub fn new(dispatch: T) -> Multiplex<T> {
let dispatch = DispatchSink { inner: dispatch };
let dispatch = BufferOne::new(dispatch);
let frame_buf = FrameBuf::with_capacity(MAX_BUFFERED_FRAMES);
Multiplex {
run: true,
made_progress: false,
blocked_on_dispatch: false,
blocked_on_flush: WriteState::NoWrite,
dispatch: dispatch,
exchanges: HashMap::new(),
is_flushed: true,
dispatch_deque: VecDeque::new(),
frame_buf: frame_buf,
scratch: vec![],
}
}
fn is_done(&self) -> bool {
!self.run && self.is_flushed && self.exchanges.len() == 0
}
fn flush_dispatch_deque(&mut self) -> io::Result<()> {
while self.dispatch.get_mut().inner.poll_ready().is_ready() {
let id = match self.dispatch_deque.pop_front() {
Some(id) => id,
None => return Ok(()),
};
let exchange = match self.exchanges.get_mut(&id) {
Some(exchange) => exchange,
None => continue,
};
if let Some(message) = exchange.take_buffered_out_request() {
let message = MultiplexMessage {
id: id,
message: Ok(message),
solo: exchange.responded,
};
try!(self.dispatch.get_mut().inner.dispatch(message));
}
}
self.blocked_on_dispatch = true;
Ok(())
}
fn flush_out_bodies(&mut self) -> io::Result<()> {
trace!("flush out bodies");
self.scratch.clear();
for (id, exchange) in self.exchanges.iter_mut() {
trace!(" --> request={}", id);
try!(exchange.flush_out_body());
if exchange.is_complete() {
self.scratch.push(*id);
}
}
for id in &self.scratch {
trace!("drop exchange; id={}", id);
self.exchanges.remove(id);
}
Ok(())
}
fn read_out_frames(&mut self) -> io::Result<()> {
while self.run {
if let Async::Ready(frame) = try!(self.dispatch.get_mut().inner.transport().poll()) {
try!(self.process_out_frame(frame));
} else {
break;
}
}
Ok(())
}
fn process_out_frame(&mut self,
frame: Option<Frame<T::Out, T::BodyOut, T::Error>>)
-> io::Result<()> {
trace!("Multiplex::process_out_frame");
match frame {
Some(Frame::Message { id, message, body, solo }) => {
if body {
let (tx, rx) = Body::pair();
let message = Message::WithBody(message, rx);
try!(self.process_out_message(id, message, Some(tx), solo));
} else {
let message = Message::WithoutBody(message);
try!(self.process_out_message(id, message, None, solo));
}
}
Some(Frame::Body { id, chunk }) => {
trace!(" --> read out body chunk");
self.process_out_body_chunk(id, Ok(chunk));
}
Some(Frame::Error { id, error }) => {
try!(self.process_out_err(id, error));
}
None => {
trace!("read None");
self.run = false;
}
}
Ok(())
}
fn process_out_message(&mut self,
id: RequestId,
message: Message<T::Out, Body<T::BodyOut, T::Error>>,
body: Option<mpsc::Sender<Result<T::BodyOut, T::Error>>>,
solo: bool)
-> io::Result<()>
{
trace!(" --> process message; body={:?}", body.is_some());
match self.exchanges.entry(id) {
Entry::Occupied(mut e) => {
assert!(!e.get().responded, "invalid exchange state");
assert!(e.get().is_inbound());
try!(self.dispatch.get_mut().inner.dispatch(MultiplexMessage {
id: id,
message: Ok(message),
solo: solo,
}));
e.get_mut().responded = true;
e.get_mut().out_body = body;
if e.get().is_complete() {
e.remove();
}
}
Entry::Vacant(e) => {
if self.dispatch.get_mut().inner.poll_ready().is_ready() {
trace!(" --> dispatch ready -- dispatching");
assert!(self.dispatch_deque.is_empty());
let mut exchange = Exchange::new(
Request::Out(None),
self.frame_buf.deque());
exchange.out_body = body;
exchange.set_expect_response(solo);
if !exchange.is_complete() {
e.insert(exchange);
}
try!(self.dispatch.get_mut().inner.dispatch(MultiplexMessage {
id: id,
message: Ok(message),
solo: solo,
}));
} else {
trace!(" --> dispatch not ready");
self.blocked_on_dispatch = true;
let mut exchange = Exchange::new(
Request::Out(Some(message)),
self.frame_buf.deque());
exchange.out_body = body;
exchange.set_expect_response(solo);
assert!(!exchange.is_complete());
e.insert(exchange);
self.dispatch_deque.push_back(id);
}
}
}
Ok(())
}
fn process_out_err(&mut self, id: RequestId, err: T::Error) -> io::Result<()> {
trace!(" --> process error frame");
let mut remove = false;
if let Some(exchange) = self.exchanges.get_mut(&id) {
if !exchange.is_dispatched() {
remove = true;
assert!(exchange.out_body.is_none());
assert!(exchange.in_body.is_none());
} else if exchange.is_outbound() {
exchange.send_out_chunk(Err(err));
if !exchange.responded {
try!(self.dispatch.get_mut().inner.cancel(id));
}
remove = exchange.is_complete();
} else {
if !exchange.responded {
try!(self.dispatch.get_mut().inner.dispatch(MultiplexMessage::error(id, err)));
exchange.responded = true;
} else {
exchange.send_out_chunk(Err(err));
}
remove = exchange.is_complete();
}
} else {
trace!(" --> no in-flight exchange; dropping error");
}
if remove {
self.exchanges.remove(&id);
}
Ok(())
}
fn process_out_body_chunk(&mut self, id: RequestId, chunk: Result<Option<T::BodyOut>, T::Error>) {
trace!("process out body chunk; id={:?}", id);
{
let exchange = match self.exchanges.get_mut(&id) {
Some(v) => v,
_ => {
trace!(" --> exchange previously aborted; id={:?}", id);
return;
}
};
exchange.send_out_chunk(chunk);
if !exchange.is_complete() {
return;
}
}
trace!("dropping out body handle; id={:?}", id);
self.exchanges.remove(&id);
}
fn write_in_frames(&mut self) -> io::Result<()> {
try!(self.write_in_messages());
try!(self.write_in_body());
Ok(())
}
fn write_in_messages(&mut self) -> io::Result<()> {
trace!("write in messages");
while self.dispatch.poll_ready().is_ready() {
trace!(" --> polling for in frame");
match try!(self.dispatch.get_mut().inner.poll()) {
Async::Ready(Some(message)) => {
self.dispatch_made_progress();
match message.message {
Ok(m) => {
trace!(" --> got message");
try!(self.write_in_message(message.id, m, message.solo));
}
Err(error) => {
trace!(" --> got error");
try!(self.write_in_error(message.id, error));
}
}
}
Async::Ready(None) => {
trace!(" --> got error");
trace!(" --> got None");
break;
}
Async::NotReady => break,
}
}
trace!(" --> transport not ready");
self.blocked_on_flush.transport_not_write_ready();
Ok(())
}
fn write_in_message(&mut self,
id: RequestId,
message: Message<T::In, T::Stream>,
solo: bool)
-> io::Result<()>
{
let (message, body) = match message {
Message::WithBody(message, rx) => (message, Some(rx)),
Message::WithoutBody(message) => (message, None),
};
let frame = Frame::Message {
id: id,
message: message,
body: body.is_some(),
solo: solo,
};
try!(assert_send(&mut self.dispatch, frame));
self.blocked_on_flush.wrote_frame();
match self.exchanges.entry(id) {
Entry::Occupied(mut e) => {
assert!(!e.get().responded, "invalid exchange state");
assert!(e.get().is_outbound());
assert!(!solo);
e.get_mut().responded = true;
e.get_mut().in_body = body;
if e.get().is_complete() {
e.remove();
}
}
Entry::Vacant(e) => {
let mut exchange = Exchange::new(
Request::In,
self.frame_buf.deque());
exchange.in_body = body;
exchange.set_expect_response(solo);
if !exchange.is_complete() {
e.insert(exchange);
}
}
}
Ok(())
}
fn write_in_error(&mut self,
id: RequestId,
error: T::Error)
-> io::Result<()>
{
if let Entry::Occupied(mut e) = self.exchanges.entry(id) {
assert!(!e.get().responded, "exchange already responded");
e.get_mut().responded = true;
e.get_mut().out_body = None;
e.get_mut().in_body = None;
e.get_mut().out_deque.clear();
assert!(e.get().is_complete());
let frame = Frame::Error { id: id, error: error };
try!(assert_send(&mut self.dispatch, frame));
self.blocked_on_flush.wrote_frame();
e.remove();
} else {
trace!("exchange does not exist; id={:?}", id);
}
Ok(())
}
fn write_in_body(&mut self) -> io::Result<()> {
trace!("write in body chunks");
self.scratch.clear();
'outer:
for (&id, exchange) in &mut self.exchanges {
trace!(" --> checking request {:?}", id);
loop {
if !try!(self.dispatch.poll_complete()).is_ready() {
trace!(" --> blocked on transport");
self.blocked_on_flush.transport_not_write_ready();
break 'outer;
}
match exchange.try_poll_in_body() {
Ok(Async::Ready(Some(chunk))) => {
trace!(" --> got chunk");
let frame = Frame::Body { id: id, chunk: Some(chunk) };
try!(assert_send(&mut self.dispatch, frame));
self.blocked_on_flush.wrote_frame();
}
Ok(Async::Ready(None)) => {
trace!(" --> end of stream");
let frame = Frame::Body { id: id, chunk: None };
try!(assert_send(&mut self.dispatch, frame));
self.blocked_on_flush.wrote_frame();
exchange.in_body = None;
break;
}
Err(error) => {
trace!(" --> got error");
let frame = Frame::Error { id: id, error: error };
try!(assert_send(&mut self.dispatch, frame));
self.blocked_on_flush.wrote_frame();
exchange.responded = true;
exchange.in_body = None;
exchange.out_body = None;
exchange.out_deque.clear();
debug_assert!(exchange.is_complete());
break;
}
Ok(Async::NotReady) => {
trace!(" --> no pending chunks");
continue 'outer;
}
}
}
if exchange.is_complete() {
self.scratch.push(id);
}
}
for id in &self.scratch {
trace!("dropping in body handle; id={:?}", id);
self.exchanges.remove(id);
}
Ok(())
}
fn flush(&mut self) -> io::Result<()> {
self.is_flushed = try!(self.dispatch.poll_complete()).is_ready();
if self.is_flushed && self.blocked_on_flush == WriteState::Blocked {
self.made_progress = true;
}
Ok(())
}
fn reset_flags(&mut self) {
self.made_progress = false;
self.blocked_on_dispatch = false;
self.blocked_on_flush = WriteState::NoWrite;
}
fn dispatch_made_progress(&mut self) {
if self.blocked_on_dispatch {
self.made_progress = true;
}
}
}
impl<T> Future for Multiplex<T>
where T: Dispatch,
{
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
trace!("Multiplex::tick ~~~~~~~~~~~~~~~~~~~~~~~~~~~");
self.dispatch.get_mut().inner.transport().tick();
try!(self.flush_out_bodies());
self.made_progress = true;
while self.made_progress {
trace!("~~ multiplex primary loop tick ~~");
self.reset_flags();
try!(self.flush_dispatch_deque());
try!(self.read_out_frames());
try!(self.write_in_frames());
try!(self.flush());
}
if self.is_done() {
trace!("multiplex done; terminating");
return Ok(Async::Ready(()));
}
trace!("tick done; waiting for wake-up");
Ok(Async::NotReady)
}
}
impl<T: Dispatch> Drop for Multiplex<T> {
fn drop(&mut self) {
if !self.exchanges.is_empty() {
warn!("multiplexer dropping with in-flight exchanges");
}
}
}
impl<T: Dispatch> Exchange<T> {
fn new(request: Request<T>, deque: FrameDeque<Option<Result<T::BodyOut, T::Error>>>) -> Exchange<T> {
Exchange {
request: request,
responded: false,
out_body: None,
out_deque: deque,
out_is_ready: true,
in_body: None,
}
}
fn is_inbound(&self) -> bool {
match self.request {
Request::In => true,
Request::Out(_) => false,
}
}
fn is_outbound(&self) -> bool {
!self.is_inbound()
}
fn is_dispatched(&self) -> bool {
match self.request {
Request::Out(Some(_)) => false,
_ => true,
}
}
fn is_complete(&self) -> bool {
self.responded &&
self.out_body.is_none() &&
self.in_body.is_none() &&
self.request.is_none()
}
fn set_expect_response(&mut self, solo: bool) {
self.responded = solo;
if solo {
if self.is_inbound() {
assert!(self.out_body.is_none());
} else {
assert!(self.in_body.is_none());
}
}
}
fn take_buffered_out_request(&mut self) -> Option<Message<T::Out, Body<T::BodyOut, T::Error>>> {
match self.request {
Request::Out(ref mut request) => request.take(),
_ => None,
}
}
fn send_out_chunk(&mut self, chunk: Result<Option<T::BodyOut>, T::Error>) {
let chunk = match chunk {
Ok(Some(v)) => Some(Ok(v)),
Ok(None) => None,
Err(e) => Some(Err(e)),
};
{
let sender = match self.out_body {
Some(ref mut v) => v,
_ => {
return;
}
};
if self.out_is_ready {
trace!(" --> send chunk; end-of-stream={:?}", chunk.is_none());
if let Some(chunk) = chunk {
match sender.start_send(chunk) {
Ok(AsyncSink::Ready) => {
trace!(" --> ready for more");
return;
}
Ok(AsyncSink::NotReady(chunk)) => {
self.out_deque.push(Some(chunk));
self.out_is_ready = false;
return;
}
Err(_) => {
}
}
}
assert!(self.out_deque.is_empty());
} else {
trace!(" --> queueing chunk");
self.out_deque.push(chunk);
return;
}
}
self.out_is_ready = false;
self.out_body = None;
}
fn try_poll_in_body(&mut self) -> Poll<Option<T::BodyIn>, T::Error> {
match self.in_body {
Some(ref mut b) => b.poll(),
None => {
trace!(" !!! no in body??");
Ok(Async::NotReady)
}
}
}
fn flush_out_body(&mut self) -> io::Result<()> {
{
let sender = match self.out_body {
Some(ref mut sender) => sender,
None => {
assert!(self.out_deque.is_empty(), "pending out frames but no sender");
return Ok(());
}
};
self.out_is_ready = true;
loop {
let msg = match self.out_deque.pop() {
Some(Some(msg)) => msg,
Some(None) => break,
None => {
return Ok(());
}
};
let done = msg.is_err();
match sender.start_send(msg) {
Ok(AsyncSink::Ready) => {}
Ok(AsyncSink::NotReady(msg)) => {
trace!(" --> not ready");
self.out_deque.push_front(Some(msg));
self.out_is_ready = false;
return Ok(());
}
Err(_) => {
break;
}
}
if done {
break
}
}
}
self.out_deque.clear();
self.out_is_ready = false;
self.out_body = None;
Ok(())
}
}
impl<T> fmt::Debug for Exchange<T>
where T: Dispatch + fmt::Debug,
T::In: fmt::Debug,
T::Out: fmt::Debug,
T::BodyIn: fmt::Debug,
T::BodyOut: fmt::Debug,
T::Error: fmt::Debug,
T::Stream: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Exchange")
.field("request", &self.request)
.field("responded", &self.responded)
.field("out_body", &"Sender { ... }")
.field("out_deque", &"FrameDeque { ... }")
.field("out_is_ready", &self.out_is_ready)
.field("in_body", &self.in_body)
.finish()
}
}
impl<T: Dispatch> Request<T> {
fn is_none(&self) -> bool {
match *self {
Request::In => true,
Request::Out(None) => true,
_ => false,
}
}
}
impl WriteState {
fn transport_not_write_ready(&mut self) {
if *self == WriteState::Wrote {
*self = WriteState::Blocked;
}
}
fn wrote_frame(&mut self) {
if *self == WriteState::NoWrite {
*self = WriteState::Wrote;
}
}
}
fn assert_send<T>(s: &mut T, item: T::SinkItem) -> Result<(), T::SinkError>
where T: Sink
{
match try!(s.start_send(item)) {
AsyncSink::Ready => Ok(()),
AsyncSink::NotReady(_) => {
panic!("sink reported itself as ready after `poll_ready` but was \
then unable to accept a message")
}
}
}
impl<T, B, E> MultiplexMessage<T, B, E> {
pub fn new(id: RequestId, message: Message<T, B>) -> MultiplexMessage<T, B, E> {
MultiplexMessage {
id: id,
message: Ok(message),
solo: false,
}
}
pub fn error(id: RequestId, error: E) -> MultiplexMessage<T, B, E> {
MultiplexMessage {
id: id,
message: Err(error),
solo: false,
}
}
}
impl<T: Dispatch> Sink for DispatchSink<T> {
type SinkItem = <T::Transport as Sink>::SinkItem;
type SinkError = io::Error;
fn start_send(&mut self, item: Self::SinkItem)
-> StartSend<Self::SinkItem, io::Error>
{
self.inner.transport().start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), io::Error> {
self.inner.transport().poll_complete()
}
fn close(&mut self) -> Poll<(), io::Error> {
self.inner.transport().close()
}
}