Skip to content

Commit

Permalink
Gives Publish more threads to use while avoiding collision.
Browse files Browse the repository at this point in the history
  • Loading branch information
jajreidy committed Dec 10, 2024
1 parent 36f441f commit c5dfa48
Showing 1 changed file with 20 additions and 10 deletions.
30 changes: 20 additions & 10 deletions src/pubtools/_marketplacesvm/tasks/push/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class MarketplacesVMPush(MarketplacesVMTask, CloudService, CollectorService, Sta
"""Push and publish content to various cloud marketplaces."""

_REQUEST_THREADS = int(os.environ.get("MARKETPLACESVM_PUSH_REQUEST_THREADS", "5"))
_PROCESS_THREADS = int(os.environ.get("MARKETPLACESVM_PUSH_PROCESS_THREADS", "5"))
_PROCESS_THREADS = int(os.environ.get("MARKETPLACESVM_PUSH_PROCESS_THREADS", "10"))
_SKIPPED = False

@property
Expand Down Expand Up @@ -303,18 +303,28 @@ def push_function(mapped_item, marketplace, starmap_query) -> Dict[str, Any]:

res_output = []

# Sequentially publish the uploaded items for each marketplace.
# It's recommended to do this operation sequentially since parallel publishing in the
# same marketplace may cause errors due to the change set already being applied.
# Go through destinations and mappings to ensure that we are pushing
# to only a single marketplace + dest at a time.
publish_map = {}
for mapped_item, starmap_query in upload_result:
to_await = []
executor = Executors.thread_pool(
name="pubtools-marketplacesvm-push-regions",
max_workers=min(max(len(mapped_item.marketplaces), 1), self._PROCESS_THREADS),
)
for marketplace in mapped_item.marketplaces:
pi = mapped_item.get_push_item_for_marketplace(marketplace)
target_name = f"{marketplace}_{pi.dest[0].destination}"
if not publish_map.get(target_name):
publish_map[target_name] = []
publish_map[target_name].append({"mapped_item": mapped_item,
"marketplace": marketplace,
"starmap_query": starmap_query})

to_await = []
executor = Executors.thread_pool(
name="pubtools-marketplacesvm-push-regions",
max_workers=min(max(len(publish_map.keys()), 1), self._PROCESS_THREADS),
)
for kwargs_list in publish_map.values():
for kwargs in kwargs_list:
to_await.append(
executor.submit(push_function, mapped_item, marketplace, starmap_query)
executor.submit(push_function, **kwargs)
)

for f_out in to_await:
Expand Down

0 comments on commit c5dfa48

Please sign in to comment.