Skip to content

Commit

Permalink
Merge pull request #51 from osmlab/tests
Browse files Browse the repository at this point in the history
tests / async refactor
  • Loading branch information
aaronlidman committed Nov 27, 2014
2 parents 51a79f1 + 6fed4f0 commit 2a11f07
Show file tree
Hide file tree
Showing 14 changed files with 1,720 additions and 179 deletions.
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ install:
sh install.sh

load: northeast_highway_intersects_building.csv tigerdelta-latest.zip osmi-latest.zip keepright-latest.zip
node index.js --fixed > fixed.json
node fixed.js
rm -rf ldb
mkdir ldb
sh load.sh fixed.json
rm fixed.json
sh load.sh

stats:
sh getStats.sh
Expand Down
63 changes: 41 additions & 22 deletions fixed.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,50 @@
var fs = require('fs'),
levelup = require('levelup'),
leveldown = require('leveldown'),
key = require('./lib/key.js');
queue = require('queue-async'),
key = require('./lib/key.js'),
rimraf = require('rimraf');

var dbs = fs.readdirSync('./ldb/').filter(function(item) {
return item.indexOf('.ldb') > -1;
});
function fixed(callback) {
var dbs = fs.readdirSync('./ldb/').filter(function(item) {
return item.indexOf('.ldb') > -1;
});

if (!fs.existsSync('./fixed')) {
fs.mkdirSync('./fixed');
}
if (!fs.existsSync('./fixed')) {
fs.mkdirSync('./fixed');
}

dbs.forEach(function(ldb, idx) {
levelup('./ldb/' + ldb, {
createIfMissing: false,
max_open_files: 500
}, function(err, db) {
if (err) return console.log(err);
var file = fs.createWriteStream('./fixed/' + ldb.split('.ldb')[0]);
file.once('open', function() {
db.createReadStream({lt: '0001'})
.on('data', function(data) {
file.write(key.decompose(data.key).hash + '\n');
})
.on('end', function(data) {
file.end();
var q = queue();

dbs.forEach(function(ldb, idx) {
q.defer(function(qcallback) {
levelup('./ldb/' + ldb, {
createIfMissing: false,
max_open_files: 500
}, function(err, db) {
if (err) return console.log(err);
var filePath = './fixed/' + ldb.split('.ldb')[0];
var file = fs.createWriteStream(filePath);
file.once('open', function() {
db.createReadStream({lt: '0001'})
.on('data', function(data) {
file.write(key.decompose(data.key).hash + '\n');
})
.on('end', function(data) {
file.end();
db.close(function(err) {
qcallback(err);
});
});
});
});
});
});
});

q.awaitAll(function(err, results) {
if (callback) callback(err);
});
}

module.exports = fixed;
if (require.main === module) { fixed(); }
157 changes: 100 additions & 57 deletions import-csv.js
Original file line number Diff line number Diff line change
@@ -1,78 +1,121 @@
var fs = require('fs'),
path = require('path'),
readline = require('readline'),
csv = require('csv-parser'),
levelup = require('levelup'),
key = require('./lib/key.js');
key = require('./lib/key.js'),
queue = require('queue-async'),
rimraf = require('rimraf');

if (process.stdin.isTTY) {
var verbose = false;
if (require.main === module) {
verbose = true;
if (process.argv[2] === undefined) {
return console.log('file argument required \n`node import-csv.js [source csv]`');
}
loadTask(process.argv[2]);
} else {
process.stdin.on('readable', function() {
var buf = process.stdin.read();
if (buf === null) return;
buf.toString().split('\n').forEach(function(file) {
if (file.length) loadTask(file, process.argv[2]);
});
});
loadTask(process.argv[2], function() {});
}

function loadTask(fileLoc) {
var task = path.basename(fileLoc).split('.')[0],
db = levelup('./ldb/' + task + '.ldb'),
fixed_list = [],
count = 0;
module.exports = loadTask;

var tracking = levelup('./ldb/' + task + '-tracking.ldb');
tracking.close();
function loadTask(fileLoc, callback) {
var task = path.basename(fileLoc).split('.')[0],
topq = queue();

if (fs.existsSync('./fixed') && fs.readdirSync('./fixed').indexOf(task) > -1) {
var rl = readline.createInterface({
input: fs.createReadStream('./fixed/' + task),
output: new require('stream')
// ensure that we create & close the tracking database before loading anything
topq.defer(function(tqcallback) {
levelup('./ldb/' + task + '-tracking.ldb', {}, function(err, trackingdb) {
if (err) return tqcallback(err);
trackingdb.close(function(err) {
tqcallback(err);
});
});
});

rl.on('line', function(line) {
fixed_list.push(line);
});
topq.awaitAll(function(err, results) {
// perform some tests to see if there's a 'fixed' file for this task
var fixedq = queue();
fixedq
.defer(function(fqcallback) {
fs.readdir('./fixed', function(err, files) {
if (err) return fqcallback(err);
fqcallback(err, (files.indexOf(task) > -1));
});
})
.defer(function(fqcallback) {
fs.stat('./fixed/' + task, function(err, info) {
if (err) return fqcallback(err);
fqcallback(err, (info.size > 0));
});
});

rl.on('end', function() {
doImport(fileLoc);
// once the tests come back, check if they're all positive. if so, load fixes & tasks.
fixedq.awaitAll(function(err, results) {
if((!err) && results.every(function(x) { return x; })) {
fs.readFile('./fixed/' + task, function(err, data) {
var fixed_list = data.toString().split("\n");
_doImport(fileLoc, task, fixed_list, callback);
});
} else {
_doImport(fileLoc, task, [], callback);
}
});
} else {
doImport(fileLoc);
}
});
}

function doImport(fileLoc) {
console.log('importing ' + task);
fs.createReadStream(fileLoc)
.pipe(csv())
.on('data', function(data) {
var object_hash = key.hashObject(data);
if (fixed_list.indexOf(object_hash) === -1) {
// item is not fixed
var object_id = key.compose(1, object_hash);
db.put(object_id, JSON.stringify(data), function (err) {
if (err) console.log('-- error --', err);
});
count++;
}
})
.on('end', function() {
setTimeout(function() {
// insert a dummy object in unfixed keyspace if nothing has been inserted
// prevents blocking on levelup readstream creation against an empty db
if (count === 0) {
function _doImport(fileLoc, task, fixed_list, callback) {
if (verbose) console.log('importing task from ' + fileLoc);
var count = 0;

levelup('./ldb/' + task + '.ldb', function(err, db) {
if (err) throw callback(err);
var q = queue();

q.defer(function(qcallback) {
fs.createReadStream(fileLoc)
.pipe(csv())
.on('data', function(data) {
var object_hash = key.hashObject(data);
if (fixed_list.indexOf(object_hash) === -1) {
// item is not fixed
var object_id = key.compose(1, object_hash);
count++;
q.defer(function(qsubcallback) {
db.put(object_id, JSON.stringify(data), function(err) {
qsubcallback(err);
});
});
}
})
.on('end', function(err) {
if(count === 0) {
var keyval = key.compose(1, 'random');
db.put(keyval, JSON.stringify({ignore: true}));
q.defer(function(qsubcallback) {
db.put(keyval, JSON.stringify({ignore: true}), function(err) {
qsubcallback(err);
});
});
}
db.close();

console.log('done with ' + task + '. ' + count + ' items imported');
}, 5000);
qcallback(err);
});
});
}

q.awaitAll(function(err, results) {
if (verbose) { console.log('done with ' + task + '. ' + count + ' items imported'); }
db.close(function(err) {
if (callback) callback(err);
});
});
});
}

/*
function deleteTask(task, callback){
var q = queue();
if (verbose) { console.log('deleting task ' + task); }
q.defer(rimraf, './ldb/' + task + '.ldb')
.defer(rimraf, './ldb/' + task + '-tracking.ldb')
.awaitAll(function(err, results) {
callback(err, results);
});
}
*/
2 changes: 1 addition & 1 deletion install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ if [ "$unamestr" = 'Darwin' ]; then
brew install leveldb wget node
elif [ "$unamestr" = 'Linux' ]; then
apt-get -y update
apt-get install -y zip git vim htop bzip2 curl libleveldb-dev nodejs npm
apt-get install -y zip git vim htop bzip2 curl libleveldb-dev nodejs npm parallel
sudo ln -s /usr/bin/nodejs /usr/bin/node
apt-get install -y s3cmd
ulimit -n 1000000
Expand Down
1 change: 1 addition & 0 deletions ldb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*
63 changes: 63 additions & 0 deletions lib/test-common.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
var levelup = require('levelup'),
key = require('./key.js'),
queue = require('queue-async');

function countTasksBySkipval(callback) {
levelup('./ldb/test.ldb', function(err, db) {
if (err) return callback(err);
var counts = {};
db.createReadStream()
.on('data', function(data) {
var skipval = key.decompose(data.key).skipval;
if (!counts[skipval]) counts[skipval] = 0;
counts[skipval]++;
})
.on('end', function() {
db.close(function(err) {
callback(err, counts);
});
});
});
}

function markTasksAsDone(tasks, callback) {
levelup('./ldb/test.ldb', function(err, db) {
if (err) throw callback(err);
var q = queue();
tasks.forEach(function(task) {
q.defer(function(task, deferCallback) {
db.get(task, function(err, val) {
var k = key.decompose(task);
db.put(key.compose(0, k.hash), val, function() {
db.del(task, deferCallback);
});
});
}, task);
});
q.awaitAll(function() {
db.close(function(err) {
callback(err);
});

});
});
}

function blankslate(clearFixed, callback){
var q = queue();
if (clearFixed) {
q.defer(rimraf, './fixed/test');
q.defer(rimraf, './fixed/test-tracking');
}
q.defer(rimraf, './ldb/test.ldb');
q.defer(rimraf, './ldb/test-tracking.ldb');
q.awaitAll(function(err, results) {
callback(err);
});
}

module.exports = {
countTasksBySkipval: countTasksBySkipval,
markTasksAsDone: markTasksAsDone,
blankslate: blankslate
};
8 changes: 4 additions & 4 deletions load.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ unzip -o keepright-latest.zip
unzip -o osmi-latest.zip
unzip -o tigerdelta-latest.zip

ls keepright-tasks/*.csv | node import-csv.js $1
ls osmi-tasks/*.csv | node import-csv.js $1
ls northeast_highway_intersects_building.csv | node import-csv.js $1
ls tigerdelta-tasks/*.csv | node import-csv.js $1
find osmi-tasks -type f | grep csv | parallel "node import-csv.js {}"
find keepright-tasks -type f | grep csv | parallel "node import-csv.js {}"
find tigerdelta-tasks -type f | grep csv | parallel "node import-csv.js {}"
node import-csv.js northeast_highway_intersects_building.csv

rm -rf keepright-tasks
rm -rf tigerdelta-tasks
Expand Down
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
"levelup": "^0.18.6",
"mousetrap": "0.0.1",
"osm-auth": "^0.2.6",
"queue-async": "^1.0.7",
"rimraf": "^2.2.8",
"routes-router": "^3.1.0",
"serve": "^1.4.0",
"store": "~1.3.16",
"stylus": "~0.47.3",
"tape": "^3.0.3",
"watchify": "~1.0.1",
"wellknown": "^0.3.0"
},
Expand All @@ -31,7 +34,8 @@
"serve-stop": "./node_modules/.bin/forever stop index.js",
"log": "./node_modules/.bin/forever logs index.js",
"watch": "watchify js/index.js -o js/bundle.js",
"watch-css": "stylus -w css/site.styl"
"watch-css": "stylus -w css/site.styl",
"test": "tape test/*.js"
},
"repository": {
"type": "git",
Expand Down
Loading

0 comments on commit 2a11f07

Please sign in to comment.