use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant};
use futures::{Future, Async, Poll, Stream};
use futures::sync::oneshot;
use tokio::reactor::{Handle, Interval};
pub struct Pool<T> {
inner: Arc<Mutex<PoolInner<T>>>,
}
pub trait Closed {
fn is_open(&self) -> bool;
}
struct PoolInner<T> {
enabled: bool,
idle: HashMap<Arc<String>, Vec<Idle<T>>>,
parked: HashMap<Arc<String>, VecDeque<oneshot::Sender<T>>>,
timeout: Option<Duration>,
expired_timer_spawned: bool,
}
impl<T> Pool<T> {
pub fn new(enabled: bool, timeout: Option<Duration>) -> Pool<T> {
Pool {
inner: Arc::new(Mutex::new(PoolInner {
enabled: enabled,
idle: HashMap::new(),
parked: HashMap::new(),
timeout: timeout,
expired_timer_spawned: false,
})),
}
}
}
impl<T: Closed> Pool<T> {
pub fn checkout(&self, key: &str) -> Checkout<T> {
Checkout {
key: Arc::new(key.to_owned()),
pool: self.clone(),
parked: None,
}
}
fn take(&self, key: &Arc<String>) -> Option<Pooled<T>> {
let entry = {
let mut inner = self.inner.lock().unwrap();
let expiration = Expiration::new(inner.timeout);
let mut should_remove = false;
let entry = inner.idle.get_mut(key).and_then(|list| {
trace!("take; url = {:?}, expiration = {:?}", key, expiration.0);
while let Some(entry) = list.pop() {
if !expiration.expires(entry.idle_at) {
if entry.value.is_open() {
should_remove = list.is_empty();
return Some(entry);
}
}
trace!("removing unacceptable pooled {:?}", key);
}
should_remove = true;
None
});
if should_remove {
inner.idle.remove(key);
}
entry
};
entry.map(|e| self.reuse(key, e.value))
}
pub fn pooled(&self, key: Arc<String>, value: T) -> Pooled<T> {
Pooled {
is_reused: false,
key: key,
pool: Arc::downgrade(&self.inner),
value: Some(value)
}
}
fn reuse(&self, key: &Arc<String>, value: T) -> Pooled<T> {
debug!("reuse idle connection for {:?}", key);
Pooled {
is_reused: true,
key: key.clone(),
pool: Arc::downgrade(&self.inner),
value: Some(value),
}
}
fn park(&mut self, key: Arc<String>, tx: oneshot::Sender<T>) {
trace!("park; waiting for idle connection: {:?}", key);
self.inner.lock().unwrap()
.parked.entry(key)
.or_insert(VecDeque::new())
.push_back(tx);
}
}
impl<T: Closed> PoolInner<T> {
fn put(&mut self, key: Arc<String>, value: T) {
if !self.enabled {
return;
}
trace!("Pool::put {:?}", key);
let mut remove_parked = false;
let mut value = Some(value);
if let Some(parked) = self.parked.get_mut(&key) {
while let Some(tx) = parked.pop_front() {
if !tx.is_canceled() {
match tx.send(value.take().unwrap()) {
Ok(()) => break,
Err(e) => {
value = Some(e);
}
}
}
trace!("Pool::put removing canceled parked {:?}", key);
}
remove_parked = parked.is_empty();
}
if remove_parked {
self.parked.remove(&key);
}
match value {
Some(value) => {
debug!("pooling idle connection for {:?}", key);
self.idle.entry(key)
.or_insert(Vec::new())
.push(Idle {
value: value,
idle_at: Instant::now(),
});
}
None => trace!("Pool::put found parked {:?}", key),
}
}
}
impl<T> PoolInner<T> {
fn clean_parked(&mut self, key: &Arc<String>) {
let mut remove_parked = false;
if let Some(parked) = self.parked.get_mut(key) {
parked.retain(|tx| {
!tx.is_canceled()
});
remove_parked = parked.is_empty();
}
if remove_parked {
self.parked.remove(key);
}
}
}
impl<T: Closed> PoolInner<T> {
fn clear_expired(&mut self) {
let dur = if let Some(dur) = self.timeout {
dur
} else {
return
};
let now = Instant::now();
self.idle.retain(|_key, values| {
values.retain(|entry| {
if !entry.value.is_open() {
return false;
}
now - entry.idle_at < dur
});
!values.is_empty()
});
}
}
impl<T: Closed + 'static> Pool<T> {
pub(super) fn spawn_expired_interval(&self, handle: &Handle) {
let dur = {
let mut inner = self.inner.lock().unwrap();
if !inner.enabled {
return;
}
if inner.expired_timer_spawned {
return;
}
inner.expired_timer_spawned = true;
if let Some(dur) = inner.timeout {
dur
} else {
return
}
};
let interval = Interval::new(dur, handle)
.expect("reactor is gone");
handle.spawn(IdleInterval {
interval: interval,
pool: Arc::downgrade(&self.inner),
});
}
}
impl<T> Clone for Pool<T> {
fn clone(&self) -> Pool<T> {
Pool {
inner: self.inner.clone(),
}
}
}
pub struct Pooled<T: Closed> {
value: Option<T>,
is_reused: bool,
key: Arc<String>,
pool: Weak<Mutex<PoolInner<T>>>,
}
impl<T: Closed> Pooled<T> {
pub fn is_reused(&self) -> bool {
self.is_reused
}
fn as_ref(&self) -> &T {
self.value.as_ref().expect("not dropped")
}
fn as_mut(&mut self) -> &mut T {
self.value.as_mut().expect("not dropped")
}
}
impl<T: Closed> Deref for Pooled<T> {
type Target = T;
fn deref(&self) -> &T {
self.as_ref()
}
}
impl<T: Closed> DerefMut for Pooled<T> {
fn deref_mut(&mut self) -> &mut T {
self.as_mut()
}
}
impl<T: Closed> Drop for Pooled<T> {
fn drop(&mut self) {
if let Some(value) = self.value.take() {
if !value.is_open() {
return;
}
if let Some(inner) = self.pool.upgrade() {
if let Ok(mut inner) = inner.lock() {
inner.put(self.key.clone(), value);
}
} else {
trace!("pool dropped, dropping pooled ({:?})", self.key);
}
}
}
}
impl<T: Closed> fmt::Debug for Pooled<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Pooled")
.field("key", &self.key)
.finish()
}
}
struct Idle<T> {
idle_at: Instant,
value: T,
}
pub struct Checkout<T> {
key: Arc<String>,
pool: Pool<T>,
parked: Option<oneshot::Receiver<T>>,
}
struct NotParked;
impl<T: Closed> Checkout<T> {
fn poll_parked(&mut self) -> Poll<Pooled<T>, NotParked> {
let mut drop_parked = false;
if let Some(ref mut rx) = self.parked {
match rx.poll() {
Ok(Async::Ready(value)) => {
if value.is_open() {
return Ok(Async::Ready(self.pool.reuse(&self.key, value)));
}
drop_parked = true;
},
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_canceled) => drop_parked = true,
}
}
if drop_parked {
self.parked.take();
}
Err(NotParked)
}
fn park(&mut self) {
if self.parked.is_none() {
let (tx, mut rx) = oneshot::channel();
let _ = rx.poll();
self.pool.park(self.key.clone(), tx);
self.parked = Some(rx);
}
}
}
impl<T: Closed> Future for Checkout<T> {
type Item = Pooled<T>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.poll_parked() {
Ok(async) => return Ok(async),
Err(_not_parked) => (),
}
let entry = self.pool.take(&self.key);
if let Some(pooled) = entry {
Ok(Async::Ready(pooled))
} else {
self.park();
Ok(Async::NotReady)
}
}
}
impl<T> Drop for Checkout<T> {
fn drop(&mut self) {
self.parked.take();
if let Ok(mut inner) = self.pool.inner.lock() {
inner.clean_parked(&self.key);
}
}
}
struct Expiration(Option<Duration>);
impl Expiration {
fn new(dur: Option<Duration>) -> Expiration {
Expiration(dur)
}
fn expires(&self, instant: Instant) -> bool {
match self.0 {
Some(timeout) => instant.elapsed() > timeout,
None => false,
}
}
}
struct IdleInterval<T> {
interval: Interval,
pool: Weak<Mutex<PoolInner<T>>>,
}
impl<T: Closed + 'static> Future for IdleInterval<T> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
try_ready!(self.interval.poll().map_err(|_| unreachable!("interval cannot error")));
if let Some(inner) = self.pool.upgrade() {
if let Ok(mut inner) = inner.lock() {
inner.clear_expired();
continue;
}
}
return Ok(Async::Ready(()));
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use futures::{Async, Future};
use futures::future;
use super::{Closed, Pool};
impl Closed for i32 {
fn is_open(&self) -> bool {
true
}
}
#[test]
fn test_pool_checkout_smoke() {
let pool = Pool::new(true, Some(Duration::from_secs(5)));
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
drop(pooled);
match pool.checkout(&key).poll().unwrap() {
Async::Ready(pooled) => assert_eq!(*pooled, 41),
_ => panic!("not ready"),
}
}
#[test]
fn test_pool_checkout_returns_none_if_expired() {
future::lazy(|| {
let pool = Pool::new(true, Some(Duration::from_millis(100)));
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
drop(pooled);
::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap());
assert!(pool.checkout(&key).poll().unwrap().is_not_ready());
::futures::future::ok::<(), ()>(())
}).wait().unwrap();
}
#[test]
fn test_pool_checkout_removes_expired() {
future::lazy(|| {
let pool = Pool::new(true, Some(Duration::from_millis(100)));
let key = Arc::new("foo".to_string());
pool.pooled(key.clone(), 41);
pool.pooled(key.clone(), 5);
pool.pooled(key.clone(), 99);
assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3));
::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap());
pool.checkout(&key).poll().unwrap();
assert!(pool.inner.lock().unwrap().idle.get(&key).is_none());
Ok::<(), ()>(())
}).wait().unwrap();
}
#[test]
fn test_pool_timer_removes_expired() {
let mut core = ::tokio::reactor::Core::new().unwrap();
let pool = Pool::new(true, Some(Duration::from_millis(100)));
pool.spawn_expired_interval(&core.handle());
let key = Arc::new("foo".to_string());
pool.pooled(key.clone(), 41);
pool.pooled(key.clone(), 5);
pool.pooled(key.clone(), 99);
assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3));
let timeout = ::tokio::reactor::Timeout::new(
Duration::from_millis(400),
&core.handle()
).unwrap();
core.run(timeout).unwrap();
assert!(pool.inner.lock().unwrap().idle.get(&key).is_none());
}
#[test]
fn test_pool_checkout_task_unparked() {
let pool = Pool::new(true, Some(Duration::from_secs(10)));
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
let checkout = pool.checkout(&key).join(future::lazy(move || {
drop(pooled);
Ok(())
})).map(|(entry, _)| entry);
assert_eq!(*checkout.wait().unwrap(), 41);
}
#[test]
fn test_pool_checkout_drop_cleans_up_parked() {
future::lazy(|| {
let pool = Pool::<i32>::new(true, Some(Duration::from_secs(10)));
let key = Arc::new("localhost:12345".to_string());
let mut checkout1 = pool.checkout(&key);
let mut checkout2 = pool.checkout(&key);
checkout1.poll().unwrap();
assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 1);
checkout2.poll().unwrap();
assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 2);
drop(checkout1);
assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 1);
drop(checkout2);
assert!(pool.inner.lock().unwrap().parked.get(&key).is_none());
::futures::future::ok::<(), ()>(())
}).wait().unwrap();
}
}