sprivacy vor 1 Jahr
Commit
03d795269b
9 geänderte Dateien mit 986 neuen und 0 gelöschten Zeilen
  1. 51 0
      doc/README.md
  2. 17 0
      src/config.yaml
  3. 17 0
      src/config2.yaml
  4. 323 0
      src/learn.py
  5. 155 0
      src/main.py
  6. 91 0
      src/tasks.py
  7. 250 0
      src/test.py
  8. 36 0
      src/utils.py
  9. 46 0
      src/videoclass.py

+ 51 - 0
doc/README.md

@@ -0,0 +1,51 @@
+
+
+
+
+EC20模块
+---
+
+0、安装前置包
+
+```bash
+sudo apt-get install minicom
+sudo apt-get install gpsd gpsd-clients python-gps
+```
+
+1、开启EC20的GPS功能
+
+```bash
+lsusb
+```
+
+2、查看串口转usb是否正常
+
+```bash
+ls -l /dev/ttyUSB*
+```
+
+3、打开ttyUSB2串口启动GPS功能
+
+```bash
+sudo minicom -D /dev/ttyUSB2
+
+AT+QGPS=1
+```
+
+4、查看GPS数据
+
+```bash
+sudo minicom -D /dev/ttyUSB1
+```
+
+5、配置gpsd
+
+```bash
+sudo gpsd /dev/ttyUSB1 -N -D 9 -F /var/run/gpsd.sock -S 3333
+```
+
+6、监听gpsd
+
+```bash
+cgps -s localhost:3333
+```

+ 17 - 0
src/config.yaml

@@ -0,0 +1,17 @@
+broker:
+  IP: '127.0.0.1'
+  Port: 1883
+client:
+  Id: 'mq_test1'
+  user: 'hangge'
+  pwd: '123'
+subscribe:
+  schedule:
+    - dog1/status
+    - dog1/sports
+    - dog1/fanlt
+    - dog1/battery
+    - dog1/online
+  realtime:
+    - sub: dog1/command/send
+      pub: dog1/command/incept

+ 17 - 0
src/config2.yaml

@@ -0,0 +1,17 @@
+broker:
+  IP: '127.0.0.1'
+  Port: 1883
+client:
+  Id: 'mq_test2'
+  user: 'hangge'
+  pwd: '123'
+subscribe:
+  schedule:
+    - dog2/status
+    - dog2/sports
+    - dog2/fanlt
+    - dog2/battery
+    - dog2/online
+  realtime:
+    - sub: dog2/command/send
+      pub: dog2/command/incept

+ 323 - 0
src/learn.py

@@ -0,0 +1,323 @@
+# -*- coding: utf-8 -*-
+# @Author: privacy
+# @Date:   2024-06-20 11:40:45
+# @Last Modified by:   privacy
+# @Last Modified time: 2024-06-24 14:54:31
+from abc import ABC, abstractmethod
+import logging
+from queue import Queue
+
+import paho.mqtt.client as mqtt
+
+
+class ReflexArc(ABC):
+    """反射弧类"""
+    def __init__(self):
+        pass
+
+    @abstractmethod
+    def receptor(self):
+        """感受器
+        """
+        pass
+
+    @abstractmethod
+    def neure(self):
+        """神经元
+        """
+        pass
+
+    @abstractmethod
+    def effector(self):
+        """效应器,
+        """
+        pass
+
+
+class MqttCell(ReflexArc):
+    def __init__(self):
+        super().__init__()
+        self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
+
+    def on_connect(self, userdata, flags, reason_code, properties):
+        if reason_code.is_failure:
+            print(f"Failed to connect: {reason_code}. loop_forever() will retry connection")
+        else:
+            self.client.subscribe("test")
+
+    def receptor(self):
+        """感受器
+        """
+        self.client.on_connect = on_connect
+
+    def nerue(self):
+        pass
+
+
+
+class MqttClient(object):
+    """发布订阅类"""
+    def __init__(self, mqtt_host: str = "localhost", mqtt_port: int = 1883, mqtt_keepalive: int = 60):
+        super(MqttClient, self).__init__()
+        self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
+        self.client.on_connect = self.on_connect
+        self.client.on_message = self.on_message
+        self.client.username_message = self.username_message
+        self.client.on_publish = self.on_publish
+        self.client.connect(mqtt_host, mqtt_port, mqtt_keepalive)  # 600 为 keepalive 的时间间隔
+        self.client.loop_forever()  # 保持连接
+
+    def on_connect(self, userdata, flags, rc):
+        """连接主题(成功,失败)都会调用此函数
+        @userdata => 在Client()或userdata_set()中设置的私有用户数据
+        @flags => 代理发送的响应标志
+        @rc => 连接结果
+            0:连接成功
+            1:连接被拒绝-协议版本不正确
+            2:连接被拒绝-客户端标识符无效
+            3:连接被拒绝-服务器不可用
+            4:连接被拒绝-用户名或密码错误
+            5:连接被拒绝-未授权
+        @reasonCode  => mqttv5.0原因码:reasonCode类的实例。
+        @properties  =>  从代理返回的mqttv5.0属性
+        """
+        print("Connected with result code: " + str(rc))
+        # 订阅
+        self.client.subscribe("mqtt11")
+
+    def on_subscribe(self, userdata, mid, granted_qos):
+        """订阅回调
+        @mid => 匹配从相应的subscribe()调用
+        @grated_qos   => 给出代理的qos级别的整数列表,为每个不同的订阅请求授予。
+        """
+        print("On Subscribed: qos = %d" % granted_qos)
+        pass
+
+    def on_message(self, userdata, msg):
+        """消息回调
+        @msg  =>  MQTTMessage的实例。这是一个包含成员主题、负载、qos和保留的类
+        使用message_callback_add()定义将调用的多个回调,用于特定主题筛选器
+        """
+        print("on_message topic:" + msg.topic + " message:" + str(msg.payload.decode('utf-8')))
+    
+    def message_callback_add(self, sub, callback):
+        """注册特定主题消息回调
+        @sub  =>   sub即subscribe, 也就是client.subscribe() 方法中的订阅的topic,可以是通配符匹配订阅
+        @callback  => 自定义回调函数,回调参数与on_message()相同即可,参数的意义也是一样的。
+        """
+        pass
+
+    def username_message(self, userdata, msg):
+        """自定义回调函数
+        """
+        print("username_message  topic:" + msg.topic)
+
+    def message_callback_remove(self, sub):
+        """删除注册的特定回调
+        """
+        pass
+
+    def on_publish(self, userdata, mid):
+        """发布消息
+        @mid  => 匹配从相应的publish()调用,以允许跟踪传出消息。
+        """
+        print("On onPublish: qos = %d" % mid)
+        # self.client.publish(topic='mqtt11', payload='amazing', qos=0, retain=False)
+
+    def on_unsubscribe(self, userdata, mid):
+        """取消订阅
+        @mid  =>  匹配从相应的unsubscribe()调用。
+        """
+        print("On unSubscribed: qos = %d" % mid)
+        pass
+
+    def on_disconnect(self, userdata, rc):
+        """断开链接
+        @rc => 断开连接的结果
+        """
+        print("Unexpected disconnection rc = " + str(rc))
+        pass
+
+    def on_socket_open(self, userdata, socket):
+        """套接字打开
+        @socket  =>  刚打开的socket
+        """
+        pass
+
+    def on_socket_close(self, userdata, socket):
+        """套接字关闭
+        """
+        pass
+
+    def on_socket_register_write(self, userdata, socket):
+        """套接字写入
+        """
+        pass
+
+    def on_socket_unregister_write(self, userdata, socket):
+        """套接字注销写入
+        """
+        pass
+
+
+
+class Neuro(object):
+    def __init__(self):
+        super(Neuro, self).__init__()
+
+    def run(self):
+        pass
+
+
+class Brain(object):
+    def __init__(self):
+        super(Brain, self).__init__()
+
+    def run(self):
+        pass
+
+
+class Texture(MqttClient):
+    def __init__(self):
+        super().__init__()
+
+    def run(self):
+        self.client.publish(topic='mqtt11', payload='amazing', qos=0, retain=False)
+
+
+class Version(object):
+    def __init__(self):
+        super(Version, self).__init__()
+
+    def run(self, src: str = 'udp://127.0.0.1:5600', dst: str = 'rtmp://192.168.1.150:1935/hls/test'):
+        process = (
+            ffmpeg
+            .input(
+                src
+            )
+            .output(
+                dst,
+                c='copy',
+                format='flv',
+            )
+            .run_async(pipe_stdout=True, pipe_stderr=True)
+        )
+        stdout, stderr = process.communicate()
+        print(stdout.decode())
+        print(stderr.decode())
+
+
+
+
+
+import datetime
+import functools
+import json
+import time
+import traceback
+import paho.mqtt.client as mqclient
+import threading
+from dataclasses import dataclass
+
+@dataclass
+class MQTTClientInfoX:
+    brokerIP: str
+    brokerPort: int
+    clientId: str
+    user: str
+    pwd: str
+
+
+class SimpleMqttClientTest(threading.Thread):
+    def __init__(self, mqttClientInfo: MQTTClientInfoX, clientId):
+        threading.Thread.__init__(self, name="mqttElevClient_{}".format(time.time()))
+        self.mqsCI = mqttClientInfo
+        self.connectedCB = None
+        self.disconnectedCB = None
+        self.mqttMsgRecvCB = None
+        self.keepAliveInterval = None
+        self.clientId = clientId
+        self._isConnected = False
+        self.sn = 1
+
+    def setKeepAlive(self, keepAliveInterval: int):
+        self.keepAliveInterval = keepAliveInterval
+
+    def publishE(self, topic, payload, qos=0, retain=False):
+        self.clientConn.publish(topic, payload, qos=qos, retain=False)
+
+    def setCallback(self, connectedCB, un, mqttMsgRecvCB):
+        self.on_subscribe = connectedCB
+        self.mqttMsgRecvCB = mqttMsgRecvCB
+
+    def on_connect(self, client, userdata, flags, rc):
+        """一旦连接成功, 回调此方法"""
+        rc_status = ["连接成功", "协议版本不正确", "客户端标识符无效", "服务器不可用", "用户名或密码不正确", "未经授权"]
+        print("connect:", rc_status[rc])
+
+    def run(self):
+        self.startClient(self.clientId)
+
+    def startClient(self, clientId):
+
+        print("starting mqttElevClient...")
+        self.clientConn = mqclient.Client(client_id=clientId, clean_session=False)
+        self.clientConn.username_pw_set(self.mqsCI.user, self.mqsCI.pwd)
+        print("MQTT u:p -> {}:{}".format(self.mqsCI.user, '*' * len(self.mqsCI.pwd)))
+
+        self.clientConn.on_connect = functools.partial(self.on_connect)
+        self.clientConn.on_subscribe = functools.partial(self.on_subscribe)
+        self.clientConn.on_message = functools.partial(self.mqttMsgRecvCB)
+        # self.clientConn.on_disconnect = functools.partial(self.on_disconnect)
+        # self.clientConn.on_log = functools.partial(self.on_log)
+        keepAliveTime = 30 if self.keepAliveInterval is None or self.keepAliveInterval < 0 else self.keepAliveInterval
+
+        # -----------------------------------------------------------------------------
+        while True:
+            try:
+                print(" connecting to MQTT broker: {}:{}".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort))
+                self.clientConn.connect(self.mqsCI.brokerIP, self.mqsCI.brokerPort, keepAliveTime)
+                break
+            except Exception as e:
+                # traceback.print_exc()
+                print('startClient :{}'.format(e))
+                time.sleep(5)
+        self.clientConn.loop_forever()
+
+
+def mqttMsgRecvCB(client, userdata, msg):
+    try:
+        mqttMsg = json.loads(msg.payload.decode('utf-8'))
+        print('recv mqttMsg:{}'.format(mqttMsg))
+    except Exception as e:
+        print("mqttMsgRecvCB:{}".format(e))
+
+
+def connectedCB(client, userdata, topic):
+    sub_topic1 = topic
+    client.subscribe(sub_topic1)
+    print('connectedCB subscribe ==> {}'.format(sub_topic1))
+
+
+if __name__ == '__main__':
+    brokerIP = '127.0.0.1'
+    brokerPort = 1883
+    clientId = 'mq_test1'
+    user = 'hangge'
+    pwd = '123'
+    topic = '/demo/test/1'
+
+    mqCI = MQTTClientInfoX(brokerIP, brokerPort, clientId, user, pwd)
+    mqttClient = SimpleMqttClientTest(mqCI, clientId)
+    mqttClient.setCallback(functools.partial(connectedCB, topic=topic), None, mqttMsgRecvCB)
+    mqttClient.setKeepAlive(30)
+    mqttClient.start()
+
+    # while 1:
+    #     try:
+    #         time.sleep(1)
+    #         cmd = {"sn": 10001, "t": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "f": 3, "p1": 120, }
+    #         cmd_s = json.dumps(cmd)
+    #         mqttClient.publishE(topic, payload=cmd_s, qos=0, retain=False)
+    #     except Exception as e:
+    #         print(e)

+ 155 - 0
src/main.py

@@ -0,0 +1,155 @@
+# -*- coding: utf-8 -*-
+# @Author: privacy
+# @Date:   2024-06-24 16:33:01
+# @Last Modified by:   privacy
+# @Last Modified time: 2024-07-05 15:27:11
+import time
+import json
+import datetime
+import threading
+from dataclasses import dataclass, asdict
+
+import paho.mqtt.client as mqclient
+from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
+from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
+
+from tasks import schedule
+from utils import parse_config
+
+
+@dataclass
+class MQTTClientInfoX:
+    brokerIP: str
+    brokerPort: int
+    clientId: str
+    user: str
+    pwd: str
+
+
+class SimpleMqttClient(threading.Thread):
+    def __init__(self, mqttClientInfo: MQTTClientInfoX, clientId: str):
+        threading.Thread.__init__(self, name="mqttElevClient_{}".format(time.time()))
+        self.mqsCI = mqttClientInfo
+        self.on_message = None
+        self.keepAliveInterval = None
+        self.clientId = clientId
+
+    def setKeepAlive(self, keepAliveInterval: int = None) -> None:
+        """设置超时事件
+        """
+        self.keepAliveInterval = 30 if keepAliveInterval is None or keepAliveInterval < 0 else keepAliveInterval
+
+    def setTopic(self, topic: list) -> None:
+        """设置订阅主题
+        """
+        self.sub_topic = topic
+
+    def on_connect(self, client, userdata, flags, reason_code, properties):
+        if reason_code.is_failure:
+            print(f"Failed to connect: {reason_code}. loop_forever() will retry connection")
+        else:
+            # 我们总是应该从连接回调中订阅,以确保我们的连接在重新连接中一直存在。
+            client.subscribe(self.sub_topic)
+            print('subscribe ==> {}'.format(self.sub_topic))
+
+    def on_disconnect(self, client, userdata, flags, reason_code, properties):
+        client.user_data_set(userdata + 1)
+        if userdata == 0:
+            client.reconnect()
+
+    def on_subscribe(self, client, userdata, mid, reason_code_list: list, properties):
+        # 如果我们只订阅一个频道,reason_code_list 包含一个单条目
+        for reason_code in reason_code_list:
+            if reason_code.is_failure:
+                print(f"Broker rejected you subscription: {reason_code}")
+            else:
+                print(f"Broker granted the following QoS: {reason_code.value}")
+
+    def on_log(self, client, userdata, level, message):
+        print(message)
+
+    def run(self):
+        """继承多线程,重新run方法
+        """
+        self.startClient(self.clientId)
+
+    def startClient(self, clientId: str):
+        """启动MQTT客户端
+        """
+        print("starting mqttElevClient...")
+        self.clientConn = mqclient.Client(
+            mqclient.CallbackAPIVersion.VERSION2,
+            client_id=clientId,
+            clean_session=False
+        )
+        self.clientConn.username_pw_set(self.mqsCI.user, self.mqsCI.pwd)
+        print("MQTT u:p -> {}:{}".format(self.mqsCI.user, '*' * len(self.mqsCI.pwd)))
+
+        self.clientConn.on_connect = self.on_connect
+        self.clientConn.on_subscribe = self.on_subscribe
+        self.clientConn.on_message = self.on_message
+        self.clientConn.on_disconnect = self.on_disconnect
+        self.clientConn.on_log = self.on_log
+
+        # ----------------------------------------------------------------------------- #
+        while True:
+            try:
+                print(" connecting to MQTT broker: {}:{}".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort))
+                self.clientConn.connect(
+                    self.mqsCI.brokerIP,
+                    self.mqsCI.brokerPort,
+                    self.keepAliveInterval
+                )
+                break
+            except Exception as e:
+                # traceback.print_exc()
+                print('startClient :{}'.format(e))
+                time.sleep(5)
+
+        self.clientConn.loop_forever()
+
+
+def on_message(client, userdata, msg):
+    global pub_topic
+    try:
+        topic = msg.topic
+        mqttMsg = json.loads(msg.payload.decode('utf-8'))
+
+        print('recv mqttMsg:{}'.format(mqttMsg))
+
+        payload = {
+            "sn": pub_topic[topic],
+            "t": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
+            "f": 3,
+            "p1": 120,
+        }
+        payload = json.dumps(payload)
+
+        client.publish(pub_topic[topic], payload=payload, qos=2, retain=False)
+    except Exception as e:
+        print("mqttMsgRecvCB:{}".format(e))
+
+
+if __name__ == '__main__':
+    services = parse_config('config2.yaml')
+
+    brokerIP = services['broker']['IP']
+    brokerPort = services['broker']['Port']
+    clientId = services['client']['Id']
+    user = services['client']['user']
+    pwd = services['client']['pwd']
+
+    # 订阅主题(实时)
+    global pub_topic
+    sub_topic = [(rt['sub'], 2) for rt in services['subscribe']['realtime']]
+    pub_topic = {rt['sub']: rt['pub'] for rt in services['subscribe']['realtime']}
+
+    mqCI = MQTTClientInfoX(brokerIP, brokerPort, clientId, user, pwd)
+    mqttClient = SimpleMqttClient(mqCI, clientId)
+    mqttClient.setTopic(sub_topic)
+    mqttClient.setKeepAlive(60)
+    mqttClient.on_message = on_message
+    mqttClient.start()
+
+    # 启动定时任务
+    schedule(mqttClient, services['subscribe']['schedule'])

+ 91 - 0
src/tasks.py

@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+# @Author: privacy
+# @Date:   2024-06-24 17:04:17
+# @Last Modified by:   privacy
+# @Last Modified time: 2024-07-05 14:46:37
+import time
+import json
+import datetime
+
+from unitree_sdk2py.core.channel import ChannelFactoryInitialize, ChannelSubscriber
+from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_
+from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
+
+
+####################### 用户超类定义起始 #################################
+
+
+class CustomClient:
+    def __init__(self):
+        super(CustomClient, self).__init__()
+        ChannelFactoryInitialize(0, 'enp2s0')
+        sublowstate = ChannelSubscriber("rt/lowstate", LowState_)
+        sublowstate.Init(self.LowStateHandler, 10)
+        subsportmodestate = ChannelSubscriber("rt/sportmodestate", SportModeState_)
+        subsportmodestate.Init(self.HighStateHandler, 10)
+
+    def LowStateHandler(self, msg: LowState_):
+        payload = {
+            'bit_flag': msg.bit_flag,
+            'adc_reel': msg.adc_reel,
+            'temperature_ntc1': msg.temperature_ntc1,
+            'temperature_ntc2': msg.temperature_ntc2,
+            'power_v': msg.power_v,
+            'power_a': msg.power_a
+        }
+        print(payload)
+        payload = {
+            'status': msg.bms_state.status,
+            'soc': msg.bms_state.soc,
+            'current': msg.bms_state.current,
+            'cycle': msg.bms_state.cycle,
+            'bq_ntc': msg.bms_state.bq_ntc,
+            'mcu_ntc': msg.bms_state.mcu_ntc
+        }
+        print(payload)
+
+
+    def HighStateHandler(self, msg: SportModeState_):
+        payload = {
+            'state': msg.mode,
+            'bodyHeight': msg.body_height,
+            'footRaiseHeight': msg.foot_raise_height,
+            'gait': msg.gait_type,
+            'dance': msg.progress
+        }
+        print(payload)
+
+
+####################### 用户超类定义结束 #################################
+
+
+
+####################### 配置文件解析 - 定时任务 #######################
+
+
+def schedule(mqttClient, topics: list):
+    """定时任务"""
+    while True:
+        for topic in topics:
+            try:
+                time.sleep(0.1)
+                payload = {
+                    "sn": topic, 
+                    "t": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
+                    "f": 3,
+                    "p1": 120,
+                }
+                payload = json.dumps(payload)
+                mqttClient.clientConn.publish(
+                    topic,
+                    payload=payload,
+                    qos=2,
+                    retain=False
+                )
+            except Exception as e:
+                print(e)
+        time.sleep(10)
+
+
+####################### 定时任务定义结束 #################################
+

+ 250 - 0
src/test.py

@@ -0,0 +1,250 @@
+# -*- coding: utf-8 -*-
+# @Author: privacy
+# @Date:   2024-06-17 09:34:40
+# @Last Modified by:   privacy
+# @Last Modified time: 2024-07-04 13:33:13
+
+# subprocess 模块允许我们启动一个新进程,并连接到它们的输入/输出/错误管道,从而获取返回值。
+# from ffmpeg import FFmpeg
+
+# 服务
+# docker run -d -p 1935:1935 --name nginx-rtmp tiangolo/nginx-rtmp
+# 本地调试地址
+# ffplay -f flv http://127.0.0.1:8081
+ 
+# import asyncio
+
+# import ffmpeg
+# from ffmpeg import Progress
+# from ffmpeg.asyncio import FFmpeg
+
+
+# async def main():
+#     ffmpeg = (
+#         FFmpeg()
+#         # .option("y")
+#         # .input("udp://127.0.0.1:5600")
+#         # .input("in.mp4")
+#         .output(
+#             # "out.mp4"
+#             "rtmp://192.168.1.150:1935/hls/test",
+#             f="flv"
+#         )
+#     )
+
+#     @ffmpeg.on("progress")
+#     def on_progress(progress: Progress):
+#         print(progress)
+
+#     await ffmpeg.execute()
+
+# if __name__ == '__main__':
+#     asyncio.run(main())
+
+
+#################################START#########################################
+
+"""
+import asyncio
+
+import ffmpeg
+
+
+async def main():
+    process = (
+        ffmpeg
+        .input(
+            'udp://127.0.0.1:5600'
+        )
+        .output(
+            'rtmp://192.168.1.150:1935/hls/test',
+            c='copy',
+            format='flv',
+            # vcodec='libx264'
+        )
+        # .overwrite_output()
+        .run_async(pipe_stdout=True, pipe_stderr=True)
+    )
+    stdout, stderr = process.communicate()
+    print(stdout.decode())
+    print(stderr.decode())
+
+if __name__ == '__main__':
+    asyncio.run(main())
+"""
+
+#################################END#########################################
+
+
+# import subprocess
+
+
+# rtmp_url = 'rtmp://192.168.1.150:1935/hls/test'
+# # video_path = 'udp://127.0.0.1:5600'
+# video_path = 'in.mp4'
+
+# ffmpeg_command = [
+#     'ffmpeg',
+#     '-re',
+#     '-i', video_path,
+#     '-c', 'copy',
+#     # '-c:v', 'libx264',
+#     '-f', 'flv',
+#     rtmp_url
+# ]
+
+# print(ffmpeg_command)
+
+# process = subprocess.Popen(ffmpeg_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+# # 打印FFmpeg的输出和错误信息
+# stdout, stderr = process.communicate()
+# print(stdout.decode())
+# print(stderr.decode())
+
+
+
+
+#############################################################################
+
+
+import datetime
+import functools
+import json
+import time
+import traceback
+import paho.mqtt.client as mqclient
+import threading
+from dataclasses import dataclass
+
+@dataclass
+class MQTTClientInfoX:
+    brokerIP: str
+    brokerPort: int
+    clientId: str
+    user: str
+    pwd: str
+
+
+class SimpleMqttClientTest(threading.Thread):
+    def __init__(self, mqttClientInfo: MQTTClientInfoX, clientId, subtopics: list):
+        threading.Thread.__init__(self, name="mqttElevClient_{}".format(time.time()))
+        self.mqsCI = mqttClientInfo
+        self.keepAliveInterval = None
+        self.clientId = clientId
+        self.subtopics = subtopics
+
+    def setKeepAlive(self, keepAliveInterval: int):
+        self.keepAliveInterval = keepAliveInterval
+
+    def on_connect(self, client, userdata, flags, reason_code, properties):
+        """一旦连接成功, 回调此方法"""
+        if userdata == 0:
+            print("First connection:")
+        elif userdata == 1:
+            print("Second connection:")
+        elif userdata == 2:
+            print("Third connection (with clean session=True):")
+        print("    Session present: " + str(flags.session_present))
+        print("    Connection result: " + str(reason_code))
+
+        for topic in self.subtopics:
+            client.subscribe(topic, 2)
+        # rc_status = ["连接成功", "协议版本不正确", "客户端标识符无效", "服务器不可用", "用户名或密码不正确", "未经授权"]
+        # print("connect:", rc_status[rc])
+
+    def on_connect_fail(self, client, userdata):
+        print("Connect failed")
+
+    def on_message(self, client, userdata, msg):
+        print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload))
+
+    def on_publish(self, client, userdata, mid, reason_codes, properties):
+        print("mid: "+str(mid))
+
+    def on_subscribe(self, client, userdata, mid, reason_code_list, properties):
+        print("Subscribed: " + str(mid) + " " + str(reason_code_list))
+
+    def on_disconnect(self, client, userdata, flags, reason_code, properties):
+        client.user_data_set(userdata + 1)
+        if userdata == 0:
+            client.reconnect()
+
+    def on_log(self, client, userdata, level, message):
+        print(message)
+
+    def run(self):
+        self.startClient(self.clientId)
+
+    def startClient(self, clientId):
+
+        print("starting mqttElevClient...")
+        self.clientConn = mqclient.Client(mqclient.CallbackAPIVersion.VERSION2, client_id=clientId, clean_session=False)
+        self.clientConn.username_pw_set(self.mqsCI.user, self.mqsCI.pwd)
+        print("MQTT u:p -> {}:{}".format(self.mqsCI.user, '*' * len(self.mqsCI.pwd)))
+
+        self.clientConn.on_connect = self.on_connect
+        self.clientConn.on_connect_fail = self.on_connect_fail
+        self.clientConn.on_disconnect = self.on_disconnect
+        self.clientConn.on_publish = self.on_publish
+        self.clientConn.on_subscribe = self.on_subscribe
+        self.clientConn.on_message = self.on_message
+        self.clientConn.on_log = self.on_log
+
+        keepAliveTime = 30 if self.keepAliveInterval is None or self.keepAliveInterval < 0 else self.keepAliveInterval
+
+        # -----------------------------------------------------------------------------
+        while True:
+            try:
+                print(" connecting to MQTT broker: {}:{}".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort))
+                self.clientConn.connect(self.mqsCI.brokerIP, self.mqsCI.brokerPort, keepAliveTime)
+                break
+            except Exception as e:
+                # traceback.print_exc()
+                print('startClient :{}'.format(e))
+                time.sleep(5)
+        self.clientConn.loop_forever()
+
+
+if __name__ == '__main__':
+    brokerIP = '180.76.147.97'
+    brokerPort = 1883
+    clientId = 'mq_test1'
+    user = 'hangge'
+    pwd = '123'
+    subtopics = ['/huzhouip/new']
+    pubtopic = '/huzhouip/get'
+
+    mqCI = MQTTClientInfoX(brokerIP, brokerPort, clientId, user, pwd)
+    mqttClient = SimpleMqttClientTest(mqCI, clientId, subtopics)
+    mqttClient.setKeepAlive(30)
+    mqttClient.start()
+
+    time.sleep(1)
+
+    mqttClient.clientConn.publish(pubtopic, payload=json.dumps({"msg": "w"}), qos=2, retain=False)
+
+
+
+#######################################################################
+
+
+# import cv2
+
+# # gstreamer_str = "udpsrc address=230.1.1.1 port=1720 multicast-iface=<interface_name> ! application/x-rtp, media=video, encoding-name=H264 ! rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1"
+# gstreamer_str = "udpsrc address=239.0.0.1 port=54546 multicast-iface=192.168.1.155 ! application/x-rtp, media=video, encoding-name=H264 ! rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1"
+# cap = cv2.VideoCapture(gstreamer_str, cv2.CAP_GSTREAMER)
+# # cap = cv2.VideoCapture(0)
+
+# print(cap.isOpened())
+
+# while(cap.isOpened()):
+#     ret, frame = cap.read()
+#     if ret:
+#         cv2.imshow("Input via Gstreamer", frame)
+#         if cv2.waitKey(25) & 0xFF == ord('q'):
+#             break
+#     else:
+#         break
+
+# cap.release()
+# cv2.destroyAllWindows()

+ 36 - 0
src/utils.py

@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+# @Author: privacy
+# @Date:   2024-07-03 09:25:50
+# @Last Modified by:   privacy
+# @Last Modified time: 2024-07-08 10:48:44
+import fcntl
+import struct
+import socket
+
+import yaml
+
+
+def parse_config(config_path: str = 'config.yaml') -> dict:
+    with open(config_path, 'r', encoding='utf-8') as fp:
+        result = yaml.load(fp, Loader=yaml.FullLoader)
+    return result
+
+
+def get_interface_ip(interface_name: str):
+    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    try:
+        # 获取接口的IP地址
+        return socket.inet_ntoa(fcntl.ioctl(
+            s.fileno(),
+            0x8915,  # SIOCGIFADDR
+            struct.pack('256s', interface_name[:15].encode('utf-8'))
+        )[20:24])
+    except Exception as e:
+        print(f"Could not get IP address for {interface_name}: {e}")
+        return None
+    finally:
+        s.close()
+
+
+if __name__ == '__main__':
+    print(parse_config())

+ 46 - 0
src/videoclass.py

@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# @Author: privacy
+# @Date:   2024-07-02 13:55:20
+# @Last Modified by:   privacy
+# @Last Modified time: 2024-07-05 14:55:46
+import subprocess
+
+from utils import get_interface_ip
+
+
+class VideoServer:
+
+    def __init__(self, cmd = """ D:/software/gstreamer/1.0/msvc_x86_64/bin/gst-launch-1.0 udpsrc address=230.1.1.1 port=1720 ! queue ! decodebin ! autovideosink """):
+        super(VideoServer, self).__init__()
+        self.p = None
+        self.cmd = cmd.split()
+
+    def run(self):
+        print(self.cmd)
+        self.p = subprocess.Popen(self.cmd)
+
+    def stop(self):
+        self.p.terminate()
+        self.p.wait()
+
+
+if __name__ == '__main__':
+    gst = "D:/desktop/xianrobot/venv/Lib/site-packages/gnome/gst-launch.exe"
+    rtmp_addr = "rtmp://192.168.1.150:1935/live/stream"
+
+    # 获取 eth0 网卡的 IP 地址
+    eth0_ip = get_interface_ip('eth0')
+
+    if eth0_ip:
+        print(f"The IP address of eth0 is: {eth0_ip}")
+    else:
+        print("Could not find the IP address of eth0.")
+
+    cmd = f""" {gst} udpsrc address=230.1.1.1 port=1720 ! queue ! decodebin ! autovideosink """
+    cmd = f""" {gst} udpsrc address=230.1.1.1 port=1720 ! queue ! decodebin ! x264enc speed-preset=ultrafast tune=zerolatency ! flvmux ! rtmpsink location="{rtmp_addr}" """
+    # cmd = """ gst-launch-1.0 udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! queue ! application/x-rtp, media=video, encoding-name=H264 ! rtph264depay ! h264parse ! flvmux ! rtmpsink location="{rtmp_addr}" """
+
+    cmd = f""" ffmpeg -i udp://230.1.1.1:1720?multicast=1 -localaddr={eth0_ip} -c copy -tune zerolatency -sc_threshold 499 -profile high -preset ultrafast -f flv "{rtmp_addr}" """
+
+    vs = VideoServer(cmd)
+    vs.run()