还记得上次项目验收的时候吗?客户盯着监控界面,突然问了句:"这设备到底是在线还是离线?"——数据停在5分钟前,界面毫无反应。尴尬。
工业现场不比办公室。PLC断电、网线松动、RS485干扰...这些都是家常便饭。我见过最离谱的情况:生产线运行了半天,监控系统早就断线了,结果数据全丢。老板那个火啊!
今天咱们聊点实在的——用Tkinter做个工业级的断线重连提示灯。不仅要能显示状态,还得自动重连、记录日志。说白了,就是让设备掉线这事儿"看得见、摸得着、能追溯"。
这篇文章会给你:
第一痛:掉线悄无声息设备断了半小时,界面还显示"连接中"。用户根本不知道出了问题,等发现时数据早凉透了。
第二痛:重连机制不靠谱有些程序断线后就彻底死了。必须手动重启软件,甚至重启电脑。这在无人值守的场景简直是灾难。
第三痛:故障无法追溯客户投诉说"昨天下���3点设备掉线了",你翻遍日志也找不到记录。谁信你的辩解?
不就是个灯吗?哪有那么复杂。——如果你这么想,那就大错特错了。
工业级的提示灯至少要包含:
先来个最简单的。适合快速验证想法,或者给老板演示用。
import tkinter as tkimport randomimport threadingimport timeclassSimpleLED:def__init__(self, root):self.root = rootself.root.title("断线提示灯-简化版")# 创建画布显示LED灯self.canvas = tk.Canvas(root, width=100, height=100, bg='white')self.canvas.pack(pady=20)self.led = self.canvas.create_oval(20, 20, 80, 80, fill='green')# 状态标签self.status_label = tk.Label(root, text="设备在线", font=("微软雅黑", 14))self.status_label.pack()self.is_connected = Trueself.start_monitor()defstart_monitor(self):"""启动监控线程"""defmonitor():whileTrue:# 模拟检测连接状态(实际项目中替换为真实检测逻辑)self.is_connected = random.choice([True, True, True, False]) # 75%在线概率ifself.is_connected:self.canvas.itemconfig(self.led, fill='green')self.status_label.config(text="设备在线", fg='green')else:self.canvas.itemconfig(self.led, fill='red')self.status_label.config(text="设备离线", fg='red') time.sleep(2) # 每2秒检测一次 thread = threading.Thread(target=monitor, daemon=True) thread.start()if __name__ == "__main__": root = tk.Tk() app = SimpleLED(root) root.mainloop()
运行起来你会看到一个圆形LED灯,绿色表示在线,红色表示离线。每2秒自动刷新一次状态。
适用场景:实验室测试、功能演示、原型验证。
致命缺陷:
但是!这代码够简洁,5分钟就能看懂改好。有时候,简单就是最大的优势。
简单版只能看,不能用。现在咱们加点硬货——断线自动重连。
import tkinter as tkfrom tkinter import ttkimport socketimport threadingimport timefrom datetime import datetimeclassIndustrialLED:def__init__(self, root, host='192.168.1.100', port=502):self.root = rootself.root.title("工业断线重连提示灯")self.root.geometry("400x300")# 连接参数self.host = hostself.port = portself.socket = Noneself.is_connected = Falseself.reconnect_count = 0self.max_reconnect = 10# UI布局self.setup_ui()# 启动连接self.start_connection()defsetup_ui(self):"""搭建界面"""# LED指示灯区域 led_frame = tk.Frame(self.root) led_frame.pack(pady=20)self.canvas = tk.Canvas(led_frame, width=80, height=80, bg='#f0f0f0', highlightthickness=0)self.canvas.pack(side=tk.LEFT, padx=20)self.led = self.canvas.create_oval(10, 10, 70, 70, fill='gray', outline='#333', width=2)# 状态文字 status_frame = tk.Frame(led_frame) status_frame.pack(side=tk.LEFT)self.status_label = tk.Label(status_frame, text="未连接", font=("微软雅黑", 16, "bold"), fg='gray')self.status_label.pack(anchor='w')self.detail_label = tk.Label(status_frame, text=f"{self.host}:{self.port}", font=("Consolas", 10), fg='#666')self.detail_label.pack(anchor='w')# 操作按钮 btn_frame = tk.Frame(self.root) btn_frame.pack(pady=10)self.reconnect_btn = tk.Button(btn_frame, text="手动重连", command=self.manual_reconnect, bg='#4CAF50', fg='white', font=("微软雅黑", 10), padx=20)self.reconnect_btn.pack(side=tk.LEFT, padx=5)self.disconnect_btn = tk.Button(btn_frame, text="断开连接", command=self.manual_disconnect, bg='#f44336', fg='white', font=("微软雅黑", 10), padx=20)self.disconnect_btn.pack(side=tk.LEFT, padx=5)# 日志显示区 log_frame = tk.Frame(self.root) log_frame.pack(fill=tk.BOTH, expand=True, padx=20, pady=10) tk.Label(log_frame, text="连接日志:", font=("微软雅黑", 9)).pack(anchor='w')self.log_text = tk.Text(log_frame, height=8, font=("Consolas", 9), bg='#1e1e1e', fg='#d4d4d4', insertbackground='white')self.log_text.pack(fill=tk.BOTH, expand=True)# 滚动条 scrollbar = tk.Scrollbar(self.log_text) scrollbar.pack(side=tk.RIGHT, fill=tk.Y)self.log_text.config(yscrollcommand=scrollbar.set) scrollbar.config(command=self.log_text.yview)deflog(self, message, level='INFO'):"""写日志(带颜色)""" timestamp = datetime.now().strftime("%H:%M:%S") color_map = {'INFO': '#4CAF50', 'ERROR': '#f44336', 'WARN': '#FF9800'}self.log_text.insert(tk.END, f"[{timestamp}] ", 'time')self.log_text.insert(tk.END, f"{message}\n", level)self.log_text.tag_config('time', foreground='#888')self.log_text.tag_config(level, foreground=color_map.get(level, '#d4d4d4'))self.log_text.see(tk.END) # 自动滚到最新defconnect_device(self):"""尝试连接设备"""try:self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.socket.settimeout(3) # 3秒超时self.socket.connect((self.host, self.port))self.is_connected = Trueself.reconnect_count = 0# 更新UIself.canvas.itemconfig(self.led, fill='#4CAF50') # 绿色self.status_label.config(text="设备在线", fg='#4CAF50')self.log(f"连接成功 -> {self.host}:{self.port}", 'INFO')returnTrueexcept Exception as e:self.is_connected = Falseself.canvas.itemconfig(self.led, fill='#f44336') # 红色self.status_label.config(text="连接失败", fg='#f44336')self.log(f"连接失败: {str(e)}", 'ERROR')returnFalsedefstart_connection(self):"""启动连接线程"""defconnection_loop():whileTrue:ifnotself.is_connected:# 显示重连状态self.canvas.itemconfig(self.led, fill='#FF9800') # 黄色self.status_label.config(text="重连中...", fg='#FF9800')ifself.reconnect_count < self.max_reconnect:self.reconnect_count += 1self.log(f"第 {self.reconnect_count} 次重连尝试...", 'WARN')self.connect_device()else:self.log("重连次数超限,停止尝试", 'ERROR') time.sleep(30) # 等30秒后重置计数self.reconnect_count = 0 time.sleep(5) # 每5秒检测一次 thread = threading.Thread(target=connection_loop, daemon=True) thread.start()defmanual_reconnect(self):"""手动重连按钮"""self.reconnect_count = 0self.log("用户手动触发重连", 'INFO') threading.Thread(target=self.connect_device, daemon=True).start()defmanual_disconnect(self):"""手动断开"""ifself.socket:self.socket.close()self.is_connected = Falseself.canvas.itemconfig(self.led, fill='gray')self.status_label.config(text="已断开", fg='gray')self.log("用户手动断开连接", 'WARN')if __name__ == "__main__": root = tk.Tk()# 实际使用时替换为真实的PLC或设备IP和端口 app = IndustrialLED(root, host='127.0.0.1', port=8080) root.mainloop()
看到了吗?画风完全变了。
三色LED灯:
自动重连机制:断线后每5秒尝试重连,最多10次。超限后暂停30秒再试,避免无限循环把设备打爆。
实时日志:每次连接/断开都有记录,带时间戳。而且日志是彩色的!绿色INFO、红色ERROR、橙色WARN——一眼就能看出问题。
手动干预:两个按钮——"手动重连"和"断开连接"。测试的时候特别好用,不用改代码就能模拟各种场景。
我在一个水处理项目中用过类似方案。现场有12台PLC,每台都要监控连接状态。用这个提示灯,运维人员打开界面就知道哪台设备掉线了。配合日志功能,还能回溯故障时间,写维修报告特别方便。
踩过的坑:最初没加socket.settimeout(3),结果设备响应慢的时候程序直接卡死。加了超时后,3秒���响应直接判定为离线,用户体验提升明显。
还有个坑——daemon=True一定要加!不然关闭窗口时线程不会停止,程序成了僵尸进程。
前面两个方案解决了"能用"的问题。但要上生产环境,还得再加点料。
核心升级点:
import tkinter as tk from tkinter import messagebox, ttk import sqlite3 import json import threading import time from datetime import datetime, timedelta from pymodbus.client.tcp import ModbusTcpClient from pymodbus.exceptions import ModbusException import logging import os classProductionLED: def__init__(self, root, config_file='led_config.json'): self.root = root self.root.title("生产线LED监控系统") self.root.geometry("800x600") # 加载配置 self.load_config(config_file) # 初始化日志系统 self.setup_logging() # 初始化数据库 self.init_database() # 连接状态 self.is_connected = Falseself.modbus_client = Noneself.last_heartbeat = Noneself.offline_duration = 0self.reconnect_attempts = 0# LED状态 self.led_status = {} self.setup_ui() self.start_heartbeat() self.connect_to_device() defload_config(self, config_file): """加载配置文件"""try: withopen(config_file, 'r', encoding='utf-8') as f: self.config = json.load(f) except FileNotFoundError: # 创建默认配置 self.config = { "host": "192.168.1.100", "port": 502, "heartbeat_interval": 5, "reconnect_interval": 10, "max_reconnect_attempts": 10, "alert_threshold": 30, "led_registers": { "red": 0, "yellow": 1, "green": 2, "status": 10 } } withopen(config_file, 'w', encoding='utf-8') as f: json.dump(self.config, f, indent=4, ensure_ascii=False) defsetup_logging(self): """配置日志系统""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('led_monitor.log', encoding='utf-8'), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) definit_database(self): """初始化SQLite数据库"""self.conn = sqlite3.connect('connection_log.db', check_same_thread=False) cursor = self.conn.cursor() # 检查表是否存在以及结构 cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='connection_events'") table_exists = cursor.fetchone() isnotNoneif table_exists: # 检查是否有 duration 字段 cursor.execute("PRAGMA table_info(connection_events)") columns = [column[1] for column in cursor.fetchall()] if'duration'notin columns: cursor.execute("ALTER TABLE connection_events ADD COLUMN duration INTEGER DEFAULT 0") else: # 创建连接事件表 cursor.execute(''' CREATE TABLE connection_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, event_type TEXT NOT NULL, device_ip TEXT, message TEXT, duration INTEGER DEFAULT 0 ) ''') # LED状态表处理类似 cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='led_status'") ifnot cursor.fetchone(): cursor.execute(''' CREATE TABLE led_status ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, red_status INTEGER, yellow_status INTEGER, green_status INTEGER, device_status INTEGER ) ''') self.conn.commit() defsetup_ui(self): """设置用户界面"""# 主框架 main_frame = ttk.Frame(self.root) main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # 连接状态框架 status_frame = ttk.LabelFrame(main_frame, text="连接状态") status_frame.pack(fill=tk.X, pady=(0, 10)) self.status_label = ttk.Label(status_frame, text="未连接", font=('Arial', 12, 'bold')) self.status_label.pack(pady=10) self.ip_label = ttk.Label(status_frame, text=f"设备IP: {self.config['host']}:{self.config['port']}") self.ip_label.pack() self.heartbeat_label = ttk.Label(status_frame, text="心跳: --") self.heartbeat_label.pack() # LED状态框架 led_frame = ttk.LabelFrame(main_frame, text="LED状态监控") led_frame.pack(fill=tk.X, pady=(0, 10)) # LED指示器 led_indicators = tk.Frame(led_frame) led_indicators.pack(pady=10) self.led_canvas = {} colors = ['red', 'yellow', 'green'] for i, color inenumerate(colors): frame = tk.Frame(led_indicators) frame.grid(row=0, column=i, padx=20) ttk.Label(frame, text=color.upper()).pack() canvas = tk.Canvas(frame, width=60, height=60, bg='white') canvas.pack(pady=5) self.led_canvas[color] = canvas self.draw_led(canvas, False) # 控制按钮 control_frame = ttk.LabelFrame(main_frame, text="设备控制") control_frame.pack(fill=tk.X, pady=(0, 10)) button_frame = tk.Frame(control_frame) button_frame.pack(pady=10) ttk.Button(button_frame, text="连接", command=self.manual_connect).pack(side=tk.LEFT, padx=5) ttk.Button(button_frame, text="断开", command=self.disconnect).pack(side=tk.LEFT, padx=5) ttk.Button(button_frame, text="测试红灯", command=lambda: self.control_led('red', True)).pack(side=tk.LEFT, padx=5) ttk.Button(button_frame, text="关闭所有", command=self.turn_off_all).pack(side=tk.LEFT, padx=5) # 日志框架 log_frame = ttk.LabelFrame(main_frame, text="运行日志") log_frame.pack(fill=tk.BOTH, expand=True) # 日志文本框 log_text_frame = tk.Frame(log_frame) log_text_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) self.log_text = tk.Text(log_text_frame, height=10) scrollbar = ttk.Scrollbar(log_text_frame, orient=tk.VERTICAL, command=self.log_text.yview) self.log_text.configure(yscrollcommand=scrollbar.set) self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # 日志按钮 log_button_frame = tk.Frame(log_frame) log_button_frame.pack(fill=tk.X, padx=5, pady=5) ttk.Button(log_button_frame, text="清空日志", command=self.clear_log).pack(side=tk.LEFT, padx=5) ttk.Button(log_button_frame, text="导出日志", command=self.export_log).pack(side=tk.LEFT, padx=5) ttk.Button(log_button_frame, text="查看统计", command=self.show_statistics).pack(side=tk.LEFT, padx=5) defdraw_led(self, canvas, is_on, color='red'): """绘制LED指示器""" canvas.delete("all") fill_color = color if is_on else'gray' canvas.create_oval(10, 10, 50, 50, fill=fill_color, outline='black', width=2) if is_on: canvas.create_oval(20, 20, 40, 40, fill='white', outline='') deflog_message(self, message, level='INFO'): """记录日志消息""" timestamp = datetime.now().strftime('%H:%M:%S') log_entry = f"[{timestamp}] {level}: {message}\n"# 更新UI日志 self.log_text.insert(tk.END, log_entry) self.log_text.see(tk.END) # 系统日志 if level == 'ERROR': self.logger.error(message) elif level == 'WARNING': self.logger.warning(message) else: self.logger.info(message) deflog_to_db(self, event_type, message, duration=0): """写入数据库日志"""try: cursor = self.conn.cursor() cursor.execute(''' INSERT INTO connection_events (timestamp, event_type, device_ip, message, duration) VALUES (?, ?, ?, ?, ?) ''', (datetime.now().isoformat(), event_type, self.config['host'], message, duration)) self.conn.commit() except Exception as e: self.logger.error(f"数据库写入失败: {e}") defconnect_to_device(self): """连接到设备"""defconnect(): try: self.modbus_client = ModbusTcpClient(self.config['host'], port=self.config['port']) ifself.modbus_client.connect(): self.is_connected = Trueself.reconnect_attempts = 0self.last_heartbeat = datetime.now() self.root.after(0, lambda: self.status_label.config(text="已连接", foreground='green')) self.log_message(f"成功连接到设备 {self.config['host']}:{self.config['port']}") self.log_to_db('CONNECT', '设备连接成功') else: raise Exception("连接失败") except Exception as e: self.is_connected = Falseself.root.after(0, lambda: self.status_label.config(text="连接失败", foreground='red')) self.log_message(f"连接失败: {e}", 'ERROR') self.log_to_db('CONNECT_FAIL', f'连接失败: {e}') # 自动重连 self.schedule_reconnect() threading.Thread(target=connect, daemon=True).start() defschedule_reconnect(self): """调度重连"""ifself.reconnect_attempts < self.config['max_reconnect_attempts']: self.reconnect_attempts += 1self.log_message(f"将在 {self.config['reconnect_interval']} 秒后尝试第 {self.reconnect_attempts} 次重连") defdelayed_reconnect(): time.sleep(self.config['reconnect_interval']) ifnotself.is_connected: self.connect_to_device() threading.Thread(target=delayed_reconnect, daemon=True).start() else: self.log_message("已达到最大重连次数,停止重连", 'ERROR') self.log_to_db('RECONNECT_FAILED', '达到最大重连次数') defmanual_connect(self): """手动连接"""self.reconnect_attempts = 0self.connect_to_device() defdisconnect(self): """断开连接"""ifself.modbus_client andself.is_connected: self.modbus_client.close() self.is_connected = Falseself.status_label.config(text="已断开", foreground='orange') self.log_message("手动断开连接") self.log_to_db('DISCONNECT', '手动断开连接') defstart_heartbeat(self): """启动心跳检测"""defheartbeat_loop(): whileTrue: ifself.is_connected andself.modbus_client: success = self.send_heartbeat() if success: self.last_heartbeat = datetime.now() self.offline_duration = 0# 更新UI self.root.after(0, lambda: self.heartbeat_label.config( text=f"心跳: {self.last_heartbeat.strftime('%H:%M:%S')}" )) # 读取LED状态 self.read_led_status() else: self.offline_duration += self.config['heartbeat_interval'] # 超过阈值告警 ifself.offline_duration >= self.config['alert_threshold']: self.show_alert() self.is_connected = Falseself.root.after(0, lambda: self.status_label.config(text="连接超时", foreground='red')) self.schedule_reconnect() time.sleep(self.config['heartbeat_interval']) threading.Thread(target=heartbeat_loop, daemon=True).start() defsend_heartbeat(self): """发送心跳包"""try: # 读取状态寄存器作为心跳 result = self.modbus_client.read_holding_registers( address=self.config['led_registers']['status'], count=1, device_id=1 ) returnnot result.isError() except Exception as e: self.log_message(f"心跳检测失败: {e}", 'ERROR') returnFalsedefread_led_status(self): """读取LED状态"""try: for color in ['red', 'yellow', 'green']: register = self.config['led_registers'][color] result = self.modbus_client.read_coils(address=register, count=1, device_id=1) ifnot result.isError(): status = result.bits[0] self.led_status[color] = status # 更新UI self.root.after(0, lambda c=color, s=status: self.draw_led(self.led_canvas[c], s, c)) # 记录状态到数据库 self.save_led_status_to_db() except Exception as e: self.log_message(f"读取LED状态失败: {e}", 'ERROR') defsave_led_status_to_db(self): """保存LED状态到数据库"""try: cursor = self.conn.cursor() cursor.execute(''' INSERT INTO led_status (timestamp, red_status, yellow_status, green_status, device_status) VALUES (?, ?, ?, ?, ?) ''', ( datetime.now().isoformat(), int(self.led_status.get('red', False)), int(self.led_status.get('yellow', False)), int(self.led_status.get('green', False)), 1ifself.is_connected else0 )) self.conn.commit() except Exception as e: self.logger.error(f"保存LED状态失败: {e}") defcontrol_led(self, color, state): """控制LED"""ifnotself.is_connected: messagebox.showwarning("警告", "设备未连接") returntry: register = self.config['led_registers'][color] result = self.modbus_client.write_coil(address= register,value= state, device_id=1) ifnot result.isError(): action = "开启"if state else"关闭"self.log_message(f"{action}{color.upper()} LED成功") # 立即更新状态显示 self.read_led_status() else: self.log_message(f"控制LED失败: {result}", 'ERROR') except Exception as e: self.log_message(f"LED控制异常: {e}", 'ERROR') defturn_off_all(self): """关闭所有LED"""for color in ['red', 'yellow', 'green']: self.control_led(color, False) defshow_alert(self): """弹窗告警"""defshow_alert_dialog(): messagebox.showerror( "连接异常", f"设备 {self.config['host']} 已离线超过 {self.offline_duration} 秒!\n请检查网络连接或设备状态。" ) self.root.after(0, show_alert_dialog) self.log_to_db('ALERT', f'离线超过{self.offline_duration}秒', self.offline_duration) self.log_message(f"告警: 设备离线超过 {self.offline_duration} 秒", 'WARNING') defclear_log(self): """清空日志"""self.log_text.delete(1.0, tk.END) defexport_log(self): """导出日志"""try: cursor = self.conn.cursor() cursor.execute('SELECT * FROM connection_events ORDER BY timestamp DESC LIMIT 1000') events = cursor.fetchall() filename = f"connection_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"withopen(filename, 'w', encoding='utf-8') as f: f.write("ID,时间戳,事件类型,设备IP,消息,持续时间\n") for event in events: f.write(f"{event[0]},{event[1]},{event[2]},{event[3]},{event[4]},{event[5]}\n") messagebox.showinfo("导出成功", f"日志已导出到 {filename}") self.log_message(f"日志导出成功: {filename}") except Exception as e: messagebox.showerror("导出失败", f"导出日志失败: {e}") self.log_message(f"导出日志失败: {e}", 'ERROR') defshow_statistics(self): """显示统计信息"""try: cursor = self.conn.cursor() # 连接统计 cursor.execute('SELECT COUNT(*) FROM connection_events WHERE event_type = "CONNECT"') connect_count = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM connection_events WHERE event_type = "ALERT"') alert_count = cursor.fetchone()[0] cursor.execute('SELECT AVG(duration) FROM connection_events WHERE event_type = "ALERT" AND duration > 0') avg_downtime = cursor.fetchone()[0] or0# 今日统计 today = datetime.now().date() cursor.execute('SELECT COUNT(*) FROM connection_events WHERE DATE(timestamp) = ? AND event_type = "ALERT"', (today,)) today_alerts = cursor.fetchone()[0] stats_message = f"""设备连接统计: ━━━━━━━━━━━━━━━━━━━━━━ 总连接次数: {connect_count}总告警次数: {alert_count}平均故障时长: {avg_downtime:.1f} 秒 今日告警次数: {today_alerts}━━━━━━━━━━━━━━━━━━━━━━""" messagebox.showinfo("连接统计", stats_message) except Exception as e: messagebox.showerror("统计失败", f"获取统计信息失败: {e}") def__del__(self): """析构函数"""ifhasattr(self, 'conn'): self.conn.close() ifhasattr(self, 'modbus_client') andself.modbus_client: self.modbus_client.close() defmain(): root = tk.Tk() app = ProductionLED(root) defon_closing(): if messagebox.askokcancel("退出", "确定要退出程序吗?"): root.destroy() root.protocol("WM_DELETE_WINDOW", on_closing) root.mainloop() if __name__ == "__main__": main()
配置文件 led_config.json 示例:
{"host":"127.0.0.1","port":502,"heartbeat_interval":5,"reconnect_interval":10,"max_reconnect_attempts":10,"alert_threshold":30,"led_registers":{"red":0,"yellow":1,"green":2,"status":10},"device_info":{"name":"生产线LED控制器","model":"LED-CTRL-001","location":"产线A区"}}你可能会说:不就是个提示灯吗,至于吗?
至于。
真实的工业环境,网络可能抖动、设备可能假死(连接在但不响应)、客户可能要求出具故障报告。这时候:
我见过一个项目,因为没做心跳检测,设备死机了半天都没发现。等客户投诉时,已经损失了几十万。
工业软件不等于丑。几个简单技巧让你的界面看起来专业10倍:
1. LED灯加动画效果
defled_blink(self, color):"""LED闪烁效果"""for _ inrange(3):self.canvas.itemconfig(self.led, fill=color)self.root.update() time.sleep(0.1)self.canvas.itemconfig(self.led, fill='gray')self.root.update() time.sleep(0.1)2. 使用ttk主题
from tkinter import ttkstyle = ttk.Style()style.theme_use('clam') # 或 'alt', 'default', 'classic'3. 给LED加光晕效果
# 在Canvas上叠加两个圆,外圆半透明self.canvas.create_oval(5, 5, 75, 75, fill='', outline=color, width=5, stipple='gray50')self.canvas.create_oval(20, 20, 60, 60, fill=color)错误示范:
# 直接在子线程修改UI — 会崩溃!defmonitor():self.status_label.config(text="在线") # ❌正确做法:
defmonitor():self.root.after(0, lambda: self.status_label.config(text="在线")) # ✅Tkinter不是线程安全的。子线程要修改UI,必须通过after()方法。
每次重连前,一定要先关闭旧的socket:
ifself.socket:try:self.socket.close()except:passself.socket = socket.socket(...)不然会出现"Address already in use"错误,特别是在Windows上。
如果用文件记录日志,记得做日志轮转:
from logging.handlers import RotatingFileHandlerhandler = RotatingFileHandler('app.log', maxBytes=10*1024*1024, backupCount=5)我见过一个程序跑了一年,日志文件20GB,直接把硬盘撑爆。
掌握了断线重连,可以继续深挖:
进阶方向1: 通信协议集成把示例中的socket替换为实际协议 —— Modbus TCP、OPC UA、Ethernet/IP。推荐库:pymodbus、opcua。
进阶方向2: 多设备集群监控用ttk.Treeview做设备列表,每个设备一个LED灯。适合管理几十台PLC的场景。
进阶方向3: Web化部署用Flask把状态暴露成REST API,前端用Vue/React做仪表盘。这样手机也能看设备状态了。
评论区聊聊:
实战挑战: 试试给代码加个"连接速度测试"功能 —— 显示ping值和丢包率。提示:用subprocess调用系统ping命令。
最后,收藏前记得点个赞~ 下次遇到设备掉线,直接翻出这篇文章,10分钟搞定提示灯功能。
把这三个方案的代码存到你的工具库,改改参数就能用。工业软件开发,就是要这种"拿来即用"的实在货。