#[cfg(feature = "compat")]
pub mod compat;
pub mod conn;
mod service;
use std::cell::RefCell;
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::rc::{Rc, Weak};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use futures::task::{self, Task};
use futures::future::{self};
use futures::{Future, Stream, Poll, Async};
use net2;
#[cfg(feature = "compat")]
use http;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::{Core, Handle, Interval, Timeout};
use tokio::net::TcpListener;
pub use tokio_service::{NewService, Service};
use proto;
#[cfg(feature = "compat")]
use proto::Body;
use self::addr_stream::AddrStream;
use self::hyper_service::HyperService;
pub use proto::response::Response;
pub use proto::request::Request;
feat_server_proto! {
mod server_proto;
pub use self::server_proto::{
__ProtoRequest,
__ProtoResponse,
__ProtoTransport,
__ProtoBindTransport,
};
}
pub use self::conn::Connection;
pub use self::service::{const_service, service_fn};
pub struct Http<B = ::Chunk> {
max_buf_size: Option<usize>,
keep_alive: bool,
pipeline: bool,
sleep_on_errors: bool,
_marker: PhantomData<fn() -> B>,
}
pub struct Server<S, B>
where B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
protocol: Http<B::Item>,
new_service: S,
reactor: Core,
listener: TcpListener,
shutdown_timeout: Duration,
}
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct Serve<I, S> {
incoming: I,
new_service: S,
protocol: Http,
}
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct AddrIncoming {
addr: SocketAddr,
keep_alive_timeout: Option<Duration>,
listener: TcpListener,
handle: Handle,
sleep_on_errors: bool,
timeout: Option<Timeout>,
}
impl<B: AsRef<[u8]> + 'static> Http<B> {
pub fn new() -> Http<B> {
Http {
keep_alive: true,
max_buf_size: None,
pipeline: false,
sleep_on_errors: false,
_marker: PhantomData,
}
}
pub fn keep_alive(&mut self, val: bool) -> &mut Self {
self.keep_alive = val;
self
}
pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
self.max_buf_size = Some(max);
self
}
pub fn pipeline(&mut self, enabled: bool) -> &mut Self {
self.pipeline = enabled;
self
}
pub fn sleep_on_errors(&mut self, enabled: bool) -> &mut Self {
self.sleep_on_errors = enabled;
self
}
pub fn bind<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Server<S, Bd>>
where S: NewService<Request = Request, Response = Response<Bd>, Error = ::Error> + 'static,
Bd: Stream<Item=B, Error=::Error>,
{
let core = try!(Core::new());
let handle = core.handle();
let listener = try!(thread_listener(addr, &handle));
Ok(Server {
new_service: new_service,
reactor: core,
listener: listener,
protocol: self.clone(),
shutdown_timeout: Duration::new(1, 0),
})
}
#[cfg(feature = "compat")]
pub fn bind_compat<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Server<compat::NewCompatService<S>, Bd>>
where S: NewService<Request = http::Request<Body>, Response = http::Response<Bd>, Error = ::Error> +
Send + Sync + 'static,
Bd: Stream<Item=B, Error=::Error>,
{
self.bind(addr, self::compat::new_service(new_service))
}
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
where S: NewService<Request = Request, Response = Response<Bd>, Error = ::Error>,
Bd: Stream<Item=B, Error=::Error>,
{
let listener = TcpListener::bind(addr, &handle)?;
let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors)?;
if self.keep_alive {
incoming.set_keepalive(Some(Duration::from_secs(90)));
}
Ok(self.serve_incoming(incoming, new_service))
}
pub fn serve_incoming<I, S, Bd>(&self, incoming: I, new_service: S) -> Serve<I, S>
where I: Stream<Error=::std::io::Error>,
I::Item: AsyncRead + AsyncWrite,
S: NewService<Request = Request, Response = Response<Bd>, Error = ::Error>,
Bd: Stream<Item=B, Error=::Error>,
{
Serve {
incoming: incoming,
new_service: new_service,
protocol: Http {
keep_alive: self.keep_alive,
max_buf_size: self.max_buf_size,
pipeline: self.pipeline,
sleep_on_errors: self.sleep_on_errors,
_marker: PhantomData,
},
}
}
pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S>
where S: Service<Request = Request, Response = Response<Bd>, Error = ::Error>,
Bd: Stream<Error=::Error>,
Bd::Item: AsRef<[u8]>,
I: AsyncRead + AsyncWrite,
{
let mut conn = proto::Conn::new(io);
if !self.keep_alive {
conn.disable_keep_alive();
}
conn.set_flush_pipeline(self.pipeline);
if let Some(max) = self.max_buf_size {
conn.set_max_buf_size(max);
}
Connection {
conn: proto::dispatch::Dispatcher::new(proto::dispatch::Server::new(service), conn),
}
}
}
impl<B> Clone for Http<B> {
fn clone(&self) -> Http<B> {
Http {
..*self
}
}
}
impl<B> fmt::Debug for Http<B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Http")
.field("keep_alive", &self.keep_alive)
.field("pipeline", &self.pipeline)
.finish()
}
}
impl<S, B> Server<S, B>
where S: NewService<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
pub fn local_addr(&self) -> ::Result<SocketAddr> {
Ok(try!(self.listener.local_addr()))
}
pub fn handle(&self) -> Handle {
self.reactor.handle()
}
pub fn shutdown_timeout(&mut self, timeout: Duration) -> &mut Self {
self.shutdown_timeout = timeout;
self
}
#[doc(hidden)]
#[deprecated(since="0.11.11", note="no_proto is always enabled")]
pub fn no_proto(&mut self) -> &mut Self {
self
}
pub fn run(self) -> ::Result<()> {
self.run_until(future::empty())
}
pub fn run_until<F>(self, shutdown_signal: F) -> ::Result<()>
where F: Future<Item = (), Error = ()>,
{
let Server { protocol, new_service, mut reactor, listener, shutdown_timeout } = self;
let handle = reactor.handle();
let mut incoming = AddrIncoming::new(listener, handle.clone(), protocol.sleep_on_errors)?;
if protocol.keep_alive {
incoming.set_keepalive(Some(Duration::from_secs(90)));
}
date_render_interval(&handle);
let info = Rc::new(RefCell::new(Info {
active: 0,
blocker: None,
}));
let srv = incoming.for_each(|socket| {
let addr = socket.remote_addr;
debug!("accepted new connection ({})", addr);
let addr_service = SocketAddrService::new(addr, new_service.new_service()?);
let s = NotifyService {
inner: addr_service,
info: Rc::downgrade(&info),
};
info.borrow_mut().active += 1;
let fut = protocol.serve_connection(socket, s)
.map(|_| ())
.map_err(move |err| error!("server connection error: ({}) {}", addr, err));
handle.spawn(fut);
Ok(())
});
let shutdown_signal = shutdown_signal.then(|_| Ok(()));
match reactor.run(shutdown_signal.select(srv)) {
Ok(((), _incoming)) => {}
Err((e, _other)) => return Err(e.into()),
}
let timeout = try!(Timeout::new(shutdown_timeout, &handle));
let wait = WaitUntilZero { info: info.clone() };
match reactor.run(wait.select(timeout)) {
Ok(_) => Ok(()),
Err((e, _)) => Err(e.into())
}
}
}
impl<S, B> Server<S, B>
where S: NewService<Request = Request, Response = Response<B>, Error = ::Error> + Send + Sync + 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
#[cfg(unix)]
pub fn run_threads(self, threads: usize) {
assert!(threads > 0, "threads must be more than 0");
let Server {
protocol,
new_service,
reactor,
listener,
shutdown_timeout,
} = self;
let new_service = Arc::new(new_service);
let addr = listener.local_addr().unwrap();
let threads = (1..threads).map(|i| {
let protocol = protocol.clone();
let new_service = new_service.clone();
thread::Builder::new()
.name(format!("hyper-server-thread-{}", i))
.spawn(move || {
let reactor = Core::new().unwrap();
let listener = thread_listener(&addr, &reactor.handle()).unwrap();
let srv = Server {
protocol,
new_service,
reactor,
listener,
shutdown_timeout,
};
srv.run().unwrap();
})
.unwrap()
}).collect::<Vec<_>>();
let srv = Server {
protocol,
new_service,
reactor,
listener,
shutdown_timeout,
};
srv.run().unwrap();
for thread in threads {
thread.join().unwrap();
}
}
}
fn date_render_interval(handle: &Handle) {
let mut date_interval = match Interval::new(Duration::from_secs(1), &handle) {
Ok(i) => i,
Err(e) => {
trace!("error spawning date rendering interval: {}", e);
return;
}
};
let on_drop = IntervalDrop;
let fut =
future::poll_fn(move || {
try_ready!(date_interval.poll().map_err(|_| ()));
proto::date::update_interval();
Ok(Async::NotReady)
})
.then(move |_: Result<(), ()>| {
drop(on_drop);
Ok(())
});
handle.spawn(fut);
struct IntervalDrop;
impl Drop for IntervalDrop {
fn drop(&mut self) {
proto::date::interval_off();
}
}
}
fn thread_listener(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
let listener = match *addr {
SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
};
reuse_port(&listener);
listener.reuse_address(true)?;
listener.bind(addr)?;
listener.listen(1024).and_then(|l| {
TcpListener::from_listener(l, addr, handle)
})
}
#[cfg(unix)]
fn reuse_port(tcp: &net2::TcpBuilder) {
use net2::unix::*;
if let Err(e) = tcp.reuse_port(true) {
debug!("error setting SO_REUSEPORT: {}", e);
}
}
#[cfg(not(unix))]
fn reuse_port(_tcp: &net2::TcpBuilder) {
}
impl<S: fmt::Debug, B: Stream<Error=::Error>> fmt::Debug for Server<S, B>
where B::Item: AsRef<[u8]>
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Server")
.field("reactor", &"...")
.field("listener", &self.listener)
.field("new_service", &self.new_service)
.field("protocol", &self.protocol)
.finish()
}
}
impl<I, S> Serve<I, S> {
#[inline]
pub fn incoming_ref(&self) -> &I {
&self.incoming
}
}
impl<I, S, B> Stream for Serve<I, S>
where
I: Stream<Error=io::Error>,
I::Item: AsyncRead + AsyncWrite,
S: NewService<Request=Request, Response=Response<B>, Error=::Error>,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
type Item = Connection<I::Item, S::Instance>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(io) = try_ready!(self.incoming.poll()) {
let service = self.new_service.new_service()?;
Ok(Async::Ready(Some(self.protocol.serve_connection(io, service))))
} else {
Ok(Async::Ready(None))
}
}
}
impl AddrIncoming {
fn new(listener: TcpListener, handle: Handle, sleep_on_errors: bool) -> io::Result<AddrIncoming> {
Ok(AddrIncoming {
addr: listener.local_addr()?,
keep_alive_timeout: None,
listener: listener,
handle: handle,
sleep_on_errors: sleep_on_errors,
timeout: None,
})
}
pub fn local_addr(&self) -> SocketAddr {
self.addr
}
fn set_keepalive(&mut self, dur: Option<Duration>) {
self.keep_alive_timeout = dur;
}
}
impl Stream for AddrIncoming {
type Item = AddrStream;
type Error = ::std::io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(ref mut to) = self.timeout {
match to.poll().expect("timeout never fails") {
Async::Ready(_) => {}
Async::NotReady => return Ok(Async::NotReady),
}
}
self.timeout = None;
loop {
match self.listener.accept() {
Ok((socket, addr)) => {
if let Some(dur) = self.keep_alive_timeout {
if let Err(e) = socket.set_keepalive(Some(dur)) {
trace!("error trying to set TCP keepalive: {}", e);
}
}
return Ok(Async::Ready(Some(AddrStream::new(socket, addr))));
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
Err(ref e) if self.sleep_on_errors => {
if connection_error(e) {
continue;
}
let delay = ::std::time::Duration::from_millis(10);
debug!("accept error: {}; sleeping {:?}",
e, delay);
let mut timeout = Timeout::new(delay, &self.handle)
.expect("can always set a timeout");
let result = timeout.poll()
.expect("timeout never fails");
match result {
Async::Ready(()) => continue,
Async::NotReady => {
self.timeout = Some(timeout);
return Ok(Async::NotReady);
}
}
},
Err(e) => return Err(e),
}
}
}
}
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused ||
e.kind() == io::ErrorKind::ConnectionAborted ||
e.kind() == io::ErrorKind::ConnectionReset
}
mod addr_stream {
use std::io::{self, Read, Write};
use std::net::SocketAddr;
use bytes::{Buf, BufMut};
use futures::Poll;
use tokio::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};
#[derive(Debug)]
pub struct AddrStream {
inner: TcpStream,
pub(super) remote_addr: SocketAddr,
}
impl AddrStream {
pub(super) fn new(tcp: TcpStream, addr: SocketAddr) -> AddrStream {
AddrStream {
inner: tcp,
remote_addr: addr,
}
}
}
impl Read for AddrStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}
impl Write for AddrStream {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}
#[inline]
fn flush(&mut self ) -> io::Result<()> {
self.inner.flush()
}
}
impl AsyncRead for AddrStream {
#[inline]
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}
#[inline]
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.read_buf(buf)
}
}
impl AsyncWrite for AddrStream {
#[inline]
fn shutdown(&mut self) -> Poll<(), io::Error> {
AsyncWrite::shutdown(&mut self.inner)
}
#[inline]
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.write_buf(buf)
}
}
}
struct SocketAddrService<S> {
addr: SocketAddr,
inner: S,
}
impl<S> SocketAddrService<S> {
fn new(addr: SocketAddr, service: S) -> SocketAddrService<S> {
SocketAddrService {
addr: addr,
inner: service,
}
}
}
impl<S> Service for SocketAddrService<S>
where
S: Service<Request=Request>,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn call(&self, mut req: Self::Request) -> Self::Future {
proto::request::addr(&mut req, self.addr);
self.inner.call(req)
}
}
struct NotifyService<S> {
inner: S,
info: Weak<RefCell<Info>>,
}
struct WaitUntilZero {
info: Rc<RefCell<Info>>,
}
struct Info {
active: usize,
blocker: Option<Task>,
}
impl<S: Service> Service for NotifyService<S> {
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn call(&self, message: Self::Request) -> Self::Future {
self.inner.call(message)
}
}
impl<S> Drop for NotifyService<S> {
fn drop(&mut self) {
let info = match self.info.upgrade() {
Some(info) => info,
None => return,
};
let mut info = info.borrow_mut();
info.active -= 1;
if info.active == 0 {
if let Some(task) = info.blocker.take() {
task.notify();
}
}
}
}
impl Future for WaitUntilZero {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
let mut info = self.info.borrow_mut();
if info.active == 0 {
Ok(().into())
} else {
info.blocker = Some(task::current());
Ok(Async::NotReady)
}
}
}
mod hyper_service {
use super::{Request, Response, Service, Stream};
pub trait HyperService: Service + Sealed {
#[doc(hidden)]
type ResponseBody;
#[doc(hidden)]
type Sealed: Sealed2;
}
pub trait Sealed {}
pub trait Sealed2 {}
#[allow(missing_debug_implementations)]
pub struct Opaque {
_inner: (),
}
impl Sealed2 for Opaque {}
impl<S, B> Sealed for S
where
S: Service<
Request=Request,
Response=Response<B>,
Error=::Error,
>,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{}
impl<S, B> HyperService for S
where
S: Service<
Request=Request,
Response=Response<B>,
Error=::Error,
>,
S: Sealed,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
type ResponseBody = B;
type Sealed = Opaque;
}
}