从一台老旧设备说起

车间里有台注塑机,跑了八年了。老板问我:能不能实时看到它的温度、压力、转速?数据还得存下来,方便以后查问题。

预算?没有。买SCADA?太贵。

就这样,我用Python捣鼓出了一套方案——Tkinter做界面,Modbus采PLC数据,SQLite存历史记录。整个项目从零到上线,花了三天。踩了不少坑,但最终跑得挺稳。

这篇文章,我把完整的思路和代码都摆出来,你照着做,基本能直接用。

整体架构,先想清楚再动手

很多人一上来就写代码,写着写着发现逻辑乱成一锅粥。我吃过这个亏。

这套系统说白了就三件事:采数据、存数据、显数据。对应三个模块:

  • plc_reader.py— 负责跟PLC通信,拿原始数据

  • db_manager.py— 负责把数据塞进SQLite,查询也在这里

  • main_app.py— Tkinter主界面,把数据展示出来,还要触发定时采集

三个模块各司其职,互相不乱插手。这种结构,后期改起来不会崩。

第一步:跟PLC建立连接

工业现场最常见的协议是Modbus TCP。Python有个库叫pymodbus,用起来很顺手。

bash1 pip install pymodbus

来看PLC读取模块的核心代码:

python1 from pymodbus.client import ModbusTcpClient
2 from pymodbus.exceptions import ModbusException
3 import logging
4
5logger = logging.getLogger(__name__)
6
7 class PLCReader:
8def __init__(self, host: str, port: int = 502):
9 self.host = host
10 self.port = port
11 self.client = ModbusTcpClient(host=host, port=port, timeout=3)
12 self._connected = False
13
14def connect(self) -> bool:
15"""尝试连接PLC,返回是否成功"""
16try:
17 self._connected = self.client.connect
18if self._connected:
19 logger.info(f"已连接PLC: {self.host}:{self.port}")
20return self._connected
21except Exception as e:
22 logger.error(f"连接失败: {e}")
23return False
24
25def read_holding_registers(self, address: int, count: int) -> list | None:
26"""
27 读取保持寄存器
28 address: 起始地址
29 count: 读取数量
30 """
31if not self._connected:
32 logger.warning("PLC未连接,尝试重连...")
33if not self.connect:
34return None
35
36try:
37 result = self.client.read_holding_registers(address, count)
38if result.isError:
39 logger.error(f"读取寄存器失败,地址: {address}")
40return None
41return result.registers
42except ModbusException as e:
43 logger.error(f"Modbus异常: {e}")
44 self._connected = False # 标记断线,下次自动重连
45return None
46
47def parse_data(self, raw_registers: list) -> dict:
48"""
49 把原始寄存器值转换成有意义的工程量
50 具体换算比例要看PLC程序里的定义
51 """
52if not raw_registers or len(raw_registers) < 4:
53return {}
54
55return {
56"temperature": raw_registers[0] / 10.0, # 假设精度0.1°C
57"pressure": raw_registers[1] / 100.0, # 单位 MPa
58"speed": raw_registers[2], # 转速 RPM
59"status_code": raw_registers[3] # 设备状态码
60 }
61
62def close(self):
63 self.client.close
64 self._connected = False
有几个细节值得注意。断线重连这块,我没用复杂的心跳机制,就是读取失败时把

_connected

置为False,下次读取前自动尝试重连。简单粗暴,但在工厂环境里够用了。

寄存器地址和换算比例,一定要跟做PLC程序的工程师确认,这个没有通用答案,每个项目都不一样。

️ 第二步:SQLite数据库设计

SQLite不需要安装,Python自带,零配置,对于这种单机采集系统来说是最合适的选择。

python1 import sqlite3
2 import threading
3 from datetime import datetime
4 from contextlib import contextmanager
5
6 class DBManager:
7def __init__(self, db_path: str = "plc_data.db"):
8 self.db_path = db_path
9 self._lock = threading.Lock # 多线程写入时必须加锁
10 self._init_db
11
12def _init_db(self):
13"""建表,如果表已存在就跳过"""
14with self._get_conn as conn:
15 conn.execute("""
16 CREATE TABLE IF NOT EXISTS plc_records (
17 id INTEGER PRIMARY KEY AUTOINCREMENT,
18 timestamp TEXT NOT ,
19 temperature REAL,
20 pressure REAL,
21 speed INTEGER,
22 status_code INTEGER,
23 device_id TEXT DEFAULT 'PLC_01'
24 )
25 """)
26# 给timestamp建索引,查历史数据时快很多
27 conn.execute("""
28 CREATE INDEX IF NOT EXISTS idx_timestamp
29 ON plc_records (timestamp)
30 """)
31
32@contextmanager
33def _get_conn(self):
34"""用上下文管理器处理连接,自动提交和关闭"""
35 conn = sqlite3.connect(self.db_path)
36 conn.row_factory = sqlite3.Row # 让查询结果支持按列名访问
37try:
38yield conn
39 conn.commit
40except Exception:
41 conn.rollback
42raise
43finally:
44 conn.close
45
46def insert_record(self, data: dict) -> bool:
47"""插入一条采集记录"""
48with self._lock:
49try:
50with self._get_conn as conn:
51 conn.execute("""
52 INSERT INTO plc_records
53 (timestamp, temperature, pressure, speed, status_code)
54 VALUES (?, ?, ?, ?, ?)
55 """, (
56 datetime.now.strftime("%Y-%m-%d %H:%M:%S"),
57 data.get("temperature"),
58 data.get("pressure"),
59 data.get("speed"),
60 data.get("status_code")
61 ))
62return True
63except sqlite3.Error as e:
64print(f"数据库写入失败: {e}")
65return False
66
67def query_recent(self, limit: int = 100) -> list:
68"""查最近N条记录"""
69with self._get_conn as conn:
70 cursor = conn.execute("""
71 SELECT * FROM plc_records
72 ORDER BY id DESC
73 LIMIT ?
74 """, (limit,))
75return [dict(row) for row in cursor.fetchall()]
76
77def query_by_timerange(self, start: str, end: str) -> list:
78"""按时间段查询,start/end格式: '2025-01-01 00:00:00'"""
79with self._get_conn as conn:
80 cursor = conn.execute("""
81 SELECT * FROM plc_records
82 WHERE timestamp BETWEEN ? AND ?
83 ORDER BY timestamp ASC
84 """, (start, end))
85return [dict(row) for row in cursor.fetchall()]
86
87def get_stats(self) -> dict:
88"""统计总记录数和最新一条的时间"""
89with self._get_conn as conn:
90 row = conn.execute("""
91 SELECT COUNT(*) as total,
92 MAX(timestamp) as last_time
93 FROM plc_records
94 """).fetchone
95return dict(row) if row else {}

这里有个坑我要专门说一下:SQLite在多线程环境下写入必须加锁

。Tkinter的定时器回调和主线程是同一个线程,但如果你后面想加后台线程,不加锁迟早出问题。

threading.Lock这一行,别省。

️ 第三步:Tkinter界面,把数据立起来

界面这块,我尽量做得实用,不花哨。一个状态栏显示连接情况,一块实时数据区显示当前值,下面是历史记录表格。

python1 import tkinter as tk
2 from tkinter import ttk, messagebox
3 import threading
4 from plc_reader import PLCReader
5 from db_manager import DBManager
6
7 class PLCMonitorApp:
8def __init__(self, root: tk.Tk):
9 self.root = root
10 self.root.title("PLC实时监控系统 v1.0")
11 self.root.geometry("900x620")
12 self.root.resizable(True, True)
13
14# 初始化核心模块
15 self.plc = PLCReader(host="192.168.1.100", port=502)
16 self.db = DBManager(db_path="plc_data.db")
17
18 self._collecting = False
19 self._collect_interval = 2000 # 毫秒,即2秒采一次
20
21 self._build_ui
22 self._refresh_table # 启动时加载一次历史数据
23
24# ── 界面构建 ──────────────────────────────────────────
25def _build_ui(self):
26# 顶部工具栏
27 toolbar = tk.Frame(self.root, bg="#2c3e50", height=50)
28 toolbar.pack(fill=tk.X)
29 toolbar.pack_propagate(False)
30
31 tk.Label(toolbar, text=" PLC数据监控",
32 bg="#2c3e50", fg="white",
33 font=("微软雅黑", 14, "bold")).pack(side=tk.LEFT, padx=15)
34
35 self.btn_start = tk.Button(
36 toolbar, text="▶ 开始采集",
37 bg="#27ae60", fg="white", relief=tk.FLAT,
38 padx=12, font=("微软雅黑", 10),
39 command=self._toggle_collection
40 )
41 self.btn_start.pack(side=tk.RIGHT, padx=10, pady=8)
42
43# 状态栏
44 self.status_var = tk.StringVar(value="● 未连接")
45 status_bar = tk.Label(
46 self.root, textvariable=self.status_var,
47 bg="#ecf0f1", fg="#7f8c8d",
48 anchor=tk.W, padx=10, font=("微软雅黑", 9)
49 )
50 status_bar.pack(fill=tk.X)
51
52# 实时数据卡片区
53 card_frame = tk.Frame(self.root, bg="#f5f6fa", pady=10)
54 card_frame.pack(fill=tk.X, padx=15)
55
56 self.data_vars = {}
57 fields = [
58 ("温度", "temperature", "°C", "#e74c3c"),
59 ("压力", "pressure", "MPa", "#3498db"),
60 ("转速", "speed", "RPM", "#2ecc71"),
61 ("状态", "status_code", "", "#9b59b6"),
62 ]
63for label, key, unit, color in fields:
64 self._make_card(card_frame, label, key, unit, color)
65
66# 历史记录表格
67 table_frame = tk.LabelFrame(
68 self.root, text=" 历史记录 ",
69 font=("微软雅黑", 10), padx=8, pady=8
70 )
71 table_frame.pack(fill=tk.BOTH, expand=True, padx=15, pady=(0, 10))
72
73 cols = ("时间", "温度(°C)", "压力(MPa)", "转速(RPM)", "状态码")
74 self.tree = ttk.Treeview(table_frame, columns=cols,
75 show="headings", height=12)
76for col in cols:
77 self.tree.heading(col, text=col)
78 self.tree.column(col, width=150, anchor=tk.CENTER)
79
80 scrollbar = ttk.Scrollbar(table_frame, orient=tk.VERTICAL,
81 command=self.tree.yview)
82 self.tree.configure(yscrollcommand=scrollbar.set)
83 self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
84 scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
85
86def _make_card(self, parent, label, key, unit, color):
87"""生成一个数据展示卡片"""
88 card = tk.Frame(parent, bg="white", relief=tk.RAISED,
89 bd=1, width=180, height=80)
90 card.pack(side=tk.LEFT, padx=8, pady=4)
91 card.pack_propagate(False)
92
93 tk.Label(card, text=label, bg="white",
94 fg="#7f8c8d", font=("微软雅黑", 9)).pack(pady=(8, 0))
95
96 var = tk.StringVar(value="--")
97 self.data_vars[key] = var
98 tk.Label(card, textvariable=var, bg="white",
99 fg=color, font=("微软雅黑", 18, "bold")).pack
100
101 tk.Label(card, text=unit, bg="white",
102 fg="#bdc3c7", font=("微软雅黑", 8)).pack
103
104# ── 采集逻辑 ──────────────────────────────────────────
105def _toggle_collection(self):
106if not self._collecting:
107 self._start_collection
108else:
109 self._stop_collection
110
111def _start_collection(self):
112"""连接PLC,启动定时采集"""
113if not self.plc.connect:
114 messagebox.showerror("连接失败",
115"无法连接到PLC,请检查IP地址和网络。")
116return
117
118 self._collecting = True
119 self.btn_start.config(text="⏹ 停止采集", bg="#e74c3c")
120 self.status_var.set("● 采集中...")
121 self._do_collect # 立即采一次,别等第一个interval
122
123def _stop_collection(self):
124 self._collecting = False
125 self.plc.close
126 self.btn_start.config(text="▶ 开始采集", bg="#27ae60")
127 self.status_var.set("● 已停止")
128
129def _do_collect(self):
130"""实际的采集动作,在后台线程执行,避免卡界面"""
131if not self._collecting:
132return
133
134def worker:
135 raw = self.plc.read_holding_registers(address=0, count=4)
136if raw is None:
137 self.root.after(0, lambda: self.status_var.set(
138"⚠ 读取失败,等待重试..."))
139return
140
141 data = self.plc.parse_data(raw)
142 self.db.insert_record(data)
143
144# 更新UI必须回到主线程
145 self.root.after(0, lambda: self._update_ui(data))
146
147 threading.Thread(target=worker, daemon=True).start
148
149# 安排下一次采集
150 self.root.after(self._collect_interval, self._do_collect)
151
152def _update_ui(self, data: dict):
153"""更新实时数据卡片和表格"""
154 self.data_vars["temperature"].set(
155f"{data.get('temperature', '--'):.1f}")
156 self.data_vars["pressure"].set(
157f"{data.get('pressure', '--'):.2f}")
158 self.data_vars["speed"].set(str(data.get("speed", "--")))
159 self.data_vars["status_code"].set(str(data.get("status_code", "--")))
160
161 self.status_var.set(
162f"● 采集中 | 最新: {data.get('temperature')}°C "
163f"{data.get('pressure')}MPa {data.get('speed')}RPM"
164 )
165 self._refresh_table
166
167def _refresh_table(self):
168"""刷新历史记录表格,取最近50条"""
169for row in self.tree.get_children:
170 self.tree.delete(row)
171
172 records = self.db.query_recent(limit=50)
173for rec in records:
174 self.tree.insert("", tk.END, values=(
175 rec["timestamp"],
176 rec["temperature"],
177 rec["pressure"],
178 rec["speed"],
179 rec["status_code"]
180 ))
181
182
183if __name__ == "__main__":
184 root = tk.Tk
185 app = PLCMonitorApp(root)
186 root.mainloop
打开网易新闻 查看精彩图片

⚠️ 那些差点让我崩溃的坑

坑一:Tkinter不是线程安全的。在子线程里直接操作Label.config,有时候能跑,有时候直接崩。解决方案就是root.after(0, callback),把UI更新操作扔回主线程队列。这是铁律,不能省。

坑二:PLC断线没有异常,只有超时。Modbus TCP断线不会立刻抛异常,而是等到超时才报错。所以timeout=3

这个参数要设合理,太长了界面会假死,太短了正常通信也会误判断线。我在实际项目里设的是3秒,基本够用。

坑三:SQLite文件被占用。

有次我在程序还跑着的时候,用DB Browser for SQLite打开数据库文件,结果程序写入报错。这是SQLite的文件锁机制,正常现象。用

WAL模式可以缓解这个问题:

python1 # 在_init_db方法里加上这行
2conn.execute("PRAGMA journal_mode=WAL")
开了WAL模式,读写可以并发,外部工具查数据的时候程序还能继续写入,互不干扰。

坑四:长时间运行内存慢慢涨。这个问题藏得很深。根源是_refresh_table

每次都把Treeview里的数据全删了再重建。数据量一大,这个操作就很耗内存。解决办法是只在有新数据写入时才刷新,而不是每次采集都刷:

python1# 只在insert_record返回True时才调用_refresh_table
2 if self.db.insert_record(data):
3 self.root.after(0, lambda: self._update_ui(data))
项目结构和依赖

整个项目的文件结构很干净:

1plc_monitor/
2├── main_app.py # 主入口
3├── plc_reader.py # PLC通信
4├── db_manager.py # 数据库操作
5├── plc_data.db # 运行后自动生成
6└── requirements.txt

requirements.txt内容:

1pymodbus>=3.5.0
就这一个外部依赖。Tkinter和SQLite都是Python标准库自带的,不用额外安装。
跑起来之前,确认这几件事
一,确认PLC的IP地址和Modbus TCP端口(默认502),改掉代码里

PLCReader(host="192.168.1.100")这行。

二,跟PLC工程师确认寄存器地址和数据换算关系,修改parse_data方法里的换算逻辑。

三,如果手头没有真实PLC做测试,可以用pymodbus自带的模拟器:

bash1 python -m pymodbus.server --host 127.0.0.1 --port 502

PLCReader的host改成127.0.0.1,先把界面和数据库逻辑跑通,再接真机。

这套方案我在三个不同的工厂项目里用过,稳定运行了大半年没出过大问题。代码量不多,逻辑也不复杂,但每一块都是踩过坑之后的结果。
如果你的场景比这复杂——比如要接多台PLC、要做报警推送、或者要把数据同步到云端——这套架构也能扩展,核心思路是一样的。
欢迎在评论区聊聊你在工控数采项目里遇到的具体问题,说不定大家都踩过同一个坑。

#Python #Tkinter #PLC #工业自动化 #SQLite