Skip to content

Commit

Permalink
Use a full JS tail implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
teid committed Mar 22, 2020
1 parent 0fcd627 commit fc35ea6
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 56 deletions.
150 changes: 106 additions & 44 deletions lib/tail.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,64 +3,126 @@
'use strict';

const events = require('events');
const childProcess = require('child_process');
const tailStream = require('fs-tail-stream');
const fs = require('fs');
const TailLib = require('tail').Tail;
const readLastLines = require('read-last-lines');
const util = require('util');
const CBuffer = require('CBuffer');
const byline = require('byline');
const commandExistsSync = require('command-exists').sync;

const NB_OF_LINE_TO_PREFETCH = 50;
const TAIL_RETRY_DELAY = 2000;
const NEW_LINE_REGEX = /[\r]{0,1}\n/;
const IS_WIN = process.platform === 'win32';

function getLinePrefix(filePath, filePaths) {
const oneFileIsTailed = filePaths.length <= 1;
if (oneFileIsTailed) {
return '';
}
const paddingLength = Math.max(...filePaths.map((fileName) => fileName.length));
return `${filePath.padStart(paddingLength, ' ')} - `;
}

const fileLogger = (fileName) => ({
info: (...data) => {
console.info(fileName, ':', ...data);
},
error: (...data) => {
console.error(fileName, ':', ...data);
}
});

async function readLastLinesIfPossible(path, onLineCb) {
try {
await fs.promises.access(path, fs.constants.R_OK);
} catch (ex) {
// The file can not be read
return;
}

try {
const lines = await readLastLines.read(path, NB_OF_LINE_TO_PREFETCH)
const linesWithoutLastEmptyLine = lines.replace(/[\r]{0,1}\n$/gm, '');
if (linesWithoutLastEmptyLine === '') {
return;
}
linesWithoutLastEmptyLine.split(NEW_LINE_REGEX).forEach(onLineCb);
} catch (ex) {
fileLogger(path).error('Failed to prefetch the file content:', ex);
}
}

async function tailFile(path, onLineCb) {
const logger = fileLogger(path);
let tail;

function rescheduleTail() {
// Close the current tail
if (tail) {
try {
tail.unwatch();
} catch (ex) {
// Failed to shutdown the previous tail, ignore this since we try to stop it
}
}
setTimeout(() => tailFile(path, onLineCb), TAIL_RETRY_DELAY);
}

// Test that the file exists
try {
await fs.promises.access(path, fs.constants.R_OK);
} catch (ex) {
logger.error(`tail failure - ${ex}`);
rescheduleTail();
return;
}

try {
logger.info('starting to watch the file');
tail = new TailLib(path, {
useWatchFile: IS_WIN // Use watchfile on windows as a workaround to this issue: https://docs.microsoft.com/en-us/archive/blogs/asiasupp/file-date-modified-property-are-not-updating-while-modifying-a-file-without-closing-it
});
} catch (ex) {
logger.error(`tail failure - ${ex}`);
rescheduleTail();
return;
}

tail.on('line', onLineCb);
tail.on('error', (err) => {
logger.error('tail failure -', err);
rescheduleTail();
});
}

function Tail(path, opts) {
events.EventEmitter.call(this);
const pathArray = Array.isArray(path) ? path : [path]; // Normalize the parameter

const pushLineToBuffer = (prefix) =>
(line) => {
const str = `${prefix}${line.toString()}`;
this._buffer.push(str);
this.emit('line', str);
};

const options = opts || {
buffer: 0
};
this._buffer = new CBuffer(options.buffer);

let stream;

if (path[0] === '-') {
stream = process.stdin;
} else {
/* Check if this os provides the `tail` command. */
const hasTailCommand = commandExistsSync('tail');
if (hasTailCommand) {
let followOpt = '-F';
if (process.platform === 'openbsd') {
followOpt = '-f';
}

const cp = childProcess.spawn(
'tail',
['-n', options.buffer, followOpt].concat(path)
);
cp.stderr.on('data', (data) => {
// If there is any important error then display it in the console. Tail will keep running.
// File can be truncated over network.
if (data.toString().indexOf('file truncated') === -1) {
console.error(data.toString());
}
});
stream = cp.stdout;

process.on('exit', () => {
cp.kill();
});
// Start to tail on every parameter
pathArray.forEach((pathItem) => {
if (pathItem === '-') {
console.info('starting to watch stdin');
const linePrefix = getLinePrefix('stdin', pathArray);
byline(process.stdin, { keepEmptyLines: true }).on('data', pushLineToBuffer(linePrefix));
} else {
/* This is used if the os does not support the `tail`command. */
stream = tailStream.createReadStream(path.join(), {
encoding: 'utf8',
start: options.buffer,
tail: true
});
const linePrefix = pushLineToBuffer(getLinePrefix(pathItem, pathArray));
readLastLinesIfPossible(pathItem, pushLineToBuffer(linePrefix));
tailFile(pathItem, pushLineToBuffer(linePrefix));
}
}

byline(stream, { keepEmptyLines: true }).on('data', (line) => {
const str = line.toString();
this._buffer.push(str);
this.emit('line', str);
});
}
util.inherits(Tail, events.EventEmitter);
Expand Down
61 changes: 52 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
"cookie-parser": "~1.4.3",
"daemon-fix41": "~1.1.2",
"express-session": "~1.15.6",
"fs-tail-stream": "^1.1.0",
"is-docker": "~1.1.0",
"read-last-lines": "^1.7.2",
"serve-static": "~1.13.2",
"socket.io": "^2.2.0",
"tail": "^2.0.3",
"universal-analytics": "~0.4.20",
"untildify": "~3.0.2",
"uuid": "~3.3.2"
Expand Down
38 changes: 36 additions & 2 deletions test/tail.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const temp = require('temp');
const tail = require('../lib/tail');

const TEMP_FILE_PROFIX = '';
const SPAWN_DELAY = 10;
const SPAWN_DELAY = 20;

function writeLines(fd, count) {
for (let i = 0; i < count; i += 1) {
Expand All @@ -15,7 +15,10 @@ function writeLines(fd, count) {
`
);
}
fs.closeSync(fd);
}

function endsWith(str, suffix) {
return str.indexOf(suffix, str.length - suffix.length) !== -1;
}

describe('tail', () => {
Expand All @@ -25,13 +28,42 @@ describe('tail', () => {
temp.open(TEMP_FILE_PROFIX, (err, info) => {
tail(info.path).on('line', (line) => {
line.should.equal('line0');
fs.closeSync(info.fd);
done();
});

setTimeout(writeLines, SPAWN_DELAY, info.fd, 1);
});
});

it('calls event line if new line appear in files', (done) => {
const buffer = [];

temp.open(TEMP_FILE_PROFIX, (err, info) => {
temp.open(TEMP_FILE_PROFIX, (err2, info2) => {
tail([info.path, info2.path]).on('line', (line) => {
buffer.push(line);
});

setTimeout(() => {
writeLines(info.fd, 2);
writeLines(info2.fd, 2);

setTimeout(() => {
buffer.length.should.equal(4);
endsWith(buffer[0], `${info.path} - line0`).should.be.true;
endsWith(buffer[1], `${info.path} - line1`).should.be.true;
endsWith(buffer[2], `${info2.path} - line0`).should.be.true;
endsWith(buffer[3], `${info2.path} - line1`).should.be.true;
fs.closeSync(info.fd);
fs.closeSync(info2.fd);
done();
}, SPAWN_DELAY);
}, SPAWN_DELAY);
});
});
});

it('buffers lines on start', (done) => {
temp.open(TEMP_FILE_PROFIX, (err, info) => {
writeLines(info.fd, 20);
Expand All @@ -41,6 +73,7 @@ describe('tail', () => {
});
setTimeout(() => {
tailer.getBuffer().should.be.eql(['line18', 'line19']);
fs.closeSync(info.fd);
done();
}, SPAWN_DELAY);
});
Expand All @@ -53,6 +86,7 @@ describe('tail', () => {
const tailer = tail(info.path);
setTimeout(() => {
tailer.getBuffer().should.be.empty;
fs.closeSync(info.fd);
done();
}, SPAWN_DELAY);
});
Expand Down

0 comments on commit fc35ea6

Please sign in to comment.