写一套完整可直接部署、前后端一体的实现方案: 技术栈:Flask后端 + 原生HTML+TailwindCSS前端,仅依赖阿里百炼SK; 完整页面布局:
- Skill管理菜单:上传Skill文件夹、查看列表、启用/禁用/删除Skill
- 右侧主区域: 上方对话展示窗口,底部输入框+发送按钮; 自动流程:用户提问 → 阿里百炼Function Calling做意图识别 → 判断是否调用已启用的自定义Skill → 本地执行工具 → 模型汇总回复。
一、整体架构&存储设计
目录结构
skill_chat_system/
├── app.py # Flask后端全部接口
├── static/
│ └── index.html # 完整前端UI(侧边栏+聊天区)
├── skills_store/ # 上传的Skill根目录
│ ├── skill1/
│ │ ├── tool.json # function calling描述schema
│ │ └── run.py # 工具执行逻辑
└── chat_db.json # 本地JSON持久化:会话列表、历史消息、Skill启用状态
持久化chat_db.json结构
{
"skills": {
"skill1": {"enable": true, "upload_time": "2026-06-12"}
},
"sessions": [
{
"session_id": "uuidxxx",
"title": "天气查询对话",
"top": false,
"create_time": "...",
"messages": [{"role":"user","content":"..."}, ...]
}
],
"current_session_id": "uuidxxx"
}
核心逻辑链路
- 左侧上传Skill文件夹(支持zip压缩包上传,后端解压到指定目录);
- 后台记录每个Skill启用/禁用状态,被禁用的不会塞进tools列表传给百炼;
- 后端读取仅启用的Skill拼装tools数组传给阿里百炼;
- 模型自动意图识别:无工具调用直接回复;有工具调用则本地执行对应Skill代码;
- 工具结果回传给大模型生成最终回答,整条消息存入当前会话历史;
- 侧边栏支持:会话置顶、单条/整会话删除、Skill启用禁用删除。
二、后端完整代码 app.py
先安装依赖:
pip install flask requests zipfile uuid
import os
import json
import uuid
import zipfile
import shutil
from datetime import datetime
import requests
from flask import Flask, request, render_template_string, jsonify
from werkzeug.utils import secure_filename
app = Flask(__name__)
# ===================== 配置区(自行替换) =====================
BAILIAN_API_KEY = "sk-你的阿里百炼SK"
BAILIAN_MODEL = "qwen-turbo"
API_URL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
SKILL_ROOT = "./skills_store"
DB_PATH = "./chat_db.json"
ALLOW_EXT = {"json", "py", "yaml", "yml"}
os.makedirs(SKILL_ROOT, exist_ok=True)
# 初始化本地数据库
definit_db():
ifnot os.path.exists(DB_PATH):
init_data = {
"skills": {},
"sessions": [],
"current_session_id": ""
}
with open(DB_PATH, "w", encoding="utf-8") as f:
json.dump(init_data, f, ensure_ascii=False, indent=2)
defload_db():
with open(DB_PATH, "r", encoding="utf-8") as f:
return json.load(f)
defsave_db(data):
with open(DB_PATH, "w", encoding="utf-8") as f:
json.dump(data, ensure_ascii=False, indent=2)
init_db()
# ===================== Skill工具解析 & 执行 =====================
defparse_skill_schema(skill_name):
skill_dir = os.path.join(SKILL_ROOT, skill_name)
schema_file = os.path.join(skill_dir, "tool.json")
ifnot os.path.exists(schema_file):
returnNone
try:
with open(schema_file, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
returnNone
# 动态执行skill函数
defexec_skill_func(skill_name, func_name, args):
skill_dir = os.path.join(SKILL_ROOT, skill_name)
run_path = os.path.join(skill_dir, "run.py")
ifnot os.path.exists(run_path):
returnf"Skill {skill_name} 执行文件不存在"
import sys
sys.path.insert(0, skill_dir)
try:
import run
target_func = getattr(run, func_name)
res = target_func(**args)
return res
except Exception as e:
returnf"Skill执行异常:{str(e)}"
# ===================== 接口1:首页渲染 =====================
@app.route("/")
defindex():
return render_template_string(open("./static/index.html", "r", encoding="utf-8").read())
# ===================== Skill管理接口 =====================
# 上传zip压缩包Skill文件夹
@app.route("/api/skill/upload", methods=["POST"])
defskill_upload():
target_folder = secure_filename(request.form.get("target_folder", "").strip())
zip_file = request.files.get("skill_zip")
ifnot target_folder ornot zip_file:
return jsonify({"code":1, "msg":"文件夹名和zip包不能为空"})
save_dir = os.path.join(SKILL_ROOT, target_folder)
if os.path.exists(save_dir):
return jsonify({"code":1, "msg":"该Skill文件夹已存在"})
os.makedirs(save_dir)
zip_tmp_path = os.path.join(save_dir, "_tmp.zip")
zip_file.save(zip_tmp_path)
# 解压
try:
with zipfile.ZipFile(zip_tmp_path, "r") as zf:
zf.extractall(save_dir)
os.remove(zip_tmp_path)
except Exception as e:
shutil.rmtree(save_dir)
return jsonify({"code":1, "msg":f"解压失败:{str(e)}"})
# 写入DB启用状态
db = load_db()
db["skills"][target_folder] = {
"enable": True,
"upload_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
save_db(db)
return jsonify({"code":0, "msg":f"Skill {target_folder} 上传成功"})
# 获取skill列表
@app.route("/api/skill/list", methods=["GET"])
defskill_list():
db = load_db()
skill_info = []
for name, info in db["skills"].items():
schema = parse_skill_schema(name)
desc = schema["function"]["description"] if schema else"无描述"
skill_info.append({
"name": name,
"enable": info["enable"],
"desc": desc,
"upload_time": info["upload_time"]
})
return jsonify({"code":0, "data": skill_info})
# 启用/禁用skill
@app.route("/api/skill/toggle", methods=["POST"])
defskill_toggle():
data = request.get_json()
skill_name = data.get("skill_name")
enable = data.get("enable")
db = load_db()
if skill_name notin db["skills"]:
return jsonify({"code":1, "msg":"Skill不存在"})
db["skills"][skill_name]["enable"] = enable
save_db(db)
return jsonify({"code":0})
# 删除skill
@app.route("/api/skill/delete", methods=["POST"])
defskill_delete():
data = request.get_json()
skill_name = data.get("skill_name")
db = load_db()
if skill_name notin db["skills"]:
return jsonify({"code":1, "msg":"Skill不存在"})
# 删除本地文件夹
skill_path = os.path.join(SKILL_ROOT, skill_name)
if os.path.exists(skill_path):
shutil.rmtree(skill_path)
del db["skills"][skill_name]
save_db(db)
return jsonify({"code":0})
# ===================== 会话&历史管理接口 =====================
# 新建对话
@app.route("/api/session/new", methods=["POST"])
defsession_new():
db = load_db()
sid = str(uuid.uuid4())
new_session = {
"session_id": sid,
"title": f"对话{len(db['sessions'])+1}",
"top": False,
"create_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"messages": []
}
db["sessions"].append(new_session)
db["current_session_id"] = sid
save_db(db)
return jsonify({"code":0, "session_id": sid})
# 获取会话列表
@app.route("/api/session/list", methods=["GET"])
defsession_list():
db = load_db()
sessions = sorted(db["sessions"], key=lambda x: (not x["top"], x["create_time"]), reverse=True)
return jsonify({"code":0, "data": sessions, "current": db["current_session_id"]})
# 切换当前会话
@app.route("/api/session/switch", methods=["POST"])
defsession_switch():
data = request.get_json()
sid = data.get("session_id")
db = load_db()
db["current_session_id"] = sid
save_db(db)
return jsonify({"code":0})
# 置顶/取消置顶会话
@app.route("/api/session/top", methods=["POST"])
defsession_top():
data = request.get_json()
sid = data.get("session_id")
top_flag = data.get("top")
db = load_db()
for item in db["sessions"]:
if item["session_id"] == sid:
item["top"] = top_flag
break
save_db(db)
return jsonify({"code":0})
# 删除整个会话
@app.route("/api/session/del", methods=["POST"])
defsession_del():
data = request.get_json()
sid = data.get("session_id")
db = load_db()
db["sessions"] = [x for x in db["sessions"] if x["session_id"] != sid]
if db["current_session_id"] == sid:
db["current_session_id"] = db["sessions"][0]["session_id"] if db["sessions"] else""
save_db(db)
return jsonify({"code":0})
# 获取当前会话消息
@app.route("/api/message/get", methods=["GET"])
defmsg_get():
db = load_db()
sid = db["current_session_id"]
for s in db["sessions"]:
if s["session_id"] == sid:
return jsonify({"code":0, "msgs": s["messages"]})
return jsonify({"code":1, "msg":"无当前会话"})
# ===================== 核心聊天接口:意图识别+Skill调用 =====================
@app.route("/api/chat", methods=["POST"])
defchat():
data = request.get_json()
user_q = data.get("query", "").strip()
ifnot user_q:
return jsonify({"code":1, "msg":"问题不能为空"})
db = load_db()
sid = db["current_session_id"]
# 取出启用的skill拼装tools
tools = []
skill_name_mapping = {}
for sk_name, sk_info in db["skills"].items():
if sk_info["enable"]:
sch = parse_skill_schema(sk_name)
if sch:
tools.append(sch)
skill_name_mapping[sch["function"]["name"]] = sk_name
# 组装历史消息
target_session = None
for s in db["sessions"]:
if s["session_id"] == sid:
target_session = s
break
history = target_session["messages"]
messages = history + [{"role":"user", "content": user_q}]
headers = {
"Authorization": f"Bearer {BAILIAN_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": BAILIAN_MODEL,
"input": {"messages": messages},
"parameters": {
"result_format": "message",
"tools": tools if tools elseNone
}
}
resp = requests.post(API_URL, json=payload, headers=headers)
res_json = resp.json()
choices = res_json.get("output", {}).get("choices", [])
ifnot choices:
return jsonify({"code":1, "msg":"模型调用失败"})
reply_msg = choices[0]["message"]
# 无工具调用
ifnot reply_msg.get("tool_calls"):
final_content = reply_msg["content"]
# 存入消息
target_session["messages"].append({"role":"user", "content": user_q})
target_session["messages"].append({"role":"assistant", "content": final_content})
save_db(db)
return jsonify({
"code":0,
"need_skill": False,
"reply": final_content
})
# 触发工具调用
tool_call = reply_msg["tool_calls"][0]
func_name = tool_call["function"]["name"]
func_args = json.loads(tool_call["function"]["arguments"])
real_skill_name = skill_name_mapping[func_name]
skill_ret = exec_skill_func(real_skill_name, func_name, func_args)
# 二次请求模型整合结果
msg_round2 = messages + [reply_msg, {
"role": "tool",
"name": func_name,
"content": json.dumps(skill_ret, ensure_ascii=False)
}]
payload2 = {
"model": BAILIAN_MODEL,
"input": {"messages": msg_round2},
"parameters": {"result_format": "message"}
}
res2 = requests.post(API_URL, json=payload2, headers=headers)
final_ans = res2.json()["output"]["choices"][0]["message"]["content"]
# 持久化完整对话
target_session["messages"].append({"role":"user", "content": user_q})
target_session["messages"].append({"role":"assistant", "content": final_ans})
save_db(db)
return jsonify({
"code":0,
"need_skill": True,
"skill_name": real_skill_name,
"func": func_name,
"args": func_args,
"skill_ret": skill_ret,
"reply": final_ans
})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False)
三、前端页面 static/index.html
左右固定布局,左侧完整菜单(新建对话、Skill管理、历史会话),右侧聊天区,全部JS接口对接后端:
<!DOCTYPE html>
<htmllang="zh-CN">
<head>
<metacharset="UTF-8">
<metaname="viewport"content="width=device-width, initial-scale=1.0">
<title>百炼Skill管理+意图调用系统</title>
<scriptsrc="https://cdn.tailwindcss.com"></script>
<style>
.sidebar-item-active{@apply bg-blue-100 border-l-4 border-blue-500}
</style>
</head>
<bodyclass="h-screen flex overflow-hidden">
<!-- 左侧侧边栏 -->
<divclass="w-72 bg-gray-50 border-r flex flex-col flex-shrink-0">
<divclass="p-3 border-b">
<buttononclick="newSession()"class="w-full bg-blue-600 text-white py-2 rounded">+ 新建对话</button>
</div>
<!-- Skill管理折叠面板 -->
<divclass="border-b">
<divclass="p-3 font-bold flex justify-between items-center cursor-pointer"onclick="toggleSkillPanel()">
<span>📦 Skill管理</span>
<spanid="skillArrow">▼</span>
</div>
<divid="skillPanel"class="px-3 pb-3 hidden">
<divclass="mb-2">
<labelclass="text-sm">目标文件夹:</label>
<inputid="skillFolderName"class="border w-full px-2 py-1 text-sm mt-1">
</div>
<divclass="mb-2">
<labelclass="text-sm">上传Skill压缩包(zip):</label>
<inputtype="file"id="skillZip"accept=".zip"class="text-sm mt-1">
</div>
<buttononclick="uploadSkill()"class="w-full bg-green-500 text-white py-1 rounded text-sm">上传Skill</button>
<divclass="mt-3 max-h-52 overflow-auto border p-2"id="skillListWrap"></div>
</div>
</div>
<!-- 对话历史 -->
<divclass="flex-1 overflow-auto flex flex-col">
<divclass="p-3 font-bold border-b">📜 对话历史</div>
<divid="sessionListWrap"class="flex-1 overflow-auto p-2"></div>
</div>
</div>
<!-- 右侧聊天主区域 -->
<divclass="flex-1 flex flex-col">
<divid="chatBox"class="flex-1 overflow-auto p-4 space-y-3 border-b"></div>
<divclass="p-3 flex gap-2">
<textareaid="userInput"class="flex-1 border p-2 resize-none h-24"placeholder="输入问题,模型自动判断是否调用已启用的Skill"></textarea>
<buttononclick="sendMsg()"class="bg-blue-600 text-white px-4 rounded">发送</button>
</div>
</div>
<script>
let skillPanelOpen = false;
// 面板折叠
functiontoggleSkillPanel(){
skillPanelOpen = !skillPanelOpen;
document.getElementById("skillPanel").classList.toggle("hidden", !skillPanelOpen);
document.getElementById("skillArrow").innerText = skillPanelOpen ? "▲" : "▼";
}
// Skill上传
asyncfunctionuploadSkill(){
let folder = document.getElementById("skillFolderName").value.trim();
let zip = document.getElementById("skillZip").files[0];
if(!folder || !zip){alert("填写文件夹并选择zip包");return;}
let fd = new FormData();
fd.append("target_folder", folder);
fd.append("skill_zip", zip);
let res = await fetch("/api/skill/upload", {method:"POST", body:fd});
let ret = await res.json();
alert(ret.msg);
refreshSkillList();
}
// 刷新Skill列表
asyncfunctionrefreshSkillList(){
let res = await fetch("/api/skill/list");
let data = await res.json();
let wrap = document.getElementById("skillListWrap");
if(data.data.length===0){wrap.innerHTML="<div class='text-gray-400 text-sm'>暂无Skill</div>";return;}
let html = "";
data.data.forEach(item=>{
html += `
<div class="border-b py-2 text-sm">
<div class="font-medium">${item.name}</div>
<div class="text-gray-500 text-xs line-clamp-1">${item.desc}</div>
<div class="flex gap-1 mt-1">
<button onclick="toggleSkill('${item.name}', ${!item.enable})" class="${item.enable?'bg-orange-400':'bg-gray-400'} text-white px-1 text-xs rounded">
${item.enable?'禁用':'启用'}
</button>
<button onclick="delSkill('${item.name}')" class="bg-red-500 text-white px-1 text-xs rounded">删除</button>
</div>
</div>`;
});
wrap.innerHTML = html;
}
asyncfunctiontoggleSkill(name, enable){
await fetch("/api/skill/toggle",{
method:"POST", headers:{"Content-Type":"application/json"},
body:JSON.stringify({skill_name:name, enable:enable})
});
refreshSkillList();
}
asyncfunctiondelSkill(name){
if(!confirm(`确定删除Skill ${name}?`))return;
await fetch("/api/skill/delete",{
method:"POST", headers:{"Content-Type":"application/json"},
body:JSON.stringify({skill_name:name})
});
refreshSkillList();
}
// 会话管理
asyncfunctionnewSession(){
await fetch("/api/session/new", {method:"POST"});
refreshSessionList();
loadChatMsg();
}
asyncfunctionrefreshSessionList(){
let res = await fetch("/api/session/list");
let data = await res.json();
let wrap = document.getElementById("sessionListWrap");
let html = "";
data.data.forEach(s=>{
let active = s.session_id === data.current;
html += `
<div class="p-2 rounded cursor-pointer my-1 ${active?'sidebar-item-active':''}" onclick="switchSession('${s.session_id}')">
<div class="flex justify-between items-center">
<span class="text-sm">${s.top?'📌 ':''}${s.title}</span>
</div>
<div class="flex gap-1 mt-1">
<button onclick="event.stopPropagation();topSession('${s.session_id}',${!s.top})" class="text-xs px-1 border rounded">${s.top?'取消置顶':'置顶'}</button>
<button onclick="event.stopPropagation();delSession('${s.session_id}')" class="text-xs px-1 border rounded text-red-500">删除</button>
</div>
</div>`;
});
wrap.innerHTML = html;
}
asyncfunctionswitchSession(sid){
await fetch("/api/session/switch",{
method:"POST", headers:{"Content-Type":"application/json"},
body:JSON.stringify({session_id:sid})
});
refreshSessionList();
loadChatMsg();
}
asyncfunctiontopSession(sid, top){
await fetch("/api/session/top",{
method:"POST", headers:{"Content-Type":"application/json"},
body:JSON.stringify({session_id:sid, top:top})
});
refreshSessionList();
}
asyncfunctiondelSession(sid){
if(!confirm("删除整个对话?"))return;
await fetch("/api/session/del",{
method:"POST", headers:{"Content-Type":"application/json"},
body:JSON.stringify({session_id:sid})
});
refreshSessionList();
loadChatMsg();
}
// 加载当前会话消息
asyncfunctionloadChatMsg(){
let res = await fetch("/api/message/get");
let data = await res.json();
let box = document.getElementById("chatBox");
box.innerHTML = "";
data.msgs.forEach(m=>{
let cls = m.role==="user"?"bg-blue-50 self-end":"bg-gray-100 self-start";
box.innerHTML += `<div class="${cls} p-3 rounded max-w-[80%]">${m.content}</div>`;
});
box.scrollTop = box.scrollHeight;
}
// 发送提问
asyncfunctionsendMsg(){
let q = document.getElementById("userInput").value.trim();
if(!q)return;
let box = document.getElementById("chatBox");
box.innerHTML += `<div class="bg-blue-50 p-3 rounded max-w-[80%] self-end">${q}</div>`;
document.getElementById("userInput").value = "";
box.scrollTop = box.scrollHeight;
let res = await fetch("/api/chat",{
method:"POST", headers:{"Content-Type":"application/json"},
body:JSON.stringify({query:q})
});
let ret = await res.json();
if(ret.code!==0){
box.innerHTML += `<div class="text-red-500">${ret.msg}</div>`;
return;
}
if(ret.need_skill){
let skillInfo = `<div class="text-orange-600 text-sm border-l-2 border-orange-400 pl-2">
意图识别触发Skill【${ret.skill_name}】<br/>调用参数:${JSON.stringify(ret.args)}
</div>`;
box.innerHTML += skillInfo;
}
box.innerHTML += `<div class="bg-gray-100 p-3 rounded max-w-[80%]">${ret.reply}</div>`;
box.scrollTop = box.scrollHeight;
refreshSessionList();
}
// 页面初始化
window.onload = async ()=>{
refreshSkillList();
refreshSessionList();
loadChatMsg();
};
</script>
</body>
</html>
四、Skill打包规范(关键)
tool.json:Function Calling描述schema(模型意图识别依据)
- 把这个文件夹整体压缩为 zip包,前端上传这个zip,填写目标Skill名称;
示例 tool.json
{
"type": "function",
"function": {
"name": "get_city_weather",
"description": "查询指定城市实时天气",
"parameters": {
"type": "object",
"required": ["city"],
"properties": {
"city": {"type":"string","description":"城市名,如杭州、北京"}
}
}
}
}
配套 run.py
defget_city_weather(city):
returnf"{city} 今日多云,气温22~28℃"
五、全部功能清单(已实现)
左侧菜单栏
右侧聊天区
- 需要调用已启用Skill:自动提取参数→本地执行工具→结果回传模型整理回答
六、部署启动步骤
- 创建目录结构,把app.py、static/index.html放对位置;
- 修改app.py里
BAILIAN_API_KEY为你的真实SK; - 浏览器打开
http://127.0.0.1:5000 即可完整使用。
七、扩展优化建议
- 安全加固:添加上传zip大小限制、解压路径穿越校验、禁止危险Python代码执行;
- 日志:增加调用日志,记录每一次意图识别、Skill调用记录。