main.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. # -*- coding: utf-8 -*-
  2. # @Author: privacy
  3. # @Date: 2024-06-24 16:33:01
  4. # @Last Modified by: privacy
  5. # @Last Modified time: 2024-07-09 17:55:47
  6. import time
  7. import json
  8. import datetime
  9. import threading
  10. from queue import Queue
  11. from dataclasses import dataclass, asdict
  12. import paho.mqtt.client as mqclient
  13. from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
  14. from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
  15. # from tasks import schedule
  16. from tasks import CustomClient
  17. from utils import parse_config
  18. @dataclass
  19. class MQTTClientInfoX:
  20. brokerIP: str
  21. brokerPort: int
  22. clientId: str
  23. user: str
  24. pwd: str
  25. class SimpleMqttClient(threading.Thread):
  26. def __init__(self, mqttClientInfo: MQTTClientInfoX, clientId: str):
  27. threading.Thread.__init__(self, name="mqttElevClient_{}".format(time.time()))
  28. self.mqsCI = mqttClientInfo
  29. self.on_message = None
  30. self.keepAliveInterval = None
  31. self.clientId = clientId
  32. def setKeepAlive(self, keepAliveInterval: int = None) -> None:
  33. """设置超时事件
  34. """
  35. self.keepAliveInterval = 30 if keepAliveInterval is None or keepAliveInterval < 0 else keepAliveInterval
  36. def setTopic(self, topic: list) -> None:
  37. """设置订阅主题
  38. """
  39. self.sub_topic = topic
  40. def on_connect(self, client, userdata, flags, reason_code, properties):
  41. if reason_code.is_failure:
  42. print(f"Failed to connect: {reason_code}. loop_forever() will retry connection")
  43. else:
  44. # 我们总是应该从连接回调中订阅,以确保我们的连接在重新连接中一直存在。
  45. client.subscribe(self.sub_topic)
  46. print('subscribe ==> {}'.format(self.sub_topic))
  47. def on_disconnect(self, client, userdata, flags, reason_code, properties):
  48. client.user_data_set(userdata + 1)
  49. if userdata == 0:
  50. client.reconnect()
  51. def on_subscribe(self, client, userdata, mid, reason_code_list: list, properties):
  52. # 如果我们只订阅一个频道,reason_code_list 包含一个单条目
  53. for reason_code in reason_code_list:
  54. if reason_code.is_failure:
  55. print(f"Broker rejected you subscription: {reason_code}")
  56. else:
  57. print(f"Broker granted the following QoS: {reason_code.value}")
  58. def on_log(self, client, userdata, level, message):
  59. print(message)
  60. def run(self):
  61. """继承多线程,重新run方法
  62. """
  63. self.startClient(self.clientId)
  64. def startClient(self, clientId: str):
  65. """启动MQTT客户端
  66. """
  67. print("starting mqttElevClient...")
  68. self.clientConn = mqclient.Client(
  69. mqclient.CallbackAPIVersion.VERSION2,
  70. client_id=clientId,
  71. clean_session=False
  72. )
  73. self.clientConn.username_pw_set(self.mqsCI.user, self.mqsCI.pwd)
  74. print("MQTT u:p -> {}:{}".format(self.mqsCI.user, '*' * len(self.mqsCI.pwd)))
  75. self.clientConn.on_connect = self.on_connect
  76. self.clientConn.on_subscribe = self.on_subscribe
  77. self.clientConn.on_message = self.on_message
  78. self.clientConn.on_disconnect = self.on_disconnect
  79. self.clientConn.on_log = self.on_log
  80. # ----------------------------------------------------------------------------- #
  81. while True:
  82. try:
  83. print(" connecting to MQTT broker: {}:{}".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort))
  84. self.clientConn.connect(
  85. self.mqsCI.brokerIP,
  86. self.mqsCI.brokerPort,
  87. self.keepAliveInterval
  88. )
  89. break
  90. except Exception as e:
  91. # traceback.print_exc()
  92. print('startClient :{}'.format(e))
  93. time.sleep(5)
  94. self.clientConn.loop_forever()
  95. def on_message(client, userdata, msg):
  96. global pub_topic
  97. try:
  98. topic = msg.topic
  99. mqttMsg = json.loads(msg.payload.decode('utf-8'))
  100. print('recv mqttMsg:{}'.format(mqttMsg))
  101. payload = {
  102. "sn": pub_topic[topic],
  103. "t": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
  104. "f": 3,
  105. "p1": 120,
  106. }
  107. payload = json.dumps(payload)
  108. client.publish(pub_topic[topic], payload=payload, qos=2, retain=False)
  109. except Exception as e:
  110. print("mqttMsgRecvCB:{}".format(e))
  111. if __name__ == '__main__':
  112. queue = Queue()
  113. services = parse_config('config.yaml')
  114. brokerIP = services['broker']['IP']
  115. brokerPort = services['broker']['Port']
  116. clientId = services['client']['Id']
  117. user = services['client']['user']
  118. pwd = services['client']['pwd']
  119. # 订阅主题(实时)
  120. global pub_topic
  121. sub_topic = [(rt['sub'], 2) for rt in services['subscribe']['realtime']]
  122. pub_topic = {rt['sub']: rt['pub'] for rt in services['subscribe']['realtime']}
  123. mqCI = MQTTClientInfoX(brokerIP, brokerPort, clientId, user, pwd)
  124. mqttClient = SimpleMqttClient(mqCI, clientId)
  125. mqttClient.setTopic(sub_topic)
  126. mqttClient.setKeepAlive(60)
  127. mqttClient.on_message = on_message
  128. mqttClient.start()
  129. # 启动定时任务
  130. # schedule(mqttClient, services['subscribe']['schedule'])
  131. cc = CustomClient(queue=queue)
  132. # 消息传递
  133. while True:
  134. try:
  135. data = queue.get()
  136. mqttClient.clientConn.publish(
  137. data['topic'],
  138. payload=data['payload'],
  139. qos=2,
  140. retain=False
  141. )
  142. except Exception as e:
  143. print(e)