Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi MeshCQ and MeshEvents API Bringup #17582

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tests/tt_metal/distributed/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ set(UNIT_TESTS_DISTRIBUTED_SRC
${CMAKE_CURRENT_SOURCE_DIR}/test_mesh_workload.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_mesh_sub_device.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_mesh_allocator.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_mesh_events.cpp
${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp
)

# Define the function to create test executables for each architecture
Expand Down
253 changes: 253 additions & 0 deletions tests/tt_metal/distributed/test_mesh_events.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
// SPDX-FileCopyrightText: © 2025 Tenstorrent Inc.
//
// SPDX-License-Identifier: Apache-2.0

#include <tt-metalium/distributed.hpp>
#include <tt-metalium/host_api.hpp>
#include <tt-metalium/tt_metal.hpp>
#include <tt-metalium/bfloat16.hpp>

#include "tests/tt_metal/tt_metal/dispatch/dispatch_test_utils.hpp"
#include "tests/tt_metal/tt_metal/common/multi_device_fixture.hpp"
#include "tests/tt_metal/distributed/utils.hpp"

namespace tt::tt_metal::distributed::test {
namespace {

using MeshEventsTest = T3000MultiCQMultiDeviceFixture;

TEST_F(MeshEventsTest, ReplicatedAsyncIO) {
uint32_t NUM_TILES = 1000;
uint32_t num_iterations = 20;
int32_t single_tile_size = ::tt::tt_metal::detail::TileSize(DataFormat::UInt32);

DeviceLocalBufferConfig per_device_buffer_config{
.page_size = single_tile_size,
.buffer_type = BufferType::L1,
.buffer_layout = TensorMemoryLayout::INTERLEAVED,
.bottom_up = false};
ReplicatedBufferConfig global_buffer_config = {
.size = NUM_TILES * single_tile_size,
};

std::shared_ptr<MeshBuffer> buf =
MeshBuffer::create(global_buffer_config, per_device_buffer_config, mesh_device_.get());

for (std::size_t i = 0; i < num_iterations; i++) {
std::vector<uint32_t> src_vec(NUM_TILES * single_tile_size / sizeof(uint32_t), 0);
std::iota(src_vec.begin(), src_vec.end(), i);

std::vector<std::vector<uint32_t>> readback_vecs = {};
std::shared_ptr<MeshEvent> event = std::make_shared<MeshEvent>();
// Writes on CQ 0
EnqueueWriteMeshBuffer(mesh_device_->mesh_command_queue(0), buf, src_vec);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@omilyutin-tt for sharding, we have a way of specifying subset of devices. We don't have similar expressiveness for replication. Is there someone we can add for our metal testing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, let's chat on this offline

// Device to Device Synchronization
EnqueueRecordEvent(mesh_device_->mesh_command_queue(0), event);
EnqueueWaitForEvent(mesh_device_->mesh_command_queue(1), event);

// Reads on CQ 1
for (std::size_t logical_x = 0; logical_x < buf->device()->num_cols(); logical_x++) {
for (std::size_t logical_y = 0; logical_y < buf->device()->num_rows(); logical_y++) {
readback_vecs.push_back({});
auto shard = buf->get_device_buffer(Coordinate(logical_y, logical_x));
ReadShard(
mesh_device_->mesh_command_queue(1), readback_vecs.back(), buf, Coordinate(logical_y, logical_x));
}
}
Comment on lines +49 to +56
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@omilyutin-tt does it make sense to add some logic for EnqueueReadMeshBuffer for replicated path so we can cleanup some of this scaffolding?

    TT_FATAL(
        buffer->global_layout() == MeshBufferLayout::SHARDED, "Can only read a Sharded MeshBuffer from a MeshDevice.");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "replicated" and "sharded" should be property of the write API, not the buffer itself. Is it possible to mutate the data on each shard after the fact (so you replicate initial data, mutate it, then read back individual shards)? Let's chat on this separately, I think we can come up with a much cleaner model for this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah agreed, we can make this cleaner and it'll help our own testing.


for (auto& vec : readback_vecs) {
EXPECT_EQ(vec, src_vec);
}
}
}

TEST_F(MeshEventsTest, ShardedAsyncIO) {
uint32_t num_iterations = 20;
uint32_t single_tile_size = ::tt::tt_metal::detail::TileSize(DataFormat::UInt32);

DeviceLocalBufferConfig per_device_buffer_config{
.page_size = single_tile_size,
.buffer_type = BufferType::DRAM,
.buffer_layout = TensorMemoryLayout::INTERLEAVED,
.bottom_up = true};

Shape2D global_buffer_shape = {2048, 2048};
Shape2D shard_shape = {512, 1024};

uint32_t global_buffer_size = global_buffer_shape.height() * global_buffer_shape.width() * sizeof(uint32_t);

ShardedBufferConfig sharded_config{
.global_size = global_buffer_size,
.global_buffer_shape = global_buffer_shape,
.shard_shape = shard_shape,
.shard_orientation = ShardOrientation::ROW_MAJOR,
};

auto mesh_buffer = MeshBuffer::create(sharded_config, per_device_buffer_config, mesh_device_.get());
for (std::size_t i = 0; i < num_iterations; i++) {
std::vector<uint32_t> src_vec =
std::vector<uint32_t>(global_buffer_shape.height() * global_buffer_shape.width(), 0);
std::iota(src_vec.begin(), src_vec.end(), i);
std::shared_ptr<MeshEvent> event = std::make_shared<MeshEvent>();
// Writes on CQ 0
EnqueueWriteMeshBuffer(mesh_device_->mesh_command_queue(0), mesh_buffer, src_vec);
if (i % 2) {
// Test Host <-> Device synchronization
EnqueueRecordEventToHost(mesh_device_->mesh_command_queue(0), event);
EventSynchronize(event);
} else {
// Test Device <-> Device synchronization
EnqueueRecordEvent(mesh_device_->mesh_command_queue(0), event);
EnqueueWaitForEvent(mesh_device_->mesh_command_queue(1), event);
}
tt-asaigal marked this conversation as resolved.
Show resolved Hide resolved
// Reads on CQ 1
std::vector<uint32_t> dst_vec = {};
EnqueueReadMeshBuffer(mesh_device_->mesh_command_queue(1), dst_vec, mesh_buffer);

EXPECT_EQ(dst_vec, src_vec);
}
}

TEST_F(MeshEventsTest, AsyncWorkloadAndIO) {
uint32_t num_iters = 5;
std::vector<std::shared_ptr<MeshBuffer>> src0_bufs = {};
std::vector<std::shared_ptr<MeshBuffer>> src1_bufs = {};
std::vector<std::shared_ptr<MeshBuffer>> output_bufs = {};

CoreCoord worker_grid_size = mesh_device_->compute_with_storage_grid_size();

auto programs = tt::tt_metal::distributed::test::utils::create_eltwise_bin_programs(
mesh_device_, src0_bufs, src1_bufs, output_bufs);
auto mesh_workload = CreateMeshWorkload();
LogicalDeviceRange devices_0 = LogicalDeviceRange({0, 0}, {3, 0});
LogicalDeviceRange devices_1 = LogicalDeviceRange({0, 1}, {3, 1});

AddProgramToMeshWorkload(mesh_workload, *programs[0], devices_0);
AddProgramToMeshWorkload(mesh_workload, *programs[1], devices_1);

for (int iter = 0; iter < num_iters; iter++) {
std::vector<uint32_t> src0_vec = create_constant_vector_of_bfloat16(src0_bufs[0]->size(), iter + 2);
std::vector<uint32_t> src1_vec = create_constant_vector_of_bfloat16(src1_bufs[0]->size(), iter + 3);

std::shared_ptr<MeshEvent> write_event = std::make_shared<MeshEvent>();
std::shared_ptr<MeshEvent> op_event = std::make_shared<MeshEvent>();

// Issue writes on MeshCQ 1
for (std::size_t col_idx = 0; col_idx < worker_grid_size.x; col_idx++) {
for (std::size_t row_idx = 0; row_idx < worker_grid_size.y; row_idx++) {
EnqueueWriteMeshBuffer(
mesh_device_->mesh_command_queue(1), src0_bufs[col_idx * worker_grid_size.y + row_idx], src0_vec);
EnqueueWriteMeshBuffer(
mesh_device_->mesh_command_queue(1), src1_bufs[col_idx * worker_grid_size.y + row_idx], src1_vec);
}
}
if (iter % 2) {
// Test Host <-> Device Synchronization
EnqueueRecordEventToHost(mesh_device_->mesh_command_queue(1), write_event);
EventSynchronize(write_event);
} else {
// Test Device <-> Device Synchronization
EnqueueRecordEvent(mesh_device_->mesh_command_queue(1), write_event);
EnqueueWaitForEvent(mesh_device_->mesh_command_queue(0), write_event);
}
// Issue workloads on MeshCQ 0
EnqueueMeshWorkload(mesh_device_->mesh_command_queue(0), mesh_workload, false);
if (iter % 2) {
// Test Device <-> Device Synchronization
EnqueueRecordEvent(mesh_device_->mesh_command_queue(0), op_event);
EnqueueWaitForEvent(mesh_device_->mesh_command_queue(1), op_event);
} else {
// Test Host <-> Device Synchronization
EnqueueRecordEventToHost(mesh_device_->mesh_command_queue(0), op_event);
EventSynchronize(op_event);
}

// Issue reads on MeshCQ 1
for (std::size_t logical_y = 0; logical_y < mesh_device_->num_rows(); logical_y++) {
for (std::size_t logical_x = 0; logical_x < mesh_device_->num_cols(); logical_x++) {
for (std::size_t col_idx = 0; col_idx < worker_grid_size.x; col_idx++) {
for (std::size_t row_idx = 0; row_idx < worker_grid_size.y; row_idx++) {
std::vector<bfloat16> dst_vec = {};
ReadShard(
mesh_device_->mesh_command_queue(1),
dst_vec,
output_bufs[col_idx * worker_grid_size.y + row_idx],
Coordinate(logical_y, logical_x));
if (logical_y == 0) {
for (int i = 0; i < dst_vec.size(); i++) {
EXPECT_EQ(dst_vec[i].to_float(), (2 * iter + 5));
}
} else {
for (int i = 0; i < dst_vec.size(); i++) {
EXPECT_EQ(dst_vec[i].to_float(), (iter + 2) * (iter + 3));
}
}
}
}
}
}
}
}

TEST_F(MeshEventsTest, CustomDeviceRanges) {
uint32_t NUM_TILES = 1000;
uint32_t num_iterations = 20;
int32_t single_tile_size = ::tt::tt_metal::detail::TileSize(DataFormat::UInt32);

DeviceLocalBufferConfig per_device_buffer_config{
.page_size = single_tile_size,
.buffer_type = BufferType::L1,
.buffer_layout = TensorMemoryLayout::INTERLEAVED,
.bottom_up = false};
ReplicatedBufferConfig global_buffer_config = {
.size = NUM_TILES * single_tile_size,
};

std::shared_ptr<MeshBuffer> buf =
MeshBuffer::create(global_buffer_config, per_device_buffer_config, mesh_device_.get());

for (std::size_t i = 0; i < num_iterations; i++) {
std::vector<uint32_t> src_vec(NUM_TILES * single_tile_size / sizeof(uint32_t), i);
std::iota(src_vec.begin(), src_vec.end(), i);
LogicalDeviceRange devices_0 = LogicalDeviceRange({0, 0}, {3, 0});
LogicalDeviceRange devices_1 = LogicalDeviceRange({0, 1}, {3, 1});

std::vector<std::vector<uint32_t>> readback_vecs = {};
std::shared_ptr<MeshEvent> event_0 = std::make_shared<MeshEvent>();
std::shared_ptr<MeshEvent> event_1 = std::make_shared<MeshEvent>();

mesh_device_->mesh_command_queue(1).enqueue_write_shard_to_sub_grid(*buf, src_vec.data(), devices_0, false);
EnqueueRecordEvent(mesh_device_->mesh_command_queue(1), event_0, {}, devices_0);
EnqueueWaitForEvent(mesh_device_->mesh_command_queue(0), event_0);

for (std::size_t logical_x = devices_0.start_coord.x; logical_x < devices_0.end_coord.x; logical_x++) {
for (std::size_t logical_y = devices_0.start_coord.y; logical_y < devices_0.end_coord.y; logical_y++) {
readback_vecs.push_back({});
auto shard = buf->get_device_buffer(Coordinate(logical_y, logical_x));
ReadShard(
mesh_device_->mesh_command_queue(0), readback_vecs.back(), buf, Coordinate(logical_y, logical_x));
}
}

mesh_device_->mesh_command_queue(1).enqueue_write_shard_to_sub_grid(*buf, src_vec.data(), devices_1, false);
EnqueueRecordEventToHost(mesh_device_->mesh_command_queue(1), event_1, {}, devices_1);
EventSynchronize(event_1);

for (std::size_t logical_x = devices_1.start_coord.x; logical_x < devices_1.end_coord.x; logical_x++) {
for (std::size_t logical_y = devices_1.start_coord.y; logical_y < devices_1.end_coord.y; logical_y++) {
readback_vecs.push_back({});
auto shard = buf->get_device_buffer(Coordinate(logical_y, logical_x));
ReadShard(
mesh_device_->mesh_command_queue(0), readback_vecs.back(), buf, Coordinate(logical_y, logical_x));
}
}
for (auto& vec : readback_vecs) {
EXPECT_EQ(vec, src_vec);
}
}
Finish(mesh_device_->mesh_command_queue(0));
Finish(mesh_device_->mesh_command_queue(1));
}

} // namespace
} // namespace tt::tt_metal::distributed::test
32 changes: 4 additions & 28 deletions tests/tt_metal/distributed/test_mesh_sub_device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,34 +116,10 @@ TEST_F(MeshSubDeviceTest, DataCopyOnSubDevices) {

std::vector<uint32_t> src_vec(input_buf->size() / sizeof(uint32_t));
std::iota(src_vec.begin(), src_vec.end(), i);
EnqueueWriteMeshBuffer(mesh_device_->mesh_command_queue(), input_buf, src_vec, false);
// Read Back global semaphore value across all cores to verify that it has been reset to 0
// before updating it through host
auto shard_parameters =
ShardSpecBuffer(all_cores, {1, 1}, ShardOrientation::ROW_MAJOR, {1, 1}, {all_cores.size(), 1});
DeviceLocalBufferConfig global_sem_buf_local_config{
.page_size = sizeof(uint32_t),
.buffer_type = BufferType::L1,
.buffer_layout = TensorMemoryLayout::HEIGHT_SHARDED,
.shard_parameters = shard_parameters,
.bottom_up = false};
ReplicatedBufferConfig global_sem_buf_global_config{
.size = all_cores.size() * sizeof(uint32_t),
};

auto global_sem_buf = MeshBuffer::create(
global_sem_buf_global_config, global_sem_buf_local_config, mesh_device_.get(), global_sem.address());

for (std::size_t logical_x = 0; logical_x < input_buf->device()->num_cols(); logical_x++) {
for (std::size_t logical_y = 0; logical_y < input_buf->device()->num_rows(); logical_y++) {
std::vector<uint32_t> dst_vec;
ReadShard(
mesh_device_->mesh_command_queue(), dst_vec, global_sem_buf, Coordinate(logical_y, logical_x));
for (const auto& val : dst_vec) {
EXPECT_EQ(val, 0);
}
}
}
// Block after this write on host, since the global semaphore update starting the
// program goes through an independent path (UMD) and can go out of order wrt the
// buffer data
EnqueueWriteMeshBuffer(mesh_device_->mesh_command_queue(), input_buf, src_vec, true);

for (auto device : mesh_device_->get_devices()) {
tt::llrt::write_hex_vec_to_core(
Expand Down
Loading
Loading