zombienet_provider/native/
node.rs

1use std::{
2    collections::HashMap,
3    env,
4    path::{Path, PathBuf},
5    process::Stdio,
6    sync::{Arc, Weak},
7    time::Duration,
8};
9
10use anyhow::anyhow;
11use async_trait::async_trait;
12use configuration::types::AssetLocation;
13use flate2::read::GzDecoder;
14use futures::future::try_join_all;
15use nix::{
16    sys::signal::{kill, Signal},
17    unistd::Pid,
18};
19use serde::{ser::Error, Deserialize, Serialize, Serializer};
20use sha2::Digest;
21use support::{constants::THIS_IS_A_BUG, fs::FileSystem};
22use tar::Archive;
23use tokio::{
24    fs,
25    io::{AsyncRead, AsyncReadExt, BufReader},
26    process::{Child, ChildStderr, ChildStdout, Command},
27    sync::{
28        mpsc::{self, Sender},
29        RwLock,
30    },
31    task::JoinHandle,
32    time::sleep,
33    try_join,
34};
35use tracing::trace;
36
37use super::namespace::NativeNamespace;
38use crate::{
39    constants::{NODE_CONFIG_DIR, NODE_DATA_DIR, NODE_RELAY_DATA_DIR, NODE_SCRIPTS_DIR},
40    native,
41    types::{ExecutionResult, RunCommandOptions, RunScriptOptions, TransferedFile},
42    ProviderError, ProviderNamespace, ProviderNode,
43};
44
45pub(super) struct NativeNodeOptions<'a, FS>
46where
47    FS: FileSystem + Send + Sync + Clone + 'static,
48{
49    pub(super) namespace: &'a Weak<NativeNamespace<FS>>,
50    pub(super) namespace_base_dir: &'a PathBuf,
51    pub(super) name: &'a str,
52    pub(super) program: &'a str,
53    pub(super) args: &'a [String],
54    pub(super) env: &'a [(String, String)],
55    pub(super) startup_files: &'a [TransferedFile],
56    pub(super) created_paths: &'a [PathBuf],
57    pub(super) db_snapshot: Option<&'a AssetLocation>,
58    pub(super) filesystem: &'a FS,
59    pub(super) node_log_path: Option<&'a PathBuf>,
60}
61
62impl<'a, FS> NativeNodeOptions<'a, FS>
63where
64    FS: FileSystem + Send + Sync + Clone + 'static,
65{
66    pub(super) fn from_deserializable(
67        deserializable: &'a DeserializableNativeNodeOptions,
68        namespace: &'a Weak<NativeNamespace<FS>>,
69        namespace_base_dir: &'a PathBuf,
70        filesystem: &'a FS,
71    ) -> NativeNodeOptions<'a, FS> {
72        NativeNodeOptions {
73            namespace,
74            namespace_base_dir,
75            name: &deserializable.name,
76            program: &deserializable.program,
77            args: &deserializable.args,
78            env: &deserializable.env,
79            startup_files: &[],
80            created_paths: &[],
81            db_snapshot: None,
82            filesystem,
83            node_log_path: deserializable.node_log_path.as_ref(),
84        }
85    }
86}
87
88#[derive(Deserialize)]
89pub(super) struct DeserializableNativeNodeOptions {
90    pub name: String,
91    pub program: String,
92    pub args: Vec<String>,
93    pub env: Vec<(String, String)>,
94    pub node_log_path: Option<PathBuf>,
95}
96
97enum ProcessHandle {
98    Spawned(Child, Pid),
99    Attached(Pid),
100}
101
102#[derive(Serialize)]
103pub(super) struct NativeNode<FS>
104where
105    FS: FileSystem + Send + Sync + Clone,
106{
107    #[serde(skip)]
108    namespace: Weak<NativeNamespace<FS>>,
109    name: String,
110    program: String,
111    args: Vec<String>,
112    env: Vec<(String, String)>,
113    base_dir: PathBuf,
114    config_dir: PathBuf,
115    data_dir: PathBuf,
116    relay_data_dir: PathBuf,
117    scripts_dir: PathBuf,
118    log_path: PathBuf,
119    #[serde(serialize_with = "serialize_process_handle")]
120    // using RwLock from std to serialize properly, generally using sync locks is ok in async code as long as they
121    // are not held across await points
122    process_handle: std::sync::RwLock<Option<ProcessHandle>>,
123    #[serde(skip)]
124    stdout_reading_task: RwLock<Option<JoinHandle<()>>>,
125    #[serde(skip)]
126    stderr_reading_task: RwLock<Option<JoinHandle<()>>>,
127    #[serde(skip)]
128    log_writing_task: RwLock<Option<JoinHandle<()>>>,
129    #[serde(skip)]
130    filesystem: FS,
131    provider_tag: String,
132}
133
134impl<FS> NativeNode<FS>
135where
136    FS: FileSystem + Send + Sync + Clone + 'static,
137{
138    pub(super) async fn new(
139        options: NativeNodeOptions<'_, FS>,
140    ) -> Result<Arc<Self>, ProviderError> {
141        let filesystem = options.filesystem.clone();
142
143        let base_dir =
144            PathBuf::from_iter([options.namespace_base_dir, &PathBuf::from(options.name)]);
145        trace!("creating base_dir {:?}", base_dir);
146        options.filesystem.create_dir_all(&base_dir).await?;
147        trace!("created base_dir {:?}", base_dir);
148
149        let base_dir_raw = base_dir.to_string_lossy();
150        let config_dir = PathBuf::from(format!("{base_dir_raw}{NODE_CONFIG_DIR}"));
151        let data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_DATA_DIR}"));
152        let relay_data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_RELAY_DATA_DIR}"));
153        let scripts_dir = PathBuf::from(format!("{base_dir_raw}{NODE_SCRIPTS_DIR}"));
154        let log_path = options
155            .node_log_path
156            .cloned()
157            .unwrap_or_else(|| base_dir.join(format!("{}.log", options.name)));
158
159        trace!("creating dirs {:?}", config_dir);
160        try_join!(
161            filesystem.create_dir_all(&config_dir),
162            filesystem.create_dir_all(&data_dir),
163            filesystem.create_dir_all(&relay_data_dir),
164            filesystem.create_dir_all(&scripts_dir),
165        )?;
166        trace!("created!");
167
168        let node = Arc::new(NativeNode {
169            namespace: options.namespace.clone(),
170            name: options.name.to_string(),
171            program: options.program.to_string(),
172            args: options.args.to_vec(),
173            env: options.env.to_vec(),
174            base_dir,
175            config_dir,
176            data_dir,
177            relay_data_dir,
178            scripts_dir,
179            log_path,
180            process_handle: std::sync::RwLock::new(None),
181            stdout_reading_task: RwLock::new(None),
182            stderr_reading_task: RwLock::new(None),
183            log_writing_task: RwLock::new(None),
184            filesystem: filesystem.clone(),
185            provider_tag: native::provider::PROVIDER_NAME.to_string(),
186        });
187
188        node.initialize_startup_paths(options.created_paths).await?;
189        node.initialize_startup_files(options.startup_files).await?;
190
191        if let Some(db_snap) = options.db_snapshot {
192            node.initialize_db_snapshot(db_snap).await?;
193        }
194
195        let (stdout, stderr) = node.initialize_process().await?;
196
197        node.initialize_log_writing(stdout, stderr).await;
198
199        Ok(node)
200    }
201
202    pub(super) async fn attach_to_live(
203        options: NativeNodeOptions<'_, FS>,
204        pid: i32,
205    ) -> Result<Arc<Self>, ProviderError> {
206        let filesystem = options.filesystem.clone();
207
208        let base_dir =
209            PathBuf::from_iter([options.namespace_base_dir, &PathBuf::from(options.name)]);
210        trace!("creating base_dir {:?}", base_dir);
211        options.filesystem.create_dir_all(&base_dir).await?;
212        trace!("created base_dir {:?}", base_dir);
213
214        let base_dir_raw = base_dir.to_string_lossy();
215        let config_dir = PathBuf::from(format!("{base_dir_raw}{NODE_CONFIG_DIR}"));
216        let data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_DATA_DIR}"));
217        let relay_data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_RELAY_DATA_DIR}"));
218        let scripts_dir = PathBuf::from(format!("{base_dir_raw}{NODE_SCRIPTS_DIR}"));
219        let log_path = options
220            .node_log_path
221            .cloned()
222            .unwrap_or_else(|| base_dir.join(format!("{}.log", options.name)));
223
224        let pid = Pid::from_raw(pid);
225
226        let node = Arc::new(NativeNode {
227            namespace: options.namespace.clone(),
228            name: options.name.to_string(),
229            program: options.program.to_string(),
230            args: options.args.to_vec(),
231            env: options.env.to_vec(),
232            base_dir,
233            config_dir,
234            data_dir,
235            relay_data_dir,
236            scripts_dir,
237            log_path,
238            process_handle: std::sync::RwLock::new(Some(ProcessHandle::Attached(pid))),
239            stdout_reading_task: RwLock::new(None),
240            stderr_reading_task: RwLock::new(None),
241            log_writing_task: RwLock::new(None),
242            filesystem: filesystem.clone(),
243            provider_tag: native::provider::PROVIDER_NAME.to_string(),
244        });
245
246        Ok(node)
247    }
248
249    async fn initialize_startup_paths(&self, paths: &[PathBuf]) -> Result<(), ProviderError> {
250        trace!("creating paths {:?}", paths);
251        let base_dir_raw = self.base_dir.to_string_lossy();
252        try_join_all(paths.iter().map(|file| {
253            let full_path = format!("{base_dir_raw}{}", file.to_string_lossy());
254            self.filesystem.create_dir_all(full_path)
255        }))
256        .await?;
257        trace!("paths created!");
258
259        Ok(())
260    }
261
262    async fn initialize_startup_files(
263        &self,
264        startup_files: &[TransferedFile],
265    ) -> Result<(), ProviderError> {
266        trace!("creating files {:?}", startup_files);
267        try_join_all(
268            startup_files
269                .iter()
270                .map(|file| self.send_file(&file.local_path, &file.remote_path, &file.mode)),
271        )
272        .await?;
273        trace!("files created!");
274
275        Ok(())
276    }
277
278    async fn initialize_db_snapshot(
279        &self,
280        db_snapshot: &AssetLocation,
281    ) -> Result<(), ProviderError> {
282        trace!("snap: {db_snapshot}");
283
284        // check if we need to get the db or is already in the ns
285        let ns_base_dir = self.namespace_base_dir();
286        let hashed_location = match db_snapshot {
287            AssetLocation::Url(location) => hex::encode(sha2::Sha256::digest(location.to_string())),
288            AssetLocation::FilePath(filepath) => {
289                hex::encode(sha2::Sha256::digest(filepath.to_string_lossy().to_string()))
290            },
291        };
292
293        let full_path = format!("{ns_base_dir}/{hashed_location}.tgz");
294        trace!("db_snap fullpath in ns: {full_path}");
295        if !self.filesystem.exists(&full_path).await {
296            // needs to download/copy
297            self.get_db_snapshot(db_snapshot, &full_path).await?;
298        }
299
300        let contents = self.filesystem.read(&full_path).await.unwrap();
301        let gz = GzDecoder::new(&contents[..]);
302        let mut archive = Archive::new(gz);
303        archive
304            .unpack(self.base_dir.to_string_lossy().as_ref())
305            .unwrap();
306
307        if std::env::var("ZOMBIE_RM_TGZ_AFTER_EXTRACT").is_ok() {
308            let res = fs::remove_file(&full_path).await;
309            trace!("removing {}, result {:?}", full_path, res);
310        }
311
312        Ok(())
313    }
314
315    async fn get_db_snapshot(
316        &self,
317        location: &AssetLocation,
318        full_path: &str,
319    ) -> Result<(), ProviderError> {
320        trace!("getting db_snapshot from: {:?} to: {full_path}", location);
321        match location {
322            AssetLocation::Url(location) => {
323                let res = reqwest::get(location.as_ref())
324                    .await
325                    .map_err(|err| ProviderError::DownloadFile(location.to_string(), err.into()))?;
326
327                let contents: &[u8] = &res.bytes().await.unwrap();
328                trace!("writing: {full_path}");
329                self.filesystem.write(full_path, contents).await?;
330            },
331            AssetLocation::FilePath(filepath) => {
332                self.filesystem.copy(filepath, full_path).await?;
333            },
334        };
335
336        Ok(())
337    }
338
339    async fn initialize_process(&self) -> Result<(ChildStdout, ChildStderr), ProviderError> {
340        let filtered_env: HashMap<String, String> = env::vars()
341            .filter(|(k, _)| k == "TZ" || k == "LANG" || k == "PATH")
342            .collect();
343
344        let mut process = Command::new(&self.program)
345            .args(&self.args)
346            .env_clear()
347            .envs(&filtered_env) // minimal environment
348            .envs(self.env.to_vec())
349            .stdin(Stdio::null())
350            .stdout(Stdio::piped())
351            .stderr(Stdio::piped())
352            .kill_on_drop(true)
353            .current_dir(&self.base_dir)
354            .spawn()
355            .map_err(|err| ProviderError::NodeSpawningFailed(self.name.to_string(), err.into()))?;
356        let stdout = process
357            .stdout
358            .take()
359            .expect(&format!("infaillible, stdout is piped {THIS_IS_A_BUG}"));
360        let stderr = process
361            .stderr
362            .take()
363            .expect(&format!("infaillible, stderr is piped {THIS_IS_A_BUG}"));
364
365        let pid = Pid::from_raw(
366            process
367                .id()
368                .ok_or_else(|| ProviderError::ProcessIdRetrievalFailed(self.name.to_string()))?
369                as i32,
370        );
371        self.process_handle
372            .write()
373            .map_err(|_e| ProviderError::FailedToAcquireLock(self.name.clone()))?
374            .replace(ProcessHandle::Spawned(process, pid));
375
376        Ok((stdout, stderr))
377    }
378
379    async fn initialize_log_writing(&self, stdout: ChildStdout, stderr: ChildStderr) {
380        let (stdout_tx, mut rx) = mpsc::channel(10);
381        let stderr_tx = stdout_tx.clone();
382
383        self.stdout_reading_task
384            .write()
385            .await
386            .replace(self.create_stream_polling_task(stdout, stdout_tx));
387        self.stderr_reading_task
388            .write()
389            .await
390            .replace(self.create_stream_polling_task(stderr, stderr_tx));
391
392        let filesystem = self.filesystem.clone();
393        let log_path = self.log_path.clone();
394
395        self.log_writing_task
396            .write()
397            .await
398            .replace(tokio::spawn(async move {
399                loop {
400                    while let Some(Ok(data)) = rx.recv().await {
401                        // TODO: find a better way instead of ignoring error ?
402                        let _ = filesystem.append(&log_path, data).await;
403                    }
404                    sleep(Duration::from_millis(250)).await;
405                }
406            }));
407    }
408
409    fn create_stream_polling_task(
410        &self,
411        stream: impl AsyncRead + Unpin + Send + 'static,
412        tx: Sender<Result<Vec<u8>, std::io::Error>>,
413    ) -> JoinHandle<()> {
414        tokio::spawn(async move {
415            let mut reader = BufReader::new(stream);
416            let mut buffer = vec![0u8; 1024];
417
418            loop {
419                match reader.read(&mut buffer).await {
420                    Ok(0) => {
421                        let _ = tx.send(Ok(Vec::new())).await;
422                        break;
423                    },
424                    Ok(n) => {
425                        let _ = tx.send(Ok(buffer[..n].to_vec())).await;
426                    },
427                    Err(e) => {
428                        let _ = tx.send(Err(e)).await;
429                        break;
430                    },
431                }
432            }
433        })
434    }
435
436    fn process_id(&self) -> Result<Pid, ProviderError> {
437        let pid = self
438            .process_handle
439            .read()
440            .map_err(|_e| ProviderError::FailedToAcquireLock(self.name.clone()))?
441            .as_ref()
442            .map(|handle| match handle {
443                ProcessHandle::Spawned(_, pid) => *pid,
444                ProcessHandle::Attached(pid) => *pid,
445            })
446            .ok_or_else(|| ProviderError::ProcessIdRetrievalFailed(self.name.to_string()))?;
447
448        Ok(pid)
449    }
450
451    pub(crate) async fn abort(&self) -> anyhow::Result<()> {
452        if let Some(task) = self.log_writing_task.write().await.take() {
453            task.abort();
454        }
455        if let Some(task) = self.stdout_reading_task.write().await.take() {
456            task.abort();
457        }
458        if let Some(task) = self.stderr_reading_task.write().await.take() {
459            task.abort();
460        }
461
462        let process_handle = {
463            let mut guard = self
464                .process_handle
465                .write()
466                .map_err(|_e| ProviderError::FailedToAcquireLock(self.name.clone()))?;
467            guard
468                .take()
469                .ok_or_else(|| anyhow!("no process was attached for the node"))?
470        };
471
472        match process_handle {
473            ProcessHandle::Spawned(mut child, _pid) => {
474                child.kill().await?;
475            },
476            ProcessHandle::Attached(pid) => {
477                kill(pid, Signal::SIGKILL)
478                    .map_err(|err| anyhow!("Failed to kill attached process {pid}: {err}"))?;
479            },
480        }
481
482        Ok(())
483    }
484
485    fn namespace_base_dir(&self) -> String {
486        self.namespace
487            .upgrade()
488            .map(|namespace| namespace.base_dir().to_string_lossy().to_string())
489            .unwrap_or_else(|| panic!("namespace shouldn't be dropped, {THIS_IS_A_BUG}"))
490    }
491}
492
493#[async_trait]
494impl<FS> ProviderNode for NativeNode<FS>
495where
496    FS: FileSystem + Send + Sync + Clone + 'static,
497{
498    fn name(&self) -> &str {
499        &self.name
500    }
501
502    fn args(&self) -> Vec<&str> {
503        self.args.iter().map(|arg| arg.as_str()).collect()
504    }
505
506    fn base_dir(&self) -> &PathBuf {
507        &self.base_dir
508    }
509
510    fn config_dir(&self) -> &PathBuf {
511        &self.config_dir
512    }
513
514    fn data_dir(&self) -> &PathBuf {
515        &self.data_dir
516    }
517
518    fn relay_data_dir(&self) -> &PathBuf {
519        &self.relay_data_dir
520    }
521
522    fn scripts_dir(&self) -> &PathBuf {
523        &self.scripts_dir
524    }
525
526    fn log_path(&self) -> &PathBuf {
527        &self.log_path
528    }
529
530    fn log_cmd(&self) -> String {
531        format!("tail -f {}", self.log_path().to_string_lossy())
532    }
533
534    fn path_in_node(&self, file: &Path) -> PathBuf {
535        let full_path = format!(
536            "{}/{}",
537            self.base_dir.to_string_lossy(),
538            file.to_string_lossy()
539        );
540        PathBuf::from(full_path)
541    }
542
543    async fn logs(&self) -> Result<String, ProviderError> {
544        Ok(self.filesystem.read_to_string(&self.log_path).await?)
545    }
546
547    async fn dump_logs(&self, local_dest: PathBuf) -> Result<(), ProviderError> {
548        Ok(self.filesystem.copy(&self.log_path, local_dest).await?)
549    }
550
551    async fn run_command(
552        &self,
553        options: RunCommandOptions,
554    ) -> Result<ExecutionResult, ProviderError> {
555        let result = Command::new(options.program.clone())
556            .args(options.args.clone())
557            .envs(options.env)
558            .current_dir(&self.base_dir)
559            .output()
560            .await
561            .map_err(|err| {
562                ProviderError::RunCommandError(
563                    format!("{} {}", &options.program, &options.args.join(" ")),
564                    "locally".to_string(),
565                    err.into(),
566                )
567            })?;
568
569        if result.status.success() {
570            Ok(Ok(String::from_utf8_lossy(&result.stdout).to_string()))
571        } else {
572            Ok(Err((
573                result.status,
574                String::from_utf8_lossy(&result.stderr).to_string(),
575            )))
576        }
577    }
578
579    async fn run_script(
580        &self,
581        options: RunScriptOptions,
582    ) -> Result<ExecutionResult, ProviderError> {
583        let local_script_path = PathBuf::from(&options.local_script_path);
584
585        if !self.filesystem.exists(&local_script_path).await {
586            return Err(ProviderError::ScriptNotFound(local_script_path));
587        }
588
589        // extract file name and build remote file path
590        let script_file_name = local_script_path
591            .file_name()
592            .map(|file_name| file_name.to_string_lossy().to_string())
593            .ok_or(ProviderError::InvalidScriptPath(anyhow!(
594                "Can't retrieve filename from script with path: {:?}",
595                options.local_script_path
596            )))?;
597        let remote_script_path = format!(
598            "{}/{}",
599            self.scripts_dir.to_string_lossy(),
600            script_file_name
601        );
602
603        // copy and set script's execute permission
604        self.filesystem
605            .copy(local_script_path, &remote_script_path)
606            .await?;
607        self.filesystem.set_mode(&remote_script_path, 0o744).await?;
608
609        // execute script
610        self.run_command(RunCommandOptions {
611            program: remote_script_path,
612            args: options.args,
613            env: options.env,
614        })
615        .await
616    }
617
618    async fn send_file(
619        &self,
620        local_file_path: &Path,
621        remote_file_path: &Path,
622        mode: &str,
623    ) -> Result<(), ProviderError> {
624        let namespaced_remote_file_path = PathBuf::from(format!(
625            "{}{}",
626            &self.base_dir.to_string_lossy(),
627            remote_file_path.to_string_lossy()
628        ));
629
630        self.filesystem
631            .copy(local_file_path, &namespaced_remote_file_path)
632            .await?;
633
634        self.run_command(
635            RunCommandOptions::new("chmod")
636                .args(vec![mode, &namespaced_remote_file_path.to_string_lossy()]),
637        )
638        .await?
639        .map_err(|(_, err)| {
640            ProviderError::SendFile(
641                self.name.clone(),
642                local_file_path.to_string_lossy().to_string(),
643                anyhow!("{err}"),
644            )
645        })?;
646
647        Ok(())
648    }
649
650    async fn receive_file(
651        &self,
652        remote_file_path: &Path,
653        local_file_path: &Path,
654    ) -> Result<(), ProviderError> {
655        let namespaced_remote_file_path = PathBuf::from(format!(
656            "{}{}",
657            &self.base_dir.to_string_lossy(),
658            remote_file_path.to_string_lossy()
659        ));
660
661        self.filesystem
662            .copy(namespaced_remote_file_path, local_file_path)
663            .await?;
664
665        Ok(())
666    }
667
668    async fn pause(&self) -> Result<(), ProviderError> {
669        let process_id = self.process_id()?;
670
671        kill(process_id, Signal::SIGSTOP)
672            .map_err(|err| ProviderError::PauseNodeFailed(self.name.clone(), err.into()))?;
673
674        Ok(())
675    }
676
677    async fn resume(&self) -> Result<(), ProviderError> {
678        let process_id = self.process_id()?;
679
680        nix::sys::signal::kill(process_id, Signal::SIGCONT)
681            .map_err(|err| ProviderError::ResumeNodeFailed(self.name.clone(), err.into()))?;
682
683        Ok(())
684    }
685
686    async fn restart(&self, after: Option<Duration>) -> Result<(), ProviderError> {
687        if let Some(duration) = after {
688            sleep(duration).await;
689        }
690
691        self.abort()
692            .await
693            .map_err(|err| ProviderError::RestartNodeFailed(self.name.clone(), err))?;
694
695        let (stdout, stderr) = self
696            .initialize_process()
697            .await
698            .map_err(|err| ProviderError::RestartNodeFailed(self.name.clone(), err.into()))?;
699
700        self.initialize_log_writing(stdout, stderr).await;
701
702        Ok(())
703    }
704
705    async fn destroy(&self) -> Result<(), ProviderError> {
706        self.abort()
707            .await
708            .map_err(|err| ProviderError::DestroyNodeFailed(self.name.clone(), err))?;
709
710        if let Some(namespace) = self.namespace.upgrade() {
711            namespace.nodes.write().await.remove(&self.name);
712        }
713
714        Ok(())
715    }
716}
717
718fn serialize_process_handle<S>(
719    process_handle: &std::sync::RwLock<Option<ProcessHandle>>,
720    serializer: S,
721) -> Result<S::Ok, S::Error>
722where
723    S: Serializer,
724{
725    let pid = process_handle
726        .read()
727        .map_err(|_e| S::Error::custom("failed to acquire read lock"))?
728        .as_ref()
729        .map(|handle| match handle {
730            ProcessHandle::Spawned(_, pid) => pid.as_raw(),
731            ProcessHandle::Attached(pid) => pid.as_raw(),
732        });
733    pid.serialize(serializer)
734}