From c5dfa48f288260365d256c6c5a92bd01dc95bee9 Mon Sep 17 00:00:00 2001 From: Jordan Reidy Date: Tue, 10 Dec 2024 10:55:17 -0500 Subject: [PATCH] Gives Publish more threads to use while avoiding collision. --- .../_marketplacesvm/tasks/push/command.py | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/src/pubtools/_marketplacesvm/tasks/push/command.py b/src/pubtools/_marketplacesvm/tasks/push/command.py index a09ea3d..d5d96b0 100644 --- a/src/pubtools/_marketplacesvm/tasks/push/command.py +++ b/src/pubtools/_marketplacesvm/tasks/push/command.py @@ -24,7 +24,7 @@ class MarketplacesVMPush(MarketplacesVMTask, CloudService, CollectorService, Sta """Push and publish content to various cloud marketplaces.""" _REQUEST_THREADS = int(os.environ.get("MARKETPLACESVM_PUSH_REQUEST_THREADS", "5")) - _PROCESS_THREADS = int(os.environ.get("MARKETPLACESVM_PUSH_PROCESS_THREADS", "5")) + _PROCESS_THREADS = int(os.environ.get("MARKETPLACESVM_PUSH_PROCESS_THREADS", "10")) _SKIPPED = False @property @@ -303,18 +303,28 @@ def push_function(mapped_item, marketplace, starmap_query) -> Dict[str, Any]: res_output = [] - # Sequentially publish the uploaded items for each marketplace. - # It's recommended to do this operation sequentially since parallel publishing in the - # same marketplace may cause errors due to the change set already being applied. + # Go through destinations and mappings to ensure that we are pushing + # to only a single marketplace + dest at a time. + publish_map = {} for mapped_item, starmap_query in upload_result: - to_await = [] - executor = Executors.thread_pool( - name="pubtools-marketplacesvm-push-regions", - max_workers=min(max(len(mapped_item.marketplaces), 1), self._PROCESS_THREADS), - ) for marketplace in mapped_item.marketplaces: + pi = mapped_item.get_push_item_for_marketplace(marketplace) + target_name = f"{marketplace}_{pi.dest[0].destination}" + if not publish_map.get(target_name): + publish_map[target_name] = [] + publish_map[target_name].append({"mapped_item": mapped_item, + "marketplace": marketplace, + "starmap_query": starmap_query}) + + to_await = [] + executor = Executors.thread_pool( + name="pubtools-marketplacesvm-push-regions", + max_workers=min(max(len(publish_map.keys()), 1), self._PROCESS_THREADS), + ) + for kwargs_list in publish_map.values(): + for kwargs in kwargs_list: to_await.append( - executor.submit(push_function, mapped_item, marketplace, starmap_query) + executor.submit(push_function, **kwargs) ) for f_out in to_await: