Skip to content

Commit

Permalink
Thread monitoring with restart added.
Browse files Browse the repository at this point in the history
  • Loading branch information
danilop committed Mar 6, 2014
1 parent 6cf5a01 commit aa28dec
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 52 deletions.
133 changes: 81 additions & 52 deletions yas3fs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,50 +701,86 @@ def __init__(self, options):

signal.signal(signal.SIGINT, self.handler)

def check_threads(self, first=False):
logger.debug("check_threads '%s'" % first)

if first:
display = 'Starting'
else:
display = 'Restarting'

for i in range(self.download_num):
if thread_is_not_alive(self.download_threads[i]):
logger.debug("%s download thread #%i" % (display, i))
self.download_threads[i] = threading.Thread(target=self.download)
self.download_threads[i].deamon = True
self.download_threads[i].start()

for i in range(self.prefetch_num):
if thread_is_not_alive(self.prefetch_threads[i]):
logger.debug("%s prefetch thread #%i" % (display, i))
self.prefetch_threads[i] = threading.Thread(target=self.download, args=(True,))
self.prefetch_threads[i].deamon = True
self.prefetch_threads[i].start()

if self.sns_topic_arn:
if thread_is_not_alive(self.publish_thread):
logger.debug("%s publish thread" % display)
self.publish_thread = threading.Thread(target=self.publish_messages)
self.publish_thread.daemon = True
self.publish_thread.start()

if self.sqs_queue_name:
if thread_is_not_alive(self.queue_listen_thread):
logger.debug("%s queue listen thread" % display)
self.queue_listen_thread = threading.Thread(target=self.listen_for_messages_over_sqs)
self.queue_listen_thread.daemon = True
self.queue_listen_thread.start()

if self.sns_http_port:
if thread_is_not_alive(self.http_listen_thread):
logger.debug("%s HTTP listen thread" % display)
self.http_listen_thread = threading.Thread(target=self.listen_for_messages_over_http)
self.http_listen_thread.daemon = True
self.http_listen_thread.start()

if thread_is_not_alive(self.check_cache_thread):
logger.debug("%s check cache thread" % display)
self.check_cache_thread = threading.Thread(target=self.check_cache_size)
self.check_cache_thread.daemon = True
self.check_cache_thread.start()

def init(self, path):
logger.debug("init '%s'" % (path))
self.publish_thread = threading.Thread(target=self.publish_messages)
self.publish_thread.daemon = True
self.publish_thread.start()

self.download_threads = {}
for i in range(self.download_num):
self.download_threads[i] = threading.Thread(target=self.download)
self.download_threads[i].deamon = True
self.download_threads[i].start()

self.download_threads[i] = None
self.prefetch_threads = {}
for i in range(self.prefetch_num):
self.prefetch_threads[i] = threading.Thread(target=self.download, args=(True,))
self.prefetch_threads[i].deamon = True
self.prefetch_threads[i].start()
self.prefetch_threads[i] = None

self.publish_thread = None
self.queue_listen_thread = None
self.http_listen_thread = None

self.check_cache_thread = None

self.check_threads(first=True)

self.check_status_thread = threading.Thread(target=self.check_status)
self.check_status_thread.daemon = True
self.check_status_thread.start()

if self.sqs_queue_name:
self.queue_listen_thread = threading.Thread(target=self.listen_for_messages_over_sqs)
self.queue_listen_thread.daemon = True
self.queue_listen_thread.start()
logger.debug("Subscribing '%s' to '%s'" % (self.sqs_queue_name, self.sns_topic_arn))
response = self.sns.subscribe_sqs_queue(self.sns_topic_arn, self.queue)
self.sqs_subscription = response['SubscribeResponse']['SubscribeResult']['SubscriptionArn']
logger.debug('SNS SQS subscription = %s' % self.sqs_subscription)
else:
self.queue_listen_thread = None

if self.sns_http_port:
self.http_listen_thread = threading.Thread(target=self.listen_for_messages_over_http)
self.http_listen_thread.daemon = True
self.http_listen_thread.start()
self.sns.subscribe(self.sns_topic_arn, 'http', self.http_listen_url)
else:
self.http_listen_thread = None

self.check_status = threading.Thread(target=self.check_status)
self.check_status.daemon = True
self.check_status.start()

self.check_cache_thread = threading.Thread(target=self.check_cache_size)
self.check_cache_thread.daemon = True
self.check_cache_thread.start()
self.sns.subscribe(self.sns_topic_arn, 'http', self.http_listen_url)

def handler(signum, frame):
self.destroy('/')
Expand Down Expand Up @@ -956,19 +992,7 @@ def check_status(self):
logger.debug("gc count0/threshold0, count1/threshold1, count2/threshold2: %i/%i, %i/%i, %i/%i"
% (count0,threshold0,count1,threshold1,count2,threshold2))

if self.running:
for i in self.download_threads:
if not self.download_threads[i].is_alive():
logger.debug("Download thread restarted!")
self.download_threads[i] = threading.Thread(target=self.download)
self.download_threads[i].deamon = True
self.download_threads[i].start()
for i in self.prefetch_threads:
if not self.prefetch_threads[i].is_alive():
logger.debug("Prefetch thread restarted!")
self.prefetch_threads[i] = threading.Thread(target=self.download, args=(True,))
self.prefetch_threads[i].deamon = True
self.prefetch_threads[i].start()
self.check_threads()

time.sleep(self.check_status_interval)

Expand All @@ -978,6 +1002,7 @@ def check_cache_size(self):

while self.cache_entries:

logger.debug("check_cache_size get_memory_usage")
num_entries, mem_size, disk_size = self.cache.get_memory_usage()

purge = False
Expand All @@ -995,43 +1020,44 @@ def check_cache_size(self):
# Need to purge something
path = self.cache.lru.popleft() # Take a path on top of the LRU (least used)
with self.cache.get_lock(path):
if self.cache.has(path):
logger.debug("purge: '%s' '%s' ?" % (store, path))
if self.cache.has(path): # Path may be deleted before I acquire the lock
logger.debug("check_cache_size purge: '%s' '%s' ?" % (store, path))
data = self.cache.get(path, 'data')
full_delete = False
if (not data) or (data and (store == '' or data.store == store) and (not data.has('open')) and (not data.has('change'))):
if store == '':
logger.debug("purge: '%s' '%s' OK full" % (store, path))
logger.debug("check_cache_size purge: '%s' '%s' OK full" % (store, path))
self.cache.delete(path) # Remove completely from cache
full_delete = True
elif data:
logger.debug("purge: '%s' '%s' OK data" % (store, path))
logger.debug("check_cache_size purge: '%s' '%s' OK data" % (store, path))
self.cache.delete(path, 'data') # Just remove data
else:
logger.debug("purge: '%s' '%s' KO no data" % (store, path))
logger.debug("check_cache_size purge: '%s' '%s' KO no data" % (store, path))
else:
logger.debug("purge: '%s' '%s' KO data? %s open? %s change? %s"
logger.debug("check_cache_size purge: '%s' '%s' KO data? %s open? %s change? %s"
% (store, path, data != None, data and data.has('open'), data and data.has('change')))
if not full_delete:
self.cache.lru.append(path) # The entry is still there, let's append it again at the end of the RLU list
# The entry is still there, let's append it again at the end of the RLU list
self.cache.lru.append(path)
else:
# Check for unused locks to be removed
for path in self.cache.unused_locks:
logger.debug("purge unused lock: '%s'" % (path))
logger.debug("check_cache_size purge unused lock: '%s'" % (path))
try:
with self.cache.lock and self.cache.new_locks[path]:
del self.cache.new_locks[path]
logger.debug("purge unused lock: '%s' deleted" % (path))
logger.debug("check_cache_size purge unused lock: '%s' deleted" % (path))
except KeyError:
pass
try:
del self.cache.unused_locks[path]
logger.debug("purge unused lock: '%s' removed from list" % (path))
logger.debug("check_cache_size purge unused lock: '%s' removed from list" % (path))
except KeyError:
pass
# Look for unused locks to be removed at next iteration (if still "new")
for path in self.cache.new_locks:
logger.debug("purge unused lock: '%s' added to list" % (path))
logger.debug("check_cache_size purge unused lock: '%s' added to list" % (path))
self.cache.unused_locks[path] = True # Just a flag

# Sleep for some time
Expand Down Expand Up @@ -2162,6 +2188,9 @@ def has_elements(iter, num=1):
logger.debug("has_element '%s' KO" % (iter))
return False

def thread_is_not_alive(t):
return t == None or not t.is_alive()

### Main

def main():
Expand Down
Binary file added yas3fs/_version.pyc
Binary file not shown.

0 comments on commit aa28dec

Please sign in to comment.