# -*- coding: utf-8 -*- # @Author: privacy # @Date: 2024-06-24 16:33:01 # @Last Modified by: privacy # @Last Modified time: 2024-07-09 17:55:47 import time import json import datetime import threading from queue import Queue from dataclasses import dataclass, asdict import paho.mqtt.client as mqclient from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_ # from tasks import schedule from tasks import CustomClient from utils import parse_config @dataclass class MQTTClientInfoX: brokerIP: str brokerPort: int clientId: str user: str pwd: str class SimpleMqttClient(threading.Thread): def __init__(self, mqttClientInfo: MQTTClientInfoX, clientId: str): threading.Thread.__init__(self, name="mqttElevClient_{}".format(time.time())) self.mqsCI = mqttClientInfo self.on_message = None self.keepAliveInterval = None self.clientId = clientId def setKeepAlive(self, keepAliveInterval: int = None) -> None: """设置超时事件 """ self.keepAliveInterval = 30 if keepAliveInterval is None or keepAliveInterval < 0 else keepAliveInterval def setTopic(self, topic: list) -> None: """设置订阅主题 """ self.sub_topic = topic def on_connect(self, client, userdata, flags, reason_code, properties): if reason_code.is_failure: print(f"Failed to connect: {reason_code}. loop_forever() will retry connection") else: # 我们总是应该从连接回调中订阅,以确保我们的连接在重新连接中一直存在。 client.subscribe(self.sub_topic) print('subscribe ==> {}'.format(self.sub_topic)) def on_disconnect(self, client, userdata, flags, reason_code, properties): client.user_data_set(userdata + 1) if userdata == 0: client.reconnect() def on_subscribe(self, client, userdata, mid, reason_code_list: list, properties): # 如果我们只订阅一个频道,reason_code_list 包含一个单条目 for reason_code in reason_code_list: if reason_code.is_failure: print(f"Broker rejected you subscription: {reason_code}") else: print(f"Broker granted the following QoS: {reason_code.value}") def on_log(self, client, userdata, level, message): print(message) def run(self): """继承多线程,重新run方法 """ self.startClient(self.clientId) def startClient(self, clientId: str): """启动MQTT客户端 """ print("starting mqttElevClient...") self.clientConn = mqclient.Client( mqclient.CallbackAPIVersion.VERSION2, client_id=clientId, clean_session=False ) self.clientConn.username_pw_set(self.mqsCI.user, self.mqsCI.pwd) print("MQTT u:p -> {}:{}".format(self.mqsCI.user, '*' * len(self.mqsCI.pwd))) self.clientConn.on_connect = self.on_connect self.clientConn.on_subscribe = self.on_subscribe self.clientConn.on_message = self.on_message self.clientConn.on_disconnect = self.on_disconnect self.clientConn.on_log = self.on_log # ----------------------------------------------------------------------------- # while True: try: print(" connecting to MQTT broker: {}:{}".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort)) self.clientConn.connect( self.mqsCI.brokerIP, self.mqsCI.brokerPort, self.keepAliveInterval ) break except Exception as e: # traceback.print_exc() print('startClient :{}'.format(e)) time.sleep(5) self.clientConn.loop_forever() def on_message(client, userdata, msg): global pub_topic try: topic = msg.topic mqttMsg = json.loads(msg.payload.decode('utf-8')) print('recv mqttMsg:{}'.format(mqttMsg)) payload = { "sn": pub_topic[topic], "t": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "f": 3, "p1": 120, } payload = json.dumps(payload) client.publish(pub_topic[topic], payload=payload, qos=2, retain=False) except Exception as e: print("mqttMsgRecvCB:{}".format(e)) if __name__ == '__main__': queue = Queue() services = parse_config('config.yaml') brokerIP = services['broker']['IP'] brokerPort = services['broker']['Port'] clientId = services['client']['Id'] user = services['client']['user'] pwd = services['client']['pwd'] # 订阅主题(实时) global pub_topic sub_topic = [(rt['sub'], 2) for rt in services['subscribe']['realtime']] pub_topic = {rt['sub']: rt['pub'] for rt in services['subscribe']['realtime']} mqCI = MQTTClientInfoX(brokerIP, brokerPort, clientId, user, pwd) mqttClient = SimpleMqttClient(mqCI, clientId) mqttClient.setTopic(sub_topic) mqttClient.setKeepAlive(60) mqttClient.on_message = on_message mqttClient.start() # 启动定时任务 # schedule(mqttClient, services['subscribe']['schedule']) cc = CustomClient(queue=queue) # 消息传递 while True: try: data = queue.get() mqttClient.clientConn.publish( data['topic'], payload=data['payload'], qos=2, retain=False ) except Exception as e: print(e)