diff --git a/yas3fs/__init__.py b/yas3fs/__init__.py index cc551cc..c1c1204 100755 --- a/yas3fs/__init__.py +++ b/yas3fs/__init__.py @@ -277,7 +277,7 @@ def delete(self, prop=None): remove_empty_dirs_for_file(etag_filename) self.content = None # If not self.update_size(True) - for p in self.props: + for p in self.props.keys(): self.delete(p) elif prop in self.props: if prop == 'range': @@ -712,41 +712,41 @@ def check_threads(self, first=False): for i in range(self.download_num): if thread_is_not_alive(self.download_threads[i]): logger.debug("%s download thread #%i" % (display, i)) - self.download_threads[i] = threading.Thread(target=self.download) + self.download_threads[i] = TracebackLoggingThread(target=self.download) self.download_threads[i].deamon = True self.download_threads[i].start() for i in range(self.prefetch_num): if thread_is_not_alive(self.prefetch_threads[i]): logger.debug("%s prefetch thread #%i" % (display, i)) - self.prefetch_threads[i] = threading.Thread(target=self.download, args=(True,)) + self.prefetch_threads[i] = TracebackLoggingThread(target=self.download, args=(True,)) self.prefetch_threads[i].deamon = True self.prefetch_threads[i].start() if self.sns_topic_arn: if thread_is_not_alive(self.publish_thread): logger.debug("%s publish thread" % display) - self.publish_thread = threading.Thread(target=self.publish_messages) + self.publish_thread = TracebackLoggingThread(target=self.publish_messages) self.publish_thread.daemon = True self.publish_thread.start() if self.sqs_queue_name: if thread_is_not_alive(self.queue_listen_thread): logger.debug("%s queue listen thread" % display) - self.queue_listen_thread = threading.Thread(target=self.listen_for_messages_over_sqs) + self.queue_listen_thread = TracebackLoggingThread(target=self.listen_for_messages_over_sqs) self.queue_listen_thread.daemon = True self.queue_listen_thread.start() if self.sns_http_port: if thread_is_not_alive(self.http_listen_thread): logger.debug("%s HTTP listen thread" % display) - self.http_listen_thread = threading.Thread(target=self.listen_for_messages_over_http) + self.http_listen_thread = TracebackLoggingThread(target=self.listen_for_messages_over_http) self.http_listen_thread.daemon = True self.http_listen_thread.start() if thread_is_not_alive(self.check_cache_thread): logger.debug("%s check cache thread" % display) - self.check_cache_thread = threading.Thread(target=self.check_cache_size) + self.check_cache_thread = TracebackLoggingThread(target=self.check_cache_size) self.check_cache_thread.daemon = True self.check_cache_thread.start() @@ -768,7 +768,7 @@ def init(self, path): self.check_threads(first=True) - self.check_status_thread = threading.Thread(target=self.check_status) + self.check_status_thread = TracebackLoggingThread(target=self.check_status) self.check_status_thread.daemon = True self.check_status_thread.start() @@ -898,7 +898,7 @@ def process_message(self, messages): if c[2] != None: self.invalidate_cache(c[2], c[3]) else: # Invalidate all the cached data - for path in self.cache.entries: + for path in self.cache.entries.keys(): self.invalidate_cache(path) elif c[1] == 'md': if c[2]: @@ -1042,7 +1042,7 @@ def check_cache_size(self): self.cache.lru.append(path) else: # Check for unused locks to be removed - for path in self.cache.unused_locks: + for path in self.cache.unused_locks.keys(): logger.debug("check_cache_size purge unused lock: '%s'" % (path)) try: with self.cache.lock and self.cache.new_locks[path]: @@ -1056,7 +1056,7 @@ def check_cache_size(self): except KeyError: pass # Look for unused locks to be removed at next iteration (if still "new") - for path in self.cache.new_locks: + for path in self.cache.new_locks.keys(): logger.debug("check_cache_size purge unused lock: '%s' added to list" % (path)) self.cache.unused_locks[path] = True # Just a flag @@ -1973,7 +1973,7 @@ def multipart_upload(self, key_path, data, full_size, headers, metadata): mpu = self.s3_bucket.initiate_multipart_upload(key_path, headers=headers, metadata=metadata) num_threads = min(part_num, self.multipart_num) for i in range(num_threads): - t = threading.Thread(target=self.part_upload, args=(mpu, part_queue)) + t = TracebackLoggingThread(target=self.part_upload, args=(mpu, part_queue)) t.demon = True t.start() logger.debug("multipart_upload thread '%i' started" % i) @@ -2129,6 +2129,15 @@ def statfs(self, path): } return {} +class TracebackLoggingThread(threading.Thread): + def run(self): + try: + super(TracebackLoggingThread, self).run() + except (KeyboardInterrupt, SystemExit): + raise + except Exception: + logger.exception("Uncaught Exception in Thread") + ### Utility functions def errorAndExit(error, exitCode=1): @@ -2191,6 +2200,9 @@ def has_elements(iter, num=1): def thread_is_not_alive(t): return t == None or not t.is_alive() +def custom_sys_excepthook(type, value, traceback): + logger.exception("Uncaught Exception") + ### Main def main(): @@ -2315,12 +2327,7 @@ def main(): else: logger.setLevel(logging.INFO) - def custom_sys_excepthook(type, value, traceback): - logger.error("Uncaught Exception") - logger.error("Type: %s" % type) - logger.error("Value: %s" % value) - logger.error("Traceback: %s" % traceback) - sys.excepthook = custom_sys_excepthook + sys.excepthook = custom_sys_excepthook # This is not working for new threads that start afterwards logger.debug("options = %s" % options) diff --git a/yas3fs/_version.py b/yas3fs/_version.py index d3a156b..3087de1 100644 --- a/yas3fs/_version.py +++ b/yas3fs/_version.py @@ -1 +1 @@ -__version__ = '2.1.9' +__version__ = '2.1.10'