主页 > 互联网  > 

Ubuntu部署ktransformers

Ubuntu部署ktransformers
准备工作

一台服务器

CPU:500G

GPU:48G(NVIDIA4090)

系统:Ubuntu20.04(github的文档好像用的是22.04)

第一步:下载权重文件 1.下载hfd wget hf-mirror /hfd/hfd.sh chmod a+x hfd.sh 2.设置环境变量 export HF_ENDPOINT= hf-mirror 3.下载模型(需要梯子,需要带上huggingface的token) ./hfd.sh gpt2 4.下载数据集(需要梯子,需要带上huggingface的token) ./hfd.sh wikitext --dataset 5.下载大文件(需要梯子,文件很大,大约四五百G) ./hfd.sh unsloth/DeepSeek-R1-GGUF --include DeepSeek-R1-Q4_K_M/* 第二步:拉代码,编译代码 1.使用Anaconda3安装Python3.11 conda create --name ktransformers python=3.11 conda activate ktransformers conda install -c conda-forge libstdcxx-ng 2.安装其他依赖 pip3 install torch torchvision torchaudio --index-url download.pytorch.org/whl/cu126 pip3 install packaging ninja cpufeature numpy sudo add-apt-repository ppa: ubuntu-toolchain-r/test sudo apt-get update sudo apt-get install --only-upgrade libstdc++6 pip install flash-attn --no-build-isolation 3.查看显卡版本及cuda版本

以下两条指令显示的CUDA版本需要一致,若不一致,系统会以nvcc --version的为准

nvcc --version nvidia-smi

4.拉代码

git clone github /kvcache-ai/ktransformers.git

cd ktransformers

git submodule init

git submodule update

5.编译 export USE_NUMA=1 make dev_install

第三部:运行

python ktransformers/local_chat.py --model_path deepseek-ai/DeepSeek-R1 --gguf_path /home/dpkj/deepseek/DeepSeek-R1-GGUF/DeepSeek-R1-Q4_K_M/ --cpu_infer 50 --cache_lens 1536 --max_new_tokens 8192

# --model_path:模型位置,不需要修改 # --gguf_path:前面下载的大文件,模型文件位置,按照实际情况而定 # --cpu_infer:CPU占用,单位百分比,如果服务器不死DDR5双路CPU,可以适量调低此占比

其他启动参数

python -m transformers.local_chat --model_path deepseek-ai/DeepSeek-R1 --gguf_path /root/DeepSeek-R1-GGUF/DeepSeek-R1-Q4_K_M/ --cpu_infer 53 --cache_lens 1536

python ./transformers/local_chat.py --model_path deepseek-ai/DeepSeek-R1 --gguf_path /home/shadeform/DeepSeek-R1-GGUF/DeepSeek-R1-Q4 K M/ --cpu_infer 53 --cache_lens 1536 --optimize_config_path transformers/optimize/optimize_rules/DeepSeek-V3-Chat-multi-gpu-marlin.yaml

python -m transformers.local_chat --model_path deepseek-ai/DeepSeek-R1 --gguf_path /root/autodi-tmp/DeepSeek-R1-GGUF/DeepSeek-R1-Q4 K M/ --cpu_infer 128 --cache_lens 1536 --max_new_tokens 8192 --optimize_config_path ./transformers/optimize/optimize_rules/DeepSeek-V3-Chat-multi-gpu-marlin-4.yaml

transformers --model_path deepseek-ai/DeepSeek-R1 --gguf_path /root/autodi-tmp/DeepSeek-R1-GGUF/DeepSeek-R1-Q4 K M/ --cpu_infer 65 --cache_lens 1536 --max_new_tokens 8192 --port 6006 --optimize_config_path /transformers/optimize/optimize_rules/DeepSeek-V3-Chat-multi-gpu-marlin-4.yaml

curl -X 'POST"     "http://localhost:6006/v1/chat/completions'\     -H 'accept: application/json' \     -H 'Content-Type: application/json' \     -d'{         "messages": [         "content": "tell a joke",         "role": "user"     ],     "model": "ktranformers-model",     "stream": true }'

外传 1. 使用API方式调用

新建文件:chat_openai.py

import argparse import uvicorn from typing import List, Dict, Optional, Any from fastapi import FastAPI, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import os import sys import time from fastapi import Request from fastapi.responses import StreamingResponse, JSONResponse import json import logging

# 设置日志记录 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__)

project_dir = os.path.dirname(os.path.dirname(__file__)) sys.path.insert(0, project_dir) import torch from transformers import (     AutoTokenizer,     AutoConfig,     AutoModelForCausalLM,     GenerationConfig,     TextStreamer, ) from ktransformers.optimize.optimize import optimize_and_load_gguf from ktransformers.models.modeling_deepseek import DeepseekV2ForCausalLM from ktransformers.models.modeling_qwen2_moe import Qwen2MoeForCausalLM from ktransformers.models.modeling_deepseek_v3 import DeepseekV3ForCausalLM from ktransformers.models.modeling_llama import LlamaForCausalLM from ktransformers.models.modeling_mixtral import MixtralForCausalLM from ktransformers.util.utils import prefill_and_generate from ktransformers.server.config.config import Config

custom_models = {     "DeepseekV2ForCausalLM": DeepseekV2ForCausalLM,     "DeepseekV3ForCausalLM": DeepseekV3ForCausalLM,     "Qwen2MoeForCausalLM": Qwen2MoeForCausalLM,     "LlamaForCausalLM": LlamaForCausalLM,     "MixtralForCausalLM": MixtralForCausalLM, }

ktransformer_rules_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "optimize", "optimize_rules") default_optimize_rules = {     "DeepseekV2ForCausalLM": os.path.join(ktransformer_rules_dir, "DeepSeek-V2-Chat.yaml"),     "DeepseekV3ForCausalLM": os.path.join(ktransformer_rules_dir, "DeepSeek-V3-Chat.yaml"),     "Qwen2MoeForCausalLM": os.path.join(ktransformer_rules_dir, "Qwen2-57B-A14B-Instruct.yaml"),     "LlamaForCausalLM": os.path.join(ktransformer_rules_dir, "Internlm2_5-7b-Chat-1m.yaml"),     "MixtralForCausalLM": os.path.join(ktransformer_rules_dir, "Mixtral.yaml"), }

# 全局变量,存储初始化后的模型 chat_model = None

class OpenAIChat:     def __init__(         self,         model_path: str,         optimize_rule_path: str = None,         gguf_path: str = None,         cpu_infer: int = Config().cpu_infer,         use_cuda_graph: bool = True,         mode: str = "normal",     ):         torch.set_grad_enabled(False)         Config().cpu_infer = cpu_infer

        self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)         config = AutoConfig.from_pretrained(model_path, trust_remote_code=True)         self.streamer = TextStreamer(self.tokenizer, skip_prompt=True) if not Config().cpu_infer else None         if mode == 'long_context':             assert config.architectures[0] == "LlamaForCausalLM", "Only LlamaForCausalLM supports long_context mode"             torch.set_default_dtype(torch.float16)         else:             torch.set_default_dtype(config.torch_dtype)

        with torch.device("meta"):             if config.architectures[0] in custom_models:                 if "Qwen2Moe" in config.architectures[0]:                     config._attn_implementation = "flash_attention_2"                 if "Llama" in config.architectures[0]:                     config._attn_implementation = "eager"                 if "Mixtral" in config.architectures[0]:                     config._attn_implementation = "flash_attention_2"                 model = custom_models[config.architectures[0]](config)             else:                 model = AutoModelForCausalLM.from_config(                     config, trust_remote_code=True, attn_implementation="flash_attention_2"                 )

        if optimize_rule_path is None:             if config.architectures[0] in default_optimize_rules:                 optimize_rule_path = default_optimize_rules[config.architectures[0]]

        optimize_and_load_gguf(model, optimize_rule_path, gguf_path, config)                  try:             model.generation_config = GenerationConfig.from_pretrained(model_path)         except:             model.generation_config = GenerationConfig(                 max_length=128,                 temperature=0.7,                 top_p=0.9,                 do_sample=True             )                  if model.generation_config.pad_token_id is None:             model.generation_config.pad_token_id = model.generation_config.eos_token_id                  model.eval()         self.model = model         self.use_cuda_graph = use_cuda_graph         self.mode = mode         logger.info("Model loaded successfully!")

    def create_chat_completion(         self,         messages: List[Dict[str, str]],         temperature: float = 0.7,         max_tokens: int = 1000,         top_p: float = 0.9,         force_think: bool = False,     ) -> Dict:         input_tensor = self.tokenizer.apply_chat_template(             messages, add_generation_prompt=True, return_tensors="pt"         )                  if force_think:             token_thinks = torch.tensor([self.tokenizer.encode("<think>\\n", add_special_tokens=False)],                                         device=input_tensor.device)             input_tensor = torch.cat([input_tensor, token_thinks], dim=1)

        generation_config = GenerationConfig(             temperature=temperature,             top_p=top_p,             max_new_tokens=max_tokens,             do_sample=True  # Ensure do_sample is True if using temperature or top_p         )

        generated = prefill_and_generate(             self.model,             self.tokenizer,             input_tensor.cuda(),             max_tokens,             self.use_cuda_graph,             self.mode,             force_think         )

        # Convert token IDs to text         generated_text = self.tokenizer.decode(generated, skip_special_tokens=True)

        return {             "choices": [{                 "message": {                     "role": "assistant",                     "content": generated_text                 }             }],             "usage": {                 "prompt_tokens": input_tensor.shape[1],                 "completion_tokens": len(generated),                 "total_tokens": input_tensor.shape[1] + len(generated)             }         }

class ChatMessage(BaseModel):     role: str     content: str

class ChatCompletionRequest(BaseModel):     messages: List[ChatMessage]  # 确保 messages 是 Pydantic 模型实例的列表     model: str = "default-model"     temperature: Optional[float] = 0.7     top_p: Optional[float] = 0.9     max_tokens: Optional[int] = 1000     stream: Optional[bool] = False     force_think: Optional[bool] = True

class ChatCompletionResponse(BaseModel):     id: str = "chatcmpl-default"     object: str = "chat pletion"     created: int = 0     model: str = "default-model"     choices: List[Dict[str, Any]]     usage: Dict[str, int]

app = FastAPI(title="KVCache.AI API Server")

@app.get("/health") async def health_check():     return {"status": "healthy"}

@app.middleware("http") async def add_process_time_header(request: Request, call_next):     start_time = time.time()     response = await call_next(request)     process_time = time.time() - start_time     response.headers["X-Process-Time"] = f"{process_time:.4f}s"     return response

app.add_middleware(     CORSMiddleware,     allow_origins=["*"],     allow_credentials=True,     allow_methods=["*"],     allow_headers=["*"], )

@app.post("/v1/chat/completions", response_model=ChatCompletionResponse) async def chat_completion(request: ChatCompletionRequest):     try:         # 如果 messages 是 Pydantic 模型实例列表,使用 model_dump         messages = [m.model_dump() for m in request.messages]         response = chat_model.create_chat_completion(             messages=messages,             temperature=request.temperature,             max_tokens=request.max_tokens,             top_p=request.top_p,             force_think=request.force_think         )

        return {             "id": f"chatcmpl-{int(time.time())}",             "object": "chat pletion",             "created": int(time.time()),             "model": request.model,             "choices": [{                 "index": 0,                 "message": {                     "role": "assistant",                     "content": response['choices'][0]['message']['content']                 },                 "finish_reason": "stop"             }],             "usage": response['usage']         }     except Exception as e:         logger.error(f"API Error: {str(e)}")         raise HTTPException(             status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,             detail=f"Internal server error: {str(e)}"         )

def create_app(model_path: str, gguf_path: str, cpu_infer:int, optimize_rule_path: Optional[str] = None):     global chat_model     chat_model = OpenAIChat(         model_path=model_path,         gguf_path=gguf_path,         optimize_rule_path=optimize_rule_path,         cpu_infer=cpu_infer     )     return app

def main():     parser = argparse.ArgumentParser(description="KVCache.AI API Server")     parser.add_argument("--model_path", type=str, required=True, help="HuggingFace模型路径")     parser.add_argument("--gguf_path", type=str, required=True, help="GGUF模型文件路径")     parser.add_argument("--optimize_rule_path", type=str, help="优化规则文件路径")     parser.add_argument("--port", type=int, default=8000, help="服务端口号")     parser.add_argument("--cpu_infer", type=int, default=10, help="使用cpu数量")     parser.add_argument("--host", type=str, default="0.0.0.0", help="绑定地址")     args = parser.parse_args()

    create_app(         model_path=args.model_path,         gguf_path=args.gguf_path,         optimize_rule_path=args.optimize_rule_path,         cpu_infer=args.cpu_infer     )

    uvicorn.run(         app,         host=args.host,         port=args.port,         loop="uvloop",         http="httptools",         timeout_keep_alive=300,         log_level="info",         access_log=False     )

if __name__ == "__main__":     main()

文件防止位置:

安装依赖:

pip install protobuf uvicorn httptools pip install uvloop

启动:

python ktransformers/chat_openai.py --model_path deepseek-ai/DeepSeek-R1 --gguf_path /home/dpkj/deepseek/DeepSeek-R1-GGUF/DeepSeek-R1-Q4_K_M/

2.使用open-WEBUI进行可视化对接

# 使用Pip下载OPEN-WEBUI

pip install open-webui # 下载完成后开启服务 open-webui serve #启动成功如下 在OPEN-WebUI

import os import json import requests from pydantic import BaseModel, Field from typing import List, Union, Iterator

# Set DEBUG to True to enable detailed logging DEBUG = False

class Pipe:     class Valves(BaseModel):         openai_API_KEY: str = Field(default="none")  # Optional API key if needed         DEFAULT_MODEL: str = Field(default="DeepSeek-R1")  # Default model identifier

    def __init__(self):         self.id = "DeepSeek-R1"         self.type = "manifold"         self.name = "KT: "         self.valves = self.Valves(             **{                 "openai_API_KEY": os.getenv("openai_API_KEY", "none"),                 "DEFAULT_MODEL": os.getenv("openai_DEFAULT_MODEL", "DeepSeek-R1"),             }         )         # Self-hosted FastAPI server details         self.api_url = (             "http://localhost:8000/v1/chat/completions"  # FastAPI server endpoint         )         self.headers = {"Content-Type": "application/json"}

    def get_openai_models(self):         """Return available models - for openai we'll return a fixed list"""         return [{"id": "KT", "name": "DeepSeek-R1"}]

    def pipes(self) -> List[dict]:         return self.get_openai_models()

    def pipe(self, body: dict) -> Union[str, Iterator[str]]:         try:             # Use default model ID since OpenAI has a single endpoint             model_id = self.valves.DEFAULT_MODEL             messages = []

            # Process messages including system, user, and assistant messages             for message in body["messages"]:                 if isinstance(message.get("content"), list):                     # For OpenAI, we'll join multiple content parts into a single text                     text_parts = []                     for content in message["content"]:                         if content["type"] == "text":                             text_parts.append(content["text"])                         elif content["type"] == "image_url":                             # OpenAI might not support image inputs - add a note about the image                             text_parts.append(f"[Image: {content['image_url']['url']}]")                     messages.append(                         {"role": message["role"], "content": "".join(text_parts)}                     )                 else:                     # Handle simple text messages                     messages.append(                         {"role": message["role"], "content": message["content"]}                     )

            if DEBUG:                 print("FastAPI API request:")                 print(" Model:", model_id)                 print(" Messages:", json.dumps(messages, indent=2))

            # Prepare the API call parameters             payload = {                 "model": model_id,                 "messages": messages,                 "temperature": body.get("temperature", 0.7),                 "top_p": body.get("top_p", 0.9),                 "max_tokens": body.get("max_tokens", 8192),                 "stream": body.get("stream", True),             }

            # Add stop sequences if provided             if body.get("stop"):                 payload["stop"] = body["stop"]

            # Sending request to local FastAPI server             if body.get("stream", False):                 # Streaming response                 def stream_generator():                     try:                         response = requests.post(                             self.api_url,                             json=payload,                             headers=self.headers,                             stream=True,                         )                         for line in response.iter_lines():                             if line:                                 yield line.decode("utf-8")                     except Exception as e:                         if DEBUG:                             print(f"Streaming error: {e}")                         yield f"Error during streaming: {str(e)}"

                return stream_generator()             else:                 # Regular response                 response = requests.post(                     self.api_url, json=payload, headers=self.headers                 )                 if response.status_code == 200:                     generated_content = (                         response.json()                         .get("choices", [{}])[0]                         .get("message", {})                         .get("content", "")                     )                     return generated_content                 else:                     return f"Error: {response.status_code}, {response.text}"         except Exception as e:             if DEBUG:                 print(f"Error in pipe method: {e}")             return f"Error: {e}"

    def health_check(self) -> bool:         """Check if the OpenAI API (local FastAPI service) is accessible"""         try:             # Simple health check with a basic prompt             response = requests.post(                 self.api_url,                 json={                     "model": self.valves.DEFAULT_MODEL,                     "messages": [{"role": "user", "content": "Hello"}],                     "max_tokens": 5,                 },                 headers=self.headers,             )             return response.status_code == 200         except Exception as e:             if DEBUG:                 print(f"Health check failed: {e}")             return False

完~ <script src="chrome-extension://bincmiainjofjnhchmcalkanjebghoen/aiscripts/script-main.js"></script>
标签:

Ubuntu部署ktransformers由讯客互联互联网栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Ubuntu部署ktransformers