diff --git a/tests/ttnn/unit_tests/operations/ccl/perf/async_perf_csv.py b/tests/ttnn/unit_tests/operations/ccl/perf/async_perf_csv.py index d01c7bd9adb..9a4bff171e0 100644 --- a/tests/ttnn/unit_tests/operations/ccl/perf/async_perf_csv.py +++ b/tests/ttnn/unit_tests/operations/ccl/perf/async_perf_csv.py @@ -18,7 +18,7 @@ def perf_report(file_path): def remove_keys_from_attributes(attributes): attributes = attributes.replace(";", ",").replace("'", '"') - keys_to_remove = ["receiver_device_id", "ring_index", "sender_device_id"] + keys_to_remove = ["receiver_device_id", "ring_index", "sender_device_id", "forward_device", "backward_device"] try: attributes_dict = eval(attributes) @@ -63,7 +63,9 @@ def safe_parse_attributes(attributes): ) df["num_links"] = df["ATTRIBUTES"].apply( - lambda x: safe_parse_attributes(x).get("num_links", "") if isinstance(safe_parse_attributes(x), dict) else "" + lambda x: safe_parse_attributes(x).get("num_links", safe_parse_attributes(x).get("num_links_preferred", "")) + if isinstance(safe_parse_attributes(x), dict) + else "" ) df["output_mem_config"] = df["ATTRIBUTES"].apply( @@ -164,12 +166,10 @@ def safe_parse_attributes(attributes): op_code = averages_df.iloc[0]["OP CODE"] today = time.strftime("%Y_%m_%d") - if op_code == "AllGather": - ccl_perf_file_path = f"CCL_all_gather_Perf_{today}.csv" - elif op_code == "AllGatherAsyn": + if op_code == "AllGatherAsync": ccl_perf_file_path = f"CCL_all_gather_async_Perf_{today}.csv" - elif op_code == "ReduceScatter": - ccl_perf_file_path = f"CCL_reduce_scatter_Perf_{today}.csv" + elif op_code == "ReduceScatterAsync": + ccl_perf_file_path = f"CCL_reduce_scatter_async_Perf_{today}.csv" else: ccl_perf_file_path = f"CCL_Perf_{today}.csv" diff --git a/tests/ttnn/unit_tests/operations/ccl/perf/run_async_reduce_scatter_profile.sh b/tests/ttnn/unit_tests/operations/ccl/perf/run_async_reduce_scatter_profile.sh new file mode 100755 index 00000000000..c2476dcedf1 --- /dev/null +++ b/tests/ttnn/unit_tests/operations/ccl/perf/run_async_reduce_scatter_profile.sh @@ -0,0 +1,107 @@ +#!/bin/sh +MODULE_DIR="tests/ttnn/unit_tests/operations/ccl/perf" + +# Defaults +DEBUG=false +TARGET="n300" + +# Function to display help +show_help() { + echo "Usage: ./tests/ttnn/unit_tests/operations/ccl/perf/run_profile.sh [OPTIONS]" + echo + echo "Options:" + echo " -d, --debug Enable debug mode to show real-time output." + echo " -t, --target Specify the target configuration (t3000 or n300 or tg). Default is n300." + echo " -h, --help Display this help message." + echo + echo "Example:" + echo " ./tests/ttnn/unit_tests/operations/ccl/perf/run_profile.sh --debug --target n300" + echo " ./tests/ttnn/unit_tests/operations/ccl/perf/run_profile.sh -h" +} + +# Parse command-line arguments +while [ $# -gt 0 ]; do + case "$1" in + --debug|-d) + DEBUG=true + shift + ;; + --help|-h) + show_help + exit 0 + ;; + --target|-t) + # Ensure there is an argument following the target flag + if [ -z "$2" ]; then + echo "Error: No target specified after $1." + show_help + exit 1 + fi + + TARGET="$2" # Set the target configuration + shift 2 + + # Validate the target value + if [ "$TARGET" != "t3000" ] && [ "$TARGET" != "tg" ] && [ "$TARGET" != "n300" ]; then + echo "Error: Invalid target configuration: $TARGET. Must be 't3000' or 'n300' or 'tg'." + exit 1 + fi + ;; + *) + echo "Unknown option: $1" + show_help + exit 1 + ;; + esac +done + +# Function to run the profiling command and extract the CSV path +run_profile_and_extract_csv() { + command="./tt_metal/tools/profiler/profile_this.py -n reduce_scatter_async_$TARGET -c 'pytest tests/ttnn/unit_tests/operations/ccl/perf/test_ccl_async_perf.py::test_reduce_scatter_async_$TARGET'" + + if [ "$DEBUG" = true ]; then + echo "Running profiling command for target $TARGET in debug mode..." + full_output=$(eval $command 2>&1 | tee /dev/tty) + else + echo "Running profiling command for target $TARGET..." + full_output=$(eval $command 2>&1) + fi + + # Extract the CSV path + csv_path=$(echo "$full_output" | grep -oE 'OPs csv generated at: (.+\.csv)' | sed -E 's/OPs csv generated at: //') + + if [ -n "$csv_path" ]; then + echo "CSV path found: $csv_path" + echo "Generating performance report..." + + tmp_file="/tmp/perf_report_output.log" + PYTHONPATH="$MODULE_DIR" python3 -c " +import sys +import pandas as pd +from async_perf_csv import perf_report +from tabulate import tabulate + +try: + # Generate the report and convert it to a DataFrame + average_df = perf_report('$csv_path') + # Print the DataFrame in a pretty table format + print('Min - Avg - Max by Common Runs:') + print(tabulate(average_df, headers='keys', tablefmt='pretty')) +except Exception as e: + print(f'Error in performance report generation: {e}', file=sys.stderr) + sys.exit(1) +" 2>&1 | tee "$tmp_file" + + if grep -q "Error in performance report generation" "$tmp_file"; then + echo "Error: Performance report generation failed." + exit 1 + fi + + else + echo "CSV path not found in the command output." + exit 1 + fi +} + +# Run the function +run_profile_and_extract_csv diff --git a/tests/ttnn/unit_tests/operations/ccl/perf/test_ccl_async_perf.py b/tests/ttnn/unit_tests/operations/ccl/perf/test_ccl_async_perf.py index 9b3361fc5e4..16a5a3ac24e 100644 --- a/tests/ttnn/unit_tests/operations/ccl/perf/test_ccl_async_perf.py +++ b/tests/ttnn/unit_tests/operations/ccl/perf/test_ccl_async_perf.py @@ -8,6 +8,9 @@ from tests.ttnn.unit_tests.operations.ccl.test_new_all_gather import ( run_all_gather_impl, ) +from tests.ttnn.unit_tests.operations.ccl.test_reduce_scatter_async import ( + run_reduce_scatter_test, +) from tests.ttnn.unit_tests.operations.ccl.test_all_gather_TG_post_commit import ( run_line_all_gather_on_TG_with_mesh_tensor_along_rows, ) @@ -147,3 +150,73 @@ def test_all_gather_async_tg( teardown_persistent_fabric=True, trace_mode=True, ) + + +@skip_for_grayskull("Requires eth connected devices to run") +@pytest.mark.parametrize( + "num_devices, num_links", + [ + (4, 1), + ], +) +@pytest.mark.parametrize( + "per_chip_output_shape, dim, layout", + [ + ([1, 1, 32, 32], 3, ttnn.TILE_LAYOUT), + # ([1, 1, 32, 32 * 2], 3, ttnn.TILE_LAYOUT), + # ([1, 1, 64, 32], 3, ttnn.TILE_LAYOUT), + # ([1, 1, 64, 64], 3, ttnn.TILE_LAYOUT), + # ([1, 1, 128, 128], 0, ttnn.TILE_LAYOUT), + ], +) +@pytest.mark.parametrize( + "input_dtype", + [ + ttnn.bfloat16, + # ttnn.bfloat8_b, + ], +) +@pytest.mark.parametrize( + "mem_config", + [ + ttnn.MemoryConfig(buffer_type=ttnn.BufferType.DRAM), + # ttnn.MemoryConfig(buffer_type=ttnn.BufferType.L1), + ], +) +@pytest.mark.parametrize("math_op", [ttnn.ReduceType.Sum]) +@pytest.mark.parametrize("enable_async", [False]) +@pytest.mark.parametrize("trace_mode", [True]) +@pytest.mark.parametrize("device_params", [{"trace_region_size": 1824800}], indirect=True) +def test_reduce_scatter_async_t3000( + t3k_mesh_device, + num_devices, + per_chip_output_shape, + dim, + num_links, + math_op, + input_dtype, + layout, + mem_config, + use_program_cache, + function_level_defaults, + enable_async, + trace_mode, + num_iters=20, +): + run_reduce_scatter_test( + t3k_mesh_device, + num_devices, + per_chip_output_shape, + dim, + num_links, + math_op, + input_dtype, + layout, + mem_config, + use_program_cache, + function_level_defaults, + num_iters=num_iters, + enable_async=enable_async, + topology=ttnn.Topology.Linear, + trace_mode=trace_mode, + ) diff --git a/tests/ttnn/unit_tests/operations/ccl/test_reduce_scatter_async.py b/tests/ttnn/unit_tests/operations/ccl/test_reduce_scatter_async.py index 074efe87904..52a9934d469 100644 --- a/tests/ttnn/unit_tests/operations/ccl/test_reduce_scatter_async.py +++ b/tests/ttnn/unit_tests/operations/ccl/test_reduce_scatter_async.py @@ -42,19 +42,22 @@ def run_with_trace( output_mem_config, num_iters=40, topology=ttnn.Topology.Ring, - subdevice_id=None, + from_remote_semaphore_handles=None, + to_remote_semaphore_handles=None, + worker_sub_device_id=None, ): # Compile Run logger.info("Compiling model") - output_tensor_mesh = ttnn.reduce_scatter_async( + output_tensor_mesh = ttnn.experimental.reduce_scatter_async( input_tensor_mesh, dim=dim, + from_remote_multi_device_global_semaphore=from_remote_semaphore_handles, + to_remote_multi_device_global_semaphore=to_remote_semaphore_handles, math_op=math_op, num_links=num_links, memory_config=output_mem_config, topology=topology, - subdevice_id=subdevice_id, - create_semaphore_handles=True, + subdevice_id=worker_sub_device_id, ) for device_id in t3k_mesh_device.get_device_ids(): ttnn.synchronize_device(t3k_mesh_device.get_device(device_id)) @@ -63,15 +66,16 @@ def run_with_trace( logger.info("Capturing trace") trace_id = ttnn.begin_trace_capture(t3k_mesh_device, cq_id=0) for i in range(num_iters): - output_tensor_mesh = ttnn.reduce_scatter_async( + output_tensor_mesh = ttnn.experimental.reduce_scatter_async( input_tensor_mesh, dim=dim, + from_remote_multi_device_global_semaphore=from_remote_semaphore_handles, + to_remote_multi_device_global_semaphore=to_remote_semaphore_handles, math_op=math_op, num_links=num_links, memory_config=output_mem_config, topology=topology, - subdevice_id=subdevice_id, - create_semaphore_handles=False, + subdevice_id=worker_sub_device_id, ) ttnn.end_trace_capture(t3k_mesh_device, trace_id, cq_id=0) for device_id in t3k_mesh_device.get_device_ids(): @@ -223,7 +227,9 @@ def run_reduce_scatter_test( output_mem_config, num_iters=num_iters, topology=topology, - subdevice_id=worker_sub_device_id, + from_remote_semaphore_handles=from_remote_semaphore_handles, + to_remote_semaphore_handles=to_remote_semaphore_handles, + worker_sub_device_id=worker_sub_device_id, ) else: logger.info(f"Running {num_iters} iterations of reduce scatter")