Skip to content
This repository has been archived by the owner on Aug 10, 2021. It is now read-only.

Commit

Permalink
Now honors deletes, and fixed bug in backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh Kuhn committed Oct 3, 2014
1 parent d8af4e9 commit 5a24a56
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 18 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-river-rethinkdb</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>Elasticsearch RethinkDB River plugin</name>
<description>A River for pulling indexing RethinkDB databases</description>
<description>A River for indexing RethinkDB databases and staying synced</description>
<url>https://github.com/rethinkdb/elasticsearch-river-rethinkdb/</url>
<inceptionYear>2014</inceptionYear>
<licenses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@


public class ChangeRecord {
private final RethinkDB r = RethinkDB.r;
public final RqlQuery query;
public final String table;
public final String db;
public final boolean backfill;
Expand All @@ -21,7 +19,6 @@ public ChangeRecord(String db, String table, Map<String, Object> options){
this.backfill = (boolean) options.getOrDefault("backfill", false);
this.targetIndex = (String) options.getOrDefault("index", db);
this.targetType = (String) options.getOrDefault("type", table);
query = r.table(table).changes().field("new_val");
}

@Override
Expand Down
46 changes: 33 additions & 13 deletions src/main/java/org/elasticsearch/river/rethinkdb/FeedWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,18 @@ public void run() {
primaryKey = getPrimaryKey();
while (!river.closed) {
try {
cursor = changeRecord.query.runForCursor(connection);
cursor = r.table(changeRecord.table).changes().runForCursor(connection);
if (backfillRequired) {
backfill();
}
int counter = 0;
while (cursor.hasNext()) {
Map<String, Object> change = cursor.next();
updateES(change);
counter++;
if(counter % 10 == 1){
if(counter % 10 == 0){
logger.info("Synced {} documents", counter);
}
client.prepareIndex(
changeRecord.targetIndex,
changeRecord.targetType,
(String) change.get(primaryKey))
.setSource(change)
.execute();
}
} catch (RethinkDBException e) {
logger.error("Worker has a problem: " + e.getMessage());
Expand All @@ -106,6 +101,27 @@ public void run() {
}
}

private boolean updateES(Map<String, Object> change) {
Map<String, Object> newVal = (Map) change.get("new_val");
Map<String, Object> oldVal = (Map) change.get("old_val");
if(newVal != null) {
client.prepareIndex(
changeRecord.targetIndex,
changeRecord.targetType,
(String) newVal.get(primaryKey))
.setSource(newVal)
.execute();
return false;
}else{
client.prepareDelete(
changeRecord.targetIndex,
changeRecord.targetType,
(String) oldVal.get(primaryKey))
.execute();
return true;
}
}

private void backfill() throws IOException {
RethinkDBConnection backfillConnection = r.connect(river.hostname, river.port, river.authKey);
backfillConnection.use(changeRecord.db);
Expand All @@ -114,12 +130,16 @@ private void backfill() throws IOException {
// totalSize is purely for the purposes of printing progress, and may be inaccurate since documents can be
// inserted while we're backfilling
int totalSize = r.table(changeRecord.table).count().run(backfillConnection).intValue();
int tenthile = (totalSize + 9) / 10; // ceiling integer division by 10
BulkRequestBuilder bulkRequest = client.prepareBulk();
int i = 1;
for (Map<String, Object> doc : r.table(changeRecord.table).run(backfillConnection)) {
if (i % tenthile == 0) {
logger.info("backfill {}% complete ({} documents)", (i / tenthile) * 10, i);
int i = 0;
int oldTenthile = 0, newTenthile;
Cursor cursor = r.table(changeRecord.table).runForCursor(backfillConnection);
while (cursor.hasNext()){
Map<String, Object> doc = (Map<String, Object>) cursor.next();
newTenthile = (i * 100) / totalSize / 10;
if (newTenthile != oldTenthile) {
logger.info("backfill {}0% complete ({} documents)", newTenthile, i);
oldTenthile = newTenthile;
}
if (i % 100 == 0) {
bulkRequest.execute();
Expand Down

0 comments on commit 5a24a56

Please sign in to comment.