Skip to content

Commit

Permalink
Remove avro kafka schema registry leftovers
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Oct 12, 2023
1 parent 993ec29 commit ed24a4e
Showing 1 changed file with 4 additions and 50 deletions.
54 changes: 4 additions & 50 deletions tremor-codec/src/codec/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,35 +58,6 @@ const AVRO_BUFFER_CAP: usize = 512;
#[derive(Clone, Debug, Default)]
struct AvroRegistry {
by_name: HashMap<Name, Schema>,
by_id: HashMap<u32, Schema>,
registry_url: Option<String>,
}

impl AvroRegistry {
fn get_schema_by_id(&self, id: u32) -> Option<&Schema> {
self.by_id.get(&id)
}

async fn maybe_fetch_id(&mut self, id: u32) -> Result<()> {
if self.by_id.contains_key(&id) {
return Ok(());
}
if let Some(url) = self.registry_url.as_ref() {
let schema = String::from_utf8(
reqwest::get(&format!("{url}/schemas/ids/{id}"))
.await?
.bytes()
.await?
.to_vec(),
)?;
let schema = Schema::parse_str(&schema)?;
if let Some(name) = schema.name().cloned() {
self.by_name.insert(name, schema.clone());
}
self.by_id.insert(id, schema);
}
Ok(())
}
}

#[derive(Clone, Debug)]
Expand All @@ -108,10 +79,7 @@ impl Avro {
Some(c) => return Err(format!("Unknown compression codec: {c}").into()),
};

let mut registry = AvroRegistry {
registry_url: config.get_str("registry").map(ToString::to_string),
..AvroRegistry::default()
};
let mut registry = AvroRegistry::default();

match config.get("schema") {
Some(schema) => {
Expand Down Expand Up @@ -356,30 +324,16 @@ impl Codec for Avro {
_ingest_ns: u64,
meta: Value<'input>,
) -> Result<Option<(Value<'input>, Value<'input>)>> {
let schema = if let Some(schema_id) = meta.get_u32("schema_id") {
self.registry.maybe_fetch_id(schema_id).await?;
self.registry
.get_schema_by_id(schema_id)
.ok_or_else(|| format!("No schema found for id {schema_id} in registry"))?
} else {
&self.schema
};
let schema = &self.schema;

let reader = Reader::with_schema(schema, &*data)?;

let mut vals = reader.map(|v| avro_to_value(v?));
vals.next().map(|v| v.map(|v| (v, meta))).transpose()
}

async fn encode(&mut self, data: &Value, meta: &Value) -> Result<Vec<u8>> {
let schema = if let Some(schema_id) = meta.get_u32("schema_id") {
self.registry.maybe_fetch_id(schema_id).await?;
self.registry
.get_schema_by_id(schema_id)
.ok_or_else(|| format!("No schema found for id {schema_id} in registry"))?
} else {
&self.schema
};
async fn encode(&mut self, data: &Value, _meta: &Value) -> Result<Vec<u8>> {
let schema = &self.schema;
let mut writer = Writer::with_codec(
schema,
Vec::with_capacity(AVRO_BUFFER_CAP),
Expand Down

0 comments on commit ed24a4e

Please sign in to comment.