Skip to content

Commit

Permalink
Merge pull request #2 from voldern/do-not-write-empty
Browse files Browse the repository at this point in the history
Do not write records when the queue is empty
  • Loading branch information
Espen Volden committed Nov 16, 2015
2 parents eabad1e + afdea6d commit 1660992
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 13 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).

## [0.1.1] - 2015-11-16
### Fixed
- Do not write if there are no records in the queue when the stream gets closed

## [0.1.0] - 2015-11-16
### Changed
- Add property `records` to error events which contains the records
Expand Down
43 changes: 32 additions & 11 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,15 @@ function ElasticsearchBulkIndexWritable(client, options) {
}

/**
* Write items in queue to Elasticsearch
* Bulk write records to Elasticsearch
*
* @private
* @param {array} records
* @param {Function} callback
* @return {undefined}
*/
ElasticsearchBulkIndexWritable.prototype._flush = function _flush(callback) {
try {
var records = transformRecords(this.queue);
} catch (error) {
return callback(error);
}

ElasticsearchBulkIndexWritable.prototype.bulkWrite = function bulkWrite(records, callback) {
if (this.logger) {
this.logger.debug('Writing %d records to Elasticsearch', this.queue.length);
this.logger.debug('Writing %d records to Elasticsearch', records.length);
}

this.client.bulk({ body: records }, function bulkCallback(err, data) {
Expand Down Expand Up @@ -102,7 +96,34 @@ ElasticsearchBulkIndexWritable.prototype._flush = function _flush(callback) {
}

if (this.logger) {
this.logger.info('Wrote %d records to Elasticsearch', this.queue.length);
this.logger.info('Wrote %d records to Elasticsearch', records.length);
}

callback();
}.bind(this));
};

/**
* Flush method needed by the underlying stream implementation
*
* @private
* @param {Function} callback
* @return {undefined}
*/
ElasticsearchBulkIndexWritable.prototype._flush = function _flush(callback) {
if (this.queue.length === 0) {
return callback();
}

try {
var records = transformRecords(this.queue);
} catch (error) {
return callback(error);
}

this.bulkWrite(records, function(err) {
if (err) {
return callback(err);
}

this.writtenRecords += this.queue.length;
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "elasticsearch-bulk-index-stream",
"version": "0.1.0",
"version": "0.1.1",
"description": "A writable stream for bulk indexing records in Elasticsearch",
"main": "index.js",
"scripts": {
Expand Down
20 changes: 19 additions & 1 deletion test/elasticsearch-bulk-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ describe('ElastisearchBulkIndexWritable', function() {
bulk: this.sinon.stub()
};

this.stream = new ElasticsearchBulkIndexWritable(this.client);
this.stream = new ElasticsearchBulkIndexWritable(this.client, {
highWaterMark: 6
});
});

it('should write records to elasticsearch', function(done) {
Expand All @@ -106,6 +108,22 @@ describe('ElastisearchBulkIndexWritable', function() {
}.bind(this));
});

it('should do nothing if there is nothing in the queue when the stream is closed', function(done) {
this.client.bulk.yields(null, successResponseFixture);

this.stream.on('finish', function() {
expect(this.client.bulk).to.have.been.calledOnce;

done();
}.bind(this));

for (var i = 0; i < 6; i++) {
this.stream.write(recordFixture);
}

this.stream.end();
});

it('should trigger error on elasticsearch error', function(done) {
this.client.bulk.yields(new Error('Fail'));

Expand Down

0 comments on commit 1660992

Please sign in to comment.