

Python,速成心法
敲代码,查资料,问Ai
练习,探索,总结,优化

------★数据库历史教程列表★-------
Python数据库教程: SQLite3增删改查操作(tkinter)
Python项目源码57:数据格式转换工具1.0(csv+json+excel+sqlite3)
Python项目源码69:Excel数据筛选器1.0(tkinter+sqlite3+pandas)
Python数据库教程02:sqlite3批量删除+某字段数据内容替换(tkinter)
下面是一个完整的 Python 程序,使用 tkinter 图形界面展示 SQLite 中聚合函数(COUNT、SUM、AVG、GROUP_CONCAT 等)的综合用法。程序会在内存中创建示例数据表,并提供多个按钮,点击后显示对应的聚合查询结果。

↓ 完整源码如下 ↓
# -*- coding: utf-8 -*-# @Author : 小红牛# 微信公众号:wdPythonimport sqlite3import tkinter as tkfrom tkinter import ttk, messagebox# memory创建内存数据库并填充数据def init_db():conn = sqlite3.connect(':memory:')cursor = conn.cursor()cursor.execute('''CREATE TABLE employees (id INTEGER PRIMARY KEY,name TEXT,department TEXT,salary INTEGER,bonus INTEGER,hire_date TEXT)''')data = [(1, 'Alice', 'Sales', 60000, 5000, '2020-03-01'),(2, 'Bob', 'Sales', 55000, 4000, '2021-07-15'),(3, 'Carol', 'IT', 80000, 6000, '2019-11-20'),(4, 'Dave', 'IT', 75000, 7000, '2020-09-10'),(5, 'Eve', 'HR', 50000, 3000, '2022-01-05')]cursor.executemany('INSERT INTO employees VALUES (?,?,?,?,?,?)', data)conn.commit()return conn# ----------------------------- 各种聚合查询(返回结果字符串) -----------------------------def query_basic_group(conn):cursor = conn.cursor()cursor.execute('''SELECTdepartment,COUNT(*) AS employee_count,SUM(salary) AS total_salary,AVG(salary) AS avg_salary,MAX(bonus) AS max_bonusFROM employeesGROUP BY department''')rows = cursor.fetchall()if not rows:return "无数据"result = "{:<10} {:<8} {:<12} {:<10} {:<10}\n".format("部门", "人数", "总薪资", "平均薪资", "最高奖金")result += "-" * 55 + "\n"for row in rows:result += "{:<10} {:<8} {:<12} {:<10.2f} {:<10}\n".format(row[0], row[1], row[2], row[3], row[4])return resultdef query_having(conn):cursor = conn.cursor()cursor.execute('''SELECT department, AVG(salary) AS avg_salaryFROM employeesGROUP BY departmentHAVING avg_salary > 60000''')rows = cursor.fetchall()if not rows:return "没有平均薪资 > 60000 的部门"result = "{:<10} {:<10}\n".format("部门", "平均薪资")result += "-" * 25 + "\n"for row in rows:result += "{:<10} {:<10.2f}\n".format(row[0], row[1])return resultdef query_group_concat(conn):cursor = conn.cursor()cursor.execute('''SELECTdepartment,GROUP_CONCAT(name, '; ')FROM (SELECT name, department FROM employees ORDER BY name)GROUP BY department''')rows = cursor.fetchall()result = "{:<10} {:<30}\n".format("部门", "成员列表")result += "-" * 45 + "\n"for row in rows:result += "{:<10} {:<30}\n".format(row[0], row[1])return resultdef query_conditional(conn):cursor = conn.cursor()cursor.execute('''SELECTdepartment,SUM(CASE WHEN salary > 60000 THEN 1 ELSE 0 END) AS high_salary_count,SUM(CASE WHEN salary <= 60000 THEN 1 ELSE 0 END) AS low_salary_countFROM employeesGROUP BY department''')rows = cursor.fetchall()result = "{:<10} {:<18} {:<18}\n".format("部门", "薪资>60000人数", "薪资<=60000人数")result += "-" * 50 + "\n"for row in rows:result += "{:<10} {:<18} {:<18}\n".format(row[0], row[1], row[2])return resultdef query_date_group(conn):cursor = conn.cursor()cursor.execute('''SELECTstrftime('%Y', hire_date) AS hire_year,COUNT(*) AS hired,AVG(salary) AS avg_salaryFROM employeesGROUP BY hire_yearORDER BY hire_year''')rows = cursor.fetchall()result = "{:<10} {:<8} {:<10}\n".format("入职年份", "人数", "平均薪资")result += "-" * 35 + "\n"for row in rows:result += "{:<10} {:<8} {:<10.2f}\n".format(row[0], row[1], row[2])return resultdef query_nested_avg(conn):cursor = conn.cursor()cursor.execute('''SELECT department, AVG(salary) AS dept_avgFROM employeesGROUP BY departmentHAVING dept_avg > (SELECT AVG(salary) FROM employees)''')rows = cursor.fetchall()if not rows:return "没有这样的部门"result = "{:<10} {:<10}\n".format("部门", "部门平均薪资")result += "-" * 25 + "\n"for row in rows:result += "{:<10} {:<10.2f}\n".format(row[0], row[1])return result# ----------------------------- 获取原始数据表格字符串 -----------------------------def get_original_data_str(conn):cursor = conn.cursor()cursor.execute("SELECT * FROM employees")rows = cursor.fetchall()welcome = "当前 employees 表中的原始数据:\n"welcome += "=" * 60 + "\n"header = "{:<4} {:<6} {:<10} {:<8} {:<6} {:<12}\n".format("ID", "姓名", "部门", "薪资", "奖金", "入职日期")welcome += header + "-" * 60 + "\n"for row in rows:welcome += "{:<4} {:<6} {:<10} {:<8} {:<6} {:<12}\n".format(row[0], row[1], row[2], row[3], row[4], row[5])return welcome# ----------------------------- tkinter GUI 界面 -----------------------------class AggDemoApp:def __init__(self, root, conn):self.root = rootself.conn = connself.root.title("SQLite 聚合函数综合示例 - 原始数据固定对比")self.root.geometry("750x600")# 顶部说明标签title = tk.Label(root, text="SQLite 聚合查询演示 (原始数据始终保留,方便对比)",font=("Arial", 12, "bold"), fg="blue")title.pack(pady=5)# ---------- 原始数据区域(只读) ----------original_frame = tk.LabelFrame(root, text="原始数据 (employees 表)", font=("Arial", 10, "bold"))original_frame.pack(fill=tk.BOTH, expand=False, padx=10, pady=5)orig_scroll = tk.Scrollbar(original_frame)orig_scroll.pack(side=tk.RIGHT, fill=tk.Y)self.original_text = tk.Text(original_frame, wrap=tk.NONE, yscrollcommand=orig_scroll.set,font=("Courier", 9), height=8, state=tk.NORMAL)self.original_text.pack(fill=tk.BOTH, expand=True)orig_scroll.config(command=self.original_text.yview)# 填充原始数据self.original_text.insert(tk.END, get_original_data_str(self.conn))self.original_text.config(state=tk.DISABLED) # 设为只读# ---------- 按钮区域 (两行) ----------btn_container = tk.Frame(root)btn_container.pack(pady=10)# 第一行按钮row1 = tk.Frame(btn_container)row1.pack(pady=2)buttons_row1 = [("基础分组统计 (COUNT/SUM/AVG/MAX)", self.show_basic),("HAVING 过滤 (平均薪资>60000)", self.show_having),("GROUP_CONCAT 字符串聚合", self.show_concat)]for text, cmd in buttons_row1:btn = tk.Button(row1, text=text, command=cmd, width=40, bg="lightgray")btn.pack(side=tk.LEFT, padx=5)# 第二行按钮row2 = tk.Frame(btn_container)row2.pack(pady=2)buttons_row2 = [("条件聚合 (CASE WHEN 分段)", self.show_conditional),("按入职年份统计 (日期处理)", self.show_date),("嵌套聚合 (部门 > 公司平均)", self.show_nested)]for text, cmd in buttons_row2:btn = tk.Button(row2, text=text, command=cmd, width=28, bg="lightgray")btn.pack(side=tk.LEFT, padx=5)# ---------- 查询结果显示区域 ----------result_frame = tk.LabelFrame(root, text="查询结果", font=("Arial", 10, "bold"))result_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)res_scroll = tk.Scrollbar(result_frame)res_scroll.pack(side=tk.RIGHT, fill=tk.Y)self.result_text = tk.Text(result_frame, wrap=tk.NONE, yscrollcommand=res_scroll.set,font=("Courier", 10))self.result_text.pack(fill=tk.BOTH, expand=True)res_scroll.config(command=self.result_text.yview)# 初始显示提示self.result_text.insert(tk.END, "点击上方按钮,查询结果将显示在此处。原始数据始终保留在上方区域,方便对比。")def clear_result(self):self.result_text.delete(1.0, tk.END)def show_query(self, query_func, query_title):self.clear_result()try:result = query_func(self.conn)output = f"【{query_title}】\n\n{result}"self.result_text.insert(tk.END, output)except Exception as e:messagebox.showerror("查询错误", str(e))self.result_text.insert(tk.END, f"错误: {e}")def show_basic(self):self.show_query(query_basic_group, "基础分组统计")def show_having(self):self.show_query(query_having, "HAVING 过滤 (部门平均薪资 > 60000)")def show_concat(self):self.show_query(query_group_concat, "GROUP_CONCAT 字符串聚合 (成员列表)")def show_conditional(self):self.show_query(query_conditional, "条件聚合 (薪资高于/低于60000的人数)")def show_date(self):self.show_query(query_date_group, "按入职年份统计")def show_nested(self):self.show_query(query_nested_avg, "嵌套聚合 (部门平均薪资 > 公司整体平均薪资)")# ----------------------------- 主程序入口 -----------------------------if __name__ == "__main__":root = tk.Tk()db_conn = init_db()app = AggDemoApp(root, db_conn)root.mainloop()
完毕!!感谢您的收看
------★★历史博文集合★★------
