From 4b3abaac1b108b936170c4e696731efd65aa89b1 Mon Sep 17 00:00:00 2001 From: sg Date: Wed, 11 Jun 2025 22:24:55 +0000 Subject: [PATCH 1/9] Cuda to accelerator, +CommDebugMode --- .../sequence_parallel_example.py | 24 ++++++++++++++----- .../tensor_parallel_example.py | 24 ++++++++++++------- 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/distributed/tensor_parallelism/sequence_parallel_example.py b/distributed/tensor_parallelism/sequence_parallel_example.py index 3324d28d4a..d2034e54ad 100644 --- a/distributed/tensor_parallelism/sequence_parallel_example.py +++ b/distributed/tensor_parallelism/sequence_parallel_example.py @@ -1,3 +1,4 @@ +# torchrun --nnodes 1 --nproc-per-node 4 import os import sys import torch @@ -13,6 +14,7 @@ from log_utils import rank_log, get_logger, verify_min_gpu_count +from torch.distributed.tensor.debug import CommDebugMode # ---- GPU check ------------ _min_gpu_count = 2 @@ -63,9 +65,10 @@ def forward(self, x): """ logger = get_logger() +device_type = torch.accelerator.current_accelerator().type # create a device mesh based on the given world_size. device_mesh = init_device_mesh( - device_type="cuda", mesh_shape=(int(os.environ["WORLD_SIZE"]),) + device_type=device_type, mesh_shape=(int(os.environ["WORLD_SIZE"]),) ) _rank = device_mesh.get_rank() @@ -75,7 +78,7 @@ def forward(self, x): rank_log(_rank, logger, f"Device Mesh created: {device_mesh=}") # create model and move it to GPU. Init_device_mesh has already assigned gpu ids... -model = ToyModel().to("cuda") +model = ToyModel().to(device_type) # Custom parallelization plan for the model sp_model = parallelize_module( @@ -87,6 +90,8 @@ def forward(self, x): }, ) +if torch.distributed.get_rank() == 0: + print (f"model {sp_model}") # Create a optimizer for the parallelized module. lr = 0.25 @@ -98,12 +103,19 @@ def forward(self, x): num_iters = 10 rank_log(_rank, logger, "Sequence Parallel training starting...") + for i in range(num_iters): # For SP, input can be different across all ranks. - inp = torch.rand(20, 10, device="cuda") - output = sp_model(inp) - output.sum().backward() - optimizer.step() + #inp = torch.rand(20, 10, device=device_type) + inp = torch.rand(1, 10, device=device_type) + comm_mode = CommDebugMode() + with comm_mode: + output = sp_model(inp) + output.sum().backward() + optimizer.step() rank_log(_rank, logger, f"Sequence Parallel iter {i} completed") + if i == 0: + print (f" rank{torch.distributed.get_rank()} {i} get_comm_counts {comm_mode.get_comm_counts()} get_sharding_info() {comm_mode.get_sharding_info()} generate_comm_debug_tracing_table {comm_mode.generate_comm_debug_tracing_table(noise_level=1)} ") + rank_log(_rank, logger, "Sequence Parallel training completed!") diff --git a/distributed/tensor_parallelism/tensor_parallel_example.py b/distributed/tensor_parallelism/tensor_parallel_example.py index 0b9c884507..f3e9fcd808 100755 --- a/distributed/tensor_parallelism/tensor_parallel_example.py +++ b/distributed/tensor_parallelism/tensor_parallel_example.py @@ -10,6 +10,7 @@ ) from log_utils import rank_log, get_logger, verify_min_gpu_count +from torch.distributed.tensor.debug import CommDebugMode # ---- GPU check ------------ _min_gpu_count = 2 @@ -76,8 +77,8 @@ def forward(self, x): # create a device mesh based on the given world_size. _world_size = int(os.environ["WORLD_SIZE"]) - -device_mesh = init_device_mesh(device_type="cuda", mesh_shape=(_world_size,)) +device_type = torch.accelerator.current_accelerator().type +device_mesh = init_device_mesh(device_type=device_type, mesh_shape=(_world_size,)) _rank = device_mesh.get_rank() @@ -88,8 +89,8 @@ def forward(self, x): rank_log(_rank, logger, f"Device Mesh created: {device_mesh=}") -# create model and move it to GPU - init"cuda"_mesh has already mapped GPU ids. -tp_model = ToyModel().to("cuda") +# create model and move it to GPU - initdevice_type_mesh has already mapped GPU ids. +tp_model = ToyModel().to(device_type) # Custom parallelization plan for the model @@ -102,6 +103,9 @@ def forward(self, x): }, ) +if torch.distributed.get_rank() == 0: + print (f"model {tp_model}") + # Create an optimizer for the parallelized module. lr = 0.25 optimizer = torch.optim.AdamW(tp_model.parameters(), lr=lr, foreach=True) @@ -116,10 +120,14 @@ def forward(self, x): # For TP, input needs to be same across all TP ranks. # Setting the random seed is to mimic the behavior of dataloader. torch.manual_seed(i) - inp = torch.rand(20, 10, device="cuda") - output = tp_model(inp) - output.sum().backward() - optimizer.step() + inp = torch.rand(4, 10, device=device_type) + comm_mode = CommDebugMode() + with comm_mode: + output = tp_model(inp) + output.sum().backward() + optimizer.step() rank_log(_rank, logger, f"Tensor Parallel iter {i} completed") + if i == 1: + print (f" rank{torch.distributed.get_rank()} {i} get_comm_counts {comm_mode.get_comm_counts()} get_sharding_info() {comm_mode.get_sharding_info()} generate_comm_debug_tracing_table {comm_mode.generate_comm_debug_tracing_table(noise_level=1)} ") rank_log(_rank, logger, "Tensor Parallel training completed!") From cf381e09359e6293f4f6f679e50bb03478675e1f Mon Sep 17 00:00:00 2001 From: subrata goswami Date: Wed, 11 Jun 2025 23:31:19 +0000 Subject: [PATCH 2/9] cuda to accelerator --- distributed/tensor_parallelism/log_utils.py | 6 +++--- distributed/tensor_parallelism/tensor_parallel_example.py | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/distributed/tensor_parallelism/log_utils.py b/distributed/tensor_parallelism/log_utils.py index f16d46526d..d103df892b 100644 --- a/distributed/tensor_parallelism/log_utils.py +++ b/distributed/tensor_parallelism/log_utils.py @@ -17,6 +17,6 @@ def rank_log(_rank, logger, msg): def verify_min_gpu_count(min_gpus: int = 2) -> bool: """ verification that we have at least 2 gpus to run dist examples """ - has_cuda = torch.cuda.is_available() - gpu_count = torch.cuda.device_count() - return has_cuda and gpu_count >= min_gpus + has_gpu = torch.accelerator.is_available() + gpu_count = torch.accelerator.device_count() + return has_gpu and gpu_count >= min_gpus diff --git a/distributed/tensor_parallelism/tensor_parallel_example.py b/distributed/tensor_parallelism/tensor_parallel_example.py index f3e9fcd808..fe5e948c3d 100755 --- a/distributed/tensor_parallelism/tensor_parallel_example.py +++ b/distributed/tensor_parallelism/tensor_parallel_example.py @@ -1,3 +1,4 @@ +# torchrun --nnodes 1 --nproc-per-node 4 import os import sys import torch From d16c819b9681a561bbed867ebb7c013447a3e04d Mon Sep 17 00:00:00 2001 From: sg Date: Wed, 25 Jun 2025 00:55:51 +0000 Subject: [PATCH 3/9] Moving CommDebugMode code to a seperate PR. --- .../sequence_parallel_example.py | 15 +++------------ .../tensor_parallel_example.py | 16 ++++------------ 2 files changed, 7 insertions(+), 24 deletions(-) diff --git a/distributed/tensor_parallelism/sequence_parallel_example.py b/distributed/tensor_parallelism/sequence_parallel_example.py index d2034e54ad..0be33dc0d6 100644 --- a/distributed/tensor_parallelism/sequence_parallel_example.py +++ b/distributed/tensor_parallelism/sequence_parallel_example.py @@ -14,7 +14,6 @@ from log_utils import rank_log, get_logger, verify_min_gpu_count -from torch.distributed.tensor.debug import CommDebugMode # ---- GPU check ------------ _min_gpu_count = 2 @@ -90,8 +89,6 @@ def forward(self, x): }, ) -if torch.distributed.get_rank() == 0: - print (f"model {sp_model}") # Create a optimizer for the parallelized module. lr = 0.25 @@ -103,19 +100,13 @@ def forward(self, x): num_iters = 10 rank_log(_rank, logger, "Sequence Parallel training starting...") - for i in range(num_iters): # For SP, input can be different across all ranks. #inp = torch.rand(20, 10, device=device_type) inp = torch.rand(1, 10, device=device_type) - comm_mode = CommDebugMode() - with comm_mode: - output = sp_model(inp) - output.sum().backward() - optimizer.step() + output = sp_model(inp) + output.sum().backward() + optimizer.step() rank_log(_rank, logger, f"Sequence Parallel iter {i} completed") - if i == 0: - print (f" rank{torch.distributed.get_rank()} {i} get_comm_counts {comm_mode.get_comm_counts()} get_sharding_info() {comm_mode.get_sharding_info()} generate_comm_debug_tracing_table {comm_mode.generate_comm_debug_tracing_table(noise_level=1)} ") - rank_log(_rank, logger, "Sequence Parallel training completed!") diff --git a/distributed/tensor_parallelism/tensor_parallel_example.py b/distributed/tensor_parallelism/tensor_parallel_example.py index fe5e948c3d..627f4611eb 100755 --- a/distributed/tensor_parallelism/tensor_parallel_example.py +++ b/distributed/tensor_parallelism/tensor_parallel_example.py @@ -11,7 +11,6 @@ ) from log_utils import rank_log, get_logger, verify_min_gpu_count -from torch.distributed.tensor.debug import CommDebugMode # ---- GPU check ------------ _min_gpu_count = 2 @@ -104,9 +103,6 @@ def forward(self, x): }, ) -if torch.distributed.get_rank() == 0: - print (f"model {tp_model}") - # Create an optimizer for the parallelized module. lr = 0.25 optimizer = torch.optim.AdamW(tp_model.parameters(), lr=lr, foreach=True) @@ -121,14 +117,10 @@ def forward(self, x): # For TP, input needs to be same across all TP ranks. # Setting the random seed is to mimic the behavior of dataloader. torch.manual_seed(i) - inp = torch.rand(4, 10, device=device_type) - comm_mode = CommDebugMode() - with comm_mode: - output = tp_model(inp) - output.sum().backward() - optimizer.step() + inp = torch.rand(20, 10, device=device_type) + output = tp_model(inp) + output.sum().backward() + optimizer.step() rank_log(_rank, logger, f"Tensor Parallel iter {i} completed") - if i == 1: - print (f" rank{torch.distributed.get_rank()} {i} get_comm_counts {comm_mode.get_comm_counts()} get_sharding_info() {comm_mode.get_sharding_info()} generate_comm_debug_tracing_table {comm_mode.generate_comm_debug_tracing_table(noise_level=1)} ") rank_log(_rank, logger, "Tensor Parallel training completed!") From 79f465732e7028767c7da646a3708a4a8aed75a6 Mon Sep 17 00:00:00 2001 From: sg Date: Fri, 27 Jun 2025 01:58:20 +0000 Subject: [PATCH 4/9] Updating CI torch version. --- distributed/tensor_parallelism/requirements.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/distributed/tensor_parallelism/requirements.txt b/distributed/tensor_parallelism/requirements.txt index 80fad36bf2..08392d244a 100644 --- a/distributed/tensor_parallelism/requirements.txt +++ b/distributed/tensor_parallelism/requirements.txt @@ -3,4 +3,6 @@ --pre --extra-index-url https://download.pytorch.org/whl/nightly/cu118 --extra-index-url https://download.pytorch.org/whl/nightly/cu121 -torch >= 2.3.0.dev0; sys_platform == "linux" +--extra-index-url https://download.pytorch.org/whl/nightly/cu126 +--extra-index-url https://download.pytorch.org/whl/nightly/cu128 +torch >= 2.7.1; sys_platform == "linux" From 66b83dddfdc183634e9c3640f4fe17639205c2a9 Mon Sep 17 00:00:00 2001 From: sg Date: Mon, 30 Jun 2025 19:08:29 +0000 Subject: [PATCH 5/9] Bumping up Pyhon version to 3.9 . --- runtime.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime.txt b/runtime.txt index cc1923a40b..bd28b9c5c2 100644 --- a/runtime.txt +++ b/runtime.txt @@ -1 +1 @@ -3.8 +3.9 From 976f270456e7d77c9afd54b7a702ea589266941c Mon Sep 17 00:00:00 2001 From: sg Date: Tue, 1 Jul 2025 03:16:29 +0000 Subject: [PATCH 6/9] Bumping up Python from 3.8 to 3.9 in wf yaml. --- .github/workflows/main_distributed.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/main_distributed.yaml b/.github/workflows/main_distributed.yaml index b70da3617a..3d03cca6a0 100644 --- a/.github/workflows/main_distributed.yaml +++ b/.github/workflows/main_distributed.yaml @@ -17,10 +17,10 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Set up Python 3.8 + - name: Set up Python 3.9 uses: actions/setup-python@v2 with: - python-version: 3.8 + python-version: 3.9 - name: Install PyTorch uses: astral-sh/setup-uv@v6 - name: Run Tests From eb8aa68e1e296f68e6745c1f11d4208165340acc Mon Sep 17 00:00:00 2001 From: "Goswami, Subrata" Date: Tue, 1 Jul 2025 11:17:21 -0700 Subject: [PATCH 7/9] Removing pre's from requirement.txt, ... --- distributed/tensor_parallelism/requirements.txt | 5 ----- distributed/tensor_parallelism/sequence_parallel_example.py | 3 +-- distributed/tensor_parallelism/tensor_parallel_example.py | 1 + 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/distributed/tensor_parallelism/requirements.txt b/distributed/tensor_parallelism/requirements.txt index 08392d244a..4f47924993 100644 --- a/distributed/tensor_parallelism/requirements.txt +++ b/distributed/tensor_parallelism/requirements.txt @@ -1,8 +1,3 @@ # Python dependencies required for running the example ---pre ---extra-index-url https://download.pytorch.org/whl/nightly/cu118 ---extra-index-url https://download.pytorch.org/whl/nightly/cu121 ---extra-index-url https://download.pytorch.org/whl/nightly/cu126 ---extra-index-url https://download.pytorch.org/whl/nightly/cu128 torch >= 2.7.1; sys_platform == "linux" diff --git a/distributed/tensor_parallelism/sequence_parallel_example.py b/distributed/tensor_parallelism/sequence_parallel_example.py index 0be33dc0d6..b9416558fa 100644 --- a/distributed/tensor_parallelism/sequence_parallel_example.py +++ b/distributed/tensor_parallelism/sequence_parallel_example.py @@ -102,8 +102,7 @@ def forward(self, x): for i in range(num_iters): # For SP, input can be different across all ranks. - #inp = torch.rand(20, 10, device=device_type) - inp = torch.rand(1, 10, device=device_type) + inp = torch.rand(20, 10, device=device_type) output = sp_model(inp) output.sum().backward() optimizer.step() diff --git a/distributed/tensor_parallelism/tensor_parallel_example.py b/distributed/tensor_parallelism/tensor_parallel_example.py index 627f4611eb..f5050ef734 100755 --- a/distributed/tensor_parallelism/tensor_parallel_example.py +++ b/distributed/tensor_parallelism/tensor_parallel_example.py @@ -1,3 +1,4 @@ +# The following is an example command to run this code # torchrun --nnodes 1 --nproc-per-node 4 import os import sys From 8aa4203ead0953cf3d489b1f7acdc41fa6f38e18 Mon Sep 17 00:00:00 2001 From: "Goswami, Subrata" Date: Tue, 1 Jul 2025 12:10:19 -0700 Subject: [PATCH 8/9] Updating comments in multiple files. --- distributed/tensor_parallelism/README.md | 2 +- distributed/tensor_parallelism/sequence_parallel_example.py | 3 ++- distributed/tensor_parallelism/tensor_parallel_example.py | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/distributed/tensor_parallelism/README.md b/distributed/tensor_parallelism/README.md index b49d1672e8..ec61071e65 100644 --- a/distributed/tensor_parallelism/README.md +++ b/distributed/tensor_parallelism/README.md @@ -12,5 +12,5 @@ https://pytorch.org/docs/stable/distributed.tensor.parallel.html ``` pip install -r requirements.txt -python example.py +torchrun --nnodes 1 --nproc-per-node 4 tensor_parallel_example.py ``` diff --git a/distributed/tensor_parallelism/sequence_parallel_example.py b/distributed/tensor_parallelism/sequence_parallel_example.py index b9416558fa..b145fbc95e 100644 --- a/distributed/tensor_parallelism/sequence_parallel_example.py +++ b/distributed/tensor_parallelism/sequence_parallel_example.py @@ -1,4 +1,5 @@ -# torchrun --nnodes 1 --nproc-per-node 4 +# The following is an example command to run this code +# torchrun --nnodes 1 --nproc-per-node 4 sequence_parallel_example.py import os import sys import torch diff --git a/distributed/tensor_parallelism/tensor_parallel_example.py b/distributed/tensor_parallelism/tensor_parallel_example.py index f5050ef734..b96f982f0c 100755 --- a/distributed/tensor_parallelism/tensor_parallel_example.py +++ b/distributed/tensor_parallelism/tensor_parallel_example.py @@ -1,5 +1,5 @@ # The following is an example command to run this code -# torchrun --nnodes 1 --nproc-per-node 4 +# torchrun --nnodes 1 --nproc-per-node 4 tensor_parallel_example.py import os import sys import torch From d98e000eb39b9c8d91797f434e81d1b011d52312 Mon Sep 17 00:00:00 2001 From: "Goswami, Subrata" Date: Mon, 7 Jul 2025 16:49:53 -0700 Subject: [PATCH 9/9] Updating Python to 3.10, fsdp_tp_example.py to accelerator --- .github/workflows/main_distributed.yaml | 4 ++-- distributed/tensor_parallelism/fsdp_tp_example.py | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/.github/workflows/main_distributed.yaml b/.github/workflows/main_distributed.yaml index 3d03cca6a0..78afef687b 100644 --- a/.github/workflows/main_distributed.yaml +++ b/.github/workflows/main_distributed.yaml @@ -17,10 +17,10 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Set up Python 3.9 + - name: Set up Python 3.10 uses: actions/setup-python@v2 with: - python-version: 3.9 + python-version: 3.10 - name: Install PyTorch uses: astral-sh/setup-uv@v6 - name: Run Tests diff --git a/distributed/tensor_parallelism/fsdp_tp_example.py b/distributed/tensor_parallelism/fsdp_tp_example.py index dbab48c1b8..87935f10f0 100644 --- a/distributed/tensor_parallelism/fsdp_tp_example.py +++ b/distributed/tensor_parallelism/fsdp_tp_example.py @@ -77,10 +77,11 @@ # create a sharding plan based on the given world_size. dp_size = _world_size // tp_size +device_type = torch.accelerator.current_accelerator().type # Create a device mesh with 2 dimensions. # First dim is the data parallel dimension # Second dim is the tensor parallel dimension. -device_mesh = init_device_mesh("cuda", (dp_size, tp_size), mesh_dim_names=("dp", "tp")) +device_mesh = init_device_mesh(device_type, (dp_size, tp_size), mesh_dim_names=("dp", "tp")) rank_log(_rank, logger, f"Device Mesh created: {device_mesh=}") tp_mesh = device_mesh["tp"] @@ -92,10 +93,10 @@ # to mimic the behavior of the dataloader. dp_rank = dp_mesh.get_local_rank() -# create model and move it to GPU - init"cuda"_mesh has already mapped GPU ids. +# create model and move it to GPU - initdevice_type_mesh has already mapped GPU ids. simple_llama2_config = ModelArgs(dim=256, n_layers=2, n_heads=16, vocab_size=32000) -model = Transformer.from_model_args(simple_llama2_config).to("cuda") +model = Transformer.from_model_args(simple_llama2_config).to(device_type) # init model weights model.init_weights() @@ -170,7 +171,7 @@ for i in range(num_iterations): # seeding with dp_rank to ensure identical inputs for TP groups torch.manual_seed(i + dp_rank) - inp = torch.randint(32000, (8, 256), device="cuda") + inp = torch.randint(32000, (8, 256), device=device_type) output = sharded_model(inp) output.sum().backward()