diff --git a/packages/event_store/src/eventHandler.js b/packages/event_store/src/eventHandler.js index 112378cf..afd00ffb 100644 --- a/packages/event_store/src/eventHandler.js +++ b/packages/event_store/src/eventHandler.js @@ -1,7 +1,5 @@ -const stream = require('stream'); -const { promisify } = require('util'); const got = require('got'); -const fs = require('fs'); +const readline = require('readline'); const Storage = require('./storage'); const models = require('../src/models/index'); const env = process.env.NODE_ENV || 'development'; @@ -18,8 +16,12 @@ class EventHandler { */ async createInputStream(url) { try { - const readStream = got.stream(url); // Still to implement retry on failed connection + const readStream = readline.createInterface({ + input: got.stream(url), + crlfDelay: Infinity + }); + return readStream; } catch (err) { if (err instanceof got.stream.RequestError) { @@ -33,7 +35,7 @@ class EventHandler { /** * Returns writeable stream pointed to the storage component */ - async createOutputStream() { + async createStorage() { // Sync database schema. if (env == "production") { @@ -45,53 +47,32 @@ class EventHandler { } // Initialise storage - let storage = new Storage(models); - - // Extend empty writeable object - let outputStream = new stream.Writable(); - - outputStream._write = async (chunk, encoding, done) => { - // Removes 'data:' prefix from the event to convert it to JSON - chunk.toString().split("\n") - .filter(str => str.startsWith('data')) - .forEach(async str => { - let event; - try { - event = JSON.parse(str.substr(5)); - if (event.DeployProcessed) { - await storage.onDeployProcessed(event.DeployProcessed); - } else if (event.BlockAdded) { - await storage.onBlockAdded(event.BlockAdded); - } - } catch (err) { - console.log(`Error while processing an event.\nEvent: ${str}\nError: ${err}`); - } - }); - done(); - } - - return outputStream; + return new Storage(models); } /** * Attempts to create a streaming pipeline given an input and output stream. * - * @param {stream.Readable} inputStream - * @param {stream.Writable} outputStream + * @param {readline.Interface} inputStream + * @param {Storage} storage */ - async createPipeline(inputStream, outputStream) { - - // initialise pipeline - const pipeline = promisify(stream.pipeline); + async createPipeline(inputStream, storage) { + inputStream.on('line', async (eventString) => { + if (!eventString.startsWith('data')) { + return; + } - try { - await pipeline( - inputStream, - outputStream - ); - } catch (err) { - console.error(err); - } + try { + const event = JSON.parse(eventString.substr(5)); + if (event.DeployProcessed) { + await storage.onDeployProcessed(event.DeployProcessed); + } else if (event.BlockAdded) { + await storage.onBlockAdded(event.BlockAdded); + } + } catch (err) { + console.log(`Error while processing an event.\nEvent: ${eventString}\nError: ${err}`); + } + }); } @@ -144,9 +125,9 @@ if (env !== 'test') { let eventHandler = new EventHandler(); let nodeUrl = await eventHandler.formURL(); let eventStream = await eventHandler.createInputStream(nodeUrl); - let storageStream = await eventHandler.createOutputStream(); + let storage = await eventHandler.createStorage(); - eventHandler.createPipeline(eventStream, storageStream); + eventHandler.createPipeline(eventStream, storage); } runEventHandler();