Skip to content

Commit

Permalink
Merge pull request #42 from zendesk/ben/pk_6
Browse files Browse the repository at this point in the history
primary key strings!
  • Loading branch information
Ben Osheroff committed Apr 9, 2015
2 parents 0e60471 + 8bea745 commit a826d20
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 12 deletions.
12 changes: 12 additions & 0 deletions docs/docs/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ Maxwell sets the following Kafka options by default, but you can override them i

Maxwell writes to a kafka topic named "maxwell" by default. This can be changed with the `kafka_topic` option.

### Kafka key

Maxwell generates keys for its Kafka messages based upon a mysql row's primary key:

```
db:test_db/tbl:test_tbl/id:93/id_2:17910090
```

This key is designed to co-operate with Kafka's log compaction, which will save the last-known
value for a key, allowing Maxwell's Kafka stream to retain the last-known value for a row and act
as a source of truth.

### Topic and partitioning

Maxwell enforces ordering on events within a logical mysql database (but not within a mysql server). We enforce
Expand Down
11 changes: 11 additions & 0 deletions docs/site/kafka/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
<li class="toctree-l3"><a href="#kafka-options">Kafka options</a></li>


<li class="toctree-l3"><a href="#kafka-key">Kafka key</a></li>


<li class="toctree-l3"><a href="#topic-and-partitioning">Topic and partitioning</a></li>


Expand All @@ -92,6 +95,14 @@ <h3 id="kafka-options">Kafka options</h3>
<li>kafka.compression.type = gzip</li>
</ul>
<p>Maxwell writes to a kafka topic named "maxwell" by default. This can be changed with the <code>kafka_topic</code> option.</p>
<h3 id="kafka-key">Kafka key</h3>
<p>Maxwell generates keys for its Kafka messages based upon a mysql row's primary key:</p>
<pre><code>db:test_db/tbl:test_tbl/id:93/id_2:17910090
</code></pre>

<p>This key is designed to co-operate with Kafka's log compaction, which will save the last-known
value for a key, allowing Maxwell's Kafka stream to retain the last-known value for a row and act
as a source of truth.</p>
<h3 id="topic-and-partitioning">Topic and partitioning</h3>
<p>Maxwell enforces ordering on events within a logical mysql database (but not within a mysql server). We enforce
this ordering by choosing a kafka partition based on an event's database name (<code>dbName.hashCode() % numPartitions</code>).
Expand Down
31 changes: 31 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellAbstractRowsEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.json.JSONObject;

import com.google.code.or.binlog.BinlogEventV4Header;
Expand Down Expand Up @@ -242,4 +243,34 @@ public List<String> toJSONStrings() {
}
return list;
}

public List<String> getPKStrings() {
ArrayList<String> list = new ArrayList<>();

for ( Row r : filteredRows()) {
list.add(new MaxwelllPKJSONObject(table.getDatabase().getName(),
table.getName(),
getPKMapForRow(r)).toString());
}

return list;
}

private Map<String, Object> getPKMapForRow(Row r) {
HashMap<String, Object> map = new HashMap<>();

if ( table.getPKList().isEmpty() ) {
map.put("_uuid", java.util.UUID.randomUUID().toString());
}

for ( String pk : table.getPKList() ) {
int idx = table.findColumnIndex(pk);

Column column = r.getColumns().get(idx);
ColumnDef def = table.getColumnList().get(idx);

map.put(pk, def.asJSON(column.getValue()));
}
return map;
}
}
44 changes: 44 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwelllPKJSONObject.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.zendesk.maxwell;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;

import org.json.JSONObject;

public class MaxwelllPKJSONObject extends JSONObject {
private final ArrayList<String> pkKeys;

public MaxwelllPKJSONObject(String db, String table, Map<String, Object> pks) {
super();

this.put("database", db);
this.put("table", table);
this.pkKeys = new ArrayList<String>();

ArrayList<String> sortedPKs = new ArrayList<String>(pks.keySet());
Collections.sort(sortedPKs);

for ( String pk : sortedPKs ) {
String prefixed = "pk." + pk;
this.pkKeys.add(prefixed);
this.put(prefixed, pks.get(pk));
}
}

@Override
public Set<String> keySet() {
LinkedHashSet<String> set = new LinkedHashSet<>();

set.add("database");
set.add("table");
for ( String key : this.pkKeys ) {
set.add(key);
}

return set;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ class KafkaCallback implements Callback {
private final MaxwellConfig config;
private final MaxwellAbstractRowsEvent event;
private final String json;
private final String key;
private final boolean lastRowInEvent;

public KafkaCallback(MaxwellAbstractRowsEvent e, MaxwellConfig c, String json, boolean lastRowInEvent) {
public KafkaCallback(MaxwellAbstractRowsEvent e, MaxwellConfig c, String key, String json, boolean lastRowInEvent) {
this.config = c;
this.event = e;
this.key = key;
this.json = json;
this.lastRowInEvent = lastRowInEvent;
}
Expand All @@ -37,7 +39,7 @@ public void onCompletion(RecordMetadata md, Exception e) {
} else {
try {
if ( LOGGER.isDebugEnabled()) {
LOGGER.debug("-> topic:" + md.topic() + ", partition:" +md.partition() + ", offset:" + md.offset());
LOGGER.debug("-> key:" + key + ", partition:" +md.partition() + ", offset:" + md.offset());
LOGGER.debug(" " + this.json);
LOGGER.debug(" " + event.getNextBinlogPosition());
LOGGER.debug("");
Expand Down Expand Up @@ -69,12 +71,6 @@ public MaxwellKafkaProducer(MaxwellConfig config, Properties kafkaProperties, St
this.numPartitions = kafka.partitionsFor(topic).size(); //returns 1 for new topics
}

public String kafkaKey(MaxwellAbstractRowsEvent e) {
String db = e.getDatabase().getName();
String table = e.getTable().getName();
return db + "/" + table;
}

public int kafkaPartition(MaxwellAbstractRowsEvent e){
String db = e.getDatabase().getName();
return Math.abs(db.hashCode() % numPartitions);
Expand All @@ -83,12 +79,16 @@ public int kafkaPartition(MaxwellAbstractRowsEvent e){
@Override
public void push(MaxwellAbstractRowsEvent e) throws Exception {
Iterator<String> i = e.toJSONStrings().iterator();
while ( i.hasNext() ) {
Iterator<String> j = e.getPKStrings().iterator();

while ( i.hasNext() && j.hasNext() ) {
String json = i.next();
String key = j.next();

ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(topic, kafkaPartition(e), kafkaKey(e).getBytes(), json.getBytes());
new ProducerRecord<>(topic, kafkaPartition(e), key.getBytes(), json.getBytes());

kafka.send(record, new KafkaCallback(e, this.config, json, !i.hasNext()));
kafka.send(record, new KafkaCallback(e, this.config, key, json, !i.hasNext()));
}

}
Expand Down
10 changes: 10 additions & 0 deletions src/test/java/com/zendesk/maxwell/MaxwellIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ public void testGetEvent() throws Exception {
assertThat(list.get(0).toSQL(), is("REPLACE INTO `minimal` (`id`, `account_id`, `text_field`) VALUES (1,1,'hello')"));
}

@Test
public void testPrimaryKeyStrings() throws Exception {
List<MaxwellAbstractRowsEvent> list;
String input[] = {"insert into minimal set account_id =1, text_field='hello'"};
String expectedJSON = "{\"database\":\"shard_1\",\"table\":\"minimal\",\"pk.id\":1,\"pk.text_field\":\"hello\"}";
list = getRowsForSQL(null, input);
assertThat(list.size(), is(1));
assertThat(StringUtils.join(list.get(0).getPKStrings(), ""), is(expectedJSON));
}

@Test
public void testRowFilter() throws Exception {
List<MaxwellAbstractRowsEvent> list;
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/sql/schema/minimal.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ CREATE TABLE `minimal` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_id` int(11) NOT NULL,
`text_field` varchar(96),
PRIMARY KEY (id)
PRIMARY KEY (id, text_field)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

0 comments on commit a826d20

Please sign in to comment.