Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): allow unrecognized encoding #849

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/cli/mcap/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ To install from the latest commit, use

go install github.com/foxglove/mcap/go/cli/mcap@latest

Ensure that the go installation directory is in your path (e.g. `$HOME/go/bin` by default on Ubuntu).

### Examples:

#### Bag to mcap conversion
Expand Down
106 changes: 64 additions & 42 deletions go/cli/mcap/cmd/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/foxglove/mcap/go/cli/mcap/utils/ros"
"github.com/foxglove/mcap/go/mcap"
"github.com/foxglove/mcap/go/mcap/readopts"
"github.com/fxamacker/cbor/v2"
"github.com/spf13/cobra"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -109,57 +110,78 @@ func printMessages(
}
continue
}
switch schema.Encoding {
case "ros1msg":
transcoder, ok := transcoders[channel.SchemaID]
if !ok {
packageName := strings.Split(schema.Name, "/")[0]
transcoder, err = ros.NewJSONTranscoder(packageName, schema.Data)
if schema == nil || schema.Encoding == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jon-chuang is the idea with this patch to split out the change for supporting nil schemas, from support for CBOR? It seems like it still depends on the CBOR change, so I don't understand how it's different from the other patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this PR is building on the previous patch. The main change is handling unrecognized encoding by writing the raw data bytes insted of throwing an error. I'll convert this to a draft, it should be clearer once that PR is merged.

switch channel.MessageEncoding {
case "cbor":
var v map[string]interface{}
if err = cbor.Unmarshal(message.Data, &v); err != nil {
return fmt.Errorf("failed to decode cbor: %w", err)
}
encoded, err := json.Marshal(v)
if err != nil {
return fmt.Errorf("failed to build transcoder for %s: %w", channel.Topic, err)
return fmt.Errorf("failed to encode object into json: %w", err)
}
if _, err = msg.Write(encoded); err != nil {
return fmt.Errorf("failed to write message bytes: %w", err)
}
// Default encompasses json, which is encoded as UTF8 bytes
default:
if _, err = msg.Write(message.Data); err != nil {
return fmt.Errorf("failed to write message bytes: %w", err)
}
transcoders[channel.SchemaID] = transcoder
}
msgReader.Reset(message.Data)
err = transcoder.Transcode(msg, msgReader)
if err != nil {
return fmt.Errorf("failed to transcode %s record on %s: %w", schema.Name, channel.Topic, err)
}
case "protobuf":
messageDescriptor, ok := descriptors[channel.SchemaID]
if !ok {
fileDescriptorSet := &descriptorpb.FileDescriptorSet{}
if err := proto.Unmarshal(schema.Data, fileDescriptorSet); err != nil {
return fmt.Errorf("failed to build file descriptor set: %w", err)
} else {
switch schema.Encoding {
case "ros1msg":
transcoder, ok := transcoders[channel.SchemaID]
if !ok {
packageName := strings.Split(schema.Name, "/")[0]
transcoder, err = ros.NewJSONTranscoder(packageName, schema.Data)
if err != nil {
return fmt.Errorf("failed to build transcoder for %s: %w", channel.Topic, err)
}
transcoders[channel.SchemaID] = transcoder
}
files, err := protodesc.FileOptions{}.NewFiles(fileDescriptorSet)
msgReader.Reset(message.Data)
err = transcoder.Transcode(msg, msgReader)
if err != nil {
return fmt.Errorf("failed to create file descriptor: %w", err)
return fmt.Errorf("failed to transcode %s record on %s: %w", schema.Name, channel.Topic, err)
}
case "protobuf":
messageDescriptor, ok := descriptors[channel.SchemaID]
if !ok {
fileDescriptorSet := &descriptorpb.FileDescriptorSet{}
if err := proto.Unmarshal(schema.Data, fileDescriptorSet); err != nil {
return fmt.Errorf("failed to build file descriptor set: %w", err)
}
files, err := protodesc.FileOptions{}.NewFiles(fileDescriptorSet)
if err != nil {
return fmt.Errorf("failed to create file descriptor: %w", err)
}
descriptor, err := files.FindDescriptorByName(protoreflect.FullName(schema.Name))
if err != nil {
return fmt.Errorf("failed to find descriptor: %w", err)
}
messageDescriptor = descriptor.(protoreflect.MessageDescriptor)
descriptors[channel.SchemaID] = messageDescriptor
}
protoMsg := dynamicpb.NewMessage(messageDescriptor.(protoreflect.MessageDescriptor))
if err := proto.Unmarshal(message.Data, protoMsg); err != nil {
return fmt.Errorf("failed to parse message: %w", err)
}
descriptor, err := files.FindDescriptorByName(protoreflect.FullName(schema.Name))
bytes, err := protojson.Marshal(protoMsg)
if err != nil {
return fmt.Errorf("failed to find descriptor: %w", err)
return fmt.Errorf("failed to marshal message: %w", err)
}
if _, err = msg.Write(bytes); err != nil {
return fmt.Errorf("failed to write message bytes: %w", err)
}
// Default encompasses json, which is encoded as UTF8 bytes
default:
if _, err = msg.Write(message.Data); err != nil {
return fmt.Errorf("failed to write message bytes: %w", err)
}
messageDescriptor = descriptor.(protoreflect.MessageDescriptor)
descriptors[channel.SchemaID] = messageDescriptor
}
protoMsg := dynamicpb.NewMessage(messageDescriptor.(protoreflect.MessageDescriptor))
if err := proto.Unmarshal(message.Data, protoMsg); err != nil {
return fmt.Errorf("failed to parse message: %w", err)
}
bytes, err := protojson.Marshal(protoMsg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
if _, err = msg.Write(bytes); err != nil {
return fmt.Errorf("failed to write message bytes: %w", err)
}
case "jsonschema":
if _, err = msg.Write(message.Data); err != nil {
return fmt.Errorf("failed to write message bytes: %w", err)
}
default:
return fmt.Errorf("JSON output only supported for ros1msg, protobuf, and JSON schemas")
}
target.Topic = channel.Topic
target.Sequence = message.Sequence
Expand Down
2 changes: 2 additions & 0 deletions go/cli/mcap/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
cloud.google.com/go/iam v0.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/fxamacker/cbor/v2 v2.4.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.8 // indirect
Expand All @@ -46,6 +47,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opencensus.io v0.23.0 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/oauth2 v0.0.0-20220630143837-2104d58473e0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go/cli/mcap/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWp
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/fxamacker/cbor/v2 v2.4.0 h1:ri0ArlOR+5XunOP8CRUowT0pSJOwhW098ZCUyskZD88=
github.com/fxamacker/cbor/v2 v2.4.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
Expand Down Expand Up @@ -457,6 +459,8 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69
github.com/subosito/gotenv v1.4.0 h1:yAzM1+SmVcz5R4tXGsNMu1jUl2aOJXoiWUCEwwnGrvs=
github.com/subosito/gotenv v1.4.0/go.mod h1:mZd6rFysKEcUhUHXJk0C/08wAgyDBFuwEYL7vWWGaGo=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
81 changes: 79 additions & 2 deletions python/mcap/tests/test_writer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import contextlib
import json
from io import BytesIO
from tempfile import TemporaryFile
from tempfile import NamedTemporaryFile, TemporaryFile
from typing import List
import zlib
import cbor2
import subprocess

import lz4.frame
import pytest
Expand Down Expand Up @@ -42,7 +44,7 @@ def generate_sample_data(compression: CompressionType):
writer.add_message(
channel_id=channel_id,
log_time=0,
data=json.dumps({"sample": "test"}).encode("utf-8"),
data=json.dumps({"sample": "test"}).encode(encoding="utf-8"),
publish_time=0,
)

Expand Down Expand Up @@ -115,3 +117,78 @@ def test_out_of_order_messages():
chunk_index = next(r for r in records if isinstance(r, ChunkIndex))
assert chunk_index.message_start_time == 0
assert chunk_index.message_end_time == 100

@pytest.mark.parametrize(
"null_schema,encoding", [(True, "cbor"), (False, "cbor"), (True, "json"), (True, "unknown")]
)
def test_generate_sample_schemaless_data(null_schema: bool, encoding: str):
file = NamedTemporaryFile("w+b")
writer = Writer(file, compression=CompressionType.ZSTD)
writer.start(library="test")
schema_id = writer.register_schema(
name="sample",
encoding="",
data="".encode()
)

channel_id = writer.register_channel(
schema_id=0 if null_schema else schema_id,
topic="sample_topic",
message_encoding=encoding,
)
if encoding == "cbor":
writer.add_message(
channel_id=channel_id,
log_time=0,
data=cbor2.dumps({"sample": "test"}),
publish_time=0,
)
elif encoding == "json":
writer.add_message(
channel_id=channel_id,
log_time=0,
data=json.dumps({"sample": "test"}).encode("utf-8"),
publish_time=0,
)
else:
# Unrecognized encoding gets encoded as bytes
writer.add_message(
channel_id=channel_id,
log_time=0,
data=b'{"sample": "test"}',
publish_time=0,
)

writer.finish()
file.seek(0)
result = subprocess.run(['mcap', 'cat', '--json', file.name], stdout=subprocess.PIPE)
assert result.stdout == b'{"topic":"sample_topic","sequence":0,"log_time":0.000000000,"publish_time":0.000000000,"data":{"sample":"test"}}\n'


def test_generate_unrecognized_encoding():
file = NamedTemporaryFile("w+b")
writer = Writer(file, compression=CompressionType.ZSTD)
writer.start(library="test")
schema_id = writer.register_schema(
name="sample",
encoding="unrecognized",
data="".encode()
)

channel_id = writer.register_channel(
schema_id=schema_id,
topic="sample_topic",
message_encoding="unknown",
)
# Unrecognized encoding will print raw bytes
writer.add_message(
channel_id=channel_id,
log_time=0,
data=b'{"sample": "test"}',
publish_time=0,
)

writer.finish()
file.seek(0)
result = subprocess.run(['mcap', 'cat', '--json', file.name], stdout=subprocess.PIPE)
assert result.stdout == b'{"topic":"sample_topic","sequence":0,"log_time":0.000000000,"publish_time":0.000000000,"data":{"sample":"test"}}\n'