效果图




以下是仅针对SQLite3优化的tkinter数据库可视化管理工具完整代码,精简了多数据库适配逻辑,聚焦SQLite的核心特性(文件型数据库),保留配置保存、Treeview展示、SQL编辑器、增删改查、导出、表结构对比等全部要求功能,代码更轻量化且易运行:
pip install pandas openpyxl sqlparse # tkinter自带,无需额外安装
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, filedialog
import sqlite3
import pandas as pd
import sqlparse
import configparser
import os
classSQLiteVisualManager:
def__init__(self, root):
self.root = root
self.root.title("SQLite3 数据库可视化管理工具")
self.root.geometry("1300x800")
# 全局变量
self.conn = None# 数据库连接对象
self.current_db_path = ""# 当前连接的SQLite文件路径
self.current_table = None# 当前选中的表名
# 初始化界面
self._create_ui()
# ---------------------- 界面布局 ----------------------
def_create_ui(self):
# 1. 顶部数据库连接区
self._create_conn_frame()
# 2. 左侧表结构/列表区
self._create_table_frame()
# 3. 中间SQL编辑器区
self._create_sql_editor_frame()
# 4. 右侧数据展示区
self._create_data_display_frame()
# 5. 底部功能按钮区
self._create_func_buttons_frame()
def_create_conn_frame(self):
"""创建数据库连接面板"""
conn_frame = ttk.LabelFrame(self.root, text="SQLite数据库连接")
conn_frame.pack(fill="x", padx=5, pady=5)
# 连接控件
ttk.Label(conn_frame, text="数据库文件:").grid(row=0, column=0, padx=5, pady=5)
self.db_path_entry = ttk.Entry(conn_frame, width=50)
self.db_path_entry.grid(row=0, column=1, padx=5, pady=5)
ttk.Button(conn_frame, text="选择文件", command=self._select_db_file).grid(row=0, column=2, padx=5, pady=5)
ttk.Button(conn_frame, text="连接数据库", command=self._connect_db).grid(row=0, column=3, padx=5, pady=5)
ttk.Button(conn_frame, text="保存配置", command=self._save_config).grid(row=0, column=4, padx=5, pady=5)
ttk.Button(conn_frame, text="加载配置", command=self._load_config).grid(row=0, column=5, padx=5, pady=5)
def_create_table_frame(self):
"""创建表列表+表结构展示面板"""
table_frame = ttk.LabelFrame(self.root, text="数据表管理")
table_frame.pack(side="left", fill="y", padx=5, pady=5, width=250)
# 表列表Treeview
ttk.Label(table_frame, text="数据表列表:").pack(anchor="w", padx=5)
self.table_tree = ttk.Treeview(table_frame, show="tree", height=15)
self.table_tree.pack(fill="x", padx=5, pady=5)
self.table_tree.bind("<<TreeviewSelect>>", self._on_table_select)
# 表结构展示区
ttk.Label(table_frame, text="表结构:").pack(anchor="w", padx=5)
self.struct_text = scrolledtext.ScrolledText(table_frame, height=10, font=("Consolas", 9))
self.struct_text.pack(fill="both", padx=5, pady=5)
def_create_sql_editor_frame(self):
"""创建SQL编辑器面板"""
sql_frame = ttk.LabelFrame(self.root, text="SQL编辑器")
sql_frame.pack(fill="x", padx=5, pady=5)
# SQL编辑区
self.sql_text = scrolledtext.ScrolledText(sql_frame, width=80, height=10, font=("Consolas", 10))
self.sql_text.pack(fill="x", padx=5, pady=5)
# 编辑器功能按钮
btn_frame = ttk.Frame(sql_frame)
btn_frame.pack(fill="x", padx=5, pady=2)
ttk.Button(btn_frame, text="执行SQL", command=self._execute_sql).pack(side="left", padx=5)
ttk.Button(btn_frame, text="语法高亮", command=self._highlight_sql).pack(side="left", padx=5)
ttk.Button(btn_frame, text="清空编辑器", command=lambda: self.sql_text.delete(1.0, tk.END)).pack(side="left", padx=5)
def_create_data_display_frame(self):
"""创建数据展示面板"""
data_frame = ttk.LabelFrame(self.root, text="数据展示")
data_frame.pack(fill="both", padx=5, pady=5, expand=True)
# 数据展示Treeview(带滚动条)
self.data_tree = ttk.Treeview(data_frame, show="headings")
v_scroll = ttk.Scrollbar(data_frame, orient="vertical", command=self.data_tree.yview)
h_scroll = ttk.Scrollbar(data_frame, orient="horizontal", command=self.data_tree.xview)
self.data_tree.configure(yscrollcommand=v_scroll.set, xscrollcommand=h_scroll.set)
self.data_tree.pack(side="left", fill="both", expand=True, padx=5, pady=5)
v_scroll.pack(side="right", fill="y")
h_scroll.pack(side="bottom", fill="x")
def_create_func_buttons_frame(self):
"""创建功能按钮面板"""
func_frame = ttk.Frame(self.root)
func_frame.pack(fill="x", padx=5, pady=5)
# 数据操作按钮
ttk.Button(func_frame, text="新增行", command=self._add_row).pack(side="left", padx=5)
ttk.Button(func_frame, text="修改选中行", command=self._edit_row).pack(side="left", padx=5)
ttk.Button(func_frame, text="删除选中行", command=self._del_row).pack(side="left", padx=5)
# 导出/对比按钮
ttk.Button(func_frame, text="导出为Excel/CSV", command=self._export_data).pack(side="left", padx=5)
ttk.Button(func_frame, text="表结构对比", command=self._compare_table_struct).pack(side="left", padx=5)
# ---------------------- 数据库连接相关 ----------------------
def_select_db_file(self):
"""选择SQLite数据库文件"""
file_path = filedialog.askopenfilename(
filetypes=[("SQLite文件", "*.db *.sqlite *.sqlite3"), ("所有文件", "*.*")],
title="选择SQLite数据库文件"
)
if file_path:
self.db_path_entry.delete(0, tk.END)
self.db_path_entry.insert(0, file_path)
def_connect_db(self):
"""连接SQLite数据库"""
self.current_db_path = self.db_path_entry.get().strip()
ifnot self.current_db_path ornot os.path.exists(self.current_db_path):
messagebox.showerror("错误", "请选择有效的SQLite数据库文件!")
return
try:
# 建立数据库连接(支持读写,自动创建不存在的表)
self.conn = sqlite3.connect(self.current_db_path)
self.conn.execute("PRAGMA foreign_keys = ON") # 开启外键约束
# 加载数据表列表
self._load_tables()
messagebox.showinfo("成功", f"已成功连接:{os.path.basename(self.current_db_path)}")
except Exception as e:
messagebox.showerror("连接失败", f"错误原因:{str(e)}")
self.conn = None
def_save_config(self):
"""保存数据库连接配置"""
ifnot self.current_db_path:
messagebox.showwarning("警告", "请先连接数据库!")
return
# 选择配置文件保存路径
config_path = filedialog.asksaveasfilename(
defaultextension=".ini",
filetypes=[("配置文件", "*.ini"), ("所有文件", "*.*")],
title="保存连接配置"
)
ifnot config_path:
return
# 写入配置
config = configparser.ConfigParser()
config["SQLite"] = {"db_path": self.current_db_path}
try:
with open(config_path, "w", encoding="utf-8") as f:
config.write(f)
messagebox.showinfo("成功", f"配置已保存至:{config_path}")
except Exception as e:
messagebox.showerror("错误", f"保存失败:{str(e)}")
def_load_config(self):
"""加载数据库连接配置"""
config_path = filedialog.askopenfilename(
filetypes=[("配置文件", "*.ini"), ("所有文件", "*.*")],
title="加载连接配置"
)
ifnot config_path:
return
config = configparser.ConfigParser()
try:
config.read(config_path, encoding="utf-8")
db_path = config["SQLite"]["db_path"]
if os.path.exists(db_path):
self.db_path_entry.delete(0, tk.END)
self.db_path_entry.insert(0, db_path)
messagebox.showinfo("成功", "配置加载完成,请点击【连接数据库】")
else:
messagebox.showerror("错误", "配置文件中的数据库文件不存在!")
except Exception as e:
messagebox.showerror("加载失败", f"错误原因:{str(e)}")
def_load_tables(self):
"""加载数据库中的所有表"""
self.table_tree.delete(*self.table_tree.get_children())
cursor = self.conn.cursor()
# 查询所有用户表(排除系统表)
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name NOT LIKE 'sqlite_%'
ORDER BY name
""")
tables = [t[0] for t in cursor.fetchall()]
cursor.close()
# 插入到Treeview
for table in tables:
self.table_tree.insert("", "end", text=table, values=[table])
# ---------------------- 表选择与数据展示 ----------------------
def_on_table_select(self, event):
"""选中表后展示表结构和数据"""
selected = self.table_tree.selection()
ifnot selected ornot self.conn:
return
# 获取选中的表名
self.current_table = self.table_tree.item(selected[0])["text"]
# 展示表结构
self._show_table_struct()
# 展示表数据
self._show_table_data()
def_show_table_struct(self):
"""展示选中表的结构"""
self.struct_text.delete(1.0, tk.END)
cursor = self.conn.cursor()
# 查询表结构(SQLite专属)
cursor.execute(f"PRAGMA table_info({self.current_table})")
struct_data = cursor.fetchall() # (cid, name, type, notnull, dflt_value, pk)
cursor.close()
# 格式化展示
self.struct_text.insert(tk.END, f"表名:{self.current_table}\n")
self.struct_text.insert(tk.END, "字段名\t类型\t是否非空\t默认值\t是否主键\n")
self.struct_text.insert(tk.END, "-"*50 + "\n")
for col in struct_data:
cid, name, typ, notnull, dflt, pk = col
self.struct_text.insert(tk.END, f"{name}\t{typ}\t{bool(notnull)}\t{dflt}\t{bool(pk)}\n")
# 锁定文本框(只读)
self.struct_text.config(state="disabled")
def_show_table_data(self):
"""展示选中表的所有数据"""
self.data_tree.delete(*self.data_tree.get_children())
cursor = self.conn.cursor()
try:
# 查询表数据
cursor.execute(f"SELECT * FROM {self.current_table}")
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description] if cursor.description else []
# 设置Treeview列
self.data_tree["columns"] = columns
for col in columns:
self.data_tree.heading(col, text=col)
self.data_tree.column(col, width=100, anchor="center")
# 插入数据行
for row in rows:
self.data_tree.insert("", "end", values=row)
except Exception as e:
messagebox.showerror("加载失败", f"数据加载错误:{str(e)}")
finally:
cursor.close()
# ---------------------- SQL编辑器功能 ----------------------
def_execute_sql(self):
"""执行SQL语句"""
ifnot self.conn:
messagebox.showwarning("警告", "请先连接数据库!")
return
sql = self.sql_text.get(1.0, tk.END).strip()
ifnot sql:
messagebox.showwarning("警告", "请输入SQL语句!")
return
cursor = self.conn.cursor()
try:
# 支持批量执行SQL(分号分隔)
for stmt in sqlparse.split(sql):
if stmt.strip():
cursor.execute(stmt)
self.conn.commit()
# 处理查询类SQL,展示结果
if sql.strip().upper().startswith(("SELECT", "PRAGMA", "SHOW", "DESC")):
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description] if cursor.description else []
# 更新数据展示区
self.data_tree.delete(*self.data_tree.get_children())
self.data_tree["columns"] = columns
for col in columns:
self.data_tree.heading(col, text=col)
self.data_tree.column(col, width=100, anchor="center")
for row in rows:
self.data_tree.insert("", "end", values=row)
messagebox.showinfo("执行成功", f"查询到 {len(rows)} 条数据")
else:
# 增删改类SQL,刷新表列表和数据
self._load_tables()
if self.current_table:
self._show_table_data()
messagebox.showinfo("执行成功", "SQL语句执行完成,影响行数:{}".format(cursor.rowcount))
except Exception as e:
self.conn.rollback()
messagebox.showerror("执行失败", f"错误原因:{str(e)}")
finally:
cursor.close()
def_highlight_sql(self):
"""SQL语法高亮"""
sql = self.sql_text.get(1.0, tk.END)
ifnot sql.strip():
return
# 清空原有标签
for tag in self.sql_text.tag_names():
self.sql_text.tag_delete(tag)
# 定义标签样式
self.sql_text.tag_configure("keyword", foreground="#0000FF", font=("Consolas", 10, "bold")) # 关键字-蓝色
self.sql_text.tag_configure("string", foreground="#FF0000") # 字符串-红色
self.sql_text.tag_configure("comment", foreground="#008000") # 注释-绿色
self.sql_text.tag_configure("function", foreground="#800080") # 函数-紫色
# 解析SQL并标记
pos = 1.0
parsed = sqlparse.parse(sql)
for stmt in parsed:
for token in stmt.tokens:
token_text = token.value
token_len = len(token_text)
end_pos = self.sql_text.index(f"{pos} + {token_len} chars")
# 标记不同类型的token
if token.ttype is sqlparse.tokens.Keyword:
self.sql_text.tag_add("keyword", pos, end_pos)
elif token.ttype is sqlparse.tokens.String:
self.sql_text.tag_add("string", pos, end_pos)
elif token.ttype is sqlparse.tokens.Comment:
self.sql_text.tag_add("comment", pos, end_pos)
elif token.ttype is sqlparse.tokens.Name.Function:
self.sql_text.tag_add("function", pos, end_pos)
pos = end_pos
# ---------------------- 数据增删改查 ----------------------
def_add_row(self):
"""可视化新增行"""
ifnot self.current_table ornot self.conn:
messagebox.showwarning("警告", "请先选中数据表并连接数据库!")
return
# 获取表字段信息
cursor = self.conn.cursor()
cursor.execute(f"PRAGMA table_info({self.current_table})")
columns = [(col[1], col[2]) for col in cursor.fetchall()] # (字段名, 字段类型)
cursor.close()
# 创建新增窗口
add_win = tk.Toplevel(self.root)
add_win.title(f"新增 {self.current_table} 行数据")
add_win.geometry("400x300")
add_win.resizable(False, False)
# 生成输入框
entries = []
for i, (col_name, col_type) in enumerate(columns):
ttk.Label(add_win, text=f"{col_name} ({col_type}):").grid(row=i, column=0, padx=5, pady=5, sticky="e")
entry = ttk.Entry(add_win, width=30)
entry.grid(row=i, column=1, padx=5, pady=5)
entries.append((col_name, entry))
# 保存新增数据
defsave_data():
values = []
cols = []
for col_name, entry in entries:
val = entry.get().strip()
if val:
cols.append(col_name)
values.append(val)
ifnot cols:
messagebox.showwarning("警告", "至少填写一个字段!")
return
try:
# 构造插入SQL
placeholders = ", ".join(["?"] * len(cols))
sql = f"INSERT INTO {self.current_table} ({', '.join(cols)}) VALUES ({placeholders})"
cursor = self.conn.cursor()
cursor.execute(sql, values)
self.conn.commit()
cursor.close()
# 刷新数据展示
self._show_table_data()
add_win.destroy()
messagebox.showinfo("成功", "数据新增完成!")
except Exception as e:
self.conn.rollback()
messagebox.showerror("失败", f"新增错误:{str(e)}")
ttk.Button(add_win, text="保存", command=save_data).grid(row=len(columns), column=0, columnspan=2, pady=10)
def_edit_row(self):
"""可视化修改选中行"""
selected = self.data_tree.selection()
ifnot selected ornot self.current_table ornot self.conn:
messagebox.showwarning("警告", "请先选中要修改的数据行!")
return
# 获取选中行数据和表字段
row_vals = self.data_tree.item(selected[0])["values"]
cursor = self.conn.cursor()
cursor.execute(f"PRAGMA table_info({self.current_table})")
columns = [(col[1], col[2]) for col in cursor.fetchall()] # (字段名, 类型)
cursor.execute(f"PRAGMA table_info({self.current_table})")
pk_col = [col[1] for col in cursor.fetchall() if col[5] == 1] # 主键字段名
cursor.close()
ifnot pk_col:
messagebox.showerror("错误", "未识别到主键,无法修改!")
return
pk_name = pk_col[0]
pk_val = row_vals[columns.index((pk_name, ""))] # 主键值
# 创建修改窗口
edit_win = tk.Toplevel(self.root)
edit_win.title(f"修改 {self.current_table} 行数据")
edit_win.geometry("400x300")
edit_win.resizable(False, False)
# 生成输入框(默认填充原有值)
entries = []
for i, ((col_name, col_type), val) in enumerate(zip(columns, row_vals)):
ttk.Label(edit_win, text=f"{col_name} ({col_type}):").grid(row=i, column=0, padx=5, pady=5, sticky="e")
entry = ttk.Entry(add_win, width=30)
entry.insert(0, val)
entry.grid(row=i, column=1, padx=5, pady=5)
entries.append((col_name, entry))
# 保存修改
defsave_edit():
update_cols = []
update_vals = []
for col_name, entry in entries:
new_val = entry.get().strip()
if new_val and new_val != str(row_vals[columns.index((col_name, ""))]):
update_cols.append(f"{col_name} = ?")
update_vals.append(new_val)
ifnot update_cols:
messagebox.showwarning("警告", "未修改任何字段!")
return
try:
sql = f"UPDATE {self.current_table} SET {', '.join(update_cols)} WHERE {pk_name} = ?"
cursor = self.conn.cursor()
cursor.execute(sql, update_vals + [pk_val])
self.conn.commit()
cursor.close()
self._show_table_data()
edit_win.destroy()
messagebox.showinfo("成功", "数据修改完成!")
except Exception as e:
self.conn.rollback()
messagebox.showerror("失败", f"修改错误:{str(e)}")
ttk.Button(edit_win, text="保存修改", command=save_edit).grid(row=len(columns), column=0, columnspan=2, pady=10)
def_del_row(self):
"""删除选中行"""
selected = self.data_tree.selection()
ifnot selected ornot self.current_table ornot self.conn:
messagebox.showwarning("警告", "请先选中要删除的数据行!")
return
ifnot messagebox.askyesno("确认删除", "确定要删除选中的行吗?此操作不可恢复!"):
return
# 获取主键信息
row_vals = self.data_tree.item(selected[0])["values"]
cursor = self.conn.cursor()
cursor.execute(f"PRAGMA table_info({self.current_table})")
columns = [(col[1], col[2]) for col in cursor.fetchall()]
pk_col = [col[1] for col in cursor.fetchall() if col[5] == 1]
cursor.close()
ifnot pk_col:
messagebox.showerror("错误", "未识别到主键,无法删除!")
return
pk_name = pk_col[0]
pk_val = row_vals[columns.index((pk_name, ""))]
try:
# 执行删除
sql = f"DELETE FROM {self.current_table} WHERE {pk_name} = ?"
cursor = self.conn.cursor()
cursor.execute(sql, [pk_val])
self.conn.commit()
cursor.close()
# 刷新数据
self.data_tree.delete(selected[0])
messagebox.showinfo("成功", "数据删除完成!")
except Exception as e:
self.conn.rollback()
messagebox.showerror("失败", f"删除错误:{str(e)}")
# ---------------------- 数据导出 ----------------------
def_export_data(self):
"""导出查询结果为Excel/CSV"""
ifnot self.data_tree["columns"] ornot self.conn:
messagebox.showwarning("警告", "暂无数据可导出!")
return
# 选择导出格式和路径
file_path = filedialog.asksaveasfilename(
defaultextension=".xlsx",
filetypes=[("Excel文件", "*.xlsx"), ("CSV文件", "*.csv")],
title="导出数据"
)
ifnot file_path:
return
# 提取数据
columns = self.data_tree["columns"]
rows = []
for item in self.data_tree.get_children():
rows.append(self.data_tree.item(item)["values"])
# 生成DataFrame并导出
df = pd.DataFrame(rows, columns=columns)
try:
if file_path.endswith(".xlsx"):
df.to_excel(file_path, index=False, engine="openpyxl")
else:
df.to_csv(file_path, index=False, encoding="utf-8-sig")
messagebox.showinfo("成功", f"数据已导出至:{file_path}")
except Exception as e:
messagebox.showerror("失败", f"导出错误:{str(e)}")
# ---------------------- 表结构对比 ----------------------
def_compare_table_struct(self):
"""多表结构对比"""
ifnot self.conn:
messagebox.showwarning("警告", "请先连接数据库!")
return
# 获取所有表名
cursor = self.conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = [t[0] for t in cursor.fetchall()]
cursor.close()
if len(tables) < 2:
messagebox.showwarning("警告", "数据库中至少需要2张表才能对比!")
return
# 创建对比窗口
cmp_win = tk.Toplevel(self.root)
cmp_win.title("表结构对比")
cmp_win.geometry("500x200")
cmp_win.resizable(False, False)
# 选择要对比的表
ttk.Label(cmp_win, text="表1:").grid(row=0, column=0, padx=5, pady=10, sticky="e")
table1_combo = ttk.Combobox(cmp_win, values=tables, width=20)
table1_combo.grid(row=0, column=1, padx=5, pady=10)
ttk.Label(cmp_win, text="表2:").grid(row=1, column=0, padx=5, pady=10, sticky="e")
table2_combo = ttk.Combobox(cmp_win, values=tables, width=20)
table2_combo.grid(row=1, column=1, padx=5, pady=10)
# 执行对比
defdo_compare():
t1 = table1_combo.get()
t2 = table2_combo.get()
ifnot t1 ornot t2 or t1 == t2:
messagebox.showwarning("警告", "请选择两个不同的表!")
return
# 获取两张表的结构
cursor = self.conn.cursor()
cursor.execute(f"PRAGMA table_info({t1})")
t1_struct = {col[1]: (col[2], col[3], col[5]) for col in cursor.fetchall()} # 字段名: (类型, 非空, 主键)
cursor.execute(f"PRAGMA table_info({t2})")
t2_struct = {col[1]: (col[2], col[3], col[5]) for col in cursor.fetchall()}
cursor.close()
# 分析差异
diffs = []
# 表1独有字段
for col, (typ, notnull, pk) in t1_struct.items():
if col notin t2_struct:
diffs.append(f"【{t1} 独有】字段:{col} | 类型:{typ} | 非空:{bool(notnull)} | 主键:{bool(pk)}")
else:
# 字段存在但属性不同
t2_typ, t2_notnull, t2_pk = t2_struct[col]
if (typ, notnull, pk) != (t2_typ, t2_notnull, t2_pk):
diffs.append(f"【字段属性差异】{col} | {t1}({typ}, 非空:{bool(notnull)}, 主键:{bool(pk)}) | {t2}({t2_typ}, 非空:{bool(t2_notnull)}, 主键:{bool(t2_pk)})")
# 表2独有字段
for col, (typ, notnull, pk) in t2_struct.items():
if col notin t1_struct:
diffs.append(f"【{t2} 独有】字段:{col} | 类型:{typ} | 非空:{bool(notnull)} | 主键:{bool(pk)}")
# 展示对比结果
result = "\n\n".join(diffs) if diffs else"两张表的结构完全一致!"
result_win = tk.Toplevel(self.root)
result_win.title(f"{t1} VS {t2} 对比结果")
result_win.geometry("600x400")
result_text = scrolledtext.ScrolledText(result_win, font=("Consolas", 10))
result_text.pack(fill="both", expand=True, padx=5, pady=5)
result_text.insert(tk.END, result)
result_text.config(state="disabled") # 只读
ttk.Button(cmp_win, text="开始对比", command=do_compare).grid(row=2, column=0, columnspan=2, pady=10)
if __name__ == "__main__":
root = tk.Tk()
app = SQLiteVisualManager(root)
root.mainloop()
PRAGMA table_info获取表字段信息,格式化展示字段名、类型、主键等 | |
sqlparse实现SQL语句自动格式化;ttkbootstrap替换原生ttk,优化UI样式。utf-8-sig编码,避免中文乱码;openpyxl缺失,执行pip install openpyxl即可。 运行有问题,修复版如下
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, filedialog
import sqlite3
import pandas as pd
import sqlparse
import configparser
import os
classSQLiteVisualManager:
def__init__(self, root):
self.root = root
self.root.title("SQLite3 数据库可视化管理工具")
self.root.geometry("1300x800")
# 全局变量
self.conn = None# 数据库连接对象
self.current_db_path = ""# 当前连接的SQLite文件路径
self.current_table = None# 当前选中的表名
# 初始化界面
self._create_ui()
# ---------------------- 界面布局 ----------------------
def_create_ui(self):
# 1. 顶部数据库连接区
self._create_conn_frame()
# 2. 左侧表结构/列表区
self._create_table_frame()
# 3. 中间SQL编辑器区
self._create_sql_editor_frame()
# 4. 右侧数据展示区
self._create_data_display_frame()
# 5. 底部功能按钮区
self._create_func_buttons_frame()
def_create_conn_frame(self):
"""创建数据库连接面板"""
conn_frame = ttk.LabelFrame(self.root, text="SQLite数据库连接")
conn_frame.pack(fill="x", padx=5, pady=5)
# 连接控件
ttk.Label(conn_frame, text="数据库文件:").grid(row=0, column=0, padx=5, pady=5)
self.db_path_entry = ttk.Entry(conn_frame, width=50)
self.db_path_entry.grid(row=0, column=1, padx=5, pady=5)
ttk.Button(conn_frame, text="选择文件", command=self._select_db_file).grid(row=0, column=2, padx=5, pady=5)
ttk.Button(conn_frame, text="连接数据库", command=self._connect_db).grid(row=0, column=3, padx=5, pady=5)
ttk.Button(conn_frame, text="保存配置", command=self._save_config).grid(row=0, column=4, padx=5, pady=5)
ttk.Button(conn_frame, text="加载配置", command=self._load_config).grid(row=0, column=5, padx=5, pady=5)
def_create_table_frame(self):
"""创建表列表+表结构展示面板(修复pack的width参数问题)"""
table_frame = ttk.LabelFrame(self.root, text="数据表管理")
# 移除width参数,改用configure设置宽度
table_frame.pack(side="left", fill="y", padx=5, pady=5)
table_frame.configure(width=250) # 正确设置LabelFrame宽度的方式
table_frame.pack_propagate(False) # 禁止组件撑开Frame,固定宽度
# 表列表Treeview
ttk.Label(table_frame, text="数据表列表:").pack(anchor="w", padx=5)
self.table_tree = ttk.Treeview(table_frame, show="tree", height=15)
self.table_tree.pack(fill="x", padx=5, pady=5)
self.table_tree.bind("<<TreeviewSelect>>", self._on_table_select)
# 表结构展示区
ttk.Label(table_frame, text="表结构:").pack(anchor="w", padx=5)
self.struct_text = scrolledtext.ScrolledText(table_frame, height=10, font=("Consolas", 9))
self.struct_text.pack(fill="both", padx=5, pady=5)
def_create_sql_editor_frame(self):
"""创建SQL编辑器面板"""
sql_frame = ttk.LabelFrame(self.root, text="SQL编辑器")
sql_frame.pack(fill="x", padx=5, pady=5)
# SQL编辑区
self.sql_text = scrolledtext.ScrolledText(sql_frame, width=80, height=10, font=("Consolas", 10))
self.sql_text.pack(fill="x", padx=5, pady=5)
# 编辑器功能按钮
btn_frame = ttk.Frame(sql_frame)
btn_frame.pack(fill="x", padx=5, pady=2)
ttk.Button(btn_frame, text="执行SQL", command=self._execute_sql).pack(side="left", padx=5)
ttk.Button(btn_frame, text="语法高亮", command=self._highlight_sql).pack(side="left", padx=5)
ttk.Button(btn_frame, text="清空编辑器", command=lambda: self.sql_text.delete(1.0, tk.END)).pack(side="left", padx=5)
def_create_data_display_frame(self):
"""创建数据展示面板"""
data_frame = ttk.LabelFrame(self.root, text="数据展示")
data_frame.pack(fill="both", padx=5, pady=5, expand=True)
# 数据展示Treeview(带滚动条)
self.data_tree = ttk.Treeview(data_frame, show="headings")
v_scroll = ttk.Scrollbar(data_frame, orient="vertical", command=self.data_tree.yview)
h_scroll = ttk.Scrollbar(data_frame, orient="horizontal", command=self.data_tree.xview)
self.data_tree.configure(yscrollcommand=v_scroll.set, xscrollcommand=h_scroll.set)
self.data_tree.pack(side="left", fill="both", expand=True, padx=5, pady=5)
v_scroll.pack(side="right", fill="y")
h_scroll.pack(side="bottom", fill="x")
def_create_func_buttons_frame(self):
"""创建功能按钮面板"""
func_frame = ttk.Frame(self.root)
func_frame.pack(fill="x", padx=5, pady=5)
# 数据操作按钮
ttk.Button(func_frame, text="新增行", command=self._add_row).pack(side="left", padx=5)
ttk.Button(func_frame, text="修改选中行", command=self._edit_row).pack(side="left", padx=5)
ttk.Button(func_frame, text="删除选中行", command=self._del_row).pack(side="left", padx=5)
# 导出/对比按钮
ttk.Button(func_frame, text="导出为Excel/CSV", command=self._export_data).pack(side="left", padx=5)
ttk.Button(func_frame, text="表结构对比", command=self._compare_table_struct).pack(side="left", padx=5)
# ---------------------- 数据库连接相关 ----------------------
def_select_db_file(self):
"""选择SQLite数据库文件"""
file_path = filedialog.askopenfilename(
filetypes=[("SQLite文件", "*.db *.sqlite *.sqlite3"), ("所有文件", "*.*")],
title="选择SQLite数据库文件"
)
if file_path:
self.db_path_entry.delete(0, tk.END)
self.db_path_entry.insert(0, file_path)
def_connect_db(self):
"""连接SQLite数据库"""
self.current_db_path = self.db_path_entry.get().strip()
ifnot self.current_db_path ornot os.path.exists(self.current_db_path):
messagebox.showerror("错误", "请选择有效的SQLite数据库文件!")
return
try:
# 建立数据库连接(支持读写,自动创建不存在的表)
self.conn = sqlite3.connect(self.current_db_path)
self.conn.execute("PRAGMA foreign_keys = ON") # 开启外键约束
# 加载数据表列表
self._load_tables()
messagebox.showinfo("成功", f"已成功连接:{os.path.basename(self.current_db_path)}")
except Exception as e:
messagebox.showerror("连接失败", f"错误原因:{str(e)}")
self.conn = None
def_save_config(self):
"""保存数据库连接配置"""
ifnot self.current_db_path:
messagebox.showwarning("警告", "请先连接数据库!")
return
# 选择配置文件保存路径
config_path = filedialog.asksaveasfilename(
defaultextension=".ini",
filetypes=[("配置文件", "*.ini"), ("所有文件", "*.*")],
title="保存连接配置"
)
ifnot config_path:
return
# 写入配置
config = configparser.ConfigParser()
config["SQLite"] = {"db_path": self.current_db_path}
try:
with open(config_path, "w", encoding="utf-8") as f:
config.write(f)
messagebox.showinfo("成功", f"配置已保存至:{config_path}")
except Exception as e:
messagebox.showerror("错误", f"保存失败:{str(e)}")
def_load_config(self):
"""加载数据库连接配置"""
config_path = filedialog.askopenfilename(
filetypes=[("配置文件", "*.ini"), ("所有文件", "*.*")],
title="加载连接配置"
)
ifnot config_path:
return
config = configparser.ConfigParser()
try:
config.read(config_path, encoding="utf-8")
db_path = config["SQLite"]["db_path"]
if os.path.exists(db_path):
self.db_path_entry.delete(0, tk.END)
self.db_path_entry.insert(0, db_path)
messagebox.showinfo("成功", "配置加载完成,请点击【连接数据库】")
else:
messagebox.showerror("错误", "配置文件中的数据库文件不存在!")
except Exception as e:
messagebox.showerror("加载失败", f"错误原因:{str(e)}")
def_load_tables(self):
"""加载数据库中的所有表"""
self.table_tree.delete(*self.table_tree.get_children())
cursor = self.conn.cursor()
# 查询所有用户表(排除系统表)
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name NOT LIKE 'sqlite_%'
ORDER BY name
""")
tables = [t[0] for t in cursor.fetchall()]
cursor.close()
# 插入到Treeview
for table in tables:
self.table_tree.insert("", "end", text=table, values=[table])
# ---------------------- 表选择与数据展示 ----------------------
def_on_table_select(self, event):
"""选中表后展示表结构和数据"""
selected = self.table_tree.selection()
ifnot selected ornot self.conn:
return
# 获取选中的表名
self.current_table = self.table_tree.item(selected[0])["text"]
# 展示表结构
self._show_table_struct()
# 展示表数据
self._show_table_data()
def_show_table_struct(self):
"""展示选中表的结构"""
self.struct_text.delete(1.0, tk.END)
cursor = self.conn.cursor()
# 查询表结构(SQLite专属)
cursor.execute(f"PRAGMA table_info({self.current_table})")
struct_data = cursor.fetchall() # (cid, name, type, notnull, dflt_value, pk)
cursor.close()
# 格式化展示
self.struct_text.insert(tk.END, f"表名:{self.current_table}\n")
self.struct_text.insert(tk.END, "字段名\t类型\t是否非空\t默认值\t是否主键\n")
self.struct_text.insert(tk.END, "-"*50 + "\n")
for col in struct_data:
cid, name, typ, notnull, dflt, pk = col
self.struct_text.insert(tk.END, f"{name}\t{typ}\t{bool(notnull)}\t{dflt}\t{bool(pk)}\n")
# 锁定文本框(只读)
self.struct_text.config(state="disabled")
def_show_table_data(self):
"""展示选中表的所有数据"""
self.data_tree.delete(*self.data_tree.get_children())
cursor = self.conn.cursor()
try:
# 查询表数据
cursor.execute(f"SELECT * FROM {self.current_table}")
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description] if cursor.description else []
# 设置Treeview列
self.data_tree["columns"] = columns
for col in columns:
self.data_tree.heading(col, text=col)
self.data_tree.column(col, width=100, anchor="center")
# 插入数据行
for row in rows:
self.data_tree.insert("", "end", values=row)
except Exception as e:
messagebox.showerror("加载失败", f"数据加载错误:{str(e)}")
finally:
cursor.close()
# ---------------------- SQL编辑器功能 ----------------------
def_execute_sql(self):
"""执行SQL语句"""
ifnot self.conn:
messagebox.showwarning("警告", "请先连接数据库!")
return
sql = self.sql_text.get(1.0, tk.END).strip()
ifnot sql:
messagebox.showwarning("警告", "请输入SQL语句!")
return
cursor = self.conn.cursor()
try:
# 支持批量执行SQL(分号分隔)
for stmt in sqlparse.split(sql):
if stmt.strip():
cursor.execute(stmt)
self.conn.commit()
# 处理查询类SQL,展示结果
if sql.strip().upper().startswith(("SELECT", "PRAGMA", "SHOW", "DESC")):
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description] if cursor.description else []
# 更新数据展示区
self.data_tree.delete(*self.data_tree.get_children())
self.data_tree["columns"] = columns
for col in columns:
self.data_tree.heading(col, text=col)
self.data_tree.column(col, width=100, anchor="center")
for row in rows:
self.data_tree.insert("", "end", values=row)
messagebox.showinfo("执行成功", f"查询到 {len(rows)} 条数据")
else:
# 增删改类SQL,刷新表列表和数据
self._load_tables()
if self.current_table:
self._show_table_data()
messagebox.showinfo("执行成功", "SQL语句执行完成,影响行数:{}".format(cursor.rowcount))
except Exception as e:
self.conn.rollback()
messagebox.showerror("执行失败", f"错误原因:{str(e)}")
finally:
cursor.close()
def_highlight_sql(self):
"""SQL语法高亮"""
sql = self.sql_text.get(1.0, tk.END)
ifnot sql.strip():
return
# 清空原有标签
for tag in self.sql_text.tag_names():
self.sql_text.tag_delete(tag)
# 定义标签样式
self.sql_text.tag_configure("keyword", foreground="#0000FF", font=("Consolas", 10, "bold")) # 关键字-蓝色
self.sql_text.tag_configure("string", foreground="#FF0000") # 字符串-红色
self.sql_text.tag_configure("comment", foreground="#008000") # 注释-绿色
self.sql_text.tag_configure("function", foreground="#800080") # 函数-紫色
# 解析SQL并标记
pos = 1.0
parsed = sqlparse.parse(sql)
for stmt in parsed:
for token in stmt.tokens:
token_text = token.value
token_len = len(token_text)
end_pos = self.sql_text.index(f"{pos} + {token_len} chars")
# 标记不同类型的token
if token.ttype is sqlparse.tokens.Keyword:
self.sql_text.tag_add("keyword", pos, end_pos)
elif token.ttype is sqlparse.tokens.String:
self.sql_text.tag_add("string", pos, end_pos)
elif token.ttype is sqlparse.tokens.Comment:
self.sql_text.tag_add("comment", pos, end_pos)
elif token.ttype is sqlparse.tokens.Name.Function:
self.sql_text.tag_add("function", pos, end_pos)
pos = end_pos
# ---------------------- 数据增删改查 ----------------------
def_add_row(self):
"""可视化新增行"""
ifnot self.current_table ornot self.conn:
messagebox.showwarning("警告", "请先选中数据表并连接数据库!")
return
# 获取表字段信息
cursor = self.conn.cursor()
cursor.execute(f"PRAGMA table_info({self.current_table})")
columns = [(col[1], col[2]) for col in cursor.fetchall()] # (字段名, 字段类型)
cursor.close()
# 创建新增窗口
add_win = tk.Toplevel(self.root)
add_win.title(f"新增 {self.current_table} 行数据")
add_win.geometry("400x300")
add_win.resizable(False, False)
# 生成输入框
entries = []
for i, (col_name, col_type) in enumerate(columns):
ttk.Label(add_win, text=f"{col_name} ({col_type}):").grid(row=i, column=0, padx=5, pady=5, sticky="e")
entry = ttk.Entry(add_win, width=30)
entry.grid(row=i, column=1, padx=5, pady=5)
entries.append((col_name, entry))
# 保存新增数据
defsave_data():
values = []
cols = []
for col_name, entry in entries:
val = entry.get().strip()
if val:
cols.append(col_name)
values.append(val)
ifnot cols:
messagebox.showwarning("警告", "至少填写一个字段!")
return
try:
# 构造插入SQL
placeholders = ", ".join(["?"] * len(cols))
sql = f"INSERT INTO {self.current_table} ({', '.join(cols)}) VALUES ({placeholders})"
cursor = self.conn.cursor()
cursor.execute(sql, values)
self.conn.commit()
cursor.close()
# 刷新数据展示
self._show_table_data()
add_win.destroy()
messagebox.showinfo("成功", "数据新增完成!")
except Exception as e:
self.conn.rollback()
messagebox.showerror("失败", f"新增错误:{str(e)}")
ttk.Button(add_win, text="保存", command=save_data).grid(row=len(columns), column=0, columnspan=2, pady=10)
def_edit_row(self):
"""可视化修改选中行(修复输入框绑定窗口的笔误)"""
selected = self.data_tree.selection()
ifnot selected ornot self.current_table ornot self.conn:
messagebox.showwarning("警告", "请先选中要修改的数据行!")
return
# 获取选中行数据和表字段
row_vals = self.data_tree.item(selected[0])["values"]
cursor = self.conn.cursor()
cursor.execute(f"PRAGMA table_info({self.current_table})")
columns = [(col[1], col[2]) for col in cursor.fetchall()] # (字段名, 类型)
cursor.execute(f"PRAGMA table_info({self.current_table})")
pk_col = [col[1] for col in cursor.fetchall() if col[5] == 1] # 主键字段名
cursor.close()
ifnot pk_col:
messagebox.showerror("错误", "未识别到主键,无法修改!")
return
pk_name = pk_col[0]
# 修正:正确获取主键字段的索引
pk_index = [i for i, (col, _) in enumerate(columns) if col == pk_name][0]
pk_val = row_vals[pk_index]
# 创建修改窗口
edit_win = tk.Toplevel(self.root)
edit_win.title(f"修改 {self.current_table} 行数据")
edit_win.geometry("400x300")
edit_win.resizable(False, False)
# 生成输入框(默认填充原有值)
entries = []
for i, ((col_name, col_type), val) in enumerate(zip(columns, row_vals)):
ttk.Label(edit_win, text=f"{col_name} ({col_type}):").grid(row=i, column=0, padx=5, pady=5, sticky="e")
entry = ttk.Entry(edit_win, width=30) # 修复:原代码错误绑定到add_win
entry.insert(0, val)
entry.grid(row=i, column=1, padx=5, pady=5)
entries.append((col_name, entry))
# 保存修改
defsave_edit():
update_cols = []
update_vals = []
for col_name, entry in entries:
new_val = entry.get().strip()
# 跳过主键(避免修改主键)
if col_name == pk_name:
continue
if new_val and new_val != str(row_vals[columns.index((col_name, col_type))]):
update_cols.append(f"{col_name} = ?")
update_vals.append(new_val)
ifnot update_cols:
messagebox.showwarning("警告", "未修改任何字段!")
return
try:
sql = f"UPDATE {self.current_table} SET {', '.join(update_cols)} WHERE {pk_name} = ?"
cursor = self.conn.cursor()
cursor.execute(sql, update_vals + [pk_val])
self.conn.commit()
cursor.close()
self._show_table_data()
edit_win.destroy()
messagebox.showinfo("成功", "数据修改完成!")
except Exception as e:
self.conn.rollback()
messagebox.showerror("失败", f"修改错误:{str(e)}")
ttk.Button(edit_win, text="保存修改", command=save_edit).grid(row=len(columns), column=0, columnspan=2, pady=10)
def_del_row(self):
"""删除选中行"""
selected = self.data_tree.selection()
ifnot selected ornot self.current_table ornot self.conn:
messagebox.showwarning("警告", "请先选中要删除的数据行!")
return
ifnot messagebox.askyesno("确认删除", "确定要删除选中的行吗?此操作不可恢复!"):
return
# 获取主键信息
row_vals = self.data_tree.item(selected[0])["values"]
cursor = self.conn.cursor()
cursor.execute(f"PRAGMA table_info({self.current_table})")
columns = [(col[1], col[2]) for col in cursor.fetchall()]
pk_col = [col[1] for col in cursor.fetchall() if col[5] == 1]
cursor.close()
ifnot pk_col:
messagebox.showerror("错误", "未识别到主键,无法删除!")
return
pk_name = pk_col[0]
pk_index = [i for i, (col, _) in enumerate(columns) if col == pk_name][0]
pk_val = row_vals[pk_index]
try:
# 执行删除
sql = f"DELETE FROM {self.current_table} WHERE {pk_name} = ?"
cursor = self.conn.cursor()
cursor.execute(sql, [pk_val])
self.conn.commit()
cursor.close()
# 刷新数据
self.data_tree.delete(selected[0])
messagebox.showinfo("成功", "数据删除完成!")
except Exception as e:
self.conn.rollback()
messagebox.showerror("失败", f"删除错误:{str(e)}")
# ---------------------- 数据导出 ----------------------
def_export_data(self):
"""导出查询结果为Excel/CSV"""
ifnot self.data_tree["columns"] ornot self.conn:
messagebox.showwarning("警告", "暂无数据可导出!")
return
# 选择导出格式和路径
file_path = filedialog.asksaveasfilename(
defaultextension=".xlsx",
filetypes=[("Excel文件", "*.xlsx"), ("CSV文件", "*.csv")],
title="导出数据"
)
ifnot file_path:
return
# 提取数据
columns = self.data_tree["columns"]
rows = []
for item in self.data_tree.get_children():
rows.append(self.data_tree.item(item)["values"])
# 生成DataFrame并导出
df = pd.DataFrame(rows, columns=columns)
try:
if file_path.endswith(".xlsx"):
df.to_excel(file_path, index=False, engine="openpyxl")
else:
df.to_csv(file_path, index=False, encoding="utf-8-sig")
messagebox.showinfo("成功", f"数据已导出至:{file_path}")
except Exception as e:
messagebox.showerror("失败", f"导出错误:{str(e)}")
# ---------------------- 表结构对比 ----------------------
def_compare_table_struct(self):
"""多表结构对比"""
ifnot self.conn:
messagebox.showwarning("警告", "请先连接数据库!")
return
# 获取所有表名
cursor = self.conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = [t[0] for t in cursor.fetchall()]
cursor.close()
if len(tables) < 2:
messagebox.showwarning("警告", "数据库中至少需要2张表才能对比!")
return
# 创建对比窗口
cmp_win = tk.Toplevel(self.root)
cmp_win.title("表结构对比")
cmp_win.geometry("500x200")
cmp_win.resizable(False, False)
# 选择要对比的表
ttk.Label(cmp_win, text="表1:").grid(row=0, column=0, padx=5, pady=10, sticky="e")
table1_combo = ttk.Combobox(cmp_win, values=tables, width=20)
table1_combo.grid(row=0, column=1, padx=5, pady=10)
ttk.Label(cmp_win, text="表2:").grid(row=1, column=0, padx=5, pady=10, sticky="e")
table2_combo = ttk.Combobox(cmp_win, values=tables, width=20)
table2_combo.grid(row=1, column=1, padx=5, pady=10)
# 执行对比
defdo_compare():
t1 = table1_combo.get()
t2 = table2_combo.get()
ifnot t1 ornot t2 or t1 == t2:
messagebox.showwarning("警告", "请选择两个不同的表!")
return
# 获取两张表的结构
cursor = self.conn.cursor()
cursor.execute(f"PRAGMA table_info({t1})")
t1_struct = {col[1]: (col[2], col[3], col[5]) for col in cursor.fetchall()} # 字段名: (类型, 非空, 主键)
cursor.execute(f"PRAGMA table_info({t2})")
t2_struct = {col[1]: (col[2], col[3], col[5]) for col in cursor.fetchall()}
cursor.close()
# 分析差异
diffs = []
# 表1独有字段
for col, (typ, notnull, pk) in t1_struct.items():
if col notin t2_struct:
diffs.append(f"【{t1} 独有】字段:{col} | 类型:{typ} | 非空:{bool(notnull)} | 主键:{bool(pk)}")
else:
# 字段存在但属性不同
t2_typ, t2_notnull, t2_pk = t2_struct[col]
if (typ, notnull, pk) != (t2_typ, t2_notnull, t2_pk):
diffs.append(f"【字段属性差异】{col} | {t1}({typ}, 非空:{bool(notnull)}, 主键:{bool(pk)}) | {t2}({t2_typ}, 非空:{bool(t2_notnull)}, 主键:{bool(t2_pk)})")
# 表2独有字段
for col, (typ, notnull, pk) in t2_struct.items():
if col notin t1_struct:
diffs.append(f"【{t2} 独有】字段:{col} | 类型:{typ} | 非空:{bool(notnull)} | 主键:{bool(pk)}")
# 展示对比结果
result = "\n\n".join(diffs) if diffs else"两张表的结构完全一致!"
result_win = tk.Toplevel(self.root)
result_win.title(f"{t1} VS {t2} 对比结果")
result_win.geometry("600x400")
result_text = scrolledtext.ScrolledText(result_win, font=("Consolas", 10))
result_text.pack(fill="both", expand=True, padx=5, pady=5)
result_text.insert(tk.END, result)
result_text.config(state="disabled") # 只读
ttk.Button(cmp_win, text="开始对比", command=do_compare).grid(row=2, column=0, columnspan=2, pady=10)
if __name__ == "__main__":
root = tk.Tk()
app = SQLiteVisualManager(root)
root.mainloop()
pack方法width参数错误:
table_frame.pack(side="left", fill="y", padx=5, pady=5, width=250)table_frame.pack(side="left", fill="y", padx=5, pady=5),再通过table_frame.configure(width=250)设置宽度,并添加table_frame.pack_propagate(False)禁止内部组件撑开Frame,保证宽度固定。_edit_row方法输入框绑定错误:
entry = ttk.Entry(add_win, width=30)(错误绑定到新增窗口)entry = ttk.Entry(edit_win, width=30)(正确绑定到修改窗口)主键索引获取逻辑优化:
columns.index((pk_name, ""))获取主键索引,可能因元组匹配失败报错;[i for i, (col, _) in enumerate(columns) if col == pk_name][0]精准获取主键字段的索引,避免匹配错误。pip install pandas openpyxl sqlparse;_tkinter.TclError报错;以下为你生成一个可直接用于测试的SQLite数据库文件(test.db),包含两张测试表(用户表、商品表)及示例数据,能完美适配上述可视化工具的所有功能(增删改查、导出、表结构对比等)。
运行该脚本会自动在当前目录生成 test.db 文件,包含预设表和数据:
import sqlite3
import os
# 数据库文件路径(可修改为你想要的路径,如D:\ysp-test-2024\markitdown\day\202512\test.db)
DB_PATH = "test.db"
# 若文件已存在,先删除(避免重复创建)
if os.path.exists(DB_PATH):
os.remove(DB_PATH)
# 连接数据库(自动创建文件)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 1. 创建用户表(user_info)
cursor.execute('''
CREATE TABLE user_info (
id INTEGER PRIMARY KEY AUTOINCREMENT, -- 主键自增
username TEXT NOT NULL, -- 用户名(非空)
age INTEGER, -- 年龄
gender TEXT, -- 性别
create_time TEXT DEFAULT (datetime('now', 'localtime')) -- 创建时间(默认当前本地时间)
)
''')
# 插入用户示例数据
user_data = [
("张三", 25, "男"),
("李四", 30, "女"),
("王五", 28, "男"),
("赵六", 22, "女")
]
cursor.executemany("INSERT INTO user_info (username, age, gender) VALUES (?, ?, ?)", user_data)
# 2. 创建商品表(product_info)
cursor.execute('''
CREATE TABLE product_info (
product_id INTEGER PRIMARY KEY AUTOINCREMENT, -- 商品ID(主键自增)
product_name TEXT NOT NULL, -- 商品名(非空)
price REAL NOT NULL, -- 价格(非空)
stock INTEGER DEFAULT 0, -- 库存(默认0)
category TEXT -- 商品分类
)
''')
# 插入商品示例数据
product_data = [
("小米14手机", 3999.99, 100, "数码产品"),
("华为Mate60", 4999.99, 80, "数码产品"),
("Nike运动鞋", 599.99, 200, "服饰"),
("苹果AirPods Pro", 1799.00, 150, "数码产品")
]
cursor.executemany("INSERT INTO product_info (product_name, price, stock, category) VALUES (?, ?, ?, ?)", product_data)
# 提交事务并关闭连接
conn.commit()
cursor.close()
conn.close()
print(f"✅ 数据库文件生成成功!路径:{os.path.abspath(DB_PATH)}")
print("📋 包含表:user_info(用户表)、product_info(商品表),可直接用于可视化工具测试")
save_test_db.py;python save_test_db.py
test.db 文件(大小约几KB)。user_info | ||
product_info |
test.db 文件;user_info 和 product_info 两张表;SELECT * FROM user_info WHERE age > 25);user_info和product_info)。若需生成适配该数据库的配置文件(sqlite_config.ini),内容如下:
[SQLite]
# 替换为你的test.db实际路径
db_path = D:\ysp-test-2024\markitdown\day\202512\test.db
remark = 测试用SQLite数据库(包含user_info和product_info表)
save_test_db.py 中的 DB_PATH,需确保路径存在(如D:\ysp-test-2024\markitdown\day\202512\需先创建);