Skip to content

Commit

Permalink
Merge latest changes from upstream.
Browse files Browse the repository at this point in the history
Merge branch 'master' into google-drive-support
  • Loading branch information
mduggan committed Jul 28, 2015
2 parents 38395b8 + 206881d commit 7be379e
Show file tree
Hide file tree
Showing 64 changed files with 1,432 additions and 624 deletions.
11 changes: 11 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
language: python
python:
- "3.3"
services:
- mongodb
- redis-server
- rabbitmq
install:
- "pip install -r requirements.txt"
- "cp tapiriik/local_settings.py.example tapiriik/local_settings.py"
script: "python runtests.py"
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
tapiriik keeps your fitness in sync
========

[![Build Status](https://travis-ci.org/cpfair/tapiriik.svg?branch=master)](https://travis-ci.org/cpfair/tapiriik)

## Looking to run tapiriik locally?

Expand Down
2 changes: 1 addition & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
# please see the online documentation at vagrantup.com.

# Every Vagrant virtual environment requires a box to build off of.
config.vm.box = "chef/ubuntu-13.10"
config.vm.box = "chef/ubuntu-14.04"

# Disable automatic box update checking. If you disable this, then
# boxes will only be checked for updates when the user runs
Expand Down
16 changes: 9 additions & 7 deletions bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ sudo apt-get install -y python3-pip libxml2-dev libxslt-dev zlib1g-dev mongodb g
pip3 install --upgrade pip

# Install app requirements
pip install -r /vagrant/requirements.txt
pip install --upgrade -r /vagrant/requirements.txt

# Fix the default python instance
sudo rm `which python`
sudo ln -s /usr/bin/python3.3 /usr/bin/python
update-alternatives --install /usr/bin/python python /usr/bin/python2.7 1
update-alternatives --install /usr/bin/python python /usr/bin/python3.4 2

# Put in a default local_settings.py
cp /vagrant/tapiriik/local_settings.py.example /vagrant/tapiriik/local_settings.py
# Put in a default local_settings.py (if one doesn't exist)
if [ ! -f /vagrant/tapiriik/local_settings.py ]; then
cp /vagrant/tapiriik/local_settings.py.example /vagrant/tapiriik/local_settings.py
# Generate credential storage keys
python /vagrant/credentialstore_keygen.py >> /vagrant/tapiriik/local_settings.py
fi

# Generate credential storage keys
python /vagrant/credentialstore_keygen.py >> /vagrant/tapiriik/local_settings.py
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
requests==2.2.1
pymongo
pymongo==3.0.1
pytz
lxml
git+https://github.com/cpfair/dropbox-sdk-python.git
Expand All @@ -12,3 +12,4 @@ requests_oauthlib==0.4.0
redis
django-ipware
google-api-python-client-py3
smashrun-client>=0.2.0
32 changes: 32 additions & 0 deletions rollback_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from tapiriik.database import db, close_connections
from tapiriik.settings import RABBITMQ_BROKER_URL, MONGO_HOST, MONGO_FULL_WRITE_CONCERN
from tapiriik import settings
from datetime import datetime

from celery import Celery
from celery.signals import worker_shutdown
from datetime import datetime

class _celeryConfig:
CELERY_ROUTES = {
"rollback_worker.rollback_task": {"queue": "tapiriik-rollback"}
}
CELERYD_CONCURRENCY = 1
CELERYD_PREFETCH_MULTIPLIER = 1

celery_app = Celery('rollback_worker', broker=RABBITMQ_BROKER_URL)
celery_app.config_from_object(_celeryConfig())

@worker_shutdown.connect
def celery_shutdown():
close_connections()

@celery_app.task()
def rollback_task(task_id):
from tapiriik.services.rollback import RollbackTask
print("Starting rollback task %s" % task_id)
task = RollbackTask.Get(task_id)
task.Execute()

def schedule_rollback_task(task_id):
rollback_task.apply_async(args=[task_id])
96 changes: 63 additions & 33 deletions stats_cron.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,47 @@
from tapiriik.database import db, close_connections
from tapiriik.settings import RABBITMQ_USER_QUEUE_STATS_URL
from datetime import datetime, timedelta
import requests

# total distance synced
distanceSyncedAggr = db.sync_stats.aggregate([{"$group": {"_id": None, "total": {"$sum": "$Distance"}}}])["result"]
distanceSyncedAggr = list(db.sync_stats.aggregate([{"$group": {"_id": None, "total": {"$sum": "$Distance"}}}]))
if distanceSyncedAggr:
distanceSynced = distanceSyncedAggr[0]["total"]
else:
distanceSynced = 0

# last 24hr, for rate calculation
lastDayDistanceSyncedAggr = db.sync_stats.aggregate([{"$match": {"Timestamp": {"$gt": datetime.utcnow() - timedelta(hours=24)}}}, {"$group": {"_id": None, "total": {"$sum": "$Distance"}}}])["result"]
lastDayDistanceSyncedAggr = list(db.sync_stats.aggregate([{"$match": {"Timestamp": {"$gt": datetime.utcnow() - timedelta(hours=24)}}}, {"$group": {"_id": None, "total": {"$sum": "$Distance"}}}]))
if lastDayDistanceSyncedAggr:
lastDayDistanceSynced = lastDayDistanceSyncedAggr[0]["total"]
else:
lastDayDistanceSynced = 0

# similarly, last 1hr
lastHourDistanceSyncedAggr = db.sync_stats.aggregate([{"$match": {"Timestamp": {"$gt": datetime.utcnow() - timedelta(hours=1)}}}, {"$group": {"_id": None, "total": {"$sum": "$Distance"}}}])["result"]
lastHourDistanceSyncedAggr = list(db.sync_stats.aggregate([{"$match": {"Timestamp": {"$gt": datetime.utcnow() - timedelta(hours=1)}}}, {"$group": {"_id": None, "total": {"$sum": "$Distance"}}}]))
if lastHourDistanceSyncedAggr:
lastHourDistanceSynced = lastHourDistanceSyncedAggr[0]["total"]
else:
lastHourDistanceSynced = 0
# sync wait time, to save making 1 query/sec-user-browser
queueHead = list(db.users.find({"QueuedAt": {"$lte": datetime.utcnow()}, "SynchronizationWorker": None, "SynchronizationHostRestriction": {"$exists": False}}, {"QueuedAt": 1}).sort("QueuedAt").limit(10))
queueHeadTime = timedelta(0)
if len(queueHead):
for queuedUser in queueHead:
queueHeadTime += datetime.utcnow() - queuedUser["QueuedAt"]
queueHeadTime /= len(queueHead)

# How long users are taking to get pushed into rabbitMQ
# Once called "queueHead" as, a very long time ago, this _was_ user queuing
enqueueHead = list(db.users.find({"QueuedAt": {"$lte": datetime.utcnow()}, "SynchronizationWorker": None, "SynchronizationHostRestriction": {"$exists": False}}, {"QueuedAt": 1}).sort("QueuedAt").limit(10))
enqueueTime = timedelta(0)
if len(enqueueHead):
for pendingEnqueueUser in enqueueHead:
enqueueTime += datetime.utcnow() - pendingEnqueueUser["QueuedAt"]
enqueueTime /= len(enqueueHead)

# Query rabbitMQ to get main queue throughput and length
rmq_user_queue_stats = requests.get(RABBITMQ_USER_QUEUE_STATS_URL).json()
rmq_user_queue_length = rmq_user_queue_stats["messages_ready_details"]["avg"]
rmq_user_queue_rate = rmq_user_queue_stats["message_stats"]["ack_details"]["avg_rate"]
rmq_user_queue_wait_time = rmq_user_queue_length / rmq_user_queue_rate

# sync time utilization
db.sync_worker_stats.remove({"Timestamp": {"$lt": datetime.utcnow() - timedelta(hours=1)}}) # clean up old records
timeUsedAgg = db.sync_worker_stats.aggregate([{"$group": {"_id": None, "total": {"$sum": "$TimeTaken"}}}])["result"]
timeUsedAgg = list(db.sync_worker_stats.aggregate([{"$group": {"_id": None, "total": {"$sum": "$TimeTaken"}}}]))
totalSyncOps = db.sync_worker_stats.count()
if timeUsedAgg:
timeUsed = timeUsedAgg[0]["total"]
Expand All @@ -41,41 +51,41 @@
avgSyncTime = 0

# error/pending/locked stats
lockedSyncRecords = db.users.aggregate([
lockedSyncRecords = list(db.users.aggregate([
{"$match": {"SynchronizationWorker": {"$ne": None}}},
{"$group": {"_id": None, "count": {"$sum": 1}}}
])
if len(lockedSyncRecords["result"]) > 0:
lockedSyncRecords = lockedSyncRecords["result"][0]["count"]
]))
if len(lockedSyncRecords) > 0:
lockedSyncRecords = lockedSyncRecords[0]["count"]
else:
lockedSyncRecords = 0

pendingSynchronizations = db.users.aggregate([
pendingSynchronizations = list(db.users.aggregate([
{"$match": {"NextSynchronization": {"$lt": datetime.utcnow()}}},
{"$group": {"_id": None, "count": {"$sum": 1}}}
])
if len(pendingSynchronizations["result"]) > 0:
pendingSynchronizations = pendingSynchronizations["result"][0]["count"]
]))
if len(pendingSynchronizations) > 0:
pendingSynchronizations = pendingSynchronizations[0]["count"]
else:
pendingSynchronizations = 0

usersWithErrors = db.users.aggregate([
usersWithErrors = list(db.users.aggregate([
{"$match": {"NonblockingSyncErrorCount": {"$gt": 0}}},
{"$group": {"_id": None, "count": {"$sum": 1}}}
])
if len(usersWithErrors["result"]) > 0:
usersWithErrors = usersWithErrors["result"][0]["count"]
]))
if len(usersWithErrors) > 0:
usersWithErrors = usersWithErrors[0]["count"]
else:
usersWithErrors = 0


totalErrors = db.users.aggregate([
totalErrors = list(db.users.aggregate([
{"$group": {"_id": None,
"total": {"$sum": "$NonblockingSyncErrorCount"}}}
])
]))

if len(totalErrors["result"]) > 0:
totalErrors = totalErrors["result"][0]["total"]
if len(totalErrors) > 0:
totalErrors = totalErrors[0]["total"]
else:
totalErrors = 0

Expand All @@ -86,41 +96,61 @@
"ErrorUsers": usersWithErrors,
"TotalErrors": totalErrors,
"SyncTimeUsed": timeUsed,
"SyncQueueHeadTime": queueHeadTime.total_seconds()
"SyncEnqueueTime": enqueueTime.total_seconds(),
"SyncQueueHeadTime": rmq_user_queue_wait_time
})

db.stats.update({}, {"$set": {"TotalDistanceSynced": distanceSynced, "LastDayDistanceSynced": lastDayDistanceSynced, "LastHourDistanceSynced": lastHourDistanceSynced, "TotalSyncTimeUsed": timeUsed, "AverageSyncDuration": avgSyncTime, "LastHourSynchronizationCount": totalSyncOps, "QueueHeadTime": queueHeadTime.total_seconds(), "Updated": datetime.utcnow()}}, upsert=True)
db.stats.update({}, {"$set": {
"TotalDistanceSynced": distanceSynced,
"LastDayDistanceSynced": lastDayDistanceSynced,
"LastHourDistanceSynced": lastHourDistanceSynced,
"TotalSyncTimeUsed": timeUsed,
"AverageSyncDuration": avgSyncTime,
"LastHourSynchronizationCount": totalSyncOps,
"EnqueueTime": enqueueTime.total_seconds(),
"QueueHeadTime": rmq_user_queue_wait_time,
"Updated": datetime.utcnow() }}, upsert=True)


def aggregateCommonErrors():
from bson.code import Code
# The exception message always appears right before "LOCALS:"
map_operation = Code(
"function(){"
"var errorMatch = new RegExp(/\\n([^\\n]+)\\n\\nLOCALS:/);"
"if (!this.SyncErrors) return;"
"var errorMatch = new RegExp(/\\n([^\\n]+)\\n\\nLOCALS:/);"
"var id = this._id;"
"var svc = this.Service;"
"var now = new Date();"
"this.SyncErrors.forEach(function(error){"
"var message = error.Message.match(errorMatch)[1];"
"var key = {service: svc, stem: message.substring(0, 60)};"
"emit(key, {count:1, connections:[id], exemplar:message});"
"var recency_score = error.Timestamp ? (now - error.Timestamp)/1000 : 0;"
"emit(key, {count:1, ts_count: error.Timestamp ? 1 : 0, recency: recency_score, connections:[id], exemplar:message});"
"});"
"}"
)
reduce_operation = Code(
"function(key, item){"
"var reduced = {count:0, connections:[]};"
"var reduced = {count:0, ts_count:0, connections:[], recency: 0};"
"var connection_collections = [];"
"item.forEach(function(error){"
"reduced.count+=error.count;"
"reduced.ts_count+=error.ts_count;"
"reduced.recency+=error.recency;"
"reduced.exemplar = error.exemplar;"
"connection_collections.push(error.connections);"
"});"
"reduced.connections = reduced.connections.concat.apply(reduced.connections, connection_collections);"
"return reduced;"
"}")
db.connections.map_reduce(map_operation, reduce_operation, "common_sync_errors") #, finalize=finalize_operation
finalize_operation = Code(
"function(key, res){"
"res.recency_avg = res.recency / res.ts_count;"
"return res;"
"}"
)
db.connections.map_reduce(map_operation, reduce_operation, "common_sync_errors", finalize=finalize_operation)
# We don't need to do anything with the result right now, just leave it there to appear in the dashboard

aggregateCommonErrors()
Expand Down
32 changes: 32 additions & 0 deletions sync_remote_triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from tapiriik.database import db, close_connections
from tapiriik.settings import RABBITMQ_BROKER_URL, MONGO_FULL_WRITE_CONCERN
from datetime import datetime
from celery import Celery
from celery.signals import worker_shutdown

class _celeryConfig:
CELERY_ROUTES = {
"sync_remote_triggers.trigger_remote": {"queue": "tapiriik-remote-trigger"}
}
CELERYD_CONCURRENCY = 1 # Otherwise the GC rate limiting breaks since file locking is per-process.
CELERYD_PREFETCH_MULTIPLIER = 1 # The message queue could use some exercise.

celery_app = Celery('sync_remote_triggers', broker=RABBITMQ_BROKER_URL)
celery_app.config_from_object(_celeryConfig())

@worker_shutdown.connect
def celery_shutdown():
close_connections()

@celery_app.task()
def trigger_remote(service_id, affected_connection_external_ids):
from tapiriik.auth import User
from tapiriik.services import Service
svc = Service.FromID(service_id)
db.connections.update({"Service": svc.ID, "ExternalID": {"$in": affected_connection_external_ids}}, {"$set":{"TriggerPartialSync": True, "TriggerPartialSyncTimestamp": datetime.utcnow()}}, multi=True, w=MONGO_FULL_WRITE_CONCERN)
affected_connection_ids = db.connections.find({"Service": svc.ID, "ExternalID": {"$in": affected_connection_external_ids}}, {"_id": 1})
affected_connection_ids = [x["_id"] for x in affected_connection_ids]
trigger_users_query = User.PaidUserMongoQuery()
trigger_users_query.update({"ConnectedServices.ID": {"$in": affected_connection_ids}})
trigger_users_query.update({"Config.suppress_auto_sync": {"$ne": True}})
db.users.update(trigger_users_query, {"$set": {"NextSynchronization": datetime.utcnow()}}, multi=True) # It would be nicer to use the Sync.Schedule... method, but I want to cleanly do this in bulk
7 changes: 4 additions & 3 deletions sync_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@
while True:
generation = str(uuid.uuid4())
queueing_at = datetime.utcnow()
users = list(db.users.find(
users = list(db.users.with_options(read_preference=ReadPreference.PRIMARY).find(
{
"NextSynchronization": {"$lte": datetime.utcnow()},
"QueuedAt": {"$exists": False}
},
{
"_id": True,
"SynchronizationHostRestriction": True
},
read_preference=ReadPreference.PRIMARY
}
))
scheduled_ids = [x["_id"] for x in users]
print("Found %d users at %s" % (len(scheduled_ids), datetime.utcnow()))
db.users.update({"_id": {"$in": scheduled_ids}}, {"$set": {"QueuedAt": queueing_at, "QueuedGeneration": generation}, "$unset": {"NextSynchronization": True}}, multi=True, w=MONGO_FULL_WRITE_CONCERN)
print("Marked %d users as queued at %s" % (len(scheduled_ids), datetime.utcnow()))
for user in users:
producer.publish({"user_id": str(user["_id"]), "generation": generation}, routing_key=user["SynchronizationHostRestriction"] if "SynchronizationHostRestriction" in user and user["SynchronizationHostRestriction"] else "")
print("Scheduled %d users at %s" % (len(scheduled_ids), datetime.utcnow()))
Expand Down
Loading

0 comments on commit 7be379e

Please sign in to comment.