polkadot_node_network_protocol/request_response/incoming/
mod.rs1use std::marker::PhantomData;
18
19use futures::{channel::oneshot, StreamExt};
20
21use codec::{Decode, Encode};
22
23use sc_network::{config as netconfig, NetworkBackend};
24use sc_network_types::PeerId;
25use sp_runtime::traits::Block;
26
27use super::{IsRequest, ReqProtocolNames};
28use crate::UnifiedReputationChange;
29
30mod error;
31pub use error::{Error, FatalError, JfyiError, Result};
32
33#[derive(Debug)]
38pub struct IncomingRequest<Req> {
39 pub peer: PeerId,
41 pub payload: Req,
43 pub pending_response: OutgoingResponseSender<Req>,
45}
46
47impl<Req> IncomingRequest<Req>
48where
49 Req: IsRequest + Decode + Encode,
50 Req::Response: Encode,
51{
52 pub fn get_config_receiver<B: Block, N: NetworkBackend<B, <B as Block>::Hash>>(
58 req_protocol_names: &ReqProtocolNames,
59 ) -> (IncomingRequestReceiver<Req>, N::RequestResponseProtocolConfig) {
60 let (raw, cfg) = Req::PROTOCOL.get_config::<B, N>(req_protocol_names);
61 (IncomingRequestReceiver { raw, phantom: PhantomData {} }, cfg)
62 }
63
64 pub fn new(
66 peer: PeerId,
67 payload: Req,
68 pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
69 ) -> Self {
70 Self {
71 peer,
72 payload,
73 pending_response: OutgoingResponseSender { pending_response, phantom: PhantomData {} },
74 }
75 }
76
77 fn try_from_raw(
86 raw: sc_network::config::IncomingRequest,
87 reputation_changes: Vec<UnifiedReputationChange>,
88 ) -> std::result::Result<Self, JfyiError> {
89 let sc_network::config::IncomingRequest { payload, peer, pending_response } = raw;
90 let payload = match Req::decode(&mut payload.as_ref()) {
91 Ok(payload) => payload,
92 Err(err) => {
93 let reputation_changes = reputation_changes.into_iter().map(|r| r.into()).collect();
94 let response = sc_network::config::OutgoingResponse {
95 result: Err(()),
96 reputation_changes,
97 sent_feedback: None,
98 };
99
100 if let Err(_) = pending_response.send(response) {
101 return Err(JfyiError::DecodingErrorNoReputationChange(peer, err))
102 }
103 return Err(JfyiError::DecodingError(peer, err))
104 },
105 };
106 Ok(Self::new(peer, payload, pending_response))
107 }
108
109 pub fn into_raw(self) -> sc_network::config::IncomingRequest {
113 sc_network::config::IncomingRequest {
114 peer: self.peer,
115 payload: self.payload.encode(),
116 pending_response: self.pending_response.pending_response,
117 }
118 }
119
120 pub fn send_response(self, resp: Req::Response) -> std::result::Result<(), Req::Response> {
124 self.pending_response.send_response(resp)
125 }
126
127 pub fn send_outgoing_response(
131 self,
132 resp: OutgoingResponse<<Req as IsRequest>::Response>,
133 ) -> std::result::Result<(), ()> {
134 self.pending_response.send_outgoing_response(resp)
135 }
136}
137
138#[derive(Debug)]
140pub struct OutgoingResponseSender<Req> {
141 pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
142 phantom: PhantomData<Req>,
143}
144
145impl<Req> OutgoingResponseSender<Req>
146where
147 Req: IsRequest + Decode,
148 Req::Response: Encode,
149{
150 pub fn send_response(self, resp: Req::Response) -> std::result::Result<(), Req::Response> {
157 self.pending_response
158 .send(netconfig::OutgoingResponse {
159 result: Ok(resp.encode()),
160 reputation_changes: Vec::new(),
161 sent_feedback: None,
162 })
163 .map_err(|_| resp)
164 }
165
166 pub fn send_outgoing_response(
172 self,
173 resp: OutgoingResponse<<Req as IsRequest>::Response>,
174 ) -> std::result::Result<(), ()> {
175 let OutgoingResponse { result, reputation_changes, sent_feedback } = resp;
176
177 let response = netconfig::OutgoingResponse {
178 result: result.map(|v| v.encode()),
179 reputation_changes: reputation_changes.into_iter().map(|c| c.into()).collect(),
180 sent_feedback,
181 };
182
183 self.pending_response.send(response).map_err(|_| ())
184 }
185}
186
187pub struct OutgoingResponse<Response> {
191 pub result: std::result::Result<Response, ()>,
195
196 pub reputation_changes: Vec<UnifiedReputationChange>,
199
200 pub sent_feedback: Option<oneshot::Sender<()>>,
203}
204
205pub struct IncomingRequestReceiver<Req> {
209 raw: async_channel::Receiver<netconfig::IncomingRequest>,
210 phantom: PhantomData<Req>,
211}
212
213impl<Req> IncomingRequestReceiver<Req>
214where
215 Req: IsRequest + Decode + Encode,
216 Req::Response: Encode,
217{
218 pub async fn recv<F>(&mut self, reputation_changes: F) -> Result<IncomingRequest<Req>>
223 where
224 F: FnOnce() -> Vec<UnifiedReputationChange>,
225 {
226 let req = match self.raw.next().await {
227 None => return Err(FatalError::RequestChannelExhausted.into()),
228 Some(raw) => IncomingRequest::<Req>::try_from_raw(raw, reputation_changes())?,
229 };
230 Ok(req)
231 }
232}