Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Splitwise implementation to vLLM #2809

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

aashaka
Copy link

@aashaka aashaka commented Feb 8, 2024

This PR follows up on #2472 to implement the prompt and token stage parallelism introduced in Splitwise.
On enabling the --sep-prompt-token flag, first half of the workers are assigned to process prompts and the second half perform token sampling. The KV-cache state is communicated over the network in a layer-wise manner as soon as it is ready on the prompt side. We use the MSCCL++ communication library to perform fast asynchronous KV-cache transfers over the IB fabric.

This PR makes the following changes:

Installation dependencies:

We use the MSCCL++ collective communication library for KV-cache transfers.
Please follow these instructions at MSCCL++ Quickstart or follow the steps below to install it from source:

$ git clone https://github.com/microsoft/mscclpp;
$ mkdir mscclpp/build; cd mscclpp/build; cmake -DCMAKE_BUILD_TYPE=Release ..; make -j;
$ conda install -c conda-forge mpi4py
$ cd ../python; pip install -r requirements_c12.txt;
$ cd ..; pip install -e .

Make sure that $MSCCLPP_HOME is set to the installation directory or run sudo make install

Tests:

This PR has been tested in the following scenarios.

  1. Validating communication of KV cache:

    Command used: python tests/distributed/test_kvcache_comm.py
    Result: Runs without assertion errors.

  2. Without MSCCL++ environment, no stage parallelism:

    Command used: python examples/llm_engine_example_single.py --tensor-parallel-size 8 --model bigscience/bloom
    Result: Runs like normal.

  3. With stage parallelism:

    Command used: python examples/llm_engine_example_single.py --tensor-parallel-size 8 --model bigscience/bloom --sep-prompt-token
    Result: Same output as before.

llm_engine_example_single.py is the llm_engine_example.py with n=1 and deterministic SamplingParameters.

Known issues:

  1. Missing support for n>1 in SamplingParameters
  2. Since the sampling happens in a different token device than before, the sampled output for RANDOM SamplingType is sometimes different. This is likely due to nondeterminism introduced in the exponential_ operation in calculating multinomial of probs.
  3. Number of profiled GPU cache blocks is different with and without stage parallelism

Initialize MSCCL++ communication group if sep_prompt_token is set in
ParallelConfig.
Also add documentation for MSCCL++ installation.
- Add worker_type to differentiate prompt, token, and mixed workers
- Set a driver for each prompt machines and token machines
- Allow broadcasts to take a group
- Setup KV Cache communication using MSCCL++
- Add test for KV Cache communication
- Obtain `blocks_to_nw` when creating batches in scheduler. Coalesce blocks where possible for fast network transfers.
- Use a Sequence to Semaphore Mapper to allow for fine-grained waiting for kv-cache transfer per sequence
- Separately run prompt and token workers using the `_run_stage_workers` helper
- Populate KVCacheCommunicator for all PagedAttention modules, which allows implementation of layer-wise sends from within `attention.py`
- Populate destination rank for Sampler, which will be used as root in `gather` operations.
- Fix `tensor_model_parallel_gather` - use global rank instead of group local rank.
@casper-hansen
Copy link
Contributor

Splitting the prefilling and decoding on different GPUs is an excellent idea. For example, the prefilling on H100 and decoding on A100 since H100 has 3.43x more compute but only 1.6x more memory bandwidth - meaning that it would be more cost-efficient to use H100 only for prefilling.

In the paper, you demonstrate a 2.35x increase in throughput at same cost or 1.4x higher through at 20% cost saving. Are you able to reproduce these numbers in this PR?

@eshachoukse
Copy link

Splitting the prefilling and decoding on different GPUs is an excellent idea. For example, the prefilling on H100 and decoding on A100 since H100 has 3.43x more compute but only 1.6x more memory bandwidth - meaning that it would be more cost-efficient to use H100 only for prefilling.

In the paper, you demonstrate a 2.35x increase in throughput at same cost or 1.4x higher through at 20% cost saving. Are you able to reproduce these numbers in this PR?

@casper-hansen In the paper, we use the splitwise simulator to simulate a 40 node cluster - both for the homogenous (Splitwise-AA/HH), and the heterogenous (Splitwise-HHcap/HA) solutions. That simulation allows us to run a production trace through these systems at various requests per second (rps) under SLOs. That is what allows us to calculate the maximum throughput under a given cost/power, leading us to the scaled results.

The code that we just pushed only allows us to build the prototype of that solution, since it does not include the optimized cluster-level scheduler. The prototype has been developed and tested on 1 prompt machine and 1 token machine. Therefore, the main point of this PR is to show the optimized KV cache transfer time, rather than the at-scale results. Hope this answers your question.

@zhuohan123
Copy link
Member

@GindaChen Hey Junda can you help take a look at the PR and leave some comments? Thanks!

@GindaChen
Copy link
Contributor

@aashaka This is a very promising PR to integrate Splitwise into vLLM! I will try to finish up the review today. From what I have understood, this PR only tries to introduce the splitwise-mode, stage parallelism and KV cache transfer into vLLM.

Copy link
Contributor

@GindaChen GindaChen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome changes so far! Please see comments for changes or feedbacks for future design changes.

I may still have another round of review on the comm_utils.py and after running the tests. So far I am stuck at the installation of MSCCL++ library for some weird reason. I can post the error / environment once I figure out a path to testing.

# Populate Sampler with dst_rank as driver worker's rank.
self.model_runner.model.sampler.set_dst_rank(self.model_runner.driver_rank)

def init_mscclpp_comm(self, mscclpp_init_method: Optional[str] = None) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend separating the initialization of communication as a separate class or function (than being inside the worker). In my opinion, the vLLM codebase may benefit from having a better abstraction for communication, say:

class CommManager:
  ...
  def init_comm(...): ...
  def destroy_comm(...): ...
  def get_group(self, group_type): ... # say tensor / stage / pipeline parallel group
  ...

class Worker:
  def __init__(self, ...):
    self.comm_manager = CommManager(...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open for having this as a future refactor, but want to see if you think separating this logic now is feasible.

`MSCCL++ <https://github.com/microsoft/mscclpp>`_ is a GPU-driven communication stack for scalable AI applications.
It is used to implement KV cache communication in Splitwise.

To install MSCCL++, please follow the instructions at `MSCCL++ Quickstart <https://github.com/microsoft/mscclpp/blob/main/docs/quickstart.md>`_ or follow the steps below to install it from source:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a heads that that the system may need to apt install libnuma-dev (libnuma1) prior make (I hit this error at installation).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up. I will add this to the installation instructions both here and in the MSCCL++ repo. Also, feel free to reach out to me if you have any other problems with MSCCL++ setup.

@@ -369,14 +369,22 @@ def __init__(
worker_use_ray: bool,
max_parallel_loading_workers: Optional[int] = None,
disable_custom_all_reduce: bool = False,
sep_prompt_token: bool = False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modification on config looks good to me. I guess there may be future extension to pass in the number of prompt / token workers, and I think so far the abstraction looks good.

vllm/utils.py Outdated

class SeqToSlotMapper:
""" SeqToSlotMapper maps sequence ids to a limited set of slot ids.
A slot is freed every time a sequence finishes. It is used to manage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be helpful to introduce what is a "slot" here.

Comment on lines +835 to +844
all_outputs = self._run_stage_workers(
"execute_model",
prompt_stage=seq_group_metadata_list[0].is_prompt,
driver_kwargs={
"seq_group_metadata_list": seq_group_metadata_list,
"blocks_to_swap_in": scheduler_outputs.blocks_to_swap_in,
"blocks_to_swap_out": scheduler_outputs.blocks_to_swap_out,
"blocks_to_copy": scheduler_outputs.blocks_to_copy,
"blocks_to_nw": scheduler_outputs.blocks_to_nw,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to confirm - this means prompt workers cannot work concurrently with decode workers because each time we only schedule one set of workers to run.

I understand this PR is a prototype to verify KV cache transfer between prompt / decode workers, so performance of concurrent set of workers running isn't the focus. I do want to point out that making workers run concurrently (and the communication between schedulers) turns out to be one of the major challenge in design that potentially break the vLLM current architecture.

I would be more than happy to hear if you have a great solution! I'm also open to talk in detail about our design, and vLLM team's concern about the architectural change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With that said, I am personally okay leaving this code as is in this PR so we can demonstrate the KV cache transfer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend adding a TODO to say something like "this doesn't achieve the best performance because we only schedule one set of workers at a time" so people don't get confused.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GindaChen thanks for all the comments. I have updated the PR accordingly. Please let me know if you have any further comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aashaka Thanks for the heads up! I will take a look at the changes soon!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GindaChen

this means prompt workers cannot work concurrently with decode workers

I do not quite understand this part. Since decode works are provisioned separately, why can them run concurrently with prompt workers here?

because each time we only schedule one set of workers to run.

one set of workers means <prompt, worker> pair?

What you are trying to say is currently it doesn't support <List, List> to accelerate the prefill and decoding phase? (which requires model parallelism)

@valvarl
Copy link

valvarl commented Feb 22, 2024

Hello, I have encountered a few problems when repeating the results.

First, some difficulty arose when installing the MSCCL++ library. It requires CUDA at least version 11 (I don't know exactly which one), 10.2 will not work for sure. You also need cmake >= 3.25.0. The main difficulty with the installation was to install libibverbs-dev correctly. Version 17.1 of libibverbs-dev will not work because of errors during the building process, so you need at least 28. I was able to build with libibvers-dev>=36.0-1. However, I had to add list(APPEND CMAKE_FIND_LIBRARY_SUFFIXES .so.1) before find_package(IBVerbs REQUIRED) in CMakeLists.txt.

Second, I don't have an "eth0" interface on my machine. There are others in ifconfig, but changing to any other interface in vllm/engine/llm_engine.py:253 results in an initialization error I don't understand:

/workspace/vllm-oss# REQUESTS_CA_BUNDLE="" CURL_CA_BUNDLE="" python tests/distributed/test_kvcache_comm.py 
2024-02-22 09:20:50,793	INFO worker.py:1724 -- Started a local Ray instance.
INFO 02-22 09:20:52 llm_engine.py:73] Initializing an LLM engine with config: model='facebook/opt-125m', tokenizer='facebook/opt-125m', tokenizer_mode=auto, revision=None, tokenizer_revision=None, trust_remote_code=False, dtype=torch.float16, max_seq_len=2048, download_dir=None, load_format=auto, tensor_parallel_size=2, disable_custom_all_reduce=False, quantization=None, enforce_eager=False, kv_cache_dtype=auto, device_config=cuda, seed=0)
Traceback (most recent call last):
 File "/workspace/vllm-oss/tests/distributed/test_kvcache_comm.py", line 36, in <module>
   engine = initialize_engine(args)
 File "/workspace/vllm-oss/tests/distributed/test_kvcache_comm.py", line 14, in initialize_engine
   return LLMEngine.from_engine_args(engine_args)
 File "/workspace/vllm-oss/vllm/engine/llm_engine.py", line 393, in from_engine_args
   engine = cls(*engine_configs,
 File "/workspace/vllm-oss/vllm/engine/llm_engine.py", line 112, in __init__
   self._init_workers_ray(placement_group)
 File "/workspace/vllm-oss/vllm/engine/llm_engine.py", line 300, in _init_workers_ray
   self._run_workers("init_model")
 File "/workspace/vllm-oss/vllm/engine/llm_engine.py", line 1038, in _run_workers
   driver_worker_output = getattr(self.driver_worker,
 File "/workspace/vllm-oss/vllm/worker/worker.py", line 108, in init_model
   self.init_kvcache_comm(self.mscclpp_init_method)
 File "/workspace/vllm-oss/vllm/worker/worker.py", line 126, in init_kvcache_comm
   self.kvcache_comm_manager = KVCacheCommManager(
 File "/workspace/vllm-oss/vllm/worker/comm_utils.py", line 168, in __init__
   self.mscclpp_conns = self.mscclpp_group.make_connection(
 File "/workspace/mscclpp/python/mscclpp/comm.py", line 102, in make_connection
   connections[rank] = self.communicator.connect_on_setup(rank, 0, endpoint)
IndexError: IB transport out of range: 0 >= 0
nanobind: leaked 1 instances!
nanobind: leaked 1 keep_alive records!
nanobind: leaked 2 types!
- leaked type "mscclpp._mscclpp.Bootstrap"
- leaked type "mscclpp._mscclpp.TcpBootstrap"
nanobind: leaked 11 functions!
- leaked function "initialize"
- leaked function "__init__"
- leaked function "get_rank"
- leaked function "get_n_ranks"
- leaked function "get_unique_id"
- leaked function "create"
- leaked function "barrier"
- leaked function "create_unique_id"
- leaked function "send"
- leaked function "all_gather"
- leaked function "recv"
- ... skipped remainder
nanobind: this is likely caused by a reference counting issue in the binding code.

Could you give me a hint, maybe some profiling information can be gathered to see what the problem might be.

@aashaka
Copy link
Author

aashaka commented Feb 23, 2024

Hello, I have encountered a few problems when repeating the results.
...
Could you give me a hint, maybe some profiling information can be gathered to see what the problem might be.

@valvarl, the communication setup will require InfiniBand support. Looks like ibv_get_device_list is returning 0 meaning that it is unable to find any IB devices.

@leiwen83
Copy link
Contributor

Hello, I have encountered a few problems when repeating the results.
...
Could you give me a hint, maybe some profiling information can be gathered to see what the problem might be.

@valvarl, the communication setup will require InfiniBand support. Looks like ibv_get_device_list is returning 0 meaning that it is unable to find any IB devices.

Hi,

Does it means that InfiniBand support is must to have to enable splitwise feature?

@valvarl
Copy link

valvarl commented Feb 26, 2024

Hello, I have encountered a few problems when repeating the results.
...
Could you give me a hint, maybe some profiling information can be gathered to see what the problem might be.

@valvarl, the communication setup will require InfiniBand support. Looks like ibv_get_device_list is returning 0 meaning that it is unable to find any IB devices.

Unfortunately, I don't have physical access to the server. However, I can see some information about available connections on it.

nvidia-smi topo -m

	GPU0	GPU1	GPU2	GPU3	GPU4	GPU5	GPU6	GPU7	NIC0	NIC1	NIC2	NIC3	NIC4	NIC5	NIC6	NIC7	CPU Affinity	NUMA Affinity
GPU0	 X 	NV8	NV8	NV8	NV8	NV8	NV8	NV8	PXB	PXB	NODE	NODE	SYS	SYS	SYS	SYS	0-31,64-95	0
GPU1	NV8	 X 	NV8	NV8	NV8	NV8	NV8	NV8	PXB	PXB	NODE	NODE	SYS	SYS	SYS	SYS	0-31,64-95	0
GPU2	NV8	NV8	 X 	NV8	NV8	NV8	NV8	NV8	NODE	NODE	PXB	PXB	SYS	SYS	SYS	SYS	0-31,64-95	0
GPU3	NV8	NV8	NV8	 X 	NV8	NV8	NV8	NV8	NODE	NODE	PXB	PXB	SYS	SYS	SYS	SYS	0-31,64-95	0
GPU4	NV8	NV8	NV8	NV8	 X 	NV8	NV8	NV8	SYS	SYS	SYS	SYS	PXB	PXB	NODE	NODE	32-63,96-127	1
GPU5	NV8	NV8	NV8	NV8	NV8	 X 	NV8	NV8	SYS	SYS	SYS	SYS	PXB	PXB	NODE	NODE	32-63,96-127	1
GPU6	NV8	NV8	NV8	NV8	NV8	NV8	 X 	NV8	SYS	SYS	SYS	SYS	NODE	NODE	PXB	PXB	32-63,96-127	1
GPU7	NV8	NV8	NV8	NV8	NV8	NV8	NV8	 X 	SYS	SYS	SYS	SYS	NODE	NODE	PXB	PXB	32-63,96-127	1
NIC0	PXB	PXB	NODE	NODE	SYS	SYS	SYS	SYS	 X 	PIX	NODE	NODE	SYS	SYS	SYS	SYS		
NIC1	PXB	PXB	NODE	NODE	SYS	SYS	SYS	SYS	PIX	 X 	NODE	NODE	SYS	SYS	SYS	SYS		
NIC2	NODE	NODE	PXB	PXB	SYS	SYS	SYS	SYS	NODE	NODE	 X 	PIX	SYS	SYS	SYS	SYS		
NIC3	NODE	NODE	PXB	PXB	SYS	SYS	SYS	SYS	NODE	NODE	PIX	 X 	SYS	SYS	SYS	SYS		
NIC4	SYS	SYS	SYS	SYS	PXB	PXB	NODE	NODE	SYS	SYS	SYS	SYS	 X 	PIX	NODE	NODE		
NIC5	SYS	SYS	SYS	SYS	PXB	PXB	NODE	NODE	SYS	SYS	SYS	SYS	PIX	 X 	NODE	NODE		
NIC6	SYS	SYS	SYS	SYS	NODE	NODE	PXB	PXB	SYS	SYS	SYS	SYS	NODE	NODE	 X 	PIX		
NIC7	SYS	SYS	SYS	SYS	NODE	NODE	PXB	PXB	SYS	SYS	SYS	SYS	NODE	NODE	PIX	 X 		

Legend:

  X    = Self
  SYS  = Connection traversing PCIe as well as the SMP interconnect between NUMA nodes (e.g., QPI/UPI)
  NODE = Connection traversing PCIe as well as the interconnect between PCIe Host Bridges within a NUMA node
  PHB  = Connection traversing PCIe as well as a PCIe Host Bridge (typically the CPU)
  PXB  = Connection traversing multiple PCIe bridges (without traversing the PCIe Host Bridge)
  PIX  = Connection traversing at most a single PCIe bridge
  NV#  = Connection traversing a bonded set of # NVLinks

NIC Legend:

  NIC0: mlx5_0
  NIC1: mlx5_1
  NIC2: mlx5_2
  NIC3: mlx5_3
  NIC4: mlx5_4
  NIC5: mlx5_5
  NIC6: mlx5_6
  NIC7: mlx5_7
ibstat -v
CA 'mlx5_0'
	CA type: MT4125
	Number of ports: 1
	Firmware version: 22.31.2006
	Hardware version: 0
	Node GUID: ...
	System image GUID: ...
	Port 1:
		State: Active
		Physical state: LinkUp
		Rate: 100
		Base lid: 0
		LMC: 0
		SM lid: 0
		Capability mask: 0x00010000
		Port GUID: ...
		Link layer: Ethernet
<...>
ibv_devinfo
No IB devices found

I am not familiar with RDMA library and I don't understand how to use ibv_get_device_list properly. Maybe you can add a test to determine if InfiniBand is configured correctly? Perhaps there is some command that allows you to test the connection automatically?

@aashaka
Copy link
Author

aashaka commented Feb 29, 2024

Hi,

Does it means that InfiniBand support is must to have to enable splitwise feature?

@leiwen83 currently, yes that is the case. While it has been low-priority item for us, we do have a plan to support Ethernet in the future.

@nanomer
Copy link

nanomer commented Mar 1, 2024

Hi @aashaka, what are the system requirements to run this Splitwise implementation:

  1. It seems Infiniband is a requirement (no Ethernet support at the moment).
  2. Are there any GPU limitations? A100s/H100s are probably supported, but would this implementation also work on A10/A40s or V100s?
  3. Anything else I might be missing?

@oguzhannfsgl
Copy link

Hi. I was trying to run this PR on my machine. I tested the code by running 'tests/distributed/test_kvcache_comm.py'. But, i am getting this error below:

(RayWorkerVllm pid=2349994) .../lib/python3.10/site-packages/mscclpp/include/mscclpp/fifo_device.hpp:88: void mscclpp::FifoDeviceHandle::sync(unsigned long, signed long): block: [0,0,0], thread: [0,0,0] Assertion `(curFifoHead >= atomicLoad(this->tailReplica, memoryOrderRelaxed))(atomicLoad(&(this->triggers[curFifoHead % size].fst), memoryOrderRelaxed) != 0)` failed.
.../lib/python3.10/site-packages/mscclpp/include/mscclpp/fifo_device.hpp:88: void mscclpp::FifoDeviceHandle::sync(unsigned long, signed long): block: [0,0,0], thread: [0,0,0] Assertion `(curFifoHead >= atomicLoad(this->tailReplica, memoryOrderRelaxed))(atomicLoad(&(this->triggers[curFifoHead % size].fst), memoryOrderRelaxed) != 0)` failed.
Traceback (most recent call last):
  File "/workspace/vLLM/vllm-oss-splitwise-pr/tests/distributed/test_kvcache_comm.py", line 42, in <module>
    run_all_workers(engine, "send_recv_kvcache_all")
  File "/workspace/vLLM/vllm-oss-splitwise-pr/tests/distributed/test_kvcache_comm.py", line 26, in run_all_workers
    _ = getattr(engine.driver_worker, method)(*args)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/worker.py", line 351, in send_recv_kvcache_all
    self.kvcache_comm_manager.signal_and_flush(0)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/comm_utils.py", line 224, in signal_and_flush
    self.kvcache_comm.signal_and_flush(sem_id)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/comm_utils.py", line 131, in signal_and_flush
    self.signal_kernel(params)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/comm_utils.py", line 63, in __call__
    return self._kernel.launch_kernel(params,
  File ".../lib/python3.10/site-packages/mscclpp/utils.py", line 54, in launch_kernel
    cp.cuda.driver.launchKernel(
  File "cupy_backends/cuda/api/driver.pyx", line 260, in cupy_backends.cuda.api.driver.launchKernel
  File "cupy_backends/cuda/api/driver.pyx", line 273, in cupy_backends.cuda.api.driver.launchKernel
  File "cupy_backends/cuda/api/driver.pyx", line 63, in cupy_backends.cuda.api.driver.check_status
cupy_backends.cuda.api.driver.CUDADriverError: CUDA_ERROR_ASSERT: device-side assert triggered
(RayWorkerVllm pid=2349994) INFO 03-27 18:29:33 custom_all_reduce.py:202] Registering 2275 cuda graph addresses [repeated 2x across cluster]
(RayWorkerVllm pid=2349994) INFO 03-27 18:29:33 model_runner.py:724] Graph capturing finished in 6 secs. [repeated 2x across cluster]
[2024-03-27 18:29:51,959 E 2342952 2350727] logging.cc:97: Unhandled exception: N7mscclpp7IbErrorE. what(): a work item failed: status 12 (Ib failure: Cannot allocate memory)
[2024-03-27 18:29:51,963 E 2342952 2350727] logging.cc:104: Stack trace:
 .../lib/python3.10/site-packages/ray/_raylet.so(+0xfebc9a) [0x7f30da099c9a] ray::operator<<()
.../lib/python3.10/site-packages/ray/_raylet.so(+0xfee3d8) [0x7f30da09c3d8] ray::TerminateHandler()
.../bin/../lib/libstdc++.so.6(+0xb643c) [0x7f30d8f6343c] __cxxabiv1::__terminate()
.../bin/../lib/libstdc++.so.6(+0xb648e) [0x7f30d8f6348e] __cxxabiv1::__unexpected()
.../bin/../lib/libstdc++.so.6(__cxa_rethrow+0) [0x7f30d8f63680] __cxa_rethrow
.../lib/python3.10/site-packages/mscclpp/_mscclpp.cpython-310-x86_64-linux-gnu.so(+0x24516) [0x7f1e742c9516]
.../lib/python3.10/site-packages/mscclpp/_mscclpp.cpython-310-x86_64-linux-gnu.so(_ZN7mscclpp12ProxyService13handleTriggerENS_12ProxyTriggerE+0x28d) [0x7f1e7430a7ed] mscclpp::ProxyService::handleTrigger()
.../lib/python3.10/site-packages/mscclpp/_mscclpp.cpython-310-x86_64-linux-gnu.so(+0x6ca92) [0x7f1e74311a92]
.../bin/../lib/libstdc++.so.6(+0xd3e95) [0x7f30d8f80e95] execute_native_thread_routine
/lib/x86_64-linux-gnu/libpthread.so.0(+0x8609) [0x7f30db2fa609] start_thread
/lib/x86_64-linux-gnu/libc.so.6(clone+0x43) [0x7f30db0c5133] __clone

Aborted (core dumped)

Seems like kvcache_comm_manager.put method works without any problem (inside the attention.py). But, kvcache_comm_manager.signal_and_flush gets this error inside worker.py .

I couldn't figure out the source of the problem. Does this error message say something to you ?


self.world_size = pipeline_parallel_size * tensor_parallel_size
if sep_prompt_token:
# Half of the workers are prompt workers and the other half are token
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shout it be the fixed size? If the workers use exact same gpus. seems prompt may need more works?

@@ -116,8 +116,13 @@ def __init__(
# Profile the memory usage and initialize the cache.
self._init_cache()

if self.parallel_config.sep_prompt_token:
# Setup the MSCCL++ communication required for KV cache transfer
self._setup_kvcache_comm()
Copy link
Contributor

@Jeffwan Jeffwan Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious why do we need MSCCL for communication, any other options like torch.distributed with nccl backend?. Any analysis on the communication collective library used for KV cache transfer?

@@ -229,6 +250,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup",

distributed_init_method = get_distributed_init_method(
driver_ip, get_open_port())
mscclpp_init_method = f"eth0:{driver_ip}:{get_open_port()}" if self.parallel_config.sep_prompt_token else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: can we leave a TODO here.

  1. I feel this is limited if we want to use high speed network interface. It would be great to extract and env like NCCL_SOCKET_IFNAME
  2. Do not know whether mscclpp is compatible with other high speed interfaces?

Comment on lines +835 to +844
all_outputs = self._run_stage_workers(
"execute_model",
prompt_stage=seq_group_metadata_list[0].is_prompt,
driver_kwargs={
"seq_group_metadata_list": seq_group_metadata_list,
"blocks_to_swap_in": scheduler_outputs.blocks_to_swap_in,
"blocks_to_swap_out": scheduler_outputs.blocks_to_swap_out,
"blocks_to_copy": scheduler_outputs.blocks_to_copy,
"blocks_to_nw": scheduler_outputs.blocks_to_nw,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GindaChen

this means prompt workers cannot work concurrently with decode workers

I do not quite understand this part. Since decode works are provisioned separately, why can them run concurrently with prompt workers here?

because each time we only schedule one set of workers to run.

one set of workers means <prompt, worker> pair?

What you are trying to say is currently it doesn't support <List, List> to accelerate the prefill and decoding phase? (which requires model parallelism)

@Jeffwan
Copy link
Contributor

Jeffwan commented Apr 17, 2024

@eshachoukse

The code that we just pushed only allows us to build the prototype of that solution, since it does not include the optimized cluster-level scheduler.

What are the features provided by your cluster-level scheduler in this case? something like prompt and decode machine collaboration?

@Jeffwan
Copy link
Contributor

Jeffwan commented Apr 17, 2024

@aashaka

Currently, yes that is the case. While it has been low-priority item for us, we do have a plan to support Ethernet in the future.

I am trying to get more details here. Seems eth0 instead of ib device is chosen here. Why ib device is required in this case? Or why not explicit ib0?

mscclpp_init_method = f"eth0:{driver_ip}:{get_open_port()}" if self.parallel_config.sep_prompt_token else None

len(HEAD_TYPES)) + layer_id * len(HEAD_TYPES) + head_type
torch.cuda.synchronize()

def send_recv_kvcache_all(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this method only use in testing? I can not find any other references. If so, can you add some comments?

@irasin
Copy link
Contributor

irasin commented Jun 2, 2024

Hi, all, I was wondering when will it be merged into the main branch?

@irasin irasin mentioned this pull request Jun 3, 2024
65 tasks
@vie-serendipity
Copy link

any update?

@linstreamer
Copy link

@eshachoukse

The code that we just pushed only allows us to build the prototype of that solution, since it does not include the optimized cluster-level scheduler.

What are the features provided by your cluster-level scheduler in this case? something like prompt and decode machine collaboration?

Same question: will the cluster-level scheduler be released in the vllm repo or another repo? @aashaka

@CSEEduanyu
Copy link

@aashaka @Jeffwan Hello, what is the current status? Can this branch be used online? For example, how much higher is the A100 prefill/decode=1/1 with 2 cards than the original version?

@cassiewilliam
Copy link

Hi. I was trying to run this PR on my machine. I tested the code by running 'tests/distributed/test_kvcache_comm.py'. But, i am getting this error below:

(RayWorkerVllm pid=2349994) .../lib/python3.10/site-packages/mscclpp/include/mscclpp/fifo_device.hpp:88: void mscclpp::FifoDeviceHandle::sync(unsigned long, signed long): block: [0,0,0], thread: [0,0,0] Assertion `(curFifoHead >= atomicLoad(this->tailReplica, memoryOrderRelaxed))(atomicLoad(&(this->triggers[curFifoHead % size].fst), memoryOrderRelaxed) != 0)` failed.
.../lib/python3.10/site-packages/mscclpp/include/mscclpp/fifo_device.hpp:88: void mscclpp::FifoDeviceHandle::sync(unsigned long, signed long): block: [0,0,0], thread: [0,0,0] Assertion `(curFifoHead >= atomicLoad(this->tailReplica, memoryOrderRelaxed))(atomicLoad(&(this->triggers[curFifoHead % size].fst), memoryOrderRelaxed) != 0)` failed.
Traceback (most recent call last):
  File "/workspace/vLLM/vllm-oss-splitwise-pr/tests/distributed/test_kvcache_comm.py", line 42, in <module>
    run_all_workers(engine, "send_recv_kvcache_all")
  File "/workspace/vLLM/vllm-oss-splitwise-pr/tests/distributed/test_kvcache_comm.py", line 26, in run_all_workers
    _ = getattr(engine.driver_worker, method)(*args)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/worker.py", line 351, in send_recv_kvcache_all
    self.kvcache_comm_manager.signal_and_flush(0)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/comm_utils.py", line 224, in signal_and_flush
    self.kvcache_comm.signal_and_flush(sem_id)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/comm_utils.py", line 131, in signal_and_flush
    self.signal_kernel(params)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/comm_utils.py", line 63, in __call__
    return self._kernel.launch_kernel(params,
  File ".../lib/python3.10/site-packages/mscclpp/utils.py", line 54, in launch_kernel
    cp.cuda.driver.launchKernel(
  File "cupy_backends/cuda/api/driver.pyx", line 260, in cupy_backends.cuda.api.driver.launchKernel
  File "cupy_backends/cuda/api/driver.pyx", line 273, in cupy_backends.cuda.api.driver.launchKernel
  File "cupy_backends/cuda/api/driver.pyx", line 63, in cupy_backends.cuda.api.driver.check_status
cupy_backends.cuda.api.driver.CUDADriverError: CUDA_ERROR_ASSERT: device-side assert triggered
(RayWorkerVllm pid=2349994) INFO 03-27 18:29:33 custom_all_reduce.py:202] Registering 2275 cuda graph addresses [repeated 2x across cluster]
(RayWorkerVllm pid=2349994) INFO 03-27 18:29:33 model_runner.py:724] Graph capturing finished in 6 secs. [repeated 2x across cluster]
[2024-03-27 18:29:51,959 E 2342952 2350727] logging.cc:97: Unhandled exception: N7mscclpp7IbErrorE. what(): a work item failed: status 12 (Ib failure: Cannot allocate memory)
[2024-03-27 18:29:51,963 E 2342952 2350727] logging.cc:104: Stack trace:
 .../lib/python3.10/site-packages/ray/_raylet.so(+0xfebc9a) [0x7f30da099c9a] ray::operator<<()
.../lib/python3.10/site-packages/ray/_raylet.so(+0xfee3d8) [0x7f30da09c3d8] ray::TerminateHandler()
.../bin/../lib/libstdc++.so.6(+0xb643c) [0x7f30d8f6343c] __cxxabiv1::__terminate()
.../bin/../lib/libstdc++.so.6(+0xb648e) [0x7f30d8f6348e] __cxxabiv1::__unexpected()
.../bin/../lib/libstdc++.so.6(__cxa_rethrow+0) [0x7f30d8f63680] __cxa_rethrow
.../lib/python3.10/site-packages/mscclpp/_mscclpp.cpython-310-x86_64-linux-gnu.so(+0x24516) [0x7f1e742c9516]
.../lib/python3.10/site-packages/mscclpp/_mscclpp.cpython-310-x86_64-linux-gnu.so(_ZN7mscclpp12ProxyService13handleTriggerENS_12ProxyTriggerE+0x28d) [0x7f1e7430a7ed] mscclpp::ProxyService::handleTrigger()
.../lib/python3.10/site-packages/mscclpp/_mscclpp.cpython-310-x86_64-linux-gnu.so(+0x6ca92) [0x7f1e74311a92]
.../bin/../lib/libstdc++.so.6(+0xd3e95) [0x7f30d8f80e95] execute_native_thread_routine
/lib/x86_64-linux-gnu/libpthread.so.0(+0x8609) [0x7f30db2fa609] start_thread
/lib/x86_64-linux-gnu/libc.so.6(clone+0x43) [0x7f30db0c5133] __clone

Aborted (core dumped)

Seems like kvcache_comm_manager.put method works without any problem (inside the attention.py). But, kvcache_comm_manager.signal_and_flush gets this error inside worker.py .

I couldn't figure out the source of the problem. Does this error message say something to you ?

have the same issue

@cassiewilliam
Copy link

Hi. I was trying to run this PR on my machine. I tested the code by running 'tests/distributed/test_kvcache_comm.py'. But, i am getting this error below:

(RayWorkerVllm pid=2349994) .../lib/python3.10/site-packages/mscclpp/include/mscclpp/fifo_device.hpp:88: void mscclpp::FifoDeviceHandle::sync(unsigned long, signed long): block: [0,0,0], thread: [0,0,0] Assertion `(curFifoHead >= atomicLoad(this->tailReplica, memoryOrderRelaxed))(atomicLoad(&(this->triggers[curFifoHead % size].fst), memoryOrderRelaxed) != 0)` failed.
.../lib/python3.10/site-packages/mscclpp/include/mscclpp/fifo_device.hpp:88: void mscclpp::FifoDeviceHandle::sync(unsigned long, signed long): block: [0,0,0], thread: [0,0,0] Assertion `(curFifoHead >= atomicLoad(this->tailReplica, memoryOrderRelaxed))(atomicLoad(&(this->triggers[curFifoHead % size].fst), memoryOrderRelaxed) != 0)` failed.
Traceback (most recent call last):
  File "/workspace/vLLM/vllm-oss-splitwise-pr/tests/distributed/test_kvcache_comm.py", line 42, in <module>
    run_all_workers(engine, "send_recv_kvcache_all")
  File "/workspace/vLLM/vllm-oss-splitwise-pr/tests/distributed/test_kvcache_comm.py", line 26, in run_all_workers
    _ = getattr(engine.driver_worker, method)(*args)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/worker.py", line 351, in send_recv_kvcache_all
    self.kvcache_comm_manager.signal_and_flush(0)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/comm_utils.py", line 224, in signal_and_flush
    self.kvcache_comm.signal_and_flush(sem_id)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/comm_utils.py", line 131, in signal_and_flush
    self.signal_kernel(params)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/comm_utils.py", line 63, in __call__
    return self._kernel.launch_kernel(params,
  File ".../lib/python3.10/site-packages/mscclpp/utils.py", line 54, in launch_kernel
    cp.cuda.driver.launchKernel(
  File "cupy_backends/cuda/api/driver.pyx", line 260, in cupy_backends.cuda.api.driver.launchKernel
  File "cupy_backends/cuda/api/driver.pyx", line 273, in cupy_backends.cuda.api.driver.launchKernel
  File "cupy_backends/cuda/api/driver.pyx", line 63, in cupy_backends.cuda.api.driver.check_status
cupy_backends.cuda.api.driver.CUDADriverError: CUDA_ERROR_ASSERT: device-side assert triggered
(RayWorkerVllm pid=2349994) INFO 03-27 18:29:33 custom_all_reduce.py:202] Registering 2275 cuda graph addresses [repeated 2x across cluster]
(RayWorkerVllm pid=2349994) INFO 03-27 18:29:33 model_runner.py:724] Graph capturing finished in 6 secs. [repeated 2x across cluster]
[2024-03-27 18:29:51,959 E 2342952 2350727] logging.cc:97: Unhandled exception: N7mscclpp7IbErrorE. what(): a work item failed: status 12 (Ib failure: Cannot allocate memory)
[2024-03-27 18:29:51,963 E 2342952 2350727] logging.cc:104: Stack trace:
 .../lib/python3.10/site-packages/ray/_raylet.so(+0xfebc9a) [0x7f30da099c9a] ray::operator<<()
.../lib/python3.10/site-packages/ray/_raylet.so(+0xfee3d8) [0x7f30da09c3d8] ray::TerminateHandler()
.../bin/../lib/libstdc++.so.6(+0xb643c) [0x7f30d8f6343c] __cxxabiv1::__terminate()
.../bin/../lib/libstdc++.so.6(+0xb648e) [0x7f30d8f6348e] __cxxabiv1::__unexpected()
.../bin/../lib/libstdc++.so.6(__cxa_rethrow+0) [0x7f30d8f63680] __cxa_rethrow
.../lib/python3.10/site-packages/mscclpp/_mscclpp.cpython-310-x86_64-linux-gnu.so(+0x24516) [0x7f1e742c9516]
.../lib/python3.10/site-packages/mscclpp/_mscclpp.cpython-310-x86_64-linux-gnu.so(_ZN7mscclpp12ProxyService13handleTriggerENS_12ProxyTriggerE+0x28d) [0x7f1e7430a7ed] mscclpp::ProxyService::handleTrigger()
.../lib/python3.10/site-packages/mscclpp/_mscclpp.cpython-310-x86_64-linux-gnu.so(+0x6ca92) [0x7f1e74311a92]
.../bin/../lib/libstdc++.so.6(+0xd3e95) [0x7f30d8f80e95] execute_native_thread_routine
/lib/x86_64-linux-gnu/libpthread.so.0(+0x8609) [0x7f30db2fa609] start_thread
/lib/x86_64-linux-gnu/libc.so.6(clone+0x43) [0x7f30db0c5133] __clone

Aborted (core dumped)

Seems like kvcache_comm_manager.put method works without any problem (inside the attention.py). But, kvcache_comm_manager.signal_and_flush gets this error inside worker.py .

I couldn't figure out the source of the problem. Does this error message say something to you ?

do you fix this problem, maybe you can help me, please

@chenhongyu2048
Copy link

Hi. I was trying to run this PR on my machine. I tested the code by running 'tests/distributed/test_kvcache_comm.py'. But, i am getting this error below:

(RayWorkerVllm pid=2349994) .../lib/python3.10/site-packages/mscclpp/include/mscclpp/fifo_device.hpp:88: void mscclpp::FifoDeviceHandle::sync(unsigned long, signed long): block: [0,0,0], thread: [0,0,0] Assertion `(curFifoHead >= atomicLoad(this->tailReplica, memoryOrderRelaxed))(atomicLoad(&(this->triggers[curFifoHead % size].fst), memoryOrderRelaxed) != 0)` failed.
.../lib/python3.10/site-packages/mscclpp/include/mscclpp/fifo_device.hpp:88: void mscclpp::FifoDeviceHandle::sync(unsigned long, signed long): block: [0,0,0], thread: [0,0,0] Assertion `(curFifoHead >= atomicLoad(this->tailReplica, memoryOrderRelaxed))(atomicLoad(&(this->triggers[curFifoHead % size].fst), memoryOrderRelaxed) != 0)` failed.
Traceback (most recent call last):
  File "/workspace/vLLM/vllm-oss-splitwise-pr/tests/distributed/test_kvcache_comm.py", line 42, in <module>
    run_all_workers(engine, "send_recv_kvcache_all")
  File "/workspace/vLLM/vllm-oss-splitwise-pr/tests/distributed/test_kvcache_comm.py", line 26, in run_all_workers
    _ = getattr(engine.driver_worker, method)(*args)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/worker.py", line 351, in send_recv_kvcache_all
    self.kvcache_comm_manager.signal_and_flush(0)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/comm_utils.py", line 224, in signal_and_flush
    self.kvcache_comm.signal_and_flush(sem_id)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/comm_utils.py", line 131, in signal_and_flush
    self.signal_kernel(params)
  File "/workspace/vLLM/vllm-oss-splitwise-pr/vllm/worker/comm_utils.py", line 63, in __call__
    return self._kernel.launch_kernel(params,
  File ".../lib/python3.10/site-packages/mscclpp/utils.py", line 54, in launch_kernel
    cp.cuda.driver.launchKernel(
  File "cupy_backends/cuda/api/driver.pyx", line 260, in cupy_backends.cuda.api.driver.launchKernel
  File "cupy_backends/cuda/api/driver.pyx", line 273, in cupy_backends.cuda.api.driver.launchKernel
  File "cupy_backends/cuda/api/driver.pyx", line 63, in cupy_backends.cuda.api.driver.check_status
cupy_backends.cuda.api.driver.CUDADriverError: CUDA_ERROR_ASSERT: device-side assert triggered
(RayWorkerVllm pid=2349994) INFO 03-27 18:29:33 custom_all_reduce.py:202] Registering 2275 cuda graph addresses [repeated 2x across cluster]
(RayWorkerVllm pid=2349994) INFO 03-27 18:29:33 model_runner.py:724] Graph capturing finished in 6 secs. [repeated 2x across cluster]
[2024-03-27 18:29:51,959 E 2342952 2350727] logging.cc:97: Unhandled exception: N7mscclpp7IbErrorE. what(): a work item failed: status 12 (Ib failure: Cannot allocate memory)
[2024-03-27 18:29:51,963 E 2342952 2350727] logging.cc:104: Stack trace:
 .../lib/python3.10/site-packages/ray/_raylet.so(+0xfebc9a) [0x7f30da099c9a] ray::operator<<()
.../lib/python3.10/site-packages/ray/_raylet.so(+0xfee3d8) [0x7f30da09c3d8] ray::TerminateHandler()
.../bin/../lib/libstdc++.so.6(+0xb643c) [0x7f30d8f6343c] __cxxabiv1::__terminate()
.../bin/../lib/libstdc++.so.6(+0xb648e) [0x7f30d8f6348e] __cxxabiv1::__unexpected()
.../bin/../lib/libstdc++.so.6(__cxa_rethrow+0) [0x7f30d8f63680] __cxa_rethrow
.../lib/python3.10/site-packages/mscclpp/_mscclpp.cpython-310-x86_64-linux-gnu.so(+0x24516) [0x7f1e742c9516]
.../lib/python3.10/site-packages/mscclpp/_mscclpp.cpython-310-x86_64-linux-gnu.so(_ZN7mscclpp12ProxyService13handleTriggerENS_12ProxyTriggerE+0x28d) [0x7f1e7430a7ed] mscclpp::ProxyService::handleTrigger()
.../lib/python3.10/site-packages/mscclpp/_mscclpp.cpython-310-x86_64-linux-gnu.so(+0x6ca92) [0x7f1e74311a92]
.../bin/../lib/libstdc++.so.6(+0xd3e95) [0x7f30d8f80e95] execute_native_thread_routine
/lib/x86_64-linux-gnu/libpthread.so.0(+0x8609) [0x7f30db2fa609] start_thread
/lib/x86_64-linux-gnu/libc.so.6(clone+0x43) [0x7f30db0c5133] __clone

Aborted (core dumped)

Seems like kvcache_comm_manager.put method works without any problem (inside the attention.py). But, kvcache_comm_manager.signal_and_flush gets this error inside worker.py .
I couldn't figure out the source of the problem. Does this error message say something to you ?

do you fix this problem, maybe you can help me, please

Seems a problem of mscclpp. In mscclpp github, there are some similar issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.