Python实现OPC UA订阅与MQTT集成
# python
from datetime import datetime
import json
import os
import string
import time
import logging
import threading
from logging.handlers import TimedRotatingFileHandler
import random
from opcua import Client
import paho.mqtt.client as mqtt
num = string.ascii_letters + string.digits
client_id = "".join(random.sample(num, 10)) # 随机生成一串数字字母,防止mqtt登录id冲突
# mqtt服务器地址
HOST = "127.0.0.1"
PORT = 1883
username = "admin"
password = "public"
pub_topic = "hdx/pub123_fk"
heartbeat_topic = "hdx/heartbeat_QX"
state_mqtt = False
state_opcua = False
# 全局共享结构与锁
out_data = {}
out_lock = threading.Lock()
SERVER_URL = "opc.tcp://HDX-XG:53530/OPCUA/SimulationServer"
NODE_LIST = ["3:Simulation", "3:Simulation1", "3:Simulation2"]
class MyLogFilter(logging.Filter):
def filter(self, record):
# 只允许特定名称的日志记录器通过
return record.name.startswith(('main', 'opcua_worker', 'heartbeat', 'publisher', 'SubHandler', 'mqtt'))
def setup_logging(log_dir='logs', filename='Run_CX.log', level=logging.INFO, console_level=None):
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, filename)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(name)s - %(lineno)d - %(threadName)s - %(message)s')
file_handler = TimedRotatingFileHandler(log_path, when='midnight', interval=1, backupCount=30, encoding='utf-8')
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO)
root = logging.getLogger()
root.setLevel(level)
# root.setLevel(logging.WARNING)
# logging.getLogger('mqtt').setLevel(logging.INFO)
# logging.getLogger('opcua').setLevel(logging.WARNING)
# logging.getLogger('paho').setLevel(logging.WARNING)
# logging.getLogger('urllib3').setLevel(logging.WARNING)
abs_log_path = os.path.abspath(log_path)
if not any(isinstance(h, TimedRotatingFileHandler) and getattr(h, "baseFilename", None) == abs_log_path for h in
root.handlers):
root.addHandler(file_handler)
# # 只为新添加的文件处理器添加过滤器
# file_handler.addFilter(MyLogFilter())
console_level = console_level if console_level is not None else level
existing_console = None
for h in root.handlers:
if isinstance(h, logging.StreamHandler) and getattr(h, "stream", None) in (sys.stdout, sys.stderr):
existing_console = h
break
if existing_console is None:
console = logging.StreamHandler(sys.stdout)
console.setFormatter(formatter)
console.setLevel(console_level)
root.addHandler(console)
# # 只为新添加的控制台处理器添加过滤器
# console.addFilter(MyLogFilter())
else:
existing_console.setFormatter(formatter)
existing_console.setLevel(console_level)
# # 只为现有控制台处理器添加过滤器
# existing_console.addFilter(MyLogFilter())
# 连接mqtt服务器成功回调
def mqtt_on_connect(client, userdata, flags, rc):
global state_mqtt
logger = logging.getLogger("mqtt")
if rc == 0:
state_mqtt = True
logger.info("连接 MQTT 成功")
# 在这里添加订阅主题的代码
'''
try:
# 订阅单个主题
client.subscribe("your/topic/name", qos=0)
# 或者订阅多个主题
# client.subscribe([("topic1", 0), ("topic2", 1)])
logger.info("订阅主题成功")
except Exception as e:
logger.exception("订阅主题失败: %s", e)
'''
else:
state_mqtt = False
logger.warning("MQTT 连接返回码 %s", rc)
# 订阅成功回调
def mqtt_on_subscribe(client, userdata, mid, granted_qos):
# print("订阅成功: qos = %d" % granted_qos)
print("消息订阅成功!")
def mqtt_on_message(client, userdata, msg):
logger = logging.getLogger("mqtt")
try:
logger.info("收到消息: topic=%s, qos=%s, payload=%s",
msg.topic, msg.qos, msg.payload.decode('utf-8'))
# 处理接收到的消息,这是业务自己写方法
#handle_received_message(msg.topic, msg.payload)
except Exception as e:
logger.exception("处理消息异常: %s", e)
# 服务器断开回调
def mqtt_on_disconnect(client, userdata, rc):
global state_mqtt
state_mqtt = False
logging.getLogger("mqtt").warning("MQTT 断开 rc=%s", rc)
def connect_timer():
try:
if state_mqtt:
client.publish(heartbeat_topic, payload=json.dumps({
"time": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
"name": str(heartbeat_topic.split('_')[-1]),
"heartbeat_plc": state_opcua
}), qos=0) # 发布消息
except Exception as e:
print("心跳发送异常。", e)
timer = threading.Timer(5, connect_timer) # 设置一个5s 的定时器,循环发送通讯心跳。
timer.start() # 启动线程
class SubHandler(object):
def __init__(self, lock):
self.lock = lock
self.logger = logging.getLogger("SubHandler")
def datachange_notification(self, node, value, event_data):
try:
node_id = node.nodeid.to_string() if hasattr(node, "nodeid") else str(node)
except Exception:
node_id = str(node)
self.logger.warning("节点【%s】, 值【%s】, 时间【%s】", node_id, value, time.strftime('%Y-%m-%d %H:%M:%S'))
with self.lock:
out_data[node_id] = value
def opcua_client(server_url=SERVER_URL, node_list=NODE_LIST):
logger = logging.getLogger("opcua_client")
handler = SubHandler(out_lock)
subscription = None
subscription_handle_list = []
client = Client(url=server_url)
global state_opcua
try:
while True:
# 连接
try:
logger.info("connecting to %s...", server_url)
client.connect()
client.load_type_definitions()
state_opcua = True
logger.info("connected")
except Exception as e:
logger.exception("connection error, retry in 5s: %s", e)
time.sleep(5)
state_opcua = False
continue
# 构建订阅节点列表
try:
root = client.get_root_node()
project_main = root.get_child(["0:Objects"])
nodes_groups = []
for node in node_list:
try:
children = project_main.get_child([node]).get_children()
nodes_groups.append(children)
# 初始化输出字典(可根据需求调整)
with out_lock:
out_data[str(node.split(':')[-1])] = {}
except Exception as e:
logger.exception("获取节点 %s 失败: %s", node, e)
# 扁平化节点列表并订阅
nodes_to_subscribe = [n for grp in nodes_groups for n in grp]
if nodes_to_subscribe:
subscription = client.create_subscription(200, handler)
subscription_handle_list = []
for group in nodes_groups:
# subscribe_data_change 接受节点列表
handle = subscription.subscribe_data_change(group)
subscription_handle_list.append(handle)
logger.info("订阅句柄: %s", subscription_handle_list)
else:
logger.warning("没有可订阅的节点")
except Exception as e:
logger.exception("subscription error: %s", e)
# 出错时短暂等待并重试连接循环
time.sleep(1)
# 清理并继续循环,使客户端重新连接
try:
if subscription:
subscription.delete()
except Exception:
pass
client.disconnect()
time.sleep(1)
continue
# 运行监控 service level
try:
while True:
try:
service_level = client.get_node("ns=0;i=2267").get_value()
logger.debug("service level: %s", service_level)
if service_level < 200:
logger.warning("service level low (%s), reconnecting", service_level)
break # 跳出到外层重连逻辑
except Exception as e:
state_opcua = False
logger.exception("读取 service level 失败: %s", e)
break
time.sleep(5)
finally:
# 取消订阅并断开连接(确保执行)
try:
if subscription and subscription_handle_list:
for h in subscription_handle_list:
try:
subscription.unsubscribe(h)
except Exception:
logger.exception("unsubscribe handle %s 失败", h)
subscription.delete()
logger.info("unsubscribed")
except Exception:
logger.exception("删除 subscription 失败")
try:
client.disconnect()
logger.info("disconnected")
except Exception:
logger.exception("disconnect error")
# 清理本次状态,短暂等待后重试
subscription = None
subscription_handle_list = []
time.sleep(5)
except KeyboardInterrupt:
logger.info("用户中断,退出")
finally:
try:
client.disconnect()
except Exception:
pass
class PublisherWorker(threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
self.out_data = {}
def run(self):
while True:
if state_mqtt:
if len(out_data):
for _ in list(out_data.keys()):
msg_1 = json.dumps({
"time": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
"name": str(_),
"data": out_data[_]
})
try:
client.publish(pub_topic, payload=msg_1, qos=0) # 发布消息
# print("发布", msg_1)
except KeyboardInterrupt:
print("EXIT")
# 这是网络循环的阻塞形式,直到客户端调用disconnect()时才会返回。它会自动处理重新连接。
client.disconnect()
except Exception as e1:
print("mqtt推送异常", e1)
time.sleep(0.5) # 间隔0.5秒推送
if __name__ == "__main__":
setup_logging(log_dir='logs', filename='Run_CX.log')
# 连接的id(key)
client = mqtt.Client(
client_id=client_id,
callback_api_version=mqtt.CallbackAPIVersion.VERSION1 # 或 VERSION2
)
# 连接用的用户名密码
client.username_pw_set(username, password)
# 回调函数
client.on_connect = mqtt_on_connect
# client.on_message = on_message
# client.on_subscribe = on_subscribe
client.on_disconnect = mqtt_on_disconnect
# client.reconnect_delay_set(min_delay=1, max_delay=120) 设置掉线重连次数 默认max_delay=120
# 开始连接mqtt服务器
client.connect(host=HOST, port=PORT, keepalive=60)
client.loop_start() # 相对于client.loop_forever() ,它不会阻塞进程
connect_timer() # 通讯心跳
# 创建新线程
publisherWorker = PublisherWorker(1, "publisher_worker-Thread", 1)
publisherWorker.start()
opcua_client()
# python
import os
import sys
import time
import json
import random
import string
import logging
import threading
from datetime import datetime
from logging.handlers import TimedRotatingFileHandler
import queue
from opcua import Client
import paho.mqtt.client as mqtt
# 配置
HOST = "127.0.0.1"
PORT = 1883
USERNAME = "admin"
PASSWORD = "public"
PUB_TOPIC = "hdx/pub123_fk"
HEARTBEAT_TOPIC = "hdx/heartbeat_QX"
SERVER_URL = "opc.tcp://HDX-XG:53530/OPCUA/SimulationServer"
NODE_LIST = ["3:Simulation", "3:Simulation1", "3:Simulation2"]
CLIENT_ID = "".join(random.sample(string.ascii_letters + string.digits, 10))
# 共享状态
out_data = {} # nodeid -> latest value
out_lock = threading.Lock()
state_opcua = False
state_mqtt = False
stop_event = threading.Event()
class MyLogFilter(logging.Filter):
def filter(self, record):
# 只允许特定名称的日志记录器通过
return record.name.startswith(('main', 'opcua_worker', 'heartbeat', 'publisher', 'SubHandler', 'mqtt'))
def setup_logging(log_dir='logs', filename='Run_CX.log', level=logging.INFO, console_level=None):
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, filename)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(name)s - %(lineno)d - %(threadName)s - %(message)s')
file_handler = TimedRotatingFileHandler(log_path, when='midnight', interval=1, backupCount=30, encoding='utf-8')
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO)
root = logging.getLogger()
root.setLevel(level)
# root.setLevel(logging.WARNING)
# logging.getLogger('mqtt').setLevel(logging.INFO)
# logging.getLogger('opcua').setLevel(logging.WARNING)
# logging.getLogger('paho').setLevel(logging.WARNING)
# logging.getLogger('urllib3').setLevel(logging.WARNING)
abs_log_path = os.path.abspath(log_path)
if not any(isinstance(h, TimedRotatingFileHandler) and getattr(h, "baseFilename", None) == abs_log_path for h in
root.handlers):
root.addHandler(file_handler)
# # 只为新添加的文件处理器添加过滤器
# file_handler.addFilter(MyLogFilter())
console_level = console_level if console_level is not None else level
existing_console = None
for h in root.handlers:
if isinstance(h, logging.StreamHandler) and getattr(h, "stream", None) in (sys.stdout, sys.stderr):
existing_console = h
break
if existing_console is None:
console = logging.StreamHandler(sys.stdout)
console.setFormatter(formatter)
console.setLevel(console_level)
root.addHandler(console)
# # 只为新添加的控制台处理器添加过滤器
# console.addFilter(MyLogFilter())
else:
existing_console.setFormatter(formatter)
existing_console.setLevel(console_level)
# # 只为现有控制台处理器添加过滤器
# existing_console.addFilter(MyLogFilter())
class SubHandler(object):
def __init__(self, lock):
self.lock = lock
self.logger = logging.getLogger("SubHandler")
def datachange_notification(self, node, value, event_data):
try:
node_id = node.nodeid.to_string() if hasattr(node, "nodeid") else str(node)
except Exception:
node_id = str(node)
self.logger.debug("datachange %s -> %s", node_id, value)
with self.lock:
out_data[node_id] = value
def opcua_worker(server_url, node_list, stop_evt):
logger = logging.getLogger("opcua_worker")
handler = SubHandler(out_lock)
client = None
global state_opcua
while not stop_evt.is_set():
try:
client = Client(url=server_url)
logger.info("Connecting to OPC UA %s", server_url)
client.connect()
client.load_type_definitions()
state_opcua = True
logger.info("OPC UA connected")
# 找到节点并订阅
try:
root = client.get_root_node()
project_main = root.get_child(["0:Objects"])
except Exception:
project_main = None
logger.exception("获取根节点失败")
nodes_groups = []
for n in node_list:
try:
if project_main is None:
raise RuntimeError("project_main None")
children = project_main.get_child([n]).get_children()
nodes_groups.append(children)
with out_lock:
out_data[str(n.split(':')[-1])] = {} # 初始化子结构(可选)
except Exception:
logger.exception("无法获取节点 %s 的子项", n)
if not any(nodes_groups):
logger.warning("没有找到可订阅的节点, 等待后重试")
client.disconnect()
state_opcua = False
time.sleep(5)
continue
subscription = client.create_subscription(200, handler)
handles = []
try:
for group in nodes_groups:
if group:
h = subscription.subscribe_data_change(group)
handles.append(h)
logger.info("Subscribed handles: %s", handles)
except Exception:
logger.exception("订阅节点失败")
# 运行检查 loop
while not stop_evt.is_set():
try:
service_level = client.get_node("ns=0;i=2267").get_value()
logger.debug("service level: %s", service_level)
if service_level < 200:
logger.warning("Service level low: %s, 将重连", service_level)
break
except Exception:
state_opcua = False
logger.exception("读取 service level 失败,准备重连")
break
time.sleep(5)
except Exception:
logger.exception("OPC UA 主循环异常,短暂等待后重试")
state_opcua = False
time.sleep(5)
finally:
# 清理订阅和连接
try:
if 'subscription' in locals() and subscription:
if handles:
for h in handles:
try:
subscription.unsubscribe(h)
except Exception:
logger.exception("取消订阅句柄 %s 失败", h)
try:
subscription.delete()
except Exception:
logger.exception("删除 subscription 失败")
if client:
client.disconnect()
logger.info("OPC UA disconnected")
except Exception:
logger.exception("断开或清理失败")
state_opcua = False
# 小等候以免 tight loop
time.sleep(1)
logger.info("opcua_worker 退出")
def mqtt_on_connect(client, userdata, flags, rc):
global state_mqtt
logger = logging.getLogger("mqtt")
if rc == 0:
state_mqtt = True
logger.info("连接 MQTT 成功")
# 在这里添加订阅主题的代码
'''
try:
# 订阅单个主题
client.subscribe("your/topic/name", qos=0)
# 或者订阅多个主题
# client.subscribe([("topic1", 0), ("topic2", 1)])
logger.info("订阅主题成功")
except Exception as e:
logger.exception("订阅主题失败: %s", e)
'''
else:
state_mqtt = False
logger.warning("MQTT 连接返回码 %s", rc)
def mqtt_on_message(client, userdata, msg):
logger = logging.getLogger("mqtt")
try:
logger.info("收到消息: topic=%s, qos=%s, payload=%s",
msg.topic, msg.qos, msg.payload.decode('utf-8'))
# 处理接收到的消息,这是业务自己写方法
#handle_received_message(msg.topic, msg.payload)
except Exception as e:
logger.exception("处理消息异常: %s", e)
def mqtt_on_subscribe(client, userdata, mid, granted_qos):
logger = logging.getLogger("mqtt")
logger.info("订阅成功: mid=%s, qos=%s", mid, granted_qos)
def mqtt_on_disconnect(client, userdata, rc):
global state_mqtt
state_mqtt = False
logging.getLogger("mqtt").warning("MQTT 断开 rc=%s", rc)
def heartbeat_worker(mqtt_client, stop_evt, interval=5):
logger = logging.getLogger("heartbeat")
while not stop_evt.is_set():
try:
if state_mqtt:
payload = json.dumps({
"time": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
"name": HEARTBEAT_TOPIC.split('_')[-1],
"heartbeat_plc": state_opcua
})
mqtt_client.publish(HEARTBEAT_TOPIC, payload=payload, qos=0)
logger.debug("Heartbeat published")
except Exception:
logger.exception("心跳发送异常")
stop_evt.wait(interval)
logger.info("heartbeat_worker 退出")
def publisher_worker(mqtt_client, stop_evt, poll_interval=0.5):
logger = logging.getLogger("publisher")
last_sent = {}
while not stop_evt.is_set():
try:
if state_mqtt:
with out_lock:
# 拷贝并清理可选:这里只对有变更的推送
current = dict(out_data)
for key, val in current.items():
if last_sent.get(key) != val:
msg = json.dumps({
"time": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
"name": str(key),
"data": val
})
try:
mqtt_client.publish(PUB_TOPIC, payload=msg, qos=0)
logger.debug("Published %s -> %s", key, msg)
last_sent[key] = val
except Exception:
logger.exception("MQTT 发布失败")
except Exception:
logger.exception("publisher_worker 异常")
stop_evt.wait(poll_interval)
logger.info("publisher_worker 退出")
def main():
setup_logging(log_dir='logs', filename='Run_CX.log', level=logging.INFO, console_level=logging.DEBUG)
log = logging.getLogger("main")
# 创建 MQTT client
mqtt_client = mqtt.Client(client_id=CLIENT_ID, callback_api_version=mqtt.CallbackAPIVersion.VERSION1)
mqtt_client.username_pw_set(USERNAME, PASSWORD)
mqtt_client.on_connect = mqtt_on_connect
mqtt_client.on_disconnect = mqtt_on_disconnect
try:
mqtt_client.connect(HOST, PORT, keepalive=60)
except Exception:
log.exception("MQTT 初次连接失败,仍将启动并自动重试")
mqtt_client.loop_start()
# 启动线程
threads = []
opcua_thread = threading.Thread(target=opcua_worker, name="OPCUA-Thread", args=(SERVER_URL, NODE_LIST, stop_event),
daemon=True)
threads.append(opcua_thread)
opcua_thread.start()
hb_thread = threading.Thread(target=heartbeat_worker, name="Heartbeat-Thread", args=(mqtt_client, stop_event),
daemon=True)
threads.append(hb_thread)
hb_thread.start()
pub_thread = threading.Thread(target=publisher_worker, name="Publisher-Thread", args=(mqtt_client, stop_event),
daemon=True)
threads.append(pub_thread)
pub_thread.start()
try:
while True:
time.sleep(0.5)
except KeyboardInterrupt:
log.info("收到中断信号,准备退出")
stop_event.set()
finally:
# 停止 MQTT loop 并断开
try:
mqtt_client.loop_stop()
mqtt_client.disconnect()
except Exception:
log.exception("MQTT 清理异常")
# 等待线程退出
for t in threads:
t.join(timeout=3)
log.info("进程退出完毕")
if __name__ == "__main__":
main()
# python
import os
import sys
import time
import json
import random
import string
import logging
import threading
from datetime import datetime
from logging.handlers import TimedRotatingFileHandler
import queue
from concurrent.futures import ThreadPoolExecutor
from opcua import Client
import paho.mqtt.client as mqtt
# 配置
HOST = "127.0.0.1"
PORT = 1883
USERNAME = "admin"
PASSWORD = "public"
PUB_TOPIC = "hdx/pub123_fk"
HEARTBEAT_TOPIC = "hdx/heartbeat_QX"
SERVER_URL = "opc.tcp://HDX-XG:53530/OPCUA/SimulationServer"
NODE_LIST = ["3:Simulation", "3:Simulation1", "3:Simulation2"]
CLIENT_ID = "".join(random.sample(string.ascii_letters + string.digits, 10))
# 共享状态
out_data = {} # nodeid -> latest value
out_lock = threading.Lock()
state_opcua = False
state_mqtt = False
stop_event = threading.Event()
class MyLogFilter(logging.Filter):
def filter(self, record):
# 只允许特定名称的日志记录器通过
return record.name.startswith(('main', 'opcua_worker', 'heartbeat', 'publisher', 'SubHandler', 'mqtt'))
def setup_logging(log_dir='logs', filename='Run_CX.log', level=logging.INFO, console_level=None):
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, filename)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(name)s - %(lineno)d - %(threadName)s - %(message)s')
file_handler = TimedRotatingFileHandler(log_path, when='midnight', interval=1, backupCount=30, encoding='utf-8')
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO)
root = logging.getLogger()
root.setLevel(level)
# root.setLevel(logging.WARNING)
# logging.getLogger('mqtt').setLevel(logging.INFO)
# logging.getLogger('opcua').setLevel(logging.WARNING)
# logging.getLogger('paho').setLevel(logging.WARNING)
# logging.getLogger('urllib3').setLevel(logging.WARNING)
abs_log_path = os.path.abspath(log_path)
if not any(isinstance(h, TimedRotatingFileHandler) and getattr(h, "baseFilename", None) == abs_log_path for h in
root.handlers):
root.addHandler(file_handler)
# # 只为新添加的文件处理器添加过滤器
# file_handler.addFilter(MyLogFilter())
console_level = console_level if console_level is not None else level
existing_console = None
for h in root.handlers:
if isinstance(h, logging.StreamHandler) and getattr(h, "stream", None) in (sys.stdout, sys.stderr):
existing_console = h
break
if existing_console is None:
console = logging.StreamHandler(sys.stdout)
console.setFormatter(formatter)
console.setLevel(console_level)
root.addHandler(console)
# # 只为新添加的控制台处理器添加过滤器
# console.addFilter(MyLogFilter())
else:
existing_console.setFormatter(formatter)
existing_console.setLevel(console_level)
# # 只为现有控制台处理器添加过滤器
# existing_console.addFilter(MyLogFilter())
class SubHandler(object):
def __init__(self, lock):
self.lock = lock
self.logger = logging.getLogger("SubHandler")
def datachange_notification(self, node, value, event_data):
try:
node_id = node.nodeid.to_string() if hasattr(node, "nodeid") else str(node)
except Exception:
node_id = str(node)
self.logger.debug("datachange %s -> %s", node_id, value)
with self.lock:
out_data[node_id] = value
def opcua_worker(server_url, node_list, stop_evt):
logger = logging.getLogger("opcua_worker")
handler = SubHandler(out_lock)
client = None
global state_opcua
while not stop_evt.is_set():
try:
client = Client(url=server_url)
logger.info("Connecting to OPC UA %s", server_url)
client.connect()
client.load_type_definitions()
state_opcua = True
logger.info("OPC UA connected")
# 找到节点并订阅
try:
root = client.get_root_node()
project_main = root.get_child(["0:Objects"])
except Exception:
project_main = None
logger.exception("获取根节点失败")
nodes_groups = []
for n in node_list:
try:
if project_main is None:
raise RuntimeError("project_main None")
children = project_main.get_child([n]).get_children()
nodes_groups.append(children)
with out_lock:
out_data[str(n.split(':')[-1])] = {} # 初始化子结构(可选)
except Exception:
logger.exception("无法获取节点 %s 的子项", n)
if not any(nodes_groups):
logger.warning("没有找到可订阅的节点, 等待后重试")
client.disconnect()
state_opcua = False
time.sleep(5)
continue
subscription = client.create_subscription(200, handler)
handles = []
try:
for group in nodes_groups:
if group:
h = subscription.subscribe_data_change(group)
handles.append(h)
logger.info("Subscribed handles: %s", handles)
except Exception:
logger.exception("订阅节点失败")
# 运行检查 loop
while not stop_evt.is_set():
try:
service_level = client.get_node("ns=0;i=2267").get_value()
logger.debug("service level: %s", service_level)
if service_level < 200:
logger.warning("Service level low: %s, 将重连", service_level)
break
except Exception:
state_opcua = False
logger.exception("读取 service level 失败,准备重连")
break
time.sleep(5)
except Exception:
logger.exception("OPC UA 主循环异常,短暂等待后重试")
state_opcua = False
time.sleep(5)
finally:
# 清理订阅和连接
try:
if 'subscription' in locals() and subscription:
if handles:
for h in handles:
try:
subscription.unsubscribe(h)
except Exception:
logger.exception("取消订阅句柄 %s 失败", h)
try:
subscription.delete()
except Exception:
logger.exception("删除 subscription 失败")
if client:
client.disconnect()
logger.info("OPC UA disconnected")
except Exception:
logger.exception("断开或清理失败")
state_opcua = False
# 小等候以免 tight loop
time.sleep(1)
logger.info("opcua_worker 退出")
def mqtt_on_connect(client, userdata, flags, rc):
global state_mqtt
logger = logging.getLogger("mqtt")
if rc == 0:
state_mqtt = True
logger.info("连接 MQTT 成功")
# 在这里添加订阅主题的代码
try:
# 订阅单个主题
client.subscribe("testTopic", qos=0)
# 或者订阅多个主题
# client.subscribe([("topic1", 0), ("topic2", 1)])
logger.info("订阅主题成功")
except Exception as e:
logger.exception("订阅主题失败: %s", e)
else:
state_mqtt = False
logger.warning("MQTT 连接返回码 %s", rc)
def mqtt_on_message(client, userdata, msg):
logger = logging.getLogger("mqtt")
try:
logger.info("收到消息: topic=%s, qos=%s, payload=%s",
msg.topic, msg.qos, msg.payload.decode('utf-8'))
# 处理接收到的消息
# handle_received_message(msg.topic, msg.payload)
except Exception as e:
logger.exception("处理消息异常: %s", e)
def mqtt_on_subscribe(client, userdata, mid, granted_qos):
logger = logging.getLogger("mqtt")
logger.info("订阅成功: mid=%s, qos=%s", mid, granted_qos)
def mqtt_on_disconnect(client, userdata, rc):
global state_mqtt
state_mqtt = False
logging.getLogger("mqtt").warning("MQTT 断开 rc=%s", rc)
def heartbeat_worker(mqtt_client, stop_evt, interval=5):
logger = logging.getLogger("heartbeat")
while not stop_evt.is_set():
try:
if state_mqtt:
payload = json.dumps({
"time": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
"name": HEARTBEAT_TOPIC.split('_')[-1],
"heartbeat_plc": state_opcua
})
mqtt_client.publish(HEARTBEAT_TOPIC, payload=payload, qos=0)
logger.debug("Heartbeat published")
except Exception:
logger.exception("心跳发送异常")
stop_evt.wait(interval)
logger.info("heartbeat_worker 退出")
def publisher_worker(mqtt_client, stop_evt, poll_interval=0.5):
logger = logging.getLogger("publisher")
last_sent = {}
while not stop_evt.is_set():
try:
if state_mqtt:
with out_lock:
# 拷贝并清理可选:这里只对有变更的推送
current = dict(out_data)
for key, val in current.items():
if last_sent.get(key) != val:
msg = json.dumps({
"time": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
"name": str(key),
"data": val
})
try:
mqtt_client.publish(PUB_TOPIC, payload=msg, qos=0)
logger.debug("Published %s -> %s", key, msg)
last_sent[key] = val
except Exception:
logger.exception("MQTT 发布失败")
except Exception:
logger.exception("publisher_worker 异常")
stop_evt.wait(poll_interval)
logger.info("publisher_worker 退出")
def main():
setup_logging(log_dir='logs', filename='Run_CX.log', level=logging.INFO, console_level=logging.DEBUG)
log = logging.getLogger("main")
# 创建 MQTT client
mqtt_client = mqtt.Client(client_id=CLIENT_ID, callback_api_version=mqtt.CallbackAPIVersion.VERSION1)
mqtt_client.username_pw_set(USERNAME, PASSWORD)
mqtt_client.on_connect = mqtt_on_connect
mqtt_client.on_subscribe = mqtt_on_subscribe
mqtt_client.on_message = mqtt_on_message
mqtt_client.on_disconnect = mqtt_on_disconnect
try:
mqtt_client.connect(HOST, PORT, keepalive=60)
except Exception:
log.exception("MQTT 初次连接失败,仍将启动并自动重试")
mqtt_client.loop_start()
# 使用线程池管理线程
with ThreadPoolExecutor(max_workers=3, thread_name_prefix="Worker") as executor:
# 提交任务到线程池
futures = []
futures.append(executor.submit(opcua_worker, SERVER_URL, NODE_LIST, stop_event))
futures.append(executor.submit(heartbeat_worker, mqtt_client, stop_event))
futures.append(executor.submit(publisher_worker, mqtt_client, stop_event))
try:
while True:
time.sleep(0.5)
except KeyboardInterrupt:
log.info("收到中断信号,准备退出")
stop_event.set()
finally:
# 停止 MQTT loop 并断开
try:
mqtt_client.loop_stop()
mqtt_client.disconnect()
except Exception:
log.exception("MQTT 清理异常")
log.info("进程退出完毕")
if __name__ == "__main__":
main()

# node = NodeId(1007, 3) # 测试节点是否存在
# client.get_node(node)
# client.get_node(NodeId(1007, 3))
# client.get_node("ns=3;s=1008")
# client.get_node("ns=3;i=1001").get_value()
# 下面这个两种写法是一样的,一种是直接获取指定的节点(常用是这样,方便),另一种是先获取根节点再获取子节点是一种遍历一层一层下去
# project_main = client.get_node("ns=0;s=Objects").get_children()
# project_main = client.get_root_node().get_child(["0:Objects"]).get_children()