Skip to content

Commit

Permalink
[zenohex_nif] Support binary pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
pojiro committed Jan 22, 2024
1 parent 814bb18 commit f848eb9
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 4 deletions.
2 changes: 1 addition & 1 deletion lib/zenohex/nif.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Zenohex.Nif do
:erlang.nif_error(:nif_not_loaded)
end

for type <- ["string", "integer", "float"] do
for type <- ["string", "integer", "float", "binary"] do
def unquote(:"publisher_put_#{type}")(_publisher, _value) do
:erlang.nif_error(:nif_not_loaded)
end
Expand Down
19 changes: 17 additions & 2 deletions native/zenohex_nif/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::borrow::Cow;
use std::io::Write;
use std::sync::Arc;
use std::time::Duration;

use flume::Receiver;
use rustler::types::atom;
use rustler::{thread, Encoder};
use rustler::{thread, Binary, Encoder, OwnedBinary};
use rustler::{Atom, Env, ResourceArc, Term};
use zenoh::prelude::sync::*;
use zenoh::{publication::Publisher, sample::Sample, subscriber::Subscriber, Session};
Expand Down Expand Up @@ -67,6 +69,11 @@ fn publisher_put_float(resource: ResourceArc<ExPublisherRef>, value: f64) -> Ato
publisher_put_impl(resource, value)
}

#[rustler::nif]
fn publisher_put_binary(resource: ResourceArc<ExPublisherRef>, value: Binary) -> Atom {
publisher_put_impl(resource, Value::from(value.as_slice()))
}

fn publisher_put_impl<T: Into<zenoh::value::Value>>(
resource: ResourceArc<ExPublisherRef>,
value: T,
Expand Down Expand Up @@ -106,7 +113,14 @@ fn subscriber_recv_timeout(
fn to_term<'a>(sample: &Sample, env: Env<'a>) -> Term<'a> {
match sample.value.encoding.prefix() {
KnownEncoding::Empty => todo!(),
KnownEncoding::AppOctetStream => todo!(),
KnownEncoding::AppOctetStream => match Cow::try_from(&sample.value) {
Ok(value) => {
let mut binary = OwnedBinary::new(value.len()).unwrap();
binary.as_mut_slice().write_all(&value).unwrap();
binary.release(env).encode(env)
}
Err(_err) => atom::error().encode(env),
},
KnownEncoding::AppCustom => todo!(),
KnownEncoding::TextPlain => match String::try_from(&sample.value) {
Ok(value) => value.encode(env),
Expand Down Expand Up @@ -155,6 +169,7 @@ rustler::init!(
publisher_put_string,
publisher_put_integer,
publisher_put_float,
publisher_put_binary,
declare_subscriber,
subscriber_recv_timeout
],
Expand Down
30 changes: 29 additions & 1 deletion test/zenohex/nif_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ defmodule Zenohex.NifTest do
assert is_reference(Nif.declare_publisher(session, "key/expression"))
end

for {type, value} <- [{"string", "value"}, {"integer", 0}, {"float", 0.0}] do
for {type, value} <- [
{"string", "value"},
{"integer", 0},
{"float", 0.0},
{"binary", :erlang.term_to_binary("binary")}
] do
test "publisher_put_#{type}/2", %{session: session} do
type = unquote(type)
value = unquote(value)
Expand Down Expand Up @@ -50,7 +55,30 @@ defmodule Zenohex.NifTest do
Nif.publisher_put_float(publisher, 0.0)
assert Nif.subscriber_recv_timeout(subscriber, 1000) == 0.0

Nif.publisher_put_binary(publisher, :erlang.term_to_binary("binary"))
assert Nif.subscriber_recv_timeout(subscriber, 1000) == :erlang.term_to_binary("binary")

assert Nif.subscriber_recv_timeout(subscriber, 1000) == :timeout
end
end

describe "binary pub/sub" do
setup context do
key_expr = "key/expression"
publisher = Nif.declare_publisher(context.session, key_expr)
subscriber = Nif.declare_subscriber(context.session, key_expr)
%{publisher: publisher, subscriber: subscriber}
end

for {test_name, binary} <- [
{"empty binary", ""},
{"erlang term binary", :erlang.term_to_binary(%URI{})}
] do
test "#{test_name}", %{publisher: publisher, subscriber: subscriber} do
binary = unquote(binary)
Nif.publisher_put_binary(publisher, binary)
assert Nif.subscriber_recv_timeout(subscriber, 1000) == binary
end
end
end
end

0 comments on commit f848eb9

Please sign in to comment.