Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SHM: optimize metadata #1714

Merged
merged 25 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4cb0b50
Optimize SHM internals by combining watchdog and header data together…
yellowhatter Jan 14, 2025
b90d101
Update segment.rs, subscription.rs, and 2 more files...
yellowhatter Jan 14, 2025
fb21130
fix typo
yellowhatter Jan 14, 2025
b728bf8
fix test feature code
yellowhatter Jan 14, 2025
3b1c5af
Update tests
yellowhatter Jan 15, 2025
bf065d9
Merge commit '2d9472562d5e3b926f2a42cdbed3f0969cee3464'
yellowhatter Jan 15, 2025
b20e9f1
Code format fix (cargo -> rustfmt)
yellowhatter Jan 15, 2025
95265b0
- remove unnecssary trait impls
yellowhatter Jan 15, 2025
4a46568
Put ProtocolID and ChunkDescriptor into SHM metadata: less wire overh…
yellowhatter Jan 15, 2025
082b87e
code format fix
yellowhatter Jan 15, 2025
19ce9b3
review fixes
yellowhatter Jan 15, 2025
d6d6bcf
Merge commit '26d67de043efe327befd0bbfb29c76a5157f1baf'
yellowhatter Jan 15, 2025
f2df6e8
Merge commit '41739485fa5e0b78c46dd03ae06d3abcc999acd8'
yellowhatter Jan 17, 2025
25f7b05
no need to explicitly store ChunkDescriptor in BusyChunk as now it is…
yellowhatter Jan 17, 2025
44f0851
SHM version support with backward-safety
yellowhatter Jan 27, 2025
b4d2f74
Update version.rs
yellowhatter Jan 27, 2025
de483bb
- fix is_valid() check implementation for SHM buffer
yellowhatter Jan 28, 2025
efd944b
Update commons/zenoh-shm/tests/metadata.rs
yellowhatter Jan 28, 2025
354a9a4
Update commons/zenoh-shm/tests/metadata.rs
yellowhatter Jan 28, 2025
f149ab4
Update io/zenoh-transport/src/unicast/establishment/ext/shm.rs
yellowhatter Jan 28, 2025
35d533c
more review fixes
yellowhatter Jan 28, 2025
72d4b91
fix clippy
yellowhatter Jan 28, 2025
7881776
Merge commit 'eb3a7a47eaa8c771e88a114713550b993410674a'
yellowhatter Jan 28, 2025
978a961
Merge commit '8340ef9be9bbab21bdfc0bdff8f3186bed919b3f'
yellowhatter Jan 29, 2025
af31b63
fix uninitialized ref problem by getting rid of problematic lockfree …
yellowhatter Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 10 additions & 57 deletions commons/zenoh-codec/src/core/shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,18 @@ use zenoh_buffers::{
writer::{DidntWrite, Writer},
};
use zenoh_shm::{
api::provider::chunk::ChunkDescriptor, header::descriptor::HeaderDescriptor,
watchdog::descriptor::Descriptor, ShmBufInfo,
api::provider::chunk::ChunkDescriptor, metadata::descriptor::MetadataDescriptor, ShmBufInfo,
};

use crate::{RCodec, WCodec, Zenoh080};

impl<W> WCodec<&Descriptor, &mut W> for Zenoh080
impl<W> WCodec<&MetadataDescriptor, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &Descriptor) -> Self::Output {
self.write(&mut *writer, x.id)?;
self.write(&mut *writer, x.index_and_bitpos)?;
Ok(())
}
}

impl<W> WCodec<&HeaderDescriptor, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &HeaderDescriptor) -> Self::Output {
fn write(self, writer: &mut W, x: &MetadataDescriptor) -> Self::Output {
self.write(&mut *writer, x.id)?;
self.write(&mut *writer, x.index)?;
Ok(())
Expand Down Expand Up @@ -84,52 +70,29 @@ where

fn write(self, writer: &mut W, x: &ShmBufInfo) -> Self::Output {
let ShmBufInfo {
data_descriptor,
shm_protocol,
data_len,
watchdog_descriptor,
header_descriptor,
metadata,
generation,
} = x;

self.write(&mut *writer, data_descriptor)?;
self.write(&mut *writer, shm_protocol)?;
self.write(&mut *writer, *data_len)?;
self.write(&mut *writer, watchdog_descriptor)?;
self.write(&mut *writer, header_descriptor)?;
self.write(&mut *writer, metadata)?;
self.write(&mut *writer, generation)?;
Ok(())
}
}

impl<R> RCodec<Descriptor, &mut R> for Zenoh080
impl<R> RCodec<MetadataDescriptor, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<Descriptor, Self::Error> {
let id = self.read(&mut *reader)?;
let index_and_bitpos = self.read(&mut *reader)?;

Ok(Descriptor {
id,
index_and_bitpos,
})
}
}

impl<R> RCodec<HeaderDescriptor, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<HeaderDescriptor, Self::Error> {
fn read(self, reader: &mut R) -> Result<MetadataDescriptor, Self::Error> {
let id = self.read(&mut *reader)?;
let index = self.read(&mut *reader)?;

Ok(HeaderDescriptor { id, index })
Ok(MetadataDescriptor { id, index })
}
}

Expand Down Expand Up @@ -172,21 +135,11 @@ where
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<ShmBufInfo, Self::Error> {
let data_descriptor = self.read(&mut *reader)?;
let shm_protocol = self.read(&mut *reader)?;
let data_len = self.read(&mut *reader)?;
let watchdog_descriptor = self.read(&mut *reader)?;
let header_descriptor = self.read(&mut *reader)?;
let metadata = self.read(&mut *reader)?;
let generation = self.read(&mut *reader)?;

let shm_info = ShmBufInfo::new(
data_descriptor,
shm_protocol,
data_len,
watchdog_descriptor,
header_descriptor,
generation,
);
let shm_info = ShmBufInfo::new(data_len, metadata, generation);
Ok(shm_info)
}
}
13 changes: 2 additions & 11 deletions commons/zenoh-codec/tests/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,22 +361,13 @@ fn codec_encoding() {
#[cfg(feature = "shared-memory")]
#[test]
fn codec_shm_info() {
use zenoh_shm::{
api::provider::chunk::ChunkDescriptor, header::descriptor::HeaderDescriptor,
watchdog::descriptor::Descriptor, ShmBufInfo,
};
use zenoh_shm::{metadata::descriptor::MetadataDescriptor, ShmBufInfo};

run!(ShmBufInfo, {
let mut rng = rand::thread_rng();
ShmBufInfo::new(
ChunkDescriptor::new(rng.gen(), rng.gen(), rng.gen()),
rng.gen(),
rng.gen(),
Descriptor {
id: rng.gen(),
index_and_bitpos: rng.gen(),
},
HeaderDescriptor {
MetadataDescriptor {
id: rng.gen(),
index: rng.gen(),
},
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-shm/src/api/provider/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::api::common::types::{ChunkID, SegmentID};
/// Uniquely identifies the particular chunk within particular segment
#[zenoh_macros::unstable_doc]
#[derive(Clone, Debug, PartialEq, Eq)]
#[stabby::stabby]
pub struct ChunkDescriptor {
pub segment: SegmentID,
pub chunk: ChunkID,
Expand Down
133 changes: 52 additions & 81 deletions commons/zenoh-shm/src/api/provider/shm_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,12 @@ use super::{
};
use crate::{
api::{buffer::zshmmut::ZShmMut, common::types::ProtocolID},
header::{
allocated_descriptor::AllocatedHeaderDescriptor, descriptor::HeaderDescriptor,
storage::GLOBAL_HEADER_STORAGE,
metadata::{
allocated_descriptor::AllocatedMetadataDescriptor, descriptor::MetadataDescriptor,
storage::GLOBAL_METADATA_STORAGE,
},
watchdog::{
allocated_watchdog::AllocatedWatchdog,
confirmator::{ConfirmedDescriptor, GLOBAL_CONFIRMATOR},
descriptor::Descriptor,
storage::GLOBAL_STORAGE,
validator::GLOBAL_VALIDATOR,
},
ShmBufInfo, ShmBufInner,
Expand All @@ -53,20 +50,14 @@ use crate::{
#[derive(Debug)]
struct BusyChunk {
descriptor: ChunkDescriptor,
header: AllocatedHeaderDescriptor,
_watchdog: AllocatedWatchdog,
metadata: AllocatedMetadataDescriptor,
}

impl BusyChunk {
fn new(
descriptor: ChunkDescriptor,
header: AllocatedHeaderDescriptor,
watchdog: AllocatedWatchdog,
) -> Self {
fn new(descriptor: ChunkDescriptor, metadata: AllocatedMetadataDescriptor) -> Self {
Self {
descriptor,
header,
_watchdog: watchdog,
metadata,
}
}
}
Expand Down Expand Up @@ -822,16 +813,10 @@ where
let len = len.try_into()?;

// allocate resources for SHM buffer
let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?;
let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;

// wrap everything to ShmBufInner
let wrapped = self.wrap(
chunk,
len,
allocated_header,
allocated_watchdog,
confirmed_watchdog,
);
let wrapped = self.wrap(chunk, len, allocated_metadata, confirmed_metadata);
Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
}

Expand All @@ -840,7 +825,7 @@ where
#[zenoh_macros::unstable_doc]
pub fn garbage_collect(&self) -> usize {
fn is_free_chunk(chunk: &BusyChunk) -> bool {
let header = chunk.header.descriptor.header();
let header = chunk.metadata.header();
if header.refcount.load(Ordering::SeqCst) != 0 {
return header.watchdog_invalidated.load(Ordering::SeqCst);
}
Expand Down Expand Up @@ -891,7 +876,7 @@ where
Policy: AllocPolicy,
{
// allocate resources for SHM buffer
let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?;
let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;

// allocate data chunk
// Perform actions depending on the Policy
Expand All @@ -902,82 +887,74 @@ where
let chunk = Policy::alloc(layout, self)?;

// wrap allocated chunk to ShmBufInner
let wrapped = self.wrap(
chunk,
size,
allocated_header,
allocated_watchdog,
confirmed_watchdog,
);
let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
}

fn alloc_resources() -> ZResult<(
AllocatedHeaderDescriptor,
AllocatedWatchdog,
ConfirmedDescriptor,
)> {
// allocate shared header
let allocated_header = GLOBAL_HEADER_STORAGE.read().allocate_header()?;

// allocate watchdog
let allocated_watchdog = GLOBAL_STORAGE.read().allocate_watchdog()?;
fn alloc_resources() -> ZResult<(AllocatedMetadataDescriptor, ConfirmedDescriptor)> {
// allocate metadata
let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?;

// add watchdog to confirmator
let confirmed_watchdog = GLOBAL_CONFIRMATOR
.read()
.add_owned(&allocated_watchdog.descriptor)?;
let confirmed_metadata = GLOBAL_CONFIRMATOR.read().add(allocated_metadata.clone());

Ok((allocated_header, allocated_watchdog, confirmed_watchdog))
Ok((allocated_metadata, confirmed_metadata))
}

fn wrap(
&self,
chunk: AllocatedChunk,
len: NonZeroUsize,
allocated_header: AllocatedHeaderDescriptor,
allocated_watchdog: AllocatedWatchdog,
confirmed_watchdog: ConfirmedDescriptor,
allocated_metadata: AllocatedMetadataDescriptor,
confirmed_metadata: ConfirmedDescriptor,
) -> ShmBufInner {
let header = allocated_header.descriptor.clone();
let descriptor = Descriptor::from(&allocated_watchdog.descriptor);
// write additional metadata
// chunk descriptor
allocated_metadata
.header()
.segment
.store(chunk.descriptor.segment, Ordering::Relaxed);
allocated_metadata
.header()
.chunk
.store(chunk.descriptor.chunk, Ordering::Relaxed);
allocated_metadata
.header()
.len
.store(chunk.descriptor.len.into(), Ordering::Relaxed);
// protocol
allocated_metadata
.header()
.protocol
.store(self.id.id(), Ordering::Relaxed);

// add watchdog to validator
let c_header = header.clone();
GLOBAL_VALIDATOR.read().add(
allocated_watchdog.descriptor.clone(),
Box::new(move || {
c_header
.header()
.watchdog_invalidated
.store(true, Ordering::SeqCst);
}),
);
GLOBAL_VALIDATOR
.read()
.add(confirmed_metadata.owned.clone());

// Create buffer's info
let info = ShmBufInfo::new(
chunk.descriptor.clone(),
self.id.id(),
len,
descriptor,
HeaderDescriptor::from(&header),
header.header().generation.load(Ordering::SeqCst),
MetadataDescriptor::from(&confirmed_metadata.owned),
allocated_metadata
.header()
.generation
.load(Ordering::SeqCst),
);

// Create buffer
let shmb = ShmBufInner {
header,
metadata: Arc::new(confirmed_metadata),
buf: chunk.data,
info,
watchdog: Arc::new(confirmed_watchdog),
};

// Create and store busy chunk
self.busy_list.lock().unwrap().push_back(BusyChunk::new(
chunk.descriptor,
allocated_header,
allocated_watchdog,
));
self.busy_list
.lock()
.unwrap()
.push_back(BusyChunk::new(chunk.descriptor, allocated_metadata));

shmb
}
Expand All @@ -998,7 +975,7 @@ where
Policy: AsyncAllocPolicy,
{
// allocate resources for SHM buffer
let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?;
let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;

// allocate data chunk
// Perform actions depending on the Policy
Expand All @@ -1009,13 +986,7 @@ where
let chunk = Policy::alloc_async(backend_layout, self).await?;

// wrap allocated chunk to ShmBufInner
let wrapped = self.wrap(
chunk,
size,
allocated_header,
allocated_watchdog,
confirmed_watchdog,
);
let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
}
}
10 changes: 9 additions & 1 deletion commons/zenoh-shm/src/header/chunk_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::atomic::{AtomicBool, AtomicU32};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize};

// Chunk header
#[stabby::stabby]
Expand All @@ -25,4 +25,12 @@ pub struct ChunkHeaderType {
pub refcount: AtomicU32,
pub watchdog_invalidated: AtomicBool,
pub generation: AtomicU32,

/// Protocol identifier for particular SHM implementation
pub protocol: AtomicU32,

/// The data chunk descriptor
pub segment: AtomicU32,
pub chunk: AtomicU32,
pub len: AtomicUsize,
}
Loading
Loading