import jsonimport requestsfrom flask import Flask, request, Response, after_this_requestimport osapp = Flask(__name__)# 配置 JSON 不转义中文字符app.config['JSON_AS_ASCII'] = Falsedef add_cors_headers(response): """为所有响应添加 CORS 头部""" response.headers['Access-Control-Allow-Origin'] = '*' response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, PATCH, OPTIONS' response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization, X-API-Key' response.headers['Access-Control-Allow-Credentials'] = 'true' return response@app.before_requestdef handle_options(): """处理 OPTIONS 预检请求""" if request.method == 'OPTIONS': response = Response() response = add_cors_headers(response) return response@app.after_requestdef apply_cors(response): """为所有响应应用 CORS 头部""" return add_cors_headers(response)# 配置CONFIG = { "gemini": { "base_url": "https://generativelanguage.googleapis.com/v1beta", "auth_type": "x-goog-api-key" # 官方 API 使用 x-goog-api-key header }, "anthropic": { "base_url": "https://api.anthropic.com", "auth_type": "x-api-key" }, "openai": { "base_url": "https://api.openai.com/v1", "auth_type": "authorization" }}def get_provider_info(path): """根据路径判断提供商""" if path.startswith('/v1beta/') or path.startswith('/v1/') or path.startswith('/gemini/'): return 'gemini' elif path.startswith('/anthropic/') or path.startswith('/claude/'): return 'anthropic' elif path.startswith('/openai/') or path.startswith('/chatgpt/'): return 'openai' return Nonedef get_api_key(request_args, provider): """从 URL 参数获取 API Key""" provider_key_map = { 'gemini': ['gemini_key', 'key', 'google_key', 'api_key'], 'anthropic': ['claude_key', 'anthropic_key', 'api_key'], 'openai': ['openai_key', 'chatgpt_key', 'api_key'] } for key_name in provider_key_map.get(provider, []): if key_name in request_args: return request_args.get(key_name) return Nonedef should_stream(request_args): """判断是否使用流式输出""" stream_param = request_args.get('stream', 'true') # 支持多种格式:true/false, 1/0, yes/no return str(stream_param).lower() in ['true', '1', 'yes', 'on']def build_upstream_url(path, provider, api_key, request_args, use_stream=True): """构建上游请求 URL""" # 移除 URL 中的 API Key 参数和 stream 参数 args = request_args.copy() for key in ['gemini_key', 'key', 'google_key', 'api_key', 'claude_key', 'anthropic_key', 'openai_key', 'chatgpt_key', 'stream']: args.pop(key, None) base_url = CONFIG[provider]['base_url'] # 处理路径 if provider == 'gemini': if path.startswith('/v1beta/'): upstream_path = path[8:] # 移除 /v1beta/ elif path.startswith('/v1/'): upstream_path = path[4:] # 移除 /v1/ elif path.startswith('/gemini/'): upstream_path = path[8:] # 移除 /gemini/ else: upstream_path = path.lstrip('/') elif provider == 'anthropic': if path.startswith('/anthropic/'): upstream_path = path[11:] # 移除 /anthropic/ elif path.startswith('/claude/'): upstream_path = path[7:] # 移除 /claude/ else: upstream_path = path.lstrip('/') elif provider == 'openai': if path.startswith('/openai/'): upstream_path = path[7:] # 移除 /openai/ elif path.startswith('/chatgpt/'): upstream_path = path[9:] # 移除 /chatgpt/ else: upstream_path = path.lstrip('/') # Gemini 不需要将 API Key 添加到 URL 参数(使用 x-goog-api-key header) # 其他提供商也不需要(都使用 header 认证) url = f"{base_url}/{upstream_path}" if args: url += f"?{requests.compat.urlencode(args)}" return urldef build_upstream_headers(provider, api_key, content_type): """构建上游请求头(基于官方 API 示例)""" headers = { 'Content-Type': content_type or 'application/json' } # Gemini 使用 x-goog-api-key header(官方 API 示例) if CONFIG[provider]['auth_type'] == 'x-goog-api-key': headers['x-goog-api-key'] = api_key # Claude 使用 x-api-key header + anthropic-version(官方 API 示例) elif CONFIG[provider]['auth_type'] == 'x-api-key': headers['x-api-key'] = api_key if provider == 'anthropic': headers['anthropic-version'] = '2023-06-01' # OpenAI 使用 Authorization: Bearer header(官方 API 示例) elif CONFIG[provider]['auth_type'] == 'authorization': headers['Authorization'] = f'Bearer {api_key}' return headersdef stream_response(response): """流式返回上游响应""" def generate(): try: for chunk in response.iter_content(chunk_size=8192): if chunk: yield chunk except Exception: pass return Response( generate(), status=response.status_code, headers=dict(response.headers) )def non_stream_response(response): """非流式返回上游响应""" try: content = response.content response_obj = Response( content, status=response.status_code, headers=dict(response.headers) ) return response_obj except Exception as e: return json_response({"detail": f"Response error: {str(e)}"}, 502)def json_response(data, status=200): """返回 JSON 响应(支持中文)""" return Response( json.dumps(data, ensure_ascii=False), status=status, mimetype='application/json; charset=utf-8' )@app.route('/', methods=['GET'])def index(): """根路径""" return json_response({ "service": "AI API Proxy", "endpoints": { "gemini": ["/v1beta/*", "/v1/*", "/gemini/*"], "claude": ["/anthropic/*", "/claude/*"], "openai": ["/openai/*", "/chatgpt/*"] }, "auth": "URL parameter (key/api_key)", "wechat": "欢迎关注 python学霸公众号" })@app.route('/health', methods=['GET'])def health(): """健康检查""" return json_response({"status": "ok"})@app.route('/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])def proxy(path): """代理所有请求""" full_path = f"/{path}" # 判断提供商 provider = get_provider_info(full_path) if not provider: return json_response({"detail": "Unknown provider"}, 400) # 获取 API Key api_key = get_api_key(request.args, provider) if not api_key: return json_response({ "detail": f"{provider.capitalize()} key required. Provide via URL parameter" }, 401) # 检测是否使用流式输出 use_stream = should_stream(request.args) # 构建上游请求 upstream_url = build_upstream_url(full_path, provider, api_key, request.args, use_stream) # 获取 Content-Type content_type = request.headers.get('Content-Type', '') # 构建上游请求头 upstream_headers = build_upstream_headers(provider, api_key, content_type) try: # 处理请求体 if request.method in ['POST', 'PUT', 'PATCH']: # 检查是否为文件上传 if request.files: # 处理 multipart/form-data 文件上传 files = {} data = {} # 提取文件 for key, file in request.files.items(): files[key] = (file.filename, file.stream, file.content_type) # 提取表单数据 for key, value in request.form.items(): data[key] = value # 发送包含文件的请求 response = requests.request( method=request.method, url=upstream_url, headers=upstream_headers, files=files, data=data, timeout=300, stream=use_stream ) else: # 处理普通请求体(JSON) body = request.get_data() # 为 OpenAI 和 Claude 添加 stream 参数到请求体 if provider in ['openai', 'anthropic'] and body: try: body_dict = json.loads(body) body_dict['stream'] = use_stream body = json.dumps(body_dict).encode('utf-8') except (json.JSONDecodeError, TypeError): pass # 如果不是有效的 JSON,保持原样 response = requests.request( method=request.method, url=upstream_url, headers=upstream_headers, data=body, timeout=300, stream=use_stream ) else: # GET、DELETE 等无请求体的方法 response = requests.request( method=request.method, url=upstream_url, headers=upstream_headers, timeout=300, stream=use_stream ) # 根据参数决定返回方式 if use_stream: return stream_response(response) else: return non_stream_response(response) except requests.exceptions.Timeout: return json_response({"detail": "Gateway Timeout"}, 504) except requests.exceptions.ConnectionError as e: return json_response({"detail": f"Connection Error: {str(e)}"}, 502) except Exception as e: return json_response({"detail": f"Gateway error: {str(e)}"}, 502)if __name__ == '__main__': port = int(os.environ.get('PORT', 5000)) app.run(host='0.0.0.0', port=port, debug=False)