Skip to content

Commit

Permalink
First attempt to add writer guid to serdata interface
Browse files Browse the repository at this point in the history
Signed-off-by: TheFixer <[email protected]>
  • Loading branch information
TheFixer committed May 1, 2023
1 parent 9b98012 commit 01409cd
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 37 deletions.
6 changes: 4 additions & 2 deletions src/core/ddsc/src/dds_serdata_builtintopic.c
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,8 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic = {
.untyped_to_sample = serdata_builtin_untyped_to_sample,
.print = serdata_builtin_type_print,
.get_keyhash = 0,
.get_sequencenumber = 0
.get_sequencenumber = 0,
.get_writer_guid = 0
};

#ifdef DDS_HAS_TOPIC_DISCOVERY
Expand Down Expand Up @@ -501,7 +502,8 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic_topic = {
.untyped_to_sample = serdata_builtin_untyped_to_sample,
.print = serdata_builtin_type_print,
.get_keyhash = 0,
.get_sequencenumber = 0
.get_sequencenumber = 0,
.get_writer_guid = 0
};

#endif /* DDS_HAS_TOPIC_DISCOVERY */
18 changes: 14 additions & 4 deletions src/core/ddsc/src/dds_serdata_default.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ static uint64_t serdata_default_get_sequencenumber(const struct ddsi_serdata *dc
return d->c.sequence_number;
}

static ddsi_guid_t *serdata_default_get_writer_guid(const struct ddsi_serdata *dcmn)
{
struct dds_serdata_default *d = (struct dds_serdata_default *) dcmn;
return &d->c.writer_guid;
}

static bool serdata_default_eqkey(const struct ddsi_serdata *acmn, const struct ddsi_serdata *bcmn)
{
const struct dds_serdata_default *a = (const struct dds_serdata_default *)acmn;
Expand Down Expand Up @@ -834,7 +840,8 @@ const struct ddsi_serdata_ops dds_serdata_ops_cdr = {
.untyped_to_sample = serdata_default_untyped_to_sample_cdr,
.print = serdata_default_print_cdr,
.get_keyhash = serdata_default_get_keyhash,
.get_sequencenumber = serdata_default_get_sequencenumber
.get_sequencenumber = serdata_default_get_sequencenumber,
.get_writer_guid = serdata_default_get_writer_guid
#ifdef DDS_HAS_SHM
, .get_sample_size = ddsi_serdata_iox_size
, .from_iox_buffer = serdata_default_from_iox
Expand All @@ -857,7 +864,8 @@ const struct ddsi_serdata_ops dds_serdata_ops_xcdr2 = {
.untyped_to_sample = serdata_default_untyped_to_sample_cdr,
.print = serdata_default_print_cdr,
.get_keyhash = serdata_default_get_keyhash,
.get_sequencenumber = serdata_default_get_sequencenumber
.get_sequencenumber = serdata_default_get_sequencenumber,
.get_writer_guid = serdata_default_get_writer_guid
#ifdef DDS_HAS_SHM
, .get_sample_size = ddsi_serdata_iox_size
, .from_iox_buffer = serdata_default_from_iox
Expand All @@ -880,7 +888,8 @@ const struct ddsi_serdata_ops dds_serdata_ops_cdr_nokey = {
.untyped_to_sample = serdata_default_untyped_to_sample_cdr_nokey,
.print = serdata_default_print_cdr,
.get_keyhash = serdata_default_get_keyhash,
.get_sequencenumber = serdata_default_get_sequencenumber
.get_sequencenumber = serdata_default_get_sequencenumber,
.get_writer_guid = serdata_default_get_writer_guid
#ifdef DDS_HAS_SHM
, .get_sample_size = ddsi_serdata_iox_size
, .from_iox_buffer = serdata_default_from_iox_nokey
Expand All @@ -903,7 +912,8 @@ const struct ddsi_serdata_ops dds_serdata_ops_xcdr2_nokey = {
.untyped_to_sample = serdata_default_untyped_to_sample_cdr_nokey,
.print = serdata_default_print_cdr,
.get_keyhash = serdata_default_get_keyhash,
.get_sequencenumber = serdata_default_get_sequencenumber
.get_sequencenumber = serdata_default_get_sequencenumber,
.get_writer_guid = serdata_default_get_writer_guid
#ifdef DDS_HAS_SHM
, .get_sample_size = ddsi_serdata_iox_size
, .from_iox_buffer = serdata_default_from_iox_nokey
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/tests/cdr.c
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,8 @@ static const struct ddsi_serdata_ops sd_ops = {
.untyped_to_sample = sd_untyped_to_sample,
.print = sd_print,
.get_keyhash = sd_get_keyhash,
.get_sequencenumber = 0
.get_sequencenumber = 0,
.get_writer_guid = 0
};

/*----------------------------------------------------------------
Expand Down
10 changes: 10 additions & 0 deletions src/core/ddsi/include/dds/ddsi/ddsi_serdata.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct ddsi_serdata {
ddsrt_wctime_t timestamp;
uint32_t statusinfo;
ddsi_seqno_t sequence_number;
ddsi_guid_t writer_guid;

/* FIXME: can I get rid of this one? */
ddsrt_mtime_t twrite; /* write time, not source timestamp, set post-throttling */
Expand Down Expand Up @@ -152,6 +153,9 @@ typedef void (*ddsi_serdata_get_keyhash_t) (const struct ddsi_serdata *d, struct
/* Sequence number of the sample as advertised by the publisher */
typedef uint64_t (*ddsi_serdata_get_sequencenumber_t) (const struct ddsi_serdata *d);

/* Get the reference to the guid of the writer that published the serdata */
typedef ddsi_guid_t * (*ddsi_serdata_get_writer_guid_t) (const struct ddsi_serdata *d);

#ifdef DDS_HAS_SHM
typedef uint32_t(*ddsi_serdata_iox_size_t) (const struct ddsi_serdata* d);

Expand Down Expand Up @@ -180,6 +184,7 @@ struct ddsi_serdata_ops {
ddsi_serdata_print_t print;
ddsi_serdata_get_keyhash_t get_keyhash;
ddsi_serdata_get_sequencenumber_t get_sequencenumber;
ddsi_serdata_get_writer_guid_t get_writer_guid;
#ifdef DDS_HAS_SHM
ddsi_serdata_iox_size_t get_sample_size;
ddsi_serdata_from_iox_t from_iox_buffer;
Expand Down Expand Up @@ -250,6 +255,11 @@ DDS_INLINE_EXPORT inline uint64_t ddsi_serdata_sequencenumber(const struct ddsi_
return d->ops->get_sequencenumber (d);
}

/** @component typesupport_if */
DDS_INLINE_EXPORT inline ddsi_guid_t *ddsi_serdata_writer_guid(const struct ddsi_serdata *d) {
return d->ops->get_writer_guid (d);
}

DDS_INLINE_EXPORT inline struct ddsi_serdata *ddsi_serdata_from_ser (const struct ddsi_sertype *type, enum ddsi_serdata_kind kind, const struct ddsi_rdata *fragchain, size_t size) {
return type->serdata_ops->from_ser (type, kind, fragchain, size);
}
Expand Down
40 changes: 13 additions & 27 deletions src/core/ddsi/src/ddsi_receive.c
Original file line number Diff line number Diff line change
Expand Up @@ -1970,15 +1970,9 @@ static int handle_Gap (struct ddsi_receiver_state *rst, ddsrt_etime_t tnow, stru
return 1;
}

static struct ddsi_serdata *get_serdata (struct ddsi_sertype const * const type, const struct ddsi_rdata *fragchain, uint32_t sz, int justkey, unsigned statusinfo, ddsrt_wctime_t tstamp, ddsi_seqno_t seq_no)
static struct ddsi_serdata *get_serdata (struct ddsi_sertype const * const type, const struct ddsi_rdata *fragchain, uint32_t sz, int justkey)
{
struct ddsi_serdata *sd = ddsi_serdata_from_ser (type, justkey ? SDK_KEY : SDK_DATA, fragchain, sz);
if (sd)
{
sd->statusinfo = statusinfo;
sd->timestamp = tstamp;
sd->sequence_number = seq_no;
}
return sd;
}

Expand All @@ -2004,25 +1998,24 @@ static struct ddsi_serdata *remote_make_sample (struct ddsi_tkmap_instance **tk,
const ddsi_plist_t * __restrict qos = si->qos;
const char *failmsg = NULL;
struct ddsi_serdata *sample = NULL;
const struct ddsi_proxy_writer *pwr = sampleinfo->pwr;
ddsi_seqno_t seq = sampleinfo->seq;
ddsi_guid_t guid;

if (pwr) guid = pwr->e.guid; else memset (&guid, 0, sizeof (guid));
if (si->statusinfo == 0)
{
/* normal write */
if (!(data_smhdr_flags & DDSI_DATA_FLAG_DATAFLAG) || sampleinfo->size == 0)
{
const struct ddsi_proxy_writer *pwr = sampleinfo->pwr;
ddsi_guid_t guid;
/* pwr can't currently be null, but that might change some day, and this being
an error path, it doesn't hurt to survive that */
if (pwr) guid = pwr->e.guid; else memset (&guid, 0, sizeof (guid));
DDS_CTRACE (&gv->logconfig,
"data(application, vendor %u.%u): "PGUIDFMT" #%"PRIu64": write without proper payload (data_smhdr_flags 0x%x size %"PRIu32")\n",
sampleinfo->rst->vendor.id[0], sampleinfo->rst->vendor.id[1],
PGUID (guid), sampleinfo->seq,
si->data_smhdr_flags, sampleinfo->size);
return NULL;
}
sample = get_serdata (type, fragchain, sampleinfo->size, 0, statusinfo, tstamp, sampleinfo->seq);
sample = get_serdata (type, fragchain, sampleinfo->size, 0);
}
else if (sampleinfo->size)
{
Expand All @@ -2031,12 +2024,12 @@ static struct ddsi_serdata *remote_make_sample (struct ddsi_tkmap_instance **tk,
as one would expect to receive */
if (data_smhdr_flags & DDSI_DATA_FLAG_KEYFLAG)
{
sample = get_serdata (type, fragchain, sampleinfo->size, 1, statusinfo, tstamp, sampleinfo->seq);
sample = get_serdata (type, fragchain, sampleinfo->size, 1);
}
else
{
assert (data_smhdr_flags & DDSI_DATA_FLAG_DATAFLAG);
sample = get_serdata (type, fragchain, sampleinfo->size, 0, statusinfo, tstamp, sampleinfo->seq);
sample = get_serdata (type, fragchain, sampleinfo->size, 0);
}
}
else if (data_smhdr_flags & DDSI_DATA_FLAG_INLINE_QOS)
Expand All @@ -2055,12 +2048,6 @@ static struct ddsi_serdata *remote_make_sample (struct ddsi_tkmap_instance **tk,
}
else if ((sample = ddsi_serdata_from_keyhash (type, &qos->keyhash)) == NULL)
failmsg = "keyhash is MD5 and can't be converted to key value";
else
{
sample->statusinfo = statusinfo;
sample->timestamp = tstamp;
sample->sequence_number = sampleinfo->seq;
}
}
else
{
Expand All @@ -2069,9 +2056,6 @@ static struct ddsi_serdata *remote_make_sample (struct ddsi_tkmap_instance **tk,
if (sample == NULL)
{
/* No message => error out */
const struct ddsi_proxy_writer *pwr = sampleinfo->pwr;
ddsi_guid_t guid;
if (pwr) guid = pwr->e.guid; else memset (&guid, 0, sizeof (guid));
DDS_CWARNING (&gv->logconfig,
"data(application, vendor %u.%u): "PGUIDFMT" #%"PRIu64": deserialization %s/%s failed (%s)\n",
sampleinfo->rst->vendor.id[0], sampleinfo->rst->vendor.id[1],
Expand All @@ -2081,21 +2065,23 @@ static struct ddsi_serdata *remote_make_sample (struct ddsi_tkmap_instance **tk,
}
else
{
sample->statusinfo = statusinfo;
sample->timestamp = tstamp;
sample->sequence_number = seq;
memcpy(&sample->writer_guid, &guid, sizeof(sample->writer_guid));

if ((*tk = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, sample)) == NULL)
{
ddsi_serdata_unref (sample);
sample = NULL;
}
else if (gv->logconfig.c.mask & DDS_LC_TRACE)
{
const struct ddsi_proxy_writer *pwr = sampleinfo->pwr;
ddsi_guid_t guid;
char tmp[1024];
size_t res = 0;
tmp[0] = 0;
if (gv->logconfig.c.mask & DDS_LC_CONTENT)
res = ddsi_serdata_print (sample, tmp, sizeof (tmp));
if (pwr) guid = pwr->e.guid; else memset (&guid, 0, sizeof (guid));
GVTRACE ("data(application, vendor %u.%u): "PGUIDFMT" #%"PRIu64": ST%"PRIx32" %s/%s:%s%s",
sampleinfo->rst->vendor.id[0], sampleinfo->rst->vendor.id[1],
PGUID (guid), sampleinfo->seq, statusinfo,
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsi/src/ddsi_serdata_cdr.c
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_cdr = {
.untyped_to_sample = serdata_cdr_untyped_to_sample_cdr,
.print = serdata_cdr_print_cdr,
.get_keyhash = 0,
.get_sequencenumber = 0
.get_sequencenumber = 0,
.get_writer_guid = 0
#ifdef DDS_HAS_SHM
, .get_sample_size = 0
, .from_iox_buffer = 0
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsi/src/ddsi_serdata_plist.c
Original file line number Diff line number Diff line change
Expand Up @@ -326,5 +326,6 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_plist = {
.untyped_to_sample = serdata_plist_untyped_to_sample,
.print = serdata_plist_print_plist,
.get_keyhash = serdata_plist_get_keyhash,
.get_sequencenumber = 0
.get_sequencenumber = 0,
.get_writer_guid = 0
};
3 changes: 2 additions & 1 deletion src/core/ddsi/src/ddsi_serdata_pserop.c
Original file line number Diff line number Diff line change
Expand Up @@ -305,5 +305,6 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_pserop = {
.untyped_to_sample = serdata_pserop_untyped_to_sample,
.print = serdata_pserop_print_pserop,
.get_keyhash = serdata_pserop_get_keyhash,
.get_sequencenumber = 0
.get_sequencenumber = 0,
.get_writer_guid = 0
};

0 comments on commit 01409cd

Please sign in to comment.