Spaces:
Paused
Paused
| from flask import Flask, request, Response, stream_with_context, jsonify | |
| from openai import OpenAI | |
| import json | |
| import tiktoken | |
| #import httpx | |
| app = Flask(__name__) | |
| # 在请求头中指定你的API密钥名称 | |
| #MY_API_KEY = "sk-gyxzhao" | |
| # 模型的最大上下文长度 | |
| MODEL_MAX_CONTEXT_LENGTH = { | |
| "gpt-4": 8192, | |
| "gpt-4-0613": 8192, | |
| "gpt-4o": 4096, | |
| "gpt-4-turbo": 4096, | |
| "claude-3-opus-20240229": 4096 | |
| } | |
| def calculate_max_tokens(model_name, messages, requested_max_tokens): | |
| if model_name in ["gpt-4", "gpt-4-0613"]: | |
| try: | |
| encoding = tiktoken.encoding_for_model(model_name) | |
| except Exception as e: | |
| print(f"Error getting encoding for model {model_name}: {e}") | |
| encoding = tiktoken.get_encoding("cl100k_base") # 使用通用编码作为后备 | |
| max_context_length = MODEL_MAX_CONTEXT_LENGTH[model_name] | |
| tokens_per_message = 3 # 每个消息的固定令牌数 (role + content + message boundary tokens) | |
| tokens_per_name = 1 # 如果消息中包含'name'字段,增加的令牌数 | |
| messages_length = 3 # 一开始的消息长度 | |
| for message in messages: | |
| messages_length += tokens_per_message | |
| for key, value in message.items(): | |
| messages_length += len(encoding.encode(value)) | |
| if key == 'name': | |
| messages_length += tokens_per_name | |
| #print(f"Message length in tokens: {messages_length}") # 打印消息长度以进行调试 | |
| max_tokens = max_context_length - messages_length | |
| if requested_max_tokens: | |
| max_tokens = min(max_tokens, requested_max_tokens) | |
| return max(100, max_tokens) # 确保max_tokens至少为1 | |
| else: | |
| return MODEL_MAX_CONTEXT_LENGTH.get(model_name, 4096) # 其他模型直接返回对应的最大token数 | |
| def chat(): | |
| try: | |
| # 验证请求头中的API密钥 | |
| auth_header = request.headers.get('Authorization') | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| return jsonify({"error": "Unauthorized"}), 401 | |
| api_key = auth_header.split(" ")[1] | |
| data = request.json | |
| #print("Received data:", data) # 打印请求体以进行调试 | |
| # 验证请求格式 | |
| if not data or 'messages' not in data or 'model' not in data: | |
| return jsonify({"error": "Missing 'messages' or 'model' in request body"}), 400 | |
| model = data['model'] | |
| messages = data['messages'] | |
| temperature = data.get('temperature', 0.7) # 默认值0.7 | |
| requested_max_tokens = data.get('max_tokens', MODEL_MAX_CONTEXT_LENGTH.get(model, 4096)) | |
| #max_tokens = calculate_max_tokens(model, messages, requested_max_tokens) | |
| top_p = data.get('top_p', 1.0) # 默认值1.0 | |
| n = data.get('n', 1) # 默认值1 | |
| stream = data.get('stream', False) # 默认值False | |
| functions = data.get('functions', None) # Functions for function calling | |
| function_call = data.get('function_call', None) # Specific function call request | |
| # 检查 Claude 模型,调整消息格式 | |
| system_message = None | |
| if model.startswith("claude"): | |
| messages = [msg for msg in messages if msg['role'] != 'system'] | |
| if 'system' in data: | |
| system_message = data['system'] | |
| # 创建每个请求的 OpenAI 客户端实例 | |
| client = OpenAI( | |
| api_key=api_key, | |
| base_url="https://api.aimlapi.com", | |
| ) | |
| # 处理模型响应 | |
| if stream: | |
| # 处理流式响应 | |
| def generate(): | |
| if model.startswith("claude"): | |
| response = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| #max_tokens=max_tokens, | |
| top_p=top_p, | |
| n=n, | |
| functions=functions, | |
| function_call=function_call, | |
| #system=system_message # 传递 system_message 作为顶级参数 | |
| ) | |
| content = response.choices[0].message.content | |
| for i in range(0, len(content), 20): # 每20个字符分成一块 | |
| chunk = content[i:i+20] | |
| yield f"data: {json.dumps({'choices': [{'delta': {'content': chunk}}]})}\n\n" | |
| else: | |
| response = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| #max_tokens=max_tokens, | |
| top_p=top_p, | |
| n=n, | |
| stream=True, | |
| functions=functions, | |
| function_call=function_call | |
| ) | |
| for chunk in response: | |
| yield f"data: {json.dumps(chunk.to_dict())}\n\n" | |
| return Response(stream_with_context(generate()), content_type='text/event-stream') | |
| else: | |
| # 非流式响应 | |
| if model.startswith("claude"): | |
| response = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| #max_tokens=max_tokens, | |
| top_p=top_p, | |
| n=n, | |
| functions=functions, | |
| function_call=function_call, | |
| #system=system_message # 传递 system_message 作为顶级参数 | |
| ) | |
| else: | |
| response = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| #max_tokens=max_tokens, | |
| top_p=top_p, | |
| n=n, | |
| functions=functions, | |
| function_call=function_call, | |
| ) | |
| # 打印响应 | |
| #print("API response:", response) | |
| # 将响应转换为字典 | |
| response_dict = { | |
| "id": response.id, | |
| "object": response.object, | |
| "created": response.created, | |
| "model": response.model, | |
| "choices": [ | |
| { | |
| "message": { | |
| "role": choice.message.role, | |
| "content": choice.message.content | |
| }, | |
| "index": choice.index, | |
| "finish_reason": choice.finish_reason, | |
| "logprobs": choice.logprobs.__dict__ if choice.logprobs else None # 转换ChoiceLogprobs为字典 | |
| } | |
| for choice in response.choices | |
| ], | |
| "usage": { | |
| "prompt_tokens": response.usage.prompt_tokens, | |
| "completion_tokens": response.usage.completion_tokens, | |
| "total_tokens": response.usage.total_tokens | |
| } | |
| } | |
| # 打印JSON格式的响应字典 | |
| #print("Response dict:", json.dumps(response_dict, ensure_ascii=False, indent=2)) | |
| # 确保返回的JSON格式正确 | |
| return jsonify(response_dict), 200 | |
| except Exception as e: | |
| print("Exception:", e) | |
| return jsonify({"error": str(e)}), 500 | |
| if __name__ == "__main__": | |
| app.run(host='0.0.0.0', port=7860, threaded=True) |