Skip to content

Commit

Permalink
Merge pull request #11 from marshall007/custom-action
Browse files Browse the repository at this point in the history
Support overriding the "action" for an operation
  • Loading branch information
voldern authored Aug 18, 2016
2 parents d7187f7 + 97aecc2 commit a2185d8
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 13 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ The records written to the stream has to have the following format:
index: 'name-of-index',
type: 'recordType',
id: 'recordId',
parent: 'parentRecordType', //optional
parent: 'parentRecordType', // optional
action: 'update', // optional (default: 'index')
body: {
name: 'Foo Bar'
}
Expand Down
28 changes: 17 additions & 11 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,27 @@ function transformRecords(records) {
return records.reduce(function(bulkOperations, record) {
assert(record.index, 'index is required');
assert(record.type, 'type is required');
assert(record.body, 'body is required');

var index = {
record.action = record.action || 'index';

var operation = {};

operation[record.action] = {
_index: record.index,
_type: record.type,
_id: record.id
};

if (record.parent) {
index._parent = record.parent;
operation[record.action]._parent = record.parent;
}

bulkOperations.push({
index: index
});
bulkOperations.push(operation);

bulkOperations.push(record.body);
if (record.action !== 'delete') {
assert(record.body, 'body is required');
bulkOperations.push(record.body);
}

return bulkOperations;
}, []);
Expand Down Expand Up @@ -83,11 +87,13 @@ ElasticsearchBulkIndexWritable.prototype.bulkWrite = function bulkWrite(records,
}

if (data.errors === true) {
var errors = data.items.map(function(item) {
var errors = _.chain(data.items)
.map(function(item) {
return _.map(item, 'error')[0];
});

errors = _.uniq(_.filter(errors, _.isString));
})
.filter(_.isString)
.uniq()
.value();

if (this.logger) {
errors.forEach(this.logger.error.bind(this.logger));
Expand Down
64 changes: 63 additions & 1 deletion test/elasticsearch-bulk-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ chai.use(sinonChai);
var expect = chai.expect;

var recordFixture = require('./fixture/record.json');
var recordParentFixture = require('./fixture/parentrecord.json');
var recordDeleteFixture = require('./fixture/record-delete.json');
var recordParentFixture = require('./fixture/record-parent.json');
var recordUpdateFixture = require('./fixture/record-update.json');
var successResponseFixture = require('./fixture/success-response.json');
var successDeleteResponseFixture = require('./fixture/success-delete-response.json');
var successParentResponseFixture = require('./fixture/success-parent-response.json');
var successUpdateResponseFixture = require('./fixture/success-update-response.json');
var errorResponseFixture = require('./fixture/error-response.json');

describe('ElastisearchBulkIndexWritable', function() {
Expand Down Expand Up @@ -242,4 +246,62 @@ describe('ElastisearchBulkIndexWritable', function() {
expect(this.client.bulk).to.have.been.calledWith(expectedArgument);
});
});

describe('custom action', function() {
beforeEach(function() {
this.client = {
bulk: this.sinon.stub()
};

this.stream = new ElasticsearchBulkIndexWritable(this.client, {
highWaterMark: 1,
flushTimeout: 10
});

this.clock = sinon.useFakeTimers();
});

it('should allow you to override the action', function() {
this.client.bulk.yields(null, successUpdateResponseFixture);
this.stream.write(recordUpdateFixture);

var expectedArgument = {
body: [
{
update: {
_index: 'indexName',
_type: 'recordType',
_id: 'recordId'
}
},
{
foo: 'bar'
}
]
};

expect(this.client.bulk).to.have.callCount(1);
expect(this.client.bulk).to.have.been.calledWith(expectedArgument);
});

it('should not include body if action is delete', function() {
this.client.bulk.yields(null, successDeleteResponseFixture);
this.stream.write(recordDeleteFixture);

var expectedArgument = {
body: [
{
delete: {
_index: 'indexName',
_type: 'recordType',
_id: 'recordId'
}
}
]
};

expect(this.client.bulk).to.have.callCount(1);
expect(this.client.bulk).to.have.been.calledWith(expectedArgument);
});
});
});
6 changes: 6 additions & 0 deletions test/fixture/record-delete.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"index": "indexName",
"id": "recordId",
"type": "recordType",
"action": "delete"
}
File renamed without changes.
9 changes: 9 additions & 0 deletions test/fixture/record-update.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"index": "indexName",
"id": "recordId",
"type": "recordType",
"action": "update",
"body": {
"foo": "bar"
}
}
15 changes: 15 additions & 0 deletions test/fixture/success-delete-response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"took": 30,
"errors": false,
"items": [
{
"delete": {
"_index": "indexName",
"_id": "recordId",
"_type": "recordType",
"_version": 5,
"status": 200
}
}
]
}
15 changes: 15 additions & 0 deletions test/fixture/success-update-response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"took": 30,
"errors": false,
"items": [
{
"update": {
"_index": "indexName",
"_id": "recordId",
"_type": "recordType",
"_version": 5,
"status": 200
}
}
]
}

0 comments on commit a2185d8

Please sign in to comment.