use futures::{channel::oneshot, future::Either, FutureExt, StreamExt};
use libp2p::PeerId;
use log::{debug, error, trace, warn};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_arithmetic::traits::SaturatedConversion;
use std::{
	collections::{HashMap, HashSet},
	time::{Duration, Instant},
};
use wasm_timer::Delay;
use crate::peer_store::PeerStoreProvider;
pub const LOG_TARGET: &str = "peerset";
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SetId(usize);
impl SetId {
	pub const fn from(id: usize) -> Self {
		Self(id)
	}
}
impl From<usize> for SetId {
	fn from(id: usize) -> Self {
		Self(id)
	}
}
impl From<SetId> for usize {
	fn from(id: SetId) -> Self {
		id.0
	}
}
#[derive(Debug)]
pub struct ProtoSetConfig {
	pub in_peers: u32,
	pub out_peers: u32,
	pub reserved_nodes: HashSet<PeerId>,
	pub reserved_only: bool,
}
#[derive(Debug, PartialEq)]
pub enum Message {
	Connect {
		set_id: SetId,
		peer_id: PeerId,
	},
	Drop {
		set_id: SetId,
		peer_id: PeerId,
	},
	Accept(IncomingIndex),
	Reject(IncomingIndex),
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct IncomingIndex(pub u64);
impl From<u64> for IncomingIndex {
	fn from(val: u64) -> Self {
		Self(val)
	}
}
#[derive(Debug)]
enum Action {
	AddReservedPeer(PeerId),
	RemoveReservedPeer(PeerId),
	SetReservedPeers(HashSet<PeerId>),
	SetReservedOnly(bool),
	DisconnectPeer(PeerId),
	GetReservedPeers(oneshot::Sender<Vec<PeerId>>),
}
#[derive(Debug)]
enum Event {
	IncomingConnection(PeerId, IncomingIndex),
	Dropped(PeerId),
}
#[derive(Debug, Clone)]
pub struct ProtocolHandle {
	actions_tx: TracingUnboundedSender<Action>,
	events_tx: TracingUnboundedSender<Event>,
}
impl ProtocolHandle {
	pub fn add_reserved_peer(&self, peer_id: PeerId) {
		let _ = self.actions_tx.unbounded_send(Action::AddReservedPeer(peer_id));
	}
	pub fn remove_reserved_peer(&self, peer_id: PeerId) {
		let _ = self.actions_tx.unbounded_send(Action::RemoveReservedPeer(peer_id));
	}
	pub fn set_reserved_peers(&self, peer_ids: HashSet<PeerId>) {
		let _ = self.actions_tx.unbounded_send(Action::SetReservedPeers(peer_ids));
	}
	pub fn set_reserved_only(&self, reserved: bool) {
		let _ = self.actions_tx.unbounded_send(Action::SetReservedOnly(reserved));
	}
	pub fn disconnect_peer(&self, peer_id: PeerId) {
		let _ = self.actions_tx.unbounded_send(Action::DisconnectPeer(peer_id));
	}
	pub fn reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
		let _ = self.actions_tx.unbounded_send(Action::GetReservedPeers(pending_response));
	}
	pub fn incoming_connection(&self, peer_id: PeerId, incoming_index: IncomingIndex) {
		let _ = self
			.events_tx
			.unbounded_send(Event::IncomingConnection(peer_id, incoming_index));
	}
	pub fn dropped(&self, peer_id: PeerId) {
		let _ = self.events_tx.unbounded_send(Event::Dropped(peer_id));
	}
}
#[derive(Clone, Copy, Debug)]
enum Direction {
	Inbound,
	Outbound,
}
#[derive(Clone, Debug)]
enum PeerState {
	Connected(Direction),
	NotConnected,
}
impl PeerState {
	fn is_connected(&self) -> bool {
		matches!(self, PeerState::Connected(_))
	}
}
impl Default for PeerState {
	fn default() -> PeerState {
		PeerState::NotConnected
	}
}
#[derive(Debug)]
pub struct ProtocolController {
	set_id: SetId,
	actions_rx: TracingUnboundedReceiver<Action>,
	events_rx: TracingUnboundedReceiver<Event>,
	num_in: u32,
	num_out: u32,
	max_in: u32,
	max_out: u32,
	nodes: HashMap<PeerId, Direction>,
	reserved_nodes: HashMap<PeerId, PeerState>,
	reserved_only: bool,
	next_periodic_alloc_slots: Instant,
	to_notifications: TracingUnboundedSender<Message>,
	peer_store: Box<dyn PeerStoreProvider>,
}
impl ProtocolController {
	pub fn new(
		set_id: SetId,
		config: ProtoSetConfig,
		to_notifications: TracingUnboundedSender<Message>,
		peer_store: Box<dyn PeerStoreProvider>,
	) -> (ProtocolHandle, ProtocolController) {
		let (actions_tx, actions_rx) = tracing_unbounded("mpsc_api_protocol", 10_000);
		let (events_tx, events_rx) = tracing_unbounded("mpsc_notifications_protocol", 10_000);
		let handle = ProtocolHandle { actions_tx, events_tx };
		peer_store.register_protocol(handle.clone());
		let reserved_nodes =
			config.reserved_nodes.iter().map(|p| (*p, PeerState::NotConnected)).collect();
		let controller = ProtocolController {
			set_id,
			actions_rx,
			events_rx,
			num_in: 0,
			num_out: 0,
			max_in: config.in_peers,
			max_out: config.out_peers,
			nodes: HashMap::new(),
			reserved_nodes,
			reserved_only: config.reserved_only,
			next_periodic_alloc_slots: Instant::now(),
			to_notifications,
			peer_store,
		};
		(handle, controller)
	}
	pub async fn run(mut self) {
		while self.next_action().await {}
	}
	pub async fn next_action(&mut self) -> bool {
		let either = loop {
			let mut next_alloc_slots = Delay::new_at(self.next_periodic_alloc_slots).fuse();
			futures::select_biased! {
				event = self.events_rx.next() => match event {
					Some(event) => break Either::Left(event),
					None => return false,
				},
				action = self.actions_rx.next() => match action {
					Some(action) => break Either::Right(action),
					None => return false,
				},
				_ = next_alloc_slots => {
					self.alloc_slots();
					self.next_periodic_alloc_slots = Instant::now() + Duration::new(1, 0);
				},
			}
		};
		match either {
			Either::Left(event) => self.process_event(event),
			Either::Right(action) => self.process_action(action),
		}
		true
	}
	fn process_event(&mut self, event: Event) {
		match event {
			Event::IncomingConnection(peer_id, index) =>
				self.on_incoming_connection(peer_id, index),
			Event::Dropped(peer_id) => self.on_peer_dropped(peer_id),
		}
	}
	fn process_action(&mut self, action: Action) {
		match action {
			Action::AddReservedPeer(peer_id) => self.on_add_reserved_peer(peer_id),
			Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(peer_id),
			Action::SetReservedPeers(peer_ids) => self.on_set_reserved_peers(peer_ids),
			Action::SetReservedOnly(reserved_only) => self.on_set_reserved_only(reserved_only),
			Action::DisconnectPeer(peer_id) => self.on_disconnect_peer(peer_id),
			Action::GetReservedPeers(pending_response) =>
				self.on_get_reserved_peers(pending_response),
		}
	}
	fn accept_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
		trace!(
			target: LOG_TARGET,
			"Accepting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
			self.set_id,
			self.num_in,
			self.max_in,
		);
		let _ = self.to_notifications.unbounded_send(Message::Accept(incoming_index));
	}
	fn reject_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
		trace!(
			target: LOG_TARGET,
			"Rejecting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
			self.set_id,
			self.num_in,
			self.max_in,
		);
		let _ = self.to_notifications.unbounded_send(Message::Reject(incoming_index));
	}
	fn start_connection(&mut self, peer_id: PeerId) {
		trace!(
			target: LOG_TARGET,
			"Connecting to {peer_id} on {:?} ({}/{} num_out/max_out).",
			self.set_id,
			self.num_out,
			self.max_out,
		);
		let _ = self
			.to_notifications
			.unbounded_send(Message::Connect { set_id: self.set_id, peer_id });
	}
	fn drop_connection(&mut self, peer_id: PeerId) {
		trace!(
			target: LOG_TARGET,
			"Dropping {peer_id} on {:?} ({}/{} num_in/max_in, {}/{} num_out/max_out).",
			self.set_id,
			self.num_in,
			self.max_in,
			self.num_out,
			self.max_out,
		);
		let _ = self
			.to_notifications
			.unbounded_send(Message::Drop { set_id: self.set_id, peer_id });
	}
	fn report_disconnect(&mut self, peer_id: PeerId) {
		self.peer_store.report_disconnect(peer_id);
	}
	fn is_banned(&self, peer_id: &PeerId) -> bool {
		self.peer_store.is_banned(peer_id)
	}
	fn on_add_reserved_peer(&mut self, peer_id: PeerId) {
		if self.reserved_nodes.contains_key(&peer_id) {
			warn!(
				target: LOG_TARGET,
				"Trying to add an already reserved node {peer_id} as reserved on {:?}.",
				self.set_id,
			);
			return
		}
		let state = match self.nodes.remove(&peer_id) {
			Some(direction) => {
				trace!(
					target: LOG_TARGET,
					"Marking previously connected node {} ({:?}) as reserved on {:?}.",
					peer_id,
					direction,
					self.set_id
				);
				PeerState::Connected(direction)
			},
			None => {
				trace!(target: LOG_TARGET, "Adding reserved node {peer_id} on {:?}.", self.set_id,);
				PeerState::NotConnected
			},
		};
		self.reserved_nodes.insert(peer_id, state.clone());
		match state {
			PeerState::Connected(Direction::Inbound) => self.num_in -= 1,
			PeerState::Connected(Direction::Outbound) => self.num_out -= 1,
			PeerState::NotConnected => self.alloc_slots(),
		}
	}
	fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
		let state = match self.reserved_nodes.remove(&peer_id) {
			Some(state) => state,
			None => {
				warn!(
					target: LOG_TARGET,
					"Trying to remove unknown reserved node {peer_id} from {:?}.", self.set_id,
				);
				return
			},
		};
		if let PeerState::Connected(direction) = state {
			if self.reserved_only {
				trace!(
					target: LOG_TARGET,
					"Disconnecting previously reserved node {peer_id} ({direction:?}) on {:?}.",
					self.set_id,
				);
				self.drop_connection(peer_id);
			} else {
				trace!(
					target: LOG_TARGET,
					"Making a connected reserved node {peer_id} ({:?}) on {:?} a regular one.",
					direction,
					self.set_id,
				);
				match direction {
					Direction::Inbound => self.num_in += 1,
					Direction::Outbound => self.num_out += 1,
				}
				let prev = self.nodes.insert(peer_id, direction);
				assert!(prev.is_none(), "Corrupted state: reserved node was also non-reserved.");
			}
		} else {
			trace!(
				target: LOG_TARGET,
				"Removed disconnected reserved node {peer_id} from {:?}.",
				self.set_id,
			);
		}
	}
	fn on_set_reserved_peers(&mut self, peer_ids: HashSet<PeerId>) {
		let current = self.reserved_nodes.keys().cloned().collect();
		let to_insert = peer_ids.difference(¤t).cloned().collect::<Vec<_>>();
		let to_remove = current.difference(&peer_ids).cloned().collect::<Vec<_>>();
		for node in to_insert {
			self.on_add_reserved_peer(node);
		}
		for node in to_remove {
			self.on_remove_reserved_peer(node);
		}
	}
	fn on_set_reserved_only(&mut self, reserved_only: bool) {
		trace!(target: LOG_TARGET, "Set reserved only to `{reserved_only}` on {:?}", self.set_id);
		self.reserved_only = reserved_only;
		if !reserved_only {
			return self.alloc_slots()
		}
		self.nodes
			.iter()
			.map(|(k, v)| (*k, *v))
			.collect::<Vec<(_, _)>>()
			.iter()
			.for_each(|(peer_id, direction)| {
				match direction {
					Direction::Inbound => self.num_in -= 1,
					Direction::Outbound => self.num_out -= 1,
				}
				self.drop_connection(*peer_id)
			});
		self.nodes.clear();
	}
	fn on_get_reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
		let _ = pending_response.send(self.reserved_nodes.keys().cloned().collect());
	}
	fn on_disconnect_peer(&mut self, peer_id: PeerId) {
		if self.reserved_nodes.contains_key(&peer_id) {
			debug!(
				target: LOG_TARGET,
				"Ignoring request to disconnect reserved peer {peer_id} from {:?}.", self.set_id,
			);
			return
		}
		match self.nodes.remove(&peer_id) {
			Some(direction) => {
				trace!(
					target: LOG_TARGET,
					"Disconnecting peer {peer_id} ({direction:?}) from {:?}.",
					self.set_id
				);
				match direction {
					Direction::Inbound => self.num_in -= 1,
					Direction::Outbound => self.num_out -= 1,
				}
				self.drop_connection(peer_id);
			},
			None => {
				debug!(
					target: LOG_TARGET,
					"Trying to disconnect unknown peer {peer_id} from {:?}.", self.set_id,
				);
			},
		}
	}
	fn on_incoming_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
		trace!(
			target: LOG_TARGET,
			"Incoming connection from peer {peer_id} ({incoming_index:?}) on {:?}.",
			self.set_id,
		);
		if self.reserved_only && !self.reserved_nodes.contains_key(&peer_id) {
			self.reject_connection(peer_id, incoming_index);
			return
		}
		if let Some(state) = self.reserved_nodes.get_mut(&peer_id) {
			match state {
				PeerState::Connected(ref mut direction) => {
					*direction = Direction::Inbound;
					self.accept_connection(peer_id, incoming_index);
				},
				PeerState::NotConnected =>
					if self.peer_store.is_banned(&peer_id) {
						self.reject_connection(peer_id, incoming_index);
					} else {
						*state = PeerState::Connected(Direction::Inbound);
						self.accept_connection(peer_id, incoming_index);
					},
			}
			return
		}
		if let Some(direction) = self.nodes.remove(&peer_id) {
			trace!(
				target: LOG_TARGET,
				"Handling incoming connection from peer {} we think we already connected as {:?} on {:?}.",
				peer_id,
				direction,
				self.set_id
			);
			match direction {
				Direction::Inbound => self.num_in -= 1,
				Direction::Outbound => self.num_out -= 1,
			}
		}
		if self.num_in >= self.max_in {
			self.reject_connection(peer_id, incoming_index);
			return
		}
		if self.is_banned(&peer_id) {
			self.reject_connection(peer_id, incoming_index);
			return
		}
		self.num_in += 1;
		self.nodes.insert(peer_id, Direction::Inbound);
		self.accept_connection(peer_id, incoming_index);
	}
	fn on_peer_dropped(&mut self, peer_id: PeerId) {
		self.on_peer_dropped_inner(peer_id).unwrap_or_else(|peer_id| {
			trace!(
				target: LOG_TARGET,
				"Received `Action::Dropped` for not connected peer {peer_id} on {:?}.",
				self.set_id,
			)
		});
	}
	fn on_peer_dropped_inner(&mut self, peer_id: PeerId) -> Result<(), PeerId> {
		if self.drop_reserved_peer(&peer_id)? || self.drop_regular_peer(&peer_id) {
			self.report_disconnect(peer_id);
			Ok(())
		} else {
			Err(peer_id)
		}
	}
	fn drop_reserved_peer(&mut self, peer_id: &PeerId) -> Result<bool, PeerId> {
		let Some(state) = self.reserved_nodes.get_mut(peer_id) else { return Ok(false) };
		if let PeerState::Connected(direction) = state {
			trace!(
				target: LOG_TARGET,
				"Reserved peer {peer_id} ({direction:?}) dropped from {:?}.",
				self.set_id,
			);
			*state = PeerState::NotConnected;
			Ok(true)
		} else {
			Err(*peer_id)
		}
	}
	fn drop_regular_peer(&mut self, peer_id: &PeerId) -> bool {
		let Some(direction) = self.nodes.remove(peer_id) else { return false };
		trace!(
			target: LOG_TARGET,
			"Peer {peer_id} ({direction:?}) dropped from {:?}.",
			self.set_id,
		);
		match direction {
			Direction::Inbound => self.num_in -= 1,
			Direction::Outbound => self.num_out -= 1,
		}
		true
	}
	fn alloc_slots(&mut self) {
		self.reserved_nodes
			.iter_mut()
			.filter_map(|(peer_id, state)| {
				(!state.is_connected() && !self.peer_store.is_banned(peer_id)).then(|| {
					*state = PeerState::Connected(Direction::Outbound);
					peer_id
				})
			})
			.cloned()
			.collect::<Vec<_>>()
			.into_iter()
			.for_each(|peer_id| {
				self.start_connection(peer_id);
			});
		if self.reserved_only || self.num_out >= self.max_out {
			return
		}
		let available_slots = (self.max_out - self.num_out).saturated_into();
		let ignored = self
			.reserved_nodes
			.keys()
			.collect::<HashSet<&PeerId>>()
			.union(&self.nodes.keys().collect::<HashSet<&PeerId>>())
			.cloned()
			.collect();
		let candidates = self
			.peer_store
			.outgoing_candidates(available_slots, ignored)
			.into_iter()
			.filter_map(|peer_id| {
				(!self.reserved_nodes.contains_key(&peer_id) && !self.nodes.contains_key(&peer_id))
					.then_some(peer_id)
					.or_else(|| {
						error!(
							target: LOG_TARGET,
							"`PeerStore` returned a node we asked to ignore: {peer_id}.",
						);
						debug_assert!(false, "`PeerStore` returned a node we asked to ignore.");
						None
					})
			})
			.collect::<Vec<_>>();
		if candidates.len() > available_slots {
			error!(
				target: LOG_TARGET,
				"`PeerStore` returned more nodes than there are slots available.",
			);
			debug_assert!(false, "`PeerStore` returned more nodes than there are slots available.");
		}
		candidates.into_iter().take(available_slots).for_each(|peer_id| {
			self.num_out += 1;
			self.nodes.insert(peer_id, Direction::Outbound);
			self.start_connection(peer_id);
		})
	}
}
#[cfg(test)]
mod tests {
	use super::*;
	use crate::{peer_store::PeerStoreProvider, ReputationChange};
	use libp2p::PeerId;
	use sc_utils::mpsc::{tracing_unbounded, TryRecvError};
	use std::collections::HashSet;
	mockall::mock! {
		#[derive(Debug)]
		pub PeerStoreHandle {}
		impl PeerStoreProvider for PeerStoreHandle {
			fn is_banned(&self, peer_id: &PeerId) -> bool;
			fn register_protocol(&self, protocol_handle: ProtocolHandle);
			fn report_disconnect(&mut self, peer_id: PeerId);
			fn report_peer(&mut self, peer_id: PeerId, change: ReputationChange);
			fn peer_reputation(&self, peer_id: &PeerId) -> i32;
			fn outgoing_candidates<'a>(&self, count: usize, ignored: HashSet<&'a PeerId>) -> Vec<PeerId>;
		}
	}
	#[test]
	fn reserved_nodes_are_connected_dropped_and_accepted() {
		let reserved1 = PeerId::random();
		let reserved2 = PeerId::random();
		let config = ProtoSetConfig {
			in_peers: 0,
			out_peers: 0,
			reserved_nodes: std::iter::once(reserved1).collect(),
			reserved_only: true,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().times(4).return_const(false);
		peer_store.expect_report_disconnect().times(2).return_const(());
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.on_add_reserved_peer(reserved2);
		controller.alloc_slots();
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 2);
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
		controller.on_peer_dropped(reserved1);
		controller.on_peer_dropped(reserved2);
		let incoming1 = IncomingIndex(1);
		controller.on_incoming_connection(reserved1, incoming1);
		assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming1));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		let incoming2 = IncomingIndex(2);
		controller.on_incoming_connection(reserved2, incoming2);
		assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming2));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
	}
	#[test]
	fn banned_reserved_nodes_are_not_connected_and_not_accepted() {
		let reserved1 = PeerId::random();
		let reserved2 = PeerId::random();
		let config = ProtoSetConfig {
			in_peers: 0,
			out_peers: 0,
			reserved_nodes: std::iter::once(reserved1).collect(),
			reserved_only: true,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().times(6).return_const(true);
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.on_add_reserved_peer(reserved2);
		controller.alloc_slots();
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		let incoming1 = IncomingIndex(1);
		controller.on_incoming_connection(reserved1, incoming1);
		assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming1));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		let incoming2 = IncomingIndex(2);
		controller.on_incoming_connection(reserved2, incoming2);
		assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming2));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
	}
	#[test]
	fn we_try_to_reconnect_to_dropped_reserved_nodes() {
		let reserved1 = PeerId::random();
		let reserved2 = PeerId::random();
		let config = ProtoSetConfig {
			in_peers: 0,
			out_peers: 0,
			reserved_nodes: std::iter::once(reserved1).collect(),
			reserved_only: true,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().times(4).return_const(false);
		peer_store.expect_report_disconnect().times(2).return_const(());
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.on_add_reserved_peer(reserved2);
		controller.alloc_slots();
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 2);
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
		controller.on_peer_dropped(reserved1);
		controller.on_peer_dropped(reserved2);
		controller.alloc_slots();
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 2);
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
	}
	#[test]
	fn nodes_supplied_by_peer_store_are_connected() {
		let peer1 = PeerId::random();
		let peer2 = PeerId::random();
		let candidates = vec![peer1, peer2];
		let config = ProtoSetConfig {
			in_peers: 0,
			out_peers: 2,
			reserved_nodes: HashSet::new(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_outgoing_candidates().once().return_const(candidates);
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.alloc_slots();
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 2);
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
		assert_eq!(controller.num_out, 2);
		assert_eq!(controller.num_in, 0);
		controller.alloc_slots();
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert_eq!(controller.num_out, 2);
		assert_eq!(controller.num_in, 0);
	}
	#[test]
	fn both_reserved_nodes_and_nodes_supplied_by_peer_store_are_connected() {
		let reserved1 = PeerId::random();
		let reserved2 = PeerId::random();
		let regular1 = PeerId::random();
		let regular2 = PeerId::random();
		let outgoing_candidates = vec![regular1, regular2];
		let reserved_nodes = [reserved1, reserved2].iter().cloned().collect();
		let config =
			ProtoSetConfig { in_peers: 10, out_peers: 10, reserved_nodes, reserved_only: false };
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().times(2).return_const(false);
		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.alloc_slots();
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 4);
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular2 }));
		assert_eq!(controller.num_out, 2);
		assert_eq!(controller.num_in, 0);
	}
	#[test]
	fn if_slots_are_freed_we_try_to_allocate_them_again() {
		let peer1 = PeerId::random();
		let peer2 = PeerId::random();
		let peer3 = PeerId::random();
		let candidates1 = vec![peer1, peer2];
		let candidates2 = vec![peer3];
		let config = ProtoSetConfig {
			in_peers: 0,
			out_peers: 2,
			reserved_nodes: HashSet::new(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_outgoing_candidates().once().return_const(candidates1);
		peer_store.expect_outgoing_candidates().once().return_const(candidates2);
		peer_store.expect_report_disconnect().times(2).return_const(());
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.alloc_slots();
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 2);
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
		assert_eq!(controller.num_out, 2);
		assert_eq!(controller.num_in, 0);
		controller.alloc_slots();
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert_eq!(controller.num_out, 2);
		assert_eq!(controller.num_in, 0);
		controller.on_peer_dropped(peer1);
		controller.on_peer_dropped(peer2);
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
		controller.alloc_slots();
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 1);
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer3 }));
		assert_eq!(controller.num_out, 1);
		assert_eq!(controller.num_in, 0);
	}
	#[test]
	fn in_reserved_only_mode_no_peers_are_requested_from_peer_store_and_connected() {
		let config = ProtoSetConfig {
			in_peers: 0,
			out_peers: 2,
			reserved_nodes: HashSet::new(),
			reserved_only: true,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.alloc_slots();
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
	}
	#[test]
	fn in_reserved_only_mode_no_regular_peers_are_accepted() {
		let config = ProtoSetConfig {
			in_peers: 2,
			out_peers: 0,
			reserved_nodes: HashSet::new(),
			reserved_only: true,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		let peer = PeerId::random();
		let incoming_index = IncomingIndex(1);
		controller.on_incoming_connection(peer, incoming_index);
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 1);
		assert!(messages.contains(&Message::Reject(incoming_index)));
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
	}
	#[test]
	fn disabling_reserved_only_mode_allows_to_connect_to_peers() {
		let peer1 = PeerId::random();
		let peer2 = PeerId::random();
		let candidates = vec![peer1, peer2];
		let config = ProtoSetConfig {
			in_peers: 0,
			out_peers: 10,
			reserved_nodes: HashSet::new(),
			reserved_only: true,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_outgoing_candidates().once().return_const(candidates);
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.alloc_slots();
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		controller.on_set_reserved_only(false);
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 2);
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
		assert_eq!(controller.num_out, 2);
		assert_eq!(controller.num_in, 0);
	}
	#[test]
	fn enabling_reserved_only_mode_disconnects_regular_peers() {
		let reserved1 = PeerId::random();
		let reserved2 = PeerId::random();
		let regular1 = PeerId::random();
		let regular2 = PeerId::random();
		let outgoing_candidates = vec![regular1];
		let config = ProtoSetConfig {
			in_peers: 10,
			out_peers: 10,
			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().times(3).return_const(false);
		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
		controller.alloc_slots();
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 3);
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
		assert_eq!(controller.num_out, 1);
		assert_eq!(controller.num_in, 0);
		let incoming_index = IncomingIndex(1);
		controller.on_incoming_connection(regular2, incoming_index);
		assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming_index));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert_eq!(controller.num_out, 1);
		assert_eq!(controller.num_in, 1);
		controller.on_set_reserved_only(true);
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 2);
		assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular1 }));
		assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular2 }));
		assert_eq!(controller.nodes.len(), 0);
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
	}
	#[test]
	fn removed_disconnected_reserved_node_is_forgotten() {
		let reserved1 = PeerId::random();
		let reserved2 = PeerId::random();
		let config = ProtoSetConfig {
			in_peers: 10,
			out_peers: 10,
			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		assert_eq!(controller.reserved_nodes.len(), 2);
		assert_eq!(controller.nodes.len(), 0);
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
		controller.on_remove_reserved_peer(reserved1);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert_eq!(controller.reserved_nodes.len(), 1);
		assert!(!controller.reserved_nodes.contains_key(&reserved1));
		assert_eq!(controller.nodes.len(), 0);
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
	}
	#[test]
	fn removed_connected_reserved_node_is_disconnected_in_reserved_only_mode() {
		let reserved1 = PeerId::random();
		let reserved2 = PeerId::random();
		let config = ProtoSetConfig {
			in_peers: 10,
			out_peers: 10,
			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
			reserved_only: true,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().times(2).return_const(false);
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.alloc_slots();
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 2);
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
		assert_eq!(controller.reserved_nodes.len(), 2);
		assert!(controller.reserved_nodes.contains_key(&reserved1));
		assert!(controller.reserved_nodes.contains_key(&reserved2));
		assert!(controller.nodes.is_empty());
		controller.on_remove_reserved_peer(reserved1);
		assert_eq!(
			rx.try_recv().unwrap(),
			Message::Drop { set_id: SetId::from(0), peer_id: reserved1 }
		);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert_eq!(controller.reserved_nodes.len(), 1);
		assert!(controller.reserved_nodes.contains_key(&reserved2));
		assert!(controller.nodes.is_empty());
	}
	#[test]
	fn removed_connected_reserved_nodes_become_regular_in_non_reserved_mode() {
		let peer1 = PeerId::random();
		let peer2 = PeerId::random();
		let config = ProtoSetConfig {
			in_peers: 10,
			out_peers: 10,
			reserved_nodes: [peer1, peer2].iter().cloned().collect(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().times(2).return_const(false);
		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.on_incoming_connection(peer1, IncomingIndex(1));
		controller.alloc_slots();
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 2);
		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
		controller.on_remove_reserved_peer(peer1);
		controller.on_remove_reserved_peer(peer2);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert_eq!(controller.nodes.len(), 2);
		assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Inbound)));
		assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Outbound)));
		assert_eq!(controller.num_out, 1);
		assert_eq!(controller.num_in, 1);
	}
	#[test]
	fn regular_nodes_stop_occupying_slots_when_become_reserved() {
		let peer1 = PeerId::random();
		let peer2 = PeerId::random();
		let outgoing_candidates = vec![peer1];
		let config = ProtoSetConfig {
			in_peers: 10,
			out_peers: 10,
			reserved_nodes: HashSet::new(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().once().return_const(false);
		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.alloc_slots();
		controller.on_incoming_connection(peer2, IncomingIndex(1));
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 2);
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
		assert_eq!(controller.num_in, 1);
		assert_eq!(controller.num_out, 1);
		controller.on_add_reserved_peer(peer1);
		controller.on_add_reserved_peer(peer2);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert_eq!(controller.num_in, 0);
		assert_eq!(controller.num_out, 0);
	}
	#[test]
	fn disconnecting_regular_peers_work() {
		let peer1 = PeerId::random();
		let peer2 = PeerId::random();
		let outgoing_candidates = vec![peer1];
		let config = ProtoSetConfig {
			in_peers: 10,
			out_peers: 10,
			reserved_nodes: HashSet::new(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().once().return_const(false);
		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.alloc_slots();
		controller.on_incoming_connection(peer2, IncomingIndex(1));
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 2);
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
		assert_eq!(controller.nodes.len(), 2);
		assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
		assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
		assert_eq!(controller.num_in, 1);
		assert_eq!(controller.num_out, 1);
		controller.on_disconnect_peer(peer1);
		assert_eq!(
			rx.try_recv().unwrap(),
			Message::Drop { set_id: SetId::from(0), peer_id: peer1 }
		);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert_eq!(controller.nodes.len(), 1);
		assert!(!controller.nodes.contains_key(&peer1));
		assert_eq!(controller.num_in, 1);
		assert_eq!(controller.num_out, 0);
		controller.on_disconnect_peer(peer2);
		assert_eq!(
			rx.try_recv().unwrap(),
			Message::Drop { set_id: SetId::from(0), peer_id: peer2 }
		);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert_eq!(controller.nodes.len(), 0);
		assert_eq!(controller.num_in, 0);
		assert_eq!(controller.num_out, 0);
	}
	#[test]
	fn disconnecting_reserved_peers_is_a_noop() {
		let reserved1 = PeerId::random();
		let reserved2 = PeerId::random();
		let config = ProtoSetConfig {
			in_peers: 10,
			out_peers: 10,
			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().times(2).return_const(false);
		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.on_incoming_connection(reserved1, IncomingIndex(1));
		controller.alloc_slots();
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 2);
		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
		assert!(matches!(
			controller.reserved_nodes.get(&reserved1),
			Some(PeerState::Connected(Direction::Inbound))
		));
		assert!(matches!(
			controller.reserved_nodes.get(&reserved2),
			Some(PeerState::Connected(Direction::Outbound))
		));
		controller.on_disconnect_peer(reserved1);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(matches!(
			controller.reserved_nodes.get(&reserved1),
			Some(PeerState::Connected(Direction::Inbound))
		));
		controller.on_disconnect_peer(reserved2);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(matches!(
			controller.reserved_nodes.get(&reserved2),
			Some(PeerState::Connected(Direction::Outbound))
		));
	}
	#[test]
	fn dropping_regular_peers_work() {
		let peer1 = PeerId::random();
		let peer2 = PeerId::random();
		let outgoing_candidates = vec![peer1];
		let config = ProtoSetConfig {
			in_peers: 10,
			out_peers: 10,
			reserved_nodes: HashSet::new(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().once().return_const(false);
		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
		peer_store.expect_report_disconnect().times(2).return_const(());
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.alloc_slots();
		controller.on_incoming_connection(peer2, IncomingIndex(1));
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 2);
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
		assert_eq!(controller.nodes.len(), 2);
		assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
		assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
		assert_eq!(controller.num_in, 1);
		assert_eq!(controller.num_out, 1);
		controller.on_peer_dropped(peer1);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert_eq!(controller.nodes.len(), 1);
		assert!(!controller.nodes.contains_key(&peer1));
		assert_eq!(controller.num_in, 1);
		assert_eq!(controller.num_out, 0);
		controller.on_peer_dropped(peer2);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert_eq!(controller.nodes.len(), 0);
		assert_eq!(controller.num_in, 0);
		assert_eq!(controller.num_out, 0);
	}
	#[test]
	fn incoming_request_for_connected_reserved_node_switches_it_to_inbound() {
		let reserved1 = PeerId::random();
		let reserved2 = PeerId::random();
		let config = ProtoSetConfig {
			in_peers: 10,
			out_peers: 10,
			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().times(2).return_const(false);
		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.on_incoming_connection(reserved1, IncomingIndex(1));
		controller.alloc_slots();
		let mut messages = Vec::new();
		while let Some(message) = rx.try_recv().ok() {
			messages.push(message);
		}
		assert_eq!(messages.len(), 2);
		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
		assert!(matches!(
			controller.reserved_nodes.get(&reserved1),
			Some(PeerState::Connected(Direction::Inbound))
		));
		assert!(matches!(
			controller.reserved_nodes.get(&reserved2),
			Some(PeerState::Connected(Direction::Outbound))
		));
		controller.on_incoming_connection(reserved1, IncomingIndex(2));
		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(matches!(
			controller.reserved_nodes.get(&reserved1),
			Some(PeerState::Connected(Direction::Inbound))
		));
		controller.on_incoming_connection(reserved2, IncomingIndex(3));
		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(3)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(matches!(
			controller.reserved_nodes.get(&reserved2),
			Some(PeerState::Connected(Direction::Inbound))
		));
	}
	#[test]
	fn incoming_request_for_connected_regular_node_switches_it_to_inbound() {
		let regular1 = PeerId::random();
		let regular2 = PeerId::random();
		let outgoing_candidates = vec![regular1];
		let config = ProtoSetConfig {
			in_peers: 10,
			out_peers: 10,
			reserved_nodes: HashSet::new(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().times(3).return_const(false);
		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
		controller.alloc_slots();
		assert_eq!(
			rx.try_recv().ok().unwrap(),
			Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
		);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Outbound,));
		controller.on_incoming_connection(regular2, IncomingIndex(0));
		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
		controller.on_incoming_connection(regular1, IncomingIndex(1));
		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(1)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Inbound,));
		controller.on_incoming_connection(regular2, IncomingIndex(2));
		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
	}
	#[test]
	fn incoming_request_for_connected_node_is_rejected_if_its_banned() {
		let regular1 = PeerId::random();
		let regular2 = PeerId::random();
		let outgoing_candidates = vec![regular1];
		let config = ProtoSetConfig {
			in_peers: 10,
			out_peers: 10,
			reserved_nodes: HashSet::new(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().once().return_const(false);
		peer_store.expect_is_banned().times(2).return_const(true);
		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
		controller.alloc_slots();
		assert_eq!(
			rx.try_recv().ok().unwrap(),
			Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
		);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Outbound,));
		controller.on_incoming_connection(regular2, IncomingIndex(0));
		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
		controller.on_incoming_connection(regular1, IncomingIndex(1));
		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(!controller.nodes.contains_key(®ular1));
		controller.on_incoming_connection(regular2, IncomingIndex(2));
		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(!controller.nodes.contains_key(®ular2));
	}
	#[test]
	fn incoming_request_for_connected_node_is_rejected_if_no_slots_available() {
		let regular1 = PeerId::random();
		let regular2 = PeerId::random();
		let outgoing_candidates = vec![regular1];
		let config = ProtoSetConfig {
			in_peers: 1,
			out_peers: 1,
			reserved_nodes: HashSet::new(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().once().return_const(false);
		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		assert_eq!(controller.num_out, 0);
		assert_eq!(controller.num_in, 0);
		controller.alloc_slots();
		assert_eq!(
			rx.try_recv().ok().unwrap(),
			Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
		);
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Outbound,));
		controller.on_incoming_connection(regular2, IncomingIndex(0));
		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
		controller.max_in = 0;
		controller.on_incoming_connection(regular1, IncomingIndex(1));
		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(!controller.nodes.contains_key(®ular1));
		controller.on_incoming_connection(regular2, IncomingIndex(2));
		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		assert!(!controller.nodes.contains_key(®ular2));
	}
	#[test]
	fn incoming_peers_that_exceed_slots_are_rejected() {
		let peer1 = PeerId::random();
		let peer2 = PeerId::random();
		let config = ProtoSetConfig {
			in_peers: 1,
			out_peers: 10,
			reserved_nodes: HashSet::new(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().once().return_const(false);
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.on_incoming_connection(peer1, IncomingIndex(1));
		assert_eq!(rx.try_recv().unwrap(), Message::Accept(IncomingIndex(1)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
		controller.on_incoming_connection(peer2, IncomingIndex(2));
		assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(2)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
	}
	#[test]
	fn banned_regular_incoming_node_is_rejected() {
		let peer1 = PeerId::random();
		let config = ProtoSetConfig {
			in_peers: 10,
			out_peers: 10,
			reserved_nodes: HashSet::new(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().once().return_const(true);
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		controller.on_incoming_connection(peer1, IncomingIndex(1));
		assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
	}
	#[test]
	fn banned_reserved_incoming_node_is_rejected() {
		let reserved1 = PeerId::random();
		let config = ProtoSetConfig {
			in_peers: 10,
			out_peers: 10,
			reserved_nodes: std::iter::once(reserved1).collect(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().once().return_const(true);
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		assert!(controller.reserved_nodes.contains_key(&reserved1));
		controller.on_incoming_connection(reserved1, IncomingIndex(1));
		assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
	}
	#[test]
	fn we_dont_connect_to_banned_reserved_node() {
		let reserved1 = PeerId::random();
		let config = ProtoSetConfig {
			in_peers: 10,
			out_peers: 10,
			reserved_nodes: std::iter::once(reserved1).collect(),
			reserved_only: false,
		};
		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
		let mut peer_store = MockPeerStoreHandle::new();
		peer_store.expect_register_protocol().once().return_const(());
		peer_store.expect_is_banned().once().return_const(true);
		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
		let (_handle, mut controller) =
			ProtocolController::new(SetId::from(0), config, tx, Box::new(peer_store));
		assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
		controller.alloc_slots();
		assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
	}
}