Skip to content

Commit

Permalink
formatter
Browse files Browse the repository at this point in the history
  • Loading branch information
jkppr committed Jan 8, 2025
1 parent 702255f commit 62b6059
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions timesketch/lib/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,9 +939,11 @@ def run_csv_jsonl(
unique_keys = set(current_index_mapping_properties)

try:
current_limit = int(opensearch.client.indices.get_settings(index=index_name)[
index_name
]["settings"]["index"]["mapping"]["total_fields"]["limit"])
current_limit = int(
opensearch.client.indices.get_settings(index=index_name)[index_name][
"settings"
]["index"]["mapping"]["total_fields"]["limit"]
)
except KeyError:
current_limit = 1000

Expand All @@ -953,7 +955,7 @@ def run_csv_jsonl(
unique_keys.update(event.keys())
# Calculating the new limit. Each unique key is counted twice due to
# the "keayword" type plus a percentage buffer (default 20%).
new_limit = int((len(unique_keys)*2) * (1 + limit_buffer_percentage))
new_limit = int((len(unique_keys) * 2) * (1 + limit_buffer_percentage))
# To prevent mapping explosions we still check against an upper
# mapping limit set in timesketch.conf (default: 2000).
if new_limit > upper_mapping_limit:
Expand All @@ -965,12 +967,15 @@ def run_csv_jsonl(
"data or adjust OPENSEARCH_MAPPING_UPPER_LIMIT."
)
logger.error(error_msg)
_set_datasource_status(timeline_id, file_path, "fail", error_message=str(error_msg))
_set_datasource_status(
timeline_id, file_path, "fail", error_message=str(error_msg)
)
return None

if new_limit > current_limit:
opensearch.client.indices.put_settings(
index=index_name, body={"index.mapping.total_fields.limit": new_limit}
index=index_name,
body={"index.mapping.total_fields.limit": new_limit},
)
logger.info(
"OpenSearch index [%s] mapping limit increased to: %d",
Expand All @@ -986,7 +991,7 @@ def run_csv_jsonl(
results = opensearch.flush_queued_events()

error_container = results.get("error_container", {})
error_count = len(error_container.get(index_name, {}).get('errors', []))
error_count = len(error_container.get(index_name, {}).get("errors", []))
error_msg = get_import_errors(
error_container=error_container,
index_name=index_name,
Expand All @@ -1004,7 +1009,9 @@ def run_csv_jsonl(
except Exception as e: # pylint: disable=broad-except
# Mark the searchindex and timelines as failed and exit the task
error_msg = traceback.format_exc()
_set_datasource_status(timeline_id, file_path, "fail", error_message=str(error_msg))
_set_datasource_status(
timeline_id, file_path, "fail", error_message=str(error_msg)
)
logger.error("Error: {0!s}\n{1:s}".format(e, error_msg))
return None

Expand All @@ -1026,7 +1033,9 @@ def run_csv_jsonl(
)

# Set status to ready when done
_set_datasource_status(timeline_id, file_path, "ready", error_message=str(error_msg))
_set_datasource_status(
timeline_id, file_path, "ready", error_message=str(error_msg)
)

return index_name

Expand Down

0 comments on commit 62b6059

Please sign in to comment.