Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding CCL Async test cases to TG nightly and bug fix #16700

Merged
merged 3 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/nightly/tg/ccl/test_all_gather_async_nightly.py
1 change: 1 addition & 0 deletions tests/nightly/tg/ccl/test_reduce_scatter_async_nightly.py
3 changes: 1 addition & 2 deletions tests/scripts/tg/run_tg_nightly_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ run_tg_llama3_70b_tests() {

echo "LOG_METAL: Running run_tg_llama3_70b_tests"

pytest tests/nightly/tg/ccl/test_all_gather_nightly.py ; fail+=$?
pytest tests/nightly/tg/ccl/test_reduce_scatter_nightly.py ; fail+=$?
pytest -n auto tests/nightly/tg/ccl --timeout=180 ; fail+=$?

# Falcon40B prefill 60 layer end to end with 10 loops; we need 8x8 grid size
pytest tests/nightly/tg/models/demos/tg/llama3_70b ; fail+=$?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,9 @@ def run_line_all_gather_on_TG_with_mesh_tensor_along_rows(
topology=ttnn.Topology.Linear,
)

if enable_persistent_fabric:
logger.info(f"Waiting for op")
ttnn.synchronize_devices(mesh_device, sub_device_ids=sub_device_stall_group)
logger.info(f"Done iteration")
if enable_persistent_fabric:
caixunshiren marked this conversation as resolved.
Show resolved Hide resolved
ttnn.synchronize_devices(mesh_device, sub_device_ids=sub_device_stall_group)
ttnn.synchronize_devices(mesh_device, sub_device_ids=sub_device_stall_group)

if enable_persistent_fabric and teardown_persistent_fabric:
logger.info("Tearing down persistent fabric interface")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
# SPDX-FileCopyrightText: © 2025 Tenstorrent AI ULC

# SPDX-License-Identifier: Apache-2.0

import torch
import pytest
from loguru import logger
import ttnn
from tests.tt_eager.python_api_testing.sweep_tests.comparison_funcs import comp_equal, comp_pcc
from models.utility_functions import skip_for_grayskull
from tests.ttnn.unit_tests.operations.ccl.test_ccl_common import (
create_and_load_sub_device_manager_with_fabric_interface,
teardown_fabric_interface,
create_global_semaphore_with_same_address,
)

from tests.ttnn.unit_tests.operations.ccl.test_all_gather_TG_post_commit import (
run_line_all_gather_on_TG_with_mesh_tensor_along_rows,
)

from tests.ttnn.unit_tests.operations.ccl.test_new_all_gather import (
run_all_gather_impl,
)


# Enumerate the post-commit cases explicitly
@skip_for_grayskull("Requires eth connected devices to run")
@pytest.mark.parametrize(
"num_devices, num_links",
[(4, 1)],
# [(4, 3)], Multi-links fails https://github.com/tenstorrent/tt-metal/issues/16699
)
@pytest.mark.parametrize(
"input_dtype",
[
ttnn.bfloat16,
ttnn.bfloat8_b,
],
)
@pytest.mark.parametrize("shard_grid_orientation", [ttnn.ShardOrientation.ROW_MAJOR])
@pytest.mark.parametrize(
"tensor_mem_layout,per_chip_output_shape, dim, input_shard_shape,shard_grid,layout",
(
# LLama
(
ttnn.TensorMemoryLayout.WIDTH_SHARDED,
(1, 1, 32, 1024 * 4),
3,
(32, 32),
ttnn.CoreRangeSet({ttnn.CoreRange(ttnn.CoreCoord(0, 0), ttnn.CoreCoord(7, 3))}),
ttnn.TILE_LAYOUT,
),
(
ttnn.TensorMemoryLayout.WIDTH_SHARDED,
(4, 1, 32, 1280),
0,
(32, 32),
ttnn.CoreRangeSet({ttnn.CoreRange(ttnn.CoreCoord(0, 0), ttnn.CoreCoord(7, 4))}),
ttnn.TILE_LAYOUT,
),
),
)
@pytest.mark.parametrize("replication_factor", [8])
@pytest.mark.parametrize("enable_async", [True])
@pytest.mark.parametrize("mesh_device", [pytest.param((8, 4), id="8x4_grid")], indirect=True)
def test_line_all_gather_sharded_on_TG_rows_post_commit(
mesh_device,
num_devices,
per_chip_output_shape,
input_shard_shape,
shard_grid,
shard_grid_orientation,
tensor_mem_layout,
dim,
num_links,
input_dtype,
layout,
use_program_cache,
function_level_defaults,
enable_async,
replication_factor,
num_iters=1,
):
if len(mesh_device.get_devices()) != 32:
pytest.skip("Not TG!")
if input_dtype == ttnn.bfloat16 and per_chip_output_shape == (1, 1, 32, 1024 * 4):
pytest.skip("Skipped due to hang Issue #16699")
input_shard_spec = ttnn.ShardSpec(
shard_grid,
input_shard_shape,
shard_grid_orientation,
)
run_line_all_gather_on_TG_with_mesh_tensor_along_rows(
mesh_device,
num_devices,
per_chip_output_shape,
tensor_mem_layout,
dim,
num_links,
input_dtype,
layout,
ttnn.BufferType.L1,
use_program_cache,
function_level_defaults,
enable_async=enable_async,
input_shard_spec=input_shard_spec,
num_iters=num_iters,
num_all_gather_instances=replication_factor,
cluster_axis=1,
use_all_gather_async=True,
enable_persistent_fabric=True,
create_persistent_fabric=True,
teardown_persistent_fabric=True,
)


# Enumerate the post-commit cases explicitly
@skip_for_grayskull("Requires eth connected devices to run")
@pytest.mark.parametrize(
"num_devices, num_links",
[
(8, 1),
],
# [(8, 4), (8, 3), (8, 2)], Multi-links fails https://github.com/tenstorrent/tt-metal/issues/16699
)
@pytest.mark.parametrize(
"input_dtype",
[
ttnn.bfloat16,
ttnn.bfloat8_b,
],
)
@pytest.mark.parametrize("shard_grid_orientation", [ttnn.ShardOrientation.ROW_MAJOR])
@pytest.mark.parametrize(
"tensor_mem_layout, input_shape, dim, input_shard_shape,shard_grid,layout",
(
(
ttnn.TensorMemoryLayout.WIDTH_SHARDED,
(8, 1, 32, 2048),
0,
(32, 64),
ttnn.CoreRangeSet({ttnn.CoreRange(ttnn.CoreCoord(0, 0), ttnn.CoreCoord(7, 3))}),
ttnn.TILE_LAYOUT,
),
(
ttnn.TensorMemoryLayout.WIDTH_SHARDED,
(1, 8, 32, 2048),
1,
(32, 64),
ttnn.CoreRangeSet({ttnn.CoreRange(ttnn.CoreCoord(0, 0), ttnn.CoreCoord(7, 3))}),
ttnn.TILE_LAYOUT,
),
(
ttnn.TensorMemoryLayout.WIDTH_SHARDED,
(1, 1, 256, 2048),
2,
(32, 64),
ttnn.CoreRangeSet({ttnn.CoreRange(ttnn.CoreCoord(0, 0), ttnn.CoreCoord(7, 3))}),
ttnn.TILE_LAYOUT,
),
(
ttnn.TensorMemoryLayout.WIDTH_SHARDED,
(1, 1, 32, 16384),
3,
(32, 64),
ttnn.CoreRangeSet({ttnn.CoreRange(ttnn.CoreCoord(0, 0), ttnn.CoreCoord(7, 3))}),
ttnn.TILE_LAYOUT,
),
(
ttnn.TensorMemoryLayout.HEIGHT_SHARDED,
(8, 1, 2048, 32),
0,
(64, 32),
ttnn.CoreRangeSet({ttnn.CoreRange(ttnn.CoreCoord(0, 0), ttnn.CoreCoord(7, 3))}),
ttnn.TILE_LAYOUT,
),
(
ttnn.TensorMemoryLayout.HEIGHT_SHARDED,
(1, 8, 2048, 32),
1,
(64, 32),
ttnn.CoreRangeSet({ttnn.CoreRange(ttnn.CoreCoord(0, 0), ttnn.CoreCoord(7, 3))}),
ttnn.TILE_LAYOUT,
),
(
ttnn.TensorMemoryLayout.HEIGHT_SHARDED,
(1, 1, 16384, 32),
2,
(64, 32),
ttnn.CoreRangeSet({ttnn.CoreRange(ttnn.CoreCoord(0, 0), ttnn.CoreCoord(7, 3))}),
ttnn.TILE_LAYOUT,
),
(
ttnn.TensorMemoryLayout.HEIGHT_SHARDED,
(1, 1, 2048, 256),
3,
(64, 32),
ttnn.CoreRangeSet({ttnn.CoreRange(ttnn.CoreCoord(0, 0), ttnn.CoreCoord(7, 3))}),
ttnn.TILE_LAYOUT,
),
),
)
@pytest.mark.parametrize("replication_factor", [4])
@pytest.mark.parametrize("enable_async", [True])
@pytest.mark.parametrize("mesh_device", [pytest.param((8, 4), id="8x4_grid")], indirect=True)
def test_line_all_gather_sharded_on_TG_cols_post_commit(
mesh_device,
num_devices,
input_shape,
input_shard_shape,
shard_grid,
shard_grid_orientation,
tensor_mem_layout,
dim,
num_links,
input_dtype,
layout,
use_program_cache,
function_level_defaults,
enable_async,
replication_factor,
num_iters=1,
):
if len(mesh_device.get_devices()) != 32:
pytest.skip("Not TG!")
if input_dtype == ttnn.bfloat16 and input_shape == (1, 1, 256, 2048):
pytest.skip("Skipped due to hang Issue #16699")
input_shard_spec = ttnn.ShardSpec(
shard_grid,
input_shard_shape,
shard_grid_orientation,
)

run_line_all_gather_on_TG_with_mesh_tensor_along_rows(
mesh_device,
num_devices,
input_shape,
tensor_mem_layout,
dim,
num_links,
input_dtype,
layout,
ttnn.BufferType.L1,
use_program_cache,
function_level_defaults,
enable_async=enable_async,
num_iters=num_iters,
input_shard_spec=input_shard_spec,
num_all_gather_instances=replication_factor,
cluster_axis=0,
use_all_gather_async=True,
enable_persistent_fabric=True,
create_persistent_fabric=True,
teardown_persistent_fabric=True,
)


# Enumerate the post-commit cases explicitly
@skip_for_grayskull("Requires eth connected devices to run")
@pytest.mark.parametrize(
"num_devices, num_links, per_chip_output_shape, dim, layout",
[
(8, 1, [1, 8, 32, 1280], 1, ttnn.TILE_LAYOUT),
(8, 1, [8, 1, 32, 1280], 0, ttnn.TILE_LAYOUT),
(8, 1, [1, 8, 32, 2048], 1, ttnn.TILE_LAYOUT),
(8, 1, [1, 8, 32, 2304], 1, ttnn.TILE_LAYOUT),
(8, 1, [1, 8, 32, 4096], 1, ttnn.TILE_LAYOUT),
# multi-links fails: https://github.com/tenstorrent/tt-metal/issues/16699
# (8, 4, [1, 8, 32, 1280], 1, ttnn.TILE_LAYOUT),
# (8, 4, [8, 1, 32, 1280], 0, ttnn.TILE_LAYOUT),
# (8, 4, [1, 8, 32, 2048], 1, ttnn.TILE_LAYOUT),
# (8, 4, [1, 8, 32, 2304], 1, ttnn.TILE_LAYOUT),
# (8, 4, [1, 8, 32, 4096], 1, ttnn.TILE_LAYOUT),
],
)
@pytest.mark.parametrize(
"input_dtype",
[
ttnn.bfloat16,
ttnn.bfloat8_b,
],
)
@pytest.mark.parametrize(
"buffer_type",
[
ttnn.BufferType.DRAM,
ttnn.BufferType.L1,
],
)
@pytest.mark.parametrize("replication_factor", [4])
@pytest.mark.parametrize("enable_async", [True])
@pytest.mark.parametrize("mesh_device", [pytest.param((8, 4), id="8x4_grid")], indirect=True)
def test_line_all_gather_on_TG_cols_nightly(
mesh_device,
num_devices,
per_chip_output_shape,
dim,
num_links,
input_dtype,
layout,
buffer_type,
use_program_cache,
function_level_defaults,
enable_async,
replication_factor,
num_iters=1,
):
if len(mesh_device.get_devices()) != 32:
pytest.skip("Not TG!")
run_line_all_gather_on_TG_with_mesh_tensor_along_rows(
mesh_device,
num_devices,
per_chip_output_shape,
ttnn.TensorMemoryLayout.INTERLEAVED,
dim,
num_links,
input_dtype,
layout,
buffer_type,
use_program_cache,
function_level_defaults,
enable_async=enable_async,
num_iters=num_iters,
num_all_gather_instances=replication_factor,
cluster_axis=0,
use_all_gather_async=True,
enable_persistent_fabric=True,
create_persistent_fabric=True,
teardown_persistent_fabric=True,
)
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ def run_line_reduce_scatter_on_TG_with_mesh_tensor_along_rows(
memory_config=output_mem_config,
topology=ttnn.Topology.Linear,
)
if enable_persistent_fabric:
ttnn.synchronize_devices(mesh_device, sub_device_ids=sub_device_stall_group)
Comment on lines +282 to +283
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this added for debug or what's the purpose of this? Should be removed?

Copy link
Contributor Author

@caixunshiren caixunshiren Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for multi-iteration to prevent race condition, when persistent fabric is enabled. One device might get too fast and increment the semaphore on the slow device for the next call of ccl before the slow device reset the semaphore for previous iteration. There seems to be no good way to prevent it at the moment unless a sync is introduced or a new global semaphore is used every time (but that's essentially a sync).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something that should be documented for consumers of the API? Seems like an important detail

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a separate issue from #16634?

ttnn.synchronize_devices(mesh_device, sub_device_ids=sub_device_stall_group)

if enable_persistent_fabric and teardown_persistent_fabric:
Expand Down
Loading
Loading