ai-ml/gke-ray/rayserve/llm/model-composition/serve.py (94 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # NOTE: this file was inspired from: https://github.com/ray-project/ray/blob//master/doc/source/serve/doc_code/vllm_example.py import json import os from typing import AsyncGenerator import random from fastapi import BackgroundTasks from starlette.requests import Request from starlette.responses import Response, StreamingResponse from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.async_llm_engine import AsyncLLMEngine from vllm.sampling_params import SamplingParams from vllm.utils import random_uuid from ray import serve from ray.serve.handle import DeploymentHandle @serve.deployment(name="VLLMDeployment") class VLLMDeployment: def __init__(self, **kwargs): """ Construct a VLLM deployment. Refer to https://github.com/vllm-project/vllm/blob/main/vllm/engine/arg_utils.py for the full list of arguments. Args: model: name or path of the huggingface model to use download_dir: directory to download and load the weights, default to the default cache dir of huggingface. use_np_weights: save a numpy copy of model weights for faster loading. This can increase the disk usage by up to 2x. use_dummy_weights: use dummy values for model weights. dtype: data type for model weights and activations. The "auto" option will use FP16 precision for FP32 and FP16 models, and BF16 precision. for BF16 models. seed: random seed. worker_use_ray: use Ray for distributed serving, will be automatically set when using more than 1 GPU pipeline_parallel_size: number of pipeline stages. tensor_parallel_size: number of tensor parallel replicas. block_size: token block size. swap_space: CPU swap space size (GiB) per GPU. gpu_memory_utilization: the percentage of GPU memory to be used for the model executor max_num_batched_tokens: maximum number of batched tokens per iteration max_num_seqs: maximum number of sequences per iteration. disable_log_stats: disable logging statistics. engine_use_ray: use Ray to start the LLM engine in a separate process as the server process. disable_log_requests: disable logging requests. """ args = AsyncEngineArgs(**kwargs) self.engine = AsyncLLMEngine.from_engine_args(args) async def stream_results(self, results_generator) -> AsyncGenerator[bytes, None]: num_returned = 0 async for request_output in results_generator: text_outputs = [output.text for output in request_output.outputs] assert len(text_outputs) == 1 text_output = text_outputs[0][num_returned:] ret = {"text": text_output} yield (json.dumps(ret) + "\n").encode("utf-8") num_returned += len(text_output) async def may_abort_request(self, request_id) -> None: await self.engine.abort(request_id) async def __call__(self, request_dict: dict) -> str: """Generate completion for the request. The request should be a JSON object with the following fields: - prompt: the prompt to use for the generation. - stream: whether to stream the results or not. - other fields: the sampling parameters (See `SamplingParams` for details). """ # request_dict = await request.json() prompt = request_dict.pop("prompt") stream = request_dict.pop("stream", False) max_tokens = request_dict.pop("max_tokens", 1000) sampling_params = SamplingParams(**request_dict) request_id = random_uuid() results_generator = self.engine.generate( prompt, sampling_params, request_id) if stream: background_tasks = BackgroundTasks() # Using background_taks to abort the the request # if the client disconnects. background_tasks.add_task(self.may_abort_request, request_id) return StreamingResponse( self.stream_results(results_generator), background=background_tasks ) final_output = None async for request_output in results_generator: final_output = request_output assert final_output is not None prompt = final_output.prompt text_outputs = [ output.text for output in final_output.outputs] ret = {"text": text_outputs, "max_tokens": max_tokens} return json.dumps(ret) @serve.deployment class VLLMSummarizerDeployment: def __init__(self, **kwargs): args = AsyncEngineArgs(**kwargs) self.engine = AsyncLLMEngine.from_engine_args(args) async def __call__(self, response: str) -> str: """Generates summarization of a response from another model. The response should be a JSON object with the following fields: - text: the response returned from another model to summarize """ request_dict = json.loads(response) text = request_dict.pop("text") prompt = f"Summarize the following text into a single sentence: {text}" sampling_params = SamplingParams(**request_dict) request_id = random_uuid() results_generator = self.engine.generate( prompt, sampling_params, request_id) final_output = None async for request_output in results_generator: final_output = request_output assert final_output is not None prompt = final_output.prompt text_outputs = [ output.text for output in final_output.outputs] ret = {"text": text_outputs} return json.dumps(ret) @serve.deployment class MultiModelDeployment: def __init__(self, assist_model: DeploymentHandle, summarizer_model: DeploymentHandle): self.assistant_model = assist_model self.summarizer_model = summarizer_model async def __call__(self, request: Request) -> Response: model_request = await request.json() assistant_response = self.assistant_model.remote(model_request) summarizer_response = await self.summarizer_model.remote(assistant_response) return Response(content=summarizer_response) multi_model = MultiModelDeployment.bind( VLLMDeployment.options(ray_actor_options={"num_cpus": 8}).bind( model=os.environ['ASSIST_MODEL_ID'], tensor_parallel_size=2, ), VLLMSummarizerDeployment.options(ray_actor_options={"num_cpus": 8}).bind( model=os.environ['SUMMARIZER_MODEL_ID'], tensor_parallel_size=2, ) )