Skip to content

Commit

Permalink
Merge branch 'master' of github.com:danilop/yas3fs
Browse files Browse the repository at this point in the history
  • Loading branch information
danilop committed Mar 9, 2014
2 parents 2b8dead + 1a5eeac commit 281a01b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 19 deletions.
43 changes: 25 additions & 18 deletions yas3fs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def delete(self, prop=None):
remove_empty_dirs_for_file(etag_filename)
self.content = None # If not
self.update_size(True)
for p in self.props:
for p in self.props.keys():
self.delete(p)
elif prop in self.props:
if prop == 'range':
Expand Down Expand Up @@ -712,41 +712,41 @@ def check_threads(self, first=False):
for i in range(self.download_num):
if thread_is_not_alive(self.download_threads[i]):
logger.debug("%s download thread #%i" % (display, i))
self.download_threads[i] = threading.Thread(target=self.download)
self.download_threads[i] = TracebackLoggingThread(target=self.download)
self.download_threads[i].deamon = True
self.download_threads[i].start()

for i in range(self.prefetch_num):
if thread_is_not_alive(self.prefetch_threads[i]):
logger.debug("%s prefetch thread #%i" % (display, i))
self.prefetch_threads[i] = threading.Thread(target=self.download, args=(True,))
self.prefetch_threads[i] = TracebackLoggingThread(target=self.download, args=(True,))
self.prefetch_threads[i].deamon = True
self.prefetch_threads[i].start()

if self.sns_topic_arn:
if thread_is_not_alive(self.publish_thread):
logger.debug("%s publish thread" % display)
self.publish_thread = threading.Thread(target=self.publish_messages)
self.publish_thread = TracebackLoggingThread(target=self.publish_messages)
self.publish_thread.daemon = True
self.publish_thread.start()

if self.sqs_queue_name:
if thread_is_not_alive(self.queue_listen_thread):
logger.debug("%s queue listen thread" % display)
self.queue_listen_thread = threading.Thread(target=self.listen_for_messages_over_sqs)
self.queue_listen_thread = TracebackLoggingThread(target=self.listen_for_messages_over_sqs)
self.queue_listen_thread.daemon = True
self.queue_listen_thread.start()

if self.sns_http_port:
if thread_is_not_alive(self.http_listen_thread):
logger.debug("%s HTTP listen thread" % display)
self.http_listen_thread = threading.Thread(target=self.listen_for_messages_over_http)
self.http_listen_thread = TracebackLoggingThread(target=self.listen_for_messages_over_http)
self.http_listen_thread.daemon = True
self.http_listen_thread.start()

if thread_is_not_alive(self.check_cache_thread):
logger.debug("%s check cache thread" % display)
self.check_cache_thread = threading.Thread(target=self.check_cache_size)
self.check_cache_thread = TracebackLoggingThread(target=self.check_cache_size)
self.check_cache_thread.daemon = True
self.check_cache_thread.start()

Expand All @@ -768,7 +768,7 @@ def init(self, path):

self.check_threads(first=True)

self.check_status_thread = threading.Thread(target=self.check_status)
self.check_status_thread = TracebackLoggingThread(target=self.check_status)
self.check_status_thread.daemon = True
self.check_status_thread.start()

Expand Down Expand Up @@ -898,7 +898,7 @@ def process_message(self, messages):
if c[2] != None:
self.invalidate_cache(c[2], c[3])
else: # Invalidate all the cached data
for path in self.cache.entries:
for path in self.cache.entries.keys():
self.invalidate_cache(path)
elif c[1] == 'md':
if c[2]:
Expand Down Expand Up @@ -1042,7 +1042,7 @@ def check_cache_size(self):
self.cache.lru.append(path)
else:
# Check for unused locks to be removed
for path in self.cache.unused_locks:
for path in self.cache.unused_locks.keys():
logger.debug("check_cache_size purge unused lock: '%s'" % (path))
try:
with self.cache.lock and self.cache.new_locks[path]:
Expand All @@ -1056,7 +1056,7 @@ def check_cache_size(self):
except KeyError:
pass
# Look for unused locks to be removed at next iteration (if still "new")
for path in self.cache.new_locks:
for path in self.cache.new_locks.keys():
logger.debug("check_cache_size purge unused lock: '%s' added to list" % (path))
self.cache.unused_locks[path] = True # Just a flag

Expand Down Expand Up @@ -1973,7 +1973,7 @@ def multipart_upload(self, key_path, data, full_size, headers, metadata):
mpu = self.s3_bucket.initiate_multipart_upload(key_path, headers=headers, metadata=metadata)
num_threads = min(part_num, self.multipart_num)
for i in range(num_threads):
t = threading.Thread(target=self.part_upload, args=(mpu, part_queue))
t = TracebackLoggingThread(target=self.part_upload, args=(mpu, part_queue))
t.demon = True
t.start()
logger.debug("multipart_upload thread '%i' started" % i)
Expand Down Expand Up @@ -2129,6 +2129,15 @@ def statfs(self, path):
}
return {}

class TracebackLoggingThread(threading.Thread):
def run(self):
try:
super(TracebackLoggingThread, self).run()
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
logger.exception("Uncaught Exception in Thread")

### Utility functions

def errorAndExit(error, exitCode=1):
Expand Down Expand Up @@ -2191,6 +2200,9 @@ def has_elements(iter, num=1):
def thread_is_not_alive(t):
return t == None or not t.is_alive()

def custom_sys_excepthook(type, value, traceback):
logger.exception("Uncaught Exception")

### Main

def main():
Expand Down Expand Up @@ -2315,12 +2327,7 @@ def main():
else:
logger.setLevel(logging.INFO)

def custom_sys_excepthook(type, value, traceback):
logger.error("Uncaught Exception")
logger.error("Type: %s" % type)
logger.error("Value: %s" % value)
logger.error("Traceback: %s" % traceback)
sys.excepthook = custom_sys_excepthook
sys.excepthook = custom_sys_excepthook # This is not working for new threads that start afterwards

logger.debug("options = %s" % options)

Expand Down
2 changes: 1 addition & 1 deletion yas3fs/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.1.9'
__version__ = '2.1.10'

0 comments on commit 281a01b

Please sign in to comment.