123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- # -*- coding: utf-8 -*-
- # @Author: privacy
- # @Date: 2024-06-24 16:33:01
- # @Last Modified by: privacy
- # @Last Modified time: 2024-07-09 17:55:47
- import time
- import json
- import datetime
- import threading
- from queue import Queue
- from dataclasses import dataclass, asdict
- import paho.mqtt.client as mqclient
- from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
- from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
- # from tasks import schedule
- from tasks import CustomClient
- from utils import parse_config
- @dataclass
- class MQTTClientInfoX:
- brokerIP: str
- brokerPort: int
- clientId: str
- user: str
- pwd: str
- class SimpleMqttClient(threading.Thread):
- def __init__(self, mqttClientInfo: MQTTClientInfoX, clientId: str):
- threading.Thread.__init__(self, name="mqttElevClient_{}".format(time.time()))
- self.mqsCI = mqttClientInfo
- self.on_message = None
- self.keepAliveInterval = None
- self.clientId = clientId
- def setKeepAlive(self, keepAliveInterval: int = None) -> None:
- """设置超时事件
- """
- self.keepAliveInterval = 30 if keepAliveInterval is None or keepAliveInterval < 0 else keepAliveInterval
- def setTopic(self, topic: list) -> None:
- """设置订阅主题
- """
- self.sub_topic = topic
- def on_connect(self, client, userdata, flags, reason_code, properties):
- if reason_code.is_failure:
- print(f"Failed to connect: {reason_code}. loop_forever() will retry connection")
- else:
- # 我们总是应该从连接回调中订阅,以确保我们的连接在重新连接中一直存在。
- client.subscribe(self.sub_topic)
- print('subscribe ==> {}'.format(self.sub_topic))
- def on_disconnect(self, client, userdata, flags, reason_code, properties):
- client.user_data_set(userdata + 1)
- if userdata == 0:
- client.reconnect()
- def on_subscribe(self, client, userdata, mid, reason_code_list: list, properties):
- # 如果我们只订阅一个频道,reason_code_list 包含一个单条目
- for reason_code in reason_code_list:
- if reason_code.is_failure:
- print(f"Broker rejected you subscription: {reason_code}")
- else:
- print(f"Broker granted the following QoS: {reason_code.value}")
- def on_log(self, client, userdata, level, message):
- print(message)
- def run(self):
- """继承多线程,重新run方法
- """
- self.startClient(self.clientId)
- def startClient(self, clientId: str):
- """启动MQTT客户端
- """
- print("starting mqttElevClient...")
- self.clientConn = mqclient.Client(
- mqclient.CallbackAPIVersion.VERSION2,
- client_id=clientId,
- clean_session=False
- )
- self.clientConn.username_pw_set(self.mqsCI.user, self.mqsCI.pwd)
- print("MQTT u:p -> {}:{}".format(self.mqsCI.user, '*' * len(self.mqsCI.pwd)))
- self.clientConn.on_connect = self.on_connect
- self.clientConn.on_subscribe = self.on_subscribe
- self.clientConn.on_message = self.on_message
- self.clientConn.on_disconnect = self.on_disconnect
- self.clientConn.on_log = self.on_log
- # ----------------------------------------------------------------------------- #
- while True:
- try:
- print(" connecting to MQTT broker: {}:{}".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort))
- self.clientConn.connect(
- self.mqsCI.brokerIP,
- self.mqsCI.brokerPort,
- self.keepAliveInterval
- )
- break
- except Exception as e:
- # traceback.print_exc()
- print('startClient :{}'.format(e))
- time.sleep(5)
- self.clientConn.loop_forever()
- def on_message(client, userdata, msg):
- global pub_topic
- try:
- topic = msg.topic
- mqttMsg = json.loads(msg.payload.decode('utf-8'))
- print('recv mqttMsg:{}'.format(mqttMsg))
- payload = {
- "sn": pub_topic[topic],
- "t": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- "f": 3,
- "p1": 120,
- }
- payload = json.dumps(payload)
- client.publish(pub_topic[topic], payload=payload, qos=2, retain=False)
- except Exception as e:
- print("mqttMsgRecvCB:{}".format(e))
- if __name__ == '__main__':
- queue = Queue()
- services = parse_config('config.yaml')
- brokerIP = services['broker']['IP']
- brokerPort = services['broker']['Port']
- clientId = services['client']['Id']
- user = services['client']['user']
- pwd = services['client']['pwd']
- # 订阅主题(实时)
- global pub_topic
- sub_topic = [(rt['sub'], 2) for rt in services['subscribe']['realtime']]
- pub_topic = {rt['sub']: rt['pub'] for rt in services['subscribe']['realtime']}
- mqCI = MQTTClientInfoX(brokerIP, brokerPort, clientId, user, pwd)
- mqttClient = SimpleMqttClient(mqCI, clientId)
- mqttClient.setTopic(sub_topic)
- mqttClient.setKeepAlive(60)
- mqttClient.on_message = on_message
- mqttClient.start()
- # 启动定时任务
- # schedule(mqttClient, services['subscribe']['schedule'])
- cc = CustomClient(queue=queue)
- # 消息传递
- while True:
- try:
- data = queue.get()
- mqttClient.clientConn.publish(
- data['topic'],
- payload=data['payload'],
- qos=2,
- retain=False
- )
- except Exception as e:
- print(e)
|