From 2529f6f0a175b5423aa2c3ab408a0be507cd44fa Mon Sep 17 00:00:00 2001 From: Jordan Reidy Date: Tue, 10 Dec 2024 10:55:17 -0500 Subject: [PATCH] Gives Publish more threads to use while avoiding collision. --- .../_marketplacesvm/tasks/push/command.py | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/pubtools/_marketplacesvm/tasks/push/command.py b/src/pubtools/_marketplacesvm/tasks/push/command.py index a09ea3d..8ac6c58 100644 --- a/src/pubtools/_marketplacesvm/tasks/push/command.py +++ b/src/pubtools/_marketplacesvm/tasks/push/command.py @@ -24,7 +24,7 @@ class MarketplacesVMPush(MarketplacesVMTask, CloudService, CollectorService, Sta """Push and publish content to various cloud marketplaces.""" _REQUEST_THREADS = int(os.environ.get("MARKETPLACESVM_PUSH_REQUEST_THREADS", "5")) - _PROCESS_THREADS = int(os.environ.get("MARKETPLACESVM_PUSH_PROCESS_THREADS", "5")) + _PROCESS_THREADS = int(os.environ.get("MARKETPLACESVM_PUSH_PROCESS_THREADS", "10")) _SKIPPED = False @property @@ -303,20 +303,32 @@ def push_function(mapped_item, marketplace, starmap_query) -> Dict[str, Any]: res_output = [] - # Sequentially publish the uploaded items for each marketplace. - # It's recommended to do this operation sequentially since parallel publishing in the - # same marketplace may cause errors due to the change set already being applied. + # Go through destinations and mappings to ensure that we are pushing + # to only a single marketplace + dest at a time. + publish_map: Dict[str, List[Dict[str, Any]]] = {} for mapped_item, starmap_query in upload_result: - to_await = [] - executor = Executors.thread_pool( - name="pubtools-marketplacesvm-push-regions", - max_workers=min(max(len(mapped_item.marketplaces), 1), self._PROCESS_THREADS), - ) for marketplace in mapped_item.marketplaces: - to_await.append( - executor.submit(push_function, mapped_item, marketplace, starmap_query) + pi = mapped_item.get_push_item_for_marketplace(marketplace) + target_name = f"{marketplace}_{pi.dest[0].destination}" + if not publish_map.get(target_name): + publish_map[target_name] = [] + publish_map[target_name].append( + { + "mapped_item": mapped_item, + "marketplace": marketplace, + "starmap_query": starmap_query, + } ) + to_await = [] + executor = Executors.thread_pool( + name="pubtools-marketplacesvm-push-regions", + max_workers=min(max(len(publish_map.keys()), 1), self._PROCESS_THREADS), + ) + for kwargs_list in publish_map.values(): + for kwargs in kwargs_list: + to_await.append(executor.submit(push_function, **kwargs)) + for f_out in to_await: res_output.append(f_out.result())