use std::io;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use BindServer;
use futures::stream::Stream;
use futures::future::{Then, Future};
use net2;
use tokio_core::net::{TcpStream, TcpListener};
use tokio_core::reactor::{Core, Handle};
use tokio_service::{NewService, Service};
#[derive(Debug)]
pub struct TcpServer<Kind, P> {
_kind: PhantomData<Kind>,
proto: Arc<P>,
threads: usize,
addr: SocketAddr,
}
impl<Kind, P> TcpServer<Kind, P> where
P: BindServer<Kind, TcpStream> + Send + Sync + 'static
{
pub fn new(protocol: P, addr: SocketAddr) -> TcpServer<Kind, P> {
TcpServer {
_kind: PhantomData,
proto: Arc::new(protocol),
threads: 1,
addr: addr,
}
}
pub fn addr(&mut self, addr: SocketAddr) {
self.addr = addr;
}
pub fn threads(&mut self, threads: usize) {
assert!(threads > 0);
if cfg!(unix) {
self.threads = threads;
}
}
pub fn serve<S>(&self, new_service: S) where
S: NewService + Send + Sync + 'static,
S::Instance: 'static,
P::ServiceError: 'static,
P::ServiceResponse: 'static,
P::ServiceRequest: 'static,
S::Request: From<P::ServiceRequest>,
S::Response: Into<P::ServiceResponse>,
S::Error: Into<P::ServiceError>,
{
let new_service = Arc::new(new_service);
self.with_handle(move |_| new_service.clone())
}
pub fn with_handle<F, S>(&self, new_service: F) where
F: Fn(&Handle) -> S + Send + Sync + 'static,
S: NewService + Send + Sync + 'static,
S::Instance: 'static,
P::ServiceError: 'static,
P::ServiceResponse: 'static,
P::ServiceRequest: 'static,
S::Request: From<P::ServiceRequest>,
S::Response: Into<P::ServiceResponse>,
S::Error: Into<P::ServiceError>,
{
let proto = self.proto.clone();
let new_service = Arc::new(new_service);
let addr = self.addr;
let workers = self.threads;
let threads = (0..self.threads - 1).map(|i| {
let proto = proto.clone();
let new_service = new_service.clone();
thread::Builder::new().name(format!("worker{}", i)).spawn(move || {
serve(proto, addr, workers, &*new_service)
}).unwrap()
}).collect::<Vec<_>>();
serve(proto, addr, workers, &*new_service);
for thread in threads {
thread.join().unwrap();
}
}
}
fn serve<P, Kind, F, S>(binder: Arc<P>, addr: SocketAddr, workers: usize, new_service: &F)
where P: BindServer<Kind, TcpStream>,
F: Fn(&Handle) -> S,
S: NewService + Send + Sync,
S::Instance: 'static,
P::ServiceError: 'static,
P::ServiceResponse: 'static,
P::ServiceRequest: 'static,
S::Request: From<P::ServiceRequest>,
S::Response: Into<P::ServiceResponse>,
S::Error: Into<P::ServiceError>,
{
struct WrapService<S, Request, Response, Error> {
inner: S,
_marker: PhantomData<fn() -> (Request, Response, Error)>,
}
impl<S, Request, Response, Error> Service for WrapService<S, Request, Response, Error>
where S: Service,
S::Request: From<Request>,
S::Response: Into<Response>,
S::Error: Into<Error>,
{
type Request = Request;
type Response = Response;
type Error = Error;
type Future = Then<S::Future,
Result<Response, Error>,
fn(Result<S::Response, S::Error>) -> Result<Response, Error>>;
fn call(&self, req: Request) -> Self::Future {
fn change_types<A, B, C, D>(r: Result<A, B>) -> Result<C, D>
where A: Into<C>,
B: Into<D>,
{
match r {
Ok(e) => Ok(e.into()),
Err(e) => Err(e.into()),
}
}
self.inner.call(S::Request::from(req)).then(change_types)
}
}
let mut core = Core::new().unwrap();
let handle = core.handle();
let new_service = new_service(&handle);
let listener = listener(&addr, workers, &handle).unwrap();
let server = listener.incoming().for_each(move |(socket, _)| {
let service = try!(new_service.new_service());
binder.bind_server(&handle, socket, WrapService {
inner: service,
_marker: PhantomData,
});
Ok(())
});
core.run(server).unwrap();
}
fn listener(addr: &SocketAddr,
workers: usize,
handle: &Handle) -> io::Result<TcpListener> {
let listener = match *addr {
SocketAddr::V4(_) => try!(net2::TcpBuilder::new_v4()),
SocketAddr::V6(_) => try!(net2::TcpBuilder::new_v6()),
};
try!(configure_tcp(workers, &listener));
try!(listener.reuse_address(true));
try!(listener.bind(addr));
listener.listen(1024).and_then(|l| {
TcpListener::from_listener(l, addr, handle)
})
}
#[cfg(unix)]
fn configure_tcp(workers: usize, tcp: &net2::TcpBuilder) -> io::Result<()> {
use net2::unix::*;
if workers > 1 {
try!(tcp.reuse_port(true));
}
Ok(())
}
#[cfg(windows)]
fn configure_tcp(_workers: usize, _tcp: &net2::TcpBuilder) -> io::Result<()> {
Ok(())
}