Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

serializarion for zenoh types #16

Open
wants to merge 36 commits into
base: serialization_ext2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
539ba77
dummy drop for cleanup handlers
yellowhatter Sep 17, 2024
ff53488
migrate to ctrlc crate
yellowhatter Sep 18, 2024
1d58a04
Merge commit 'cad84a9113875ed995215d26e6e00806c27ed002'
yellowhatter Sep 18, 2024
50b6e6c
add force_cleanup_before_exit routine
yellowhatter Sep 18, 2024
c5819cb
Merge commit 'a5722ddced961472fa71db121c9e13503d5a44a6'
yellowhatter Sep 18, 2024
a717dd8
use ctrlc2 with other Cleanup finalization seq
yellowhatter Sep 18, 2024
360ec7f
Merge commit '1b046d4781017bf588027c38dc3d3285dac024cd'
yellowhatter Sep 18, 2024
9b48776
detach cleanup thread before joining
yellowhatter Sep 18, 2024
d3872cb
change cleanup task handle lifetime
yellowhatter Sep 18, 2024
4e7b84f
Merge commit 'ddcc8f1d8c114ca256df3939c303bcbdc1f80197'
yellowhatter Sep 20, 2024
0246d62
- completely different SHM clenup mechanism
yellowhatter Sep 20, 2024
2945569
Merge commit '965e9050ac2d1cae83ad0919c91f2ac9cd56f648'
yellowhatter Sep 26, 2024
a0c8b31
Support Linux only for a while
yellowhatter Sep 26, 2024
99fac5f
unix and not macos
yellowhatter Sep 26, 2024
5962138
do not use Linux Ephemeral ports in tests
yellowhatter Sep 26, 2024
4f835a0
remove more ephemeral ports
yellowhatter Sep 26, 2024
5db24c8
refactor(storage-manager): keep configuration in StorageService
J-Loudet Sep 24, 2024
532cf6e
refactor(storage-manager): separate creation/start of StorageService
J-Loudet Sep 24, 2024
f1b956d
refactor(storage-manager): use hash in Replication key expressions
J-Loudet Sep 25, 2024
049fdd8
refactor(storage-manager): remove unnecessary wait/retry policy
J-Loudet Sep 25, 2024
e9ef030
refactor(storage-manager): initial alignment on empty Storage
J-Loudet Sep 25, 2024
a6c6994
Merge pull request #1475 from ZettaScaleLabs/fix_ephemeral_ports_in_t…
yellowhatter Sep 26, 2024
0d0b32a
Merge commit 'a6c69946f7bd40b541ef78c9b7f417d0baa978a3'
yellowhatter Sep 26, 2024
f60a7f0
fix: starting rest plugin like any other plugin (#1478)
gabrik Sep 27, 2024
6cd1f16
chore: update release.yml for required labels
diogomatsubara Sep 27, 2024
21d2d4d
Merge pull request #1479 from ZettaScaleLabs/update-release-yaml
yellowhatter Sep 27, 2024
cdb869b
Merge pull request #1445 from ZettaScaleLabs/fix_memory_leak
yellowhatter Sep 27, 2024
941f699
Fix peers start conditions bug (#1477)
OlivierHecart Sep 27, 2024
af3ac7b
Remove closing from hat trait (#1469)
OlivierHecart Sep 27, 2024
3964d5a
Send Declare and OAM messages with Control priority (#1476)
OlivierHecart Sep 27, 2024
e79c800
Avoid waiting for dropping deadline if wait time is null (#1480)
OlivierHecart Sep 27, 2024
3bf271b
serializarion for zenoh::time::Timestamp
milyin Sep 29, 2024
795843f
intermediate types serialization added
milyin Sep 29, 2024
ba934f8
encoding serialization added
milyin Sep 29, 2024
cf37dd6
cargo fmt
milyin Sep 29, 2024
ba4cc5d
Merge branch 'serialization_ext2' into serialization_ext2_zenohtypes
milyin Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .github/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,22 @@

changelog:
categories:
- title: Breaking changes 💥
labels:
- breaking-change
- title: New features 🎉
labels:
- enhancement
- new feature
- title: Bug fixes 🐞
labels:
- bug
- title: Documentation 📝
labels:
- documentation
- title: Dependencies 👷
labels:
- dependencies
- title: Other changes
labels:
- "*"
- "*"
4 changes: 2 additions & 2 deletions commons/zenoh-protocol/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,13 @@ pub mod ext {

pub const DEFAULT: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false);

pub const DECLARE: Self = Self::new(Priority::DEFAULT, CongestionControl::Block, false);
pub const DECLARE: Self = Self::new(Priority::Control, CongestionControl::Block, false);
pub const PUSH: Self = Self::new(Priority::DEFAULT, CongestionControl::Drop, false);
pub const REQUEST: Self = Self::new(Priority::DEFAULT, CongestionControl::Block, false);
pub const RESPONSE: Self = Self::new(Priority::DEFAULT, CongestionControl::Block, false);
pub const RESPONSE_FINAL: Self =
Self::new(Priority::DEFAULT, CongestionControl::Block, false);
pub const OAM: Self = Self::new(Priority::DEFAULT, CongestionControl::Block, false);
pub const OAM: Self = Self::new(Priority::Control, CongestionControl::Block, false);

pub const fn new(
priority: Priority,
Expand Down
31 changes: 31 additions & 0 deletions commons/zenoh-shm/src/api/cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use crate::posix_shm::cleanup::cleanup_orphaned_segments;

/// Linux: Trigger cleanup for orphaned SHM segments
/// If process that created named SHM segment crashes or exits by a signal, the segment persists in the system
/// disregarding if it is used by other Zenoh processes or not. This is the detail of POSIX specification for
/// shared memory that is hard to bypass. To deal with this we developed a cleanup routine that enumerates all
/// segments and tries to find processes that are using it. If no such process found, segment will be removed.
/// There is no ideal signal to trigger this cleanup, so by default, zenoh triggers it in the following moments:
/// - first POSIX SHM segment creation
/// - process exit via exit() call or return from maint function
/// It is OK to additionally trigger this function at any time, but be aware that this can be costly.
///
/// For non-linux platforms this function currently does nothing
#[zenoh_macros::unstable_doc]
pub fn cleanup_orphaned_shm_segments() {
cleanup_orphaned_segments();
}
1 change: 1 addition & 0 deletions commons/zenoh-shm/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//

pub mod buffer;
pub mod cleanup;
pub mod client;
pub mod client_storage;
pub mod common;
Expand Down
16 changes: 13 additions & 3 deletions commons/zenoh-shm/src/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use static_init::dynamic;

use crate::posix_shm::cleanup::cleanup_orphaned_segments;

/// A global cleanup, that is guaranteed to be dropped at normal program exit and that will
/// execute all registered cleanup routines at this moment
#[dynamic(lazy, drop)]
Expand All @@ -26,6 +28,8 @@ pub(crate) struct Cleanup {

impl Cleanup {
fn new() -> Self {
// on first cleanup subsystem touch we perform zenoh segment cleanup
cleanup_orphaned_segments();
Self {
cleanups: Default::default(),
}
Expand All @@ -34,14 +38,20 @@ impl Cleanup {
pub(crate) fn register_cleanup(&self, cleanup_fn: Box<dyn FnOnce() + Send>) {
self.cleanups.push(Some(cleanup_fn));
}
}

impl Drop for Cleanup {
fn drop(&mut self) {
fn cleanup(&self) {
while let Some(cleanup) = self.cleanups.pop() {
if let Some(f) = cleanup {
f();
}
}
}
}

impl Drop for Cleanup {
fn drop(&mut self) {
// on finalization stage we perform zenoh segment cleanup
cleanup_orphaned_segments();
self.cleanup();
}
}
134 changes: 134 additions & 0 deletions commons/zenoh-shm/src/posix_shm/cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

pub(crate) use platform::cleanup_orphaned_segments;

#[cfg(not(all(unix, not(target_os = "macos"))))]
mod platform {
pub(crate) fn cleanup_orphaned_segments() {}
}

#[cfg(all(unix, not(target_os = "macos")))]
mod platform {
use std::{borrow::Borrow, collections::HashSet, fs, path::PathBuf};

use zenoh_result::ZResult;

#[derive(PartialEq, Eq, Hash)]
struct ProcFdDir(PathBuf);

impl ProcFdDir {
fn enumerate_fds(&self) -> ZResult<HashSet<FdFile>> {
let fds = self.0.read_dir()?;
let fd_map: HashSet<FdFile> = fds
.filter_map(Result::ok)
.map(|f| std::convert::Into::<FdFile>::into(f.path()))
.collect();
Ok(fd_map)
}
}

impl From<PathBuf> for ProcFdDir {
fn from(value: PathBuf) -> Self {
Self(value)
}
}

#[derive(PartialEq, Eq, Hash)]
struct FdFile(PathBuf);

impl From<PathBuf> for FdFile {
fn from(value: PathBuf) -> Self {
Self(value)
}
}

#[derive(PartialEq, Eq, Hash)]
struct ShmFile(PathBuf);

impl ShmFile {
fn cleanup_file(self) {
let _ = std::fs::remove_file(self.0);
}
}

impl Borrow<PathBuf> for ShmFile {
fn borrow(&self) -> &PathBuf {
&self.0
}
}

impl From<PathBuf> for ShmFile {
fn from(value: PathBuf) -> Self {
Self(value)
}
}

pub(crate) fn cleanup_orphaned_segments() {
if let Err(e) = cleanup_orphaned_segments_inner() {
tracing::error!("Error performing orphaned SHM segments cleanup: {e}")
}
}

fn enumerate_shm_files() -> ZResult<HashSet<ShmFile>> {
let shm_files = fs::read_dir("/dev/shm")?;
Ok(shm_files
.filter_map(Result::ok)
.filter_map(|f| {
if let Some(ext) = f.path().extension() {
if ext == "zenoh" {
return Some(std::convert::Into::<ShmFile>::into(f.path()));
}
}
None
})
.collect())
}

fn enumerate_proc_dirs() -> ZResult<HashSet<ProcFdDir>> {
let proc_dirs = fs::read_dir("/proc")?;
Ok(proc_dirs
.filter_map(Result::ok)
.map(|f| std::convert::Into::<ProcFdDir>::into(f.path().join("fd")))
.collect())
}

fn enumerate_proc_fds() -> ZResult<HashSet<FdFile>> {
let mut fds = HashSet::default();
let dirs = enumerate_proc_dirs()?;
for dir in dirs {
if let Ok(dir_fds) = dir.enumerate_fds() {
fds.extend(dir_fds);
}
}
Ok(fds)
}

fn cleanup_orphaned_segments_inner() -> ZResult<()> {
let fd_map = enumerate_proc_fds()?;
let mut shm_map = enumerate_shm_files()?;

for fd_file in fd_map {
if let Ok(resolved_link) = fd_file.0.read_link() {
shm_map.remove(&resolved_link);
}
}

for shm_file_to_cleanup in shm_map {
shm_file_to_cleanup.cleanup_file();
}

Ok(())
}
}
1 change: 1 addition & 0 deletions commons/zenoh-shm/src/posix_shm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@

pub mod array;
tested_crate_module!(segment);
pub(crate) mod cleanup;
2 changes: 1 addition & 1 deletion commons/zenoh-shm/src/posix_shm/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ where
fn os_id(id: ID, id_prefix: &str) -> String {
let os_id_str = format!("{id_prefix}_{id}");
let crc_os_id_str = ECMA.checksum(os_id_str.as_bytes());
format!("{:x}", crc_os_id_str)
format!("{:x}.zenoh", crc_os_id_str)
}

pub fn as_ptr(&self) -> *mut u8 {
Expand Down
30 changes: 27 additions & 3 deletions examples/examples/z_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::HashMap;

//
// Copyright (c) 2024 ZettaScale Technology
//
Expand All @@ -13,7 +11,16 @@ use std::collections::HashMap;
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use zenoh::bytes::ZBytes;
use std::{
collections::HashMap,
str::FromStr,
time::{SystemTime, UNIX_EPOCH},
};

use zenoh::{
bytes::{Encoding, ZBytes},
time::{Timestamp, TimestampId},
};

fn main() {
// Raw bytes
Expand Down Expand Up @@ -100,6 +107,23 @@ fn main() {
let output: (f64, String) = z_deserialize(&payload).unwrap();
assert_eq!(input, output);

// Zenoh types
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().into();
let input = Timestamp::new(now, TimestampId::rand());
let payload = z_serialize(&input);
let output: Timestamp = z_deserialize(&payload).unwrap();
assert_eq!(input, output);

let input = Encoding::TEXT_JSON;
let payload = z_serialize(&input);
let output: Encoding = z_deserialize(&payload).unwrap();
assert_eq!(input, output);

let input = Encoding::from_str("text/plain;foobar").unwrap();
let payload = z_serialize(&input);
let output: Encoding = z_deserialize(&payload).unwrap();
assert_eq!(input, output);

// Look at Serialize/Deserialize documentation for the exhaustive
// list of provided implementations
}
Expand Down
10 changes: 7 additions & 3 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl StageIn {
&mut self,
msg: &mut NetworkMessage,
priority: Priority,
deadline_before_drop: Option<Instant>,
deadline_before_drop: Option<Option<Instant>>,
) -> bool {
// Lock the current serialization batch.
let mut c_guard = self.mutex.current();
Expand All @@ -163,7 +163,7 @@ impl StageIn {
Some(deadline) if !$fragment => {
// We are in the congestion scenario and message is droppable
// Wait for an available batch until deadline
if !self.s_ref.wait_deadline(deadline) {
if !deadline.map_or(false, |deadline| self.s_ref.wait_deadline(deadline)) {
// Still no available batch.
// Restore the sequence number and drop the message
$restore_sn;
Expand Down Expand Up @@ -628,7 +628,11 @@ impl TransmissionPipelineProducer {
};
// If message is droppable, compute a deadline after which the sample could be dropped
let deadline_before_drop = if msg.is_droppable() {
Some(Instant::now() + self.wait_before_drop)
if self.wait_before_drop.is_zero() {
Some(None)
} else {
Some(Some(Instant::now() + self.wait_before_drop))
}
} else {
None
};
Expand Down
4 changes: 0 additions & 4 deletions io/zenoh-transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ impl TransportEventHandler for DummyTransportEventHandler {
/*************************************/
pub trait TransportMulticastEventHandler: Send + Sync {
fn new_peer(&self, peer: TransportPeer) -> ZResult<Arc<dyn TransportPeerEventHandler>>;
fn closing(&self);
fn closed(&self);
fn as_any(&self) -> &dyn Any;
}
Expand All @@ -95,7 +94,6 @@ impl TransportMulticastEventHandler for DummyTransportMulticastEventHandler {
fn new_peer(&self, _peer: TransportPeer) -> ZResult<Arc<dyn TransportPeerEventHandler>> {
Ok(Arc::new(DummyTransportPeerEventHandler))
}
fn closing(&self) {}
fn closed(&self) {}
fn as_any(&self) -> &dyn Any {
self
Expand All @@ -121,7 +119,6 @@ pub trait TransportPeerEventHandler: Send + Sync {
fn handle_message(&self, msg: NetworkMessage) -> ZResult<()>;
fn new_link(&self, src: Link);
fn del_link(&self, link: Link);
fn closing(&self);
fn closed(&self);
fn as_any(&self) -> &dyn Any;
}
Expand All @@ -137,7 +134,6 @@ impl TransportPeerEventHandler for DummyTransportPeerEventHandler {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
Loading
Loading