|
@@ -0,0 +1,303 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+# @Author: privacy
|
|
|
+# @Date: 2024-07-05 15:02:27
|
|
|
+# @Last Modified by: privacy
|
|
|
+# @Last Modified time: 2024-07-12 15:43:27
|
|
|
+"""
|
|
|
+octet = types.uint8
|
|
|
+unsigned long = types.uint32
|
|
|
+unsigned short = types.uint16
|
|
|
+"""
|
|
|
+import time
|
|
|
+
|
|
|
+from unitree_sdk2py.core.channel import (
|
|
|
+ ChannelFactoryInitialize,
|
|
|
+ ChannelSubscriber,
|
|
|
+ ChannelPublisher
|
|
|
+)
|
|
|
+
|
|
|
+from slam_sdk2py.idl.graph_msgs.msg.dds_ import *
|
|
|
+
|
|
|
+class slamDemo:
|
|
|
+ def __init__(self):
|
|
|
+ self.index = 0
|
|
|
+ # self.currentOdom
|
|
|
+ self.node_name = 0
|
|
|
+
|
|
|
+ ChannelFactoryInitialize(0, "eth0")
|
|
|
+
|
|
|
+ self.pubQtCommand = ChannelPublisher("rt/qt_command", QtCommand_)
|
|
|
+ self.pubQtNode = ChannelPublisher("rt/qt_add_node", QtNode_)
|
|
|
+ self.pubQtEdge = ChannelPublisher("rt/qt_add_edge", QtEdge_)
|
|
|
+
|
|
|
+ self.pubQtCommand.Init()
|
|
|
+ self.pubQtNode.Init()
|
|
|
+ self.pubQtEdge.Init()
|
|
|
+
|
|
|
+ # self.subQtNotice = ChannelSubscriber("rt/qt_notice")
|
|
|
+ # self.subOdommetry = ChannelSubscriber("rt/lio_sam_ros2/mapping/re_location_odometry")
|
|
|
+
|
|
|
+ def startMapping(self) -> None:
|
|
|
+ """开启建图"""
|
|
|
+ self.index += 1
|
|
|
+ send_msg = QtCommand_()
|
|
|
+ send_msg.command = 3
|
|
|
+ send_msg.attribute = 2
|
|
|
+ send_msg.seq = "index:" + str(self.index) + ";"
|
|
|
+
|
|
|
+ if self.pubQtCommand.Write(send_msg, 0.5):
|
|
|
+ print("Publish success. msg:", send_msg)
|
|
|
+ else:
|
|
|
+ print("Waitting for subscriber.")
|
|
|
+
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ def endMapping(self, floor_index=0, pcdmap_index=0) -> None:
|
|
|
+ """结束建图"""
|
|
|
+ self.index += 1
|
|
|
+ send_msg = QtCommand_()
|
|
|
+ send_msg.command = 4
|
|
|
+ send_msg.attribute = 0
|
|
|
+ send_msg.seq = "index:" + str(self.index) + ";"
|
|
|
+ send_msg.floor_index.append(floor_index)
|
|
|
+ send_msg.pcdmap_index.append(pcdmap_index)
|
|
|
+
|
|
|
+ if self.pubQtCommand.Write(send_msg, 0.5):
|
|
|
+ print("Publish success. msg:", send_msg)
|
|
|
+ else:
|
|
|
+ print("Waitting for subscriber.")
|
|
|
+
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ # def addNode(self):
|
|
|
+ # """增加导航拓扑点"""
|
|
|
+ # send_msg = QtNode_()
|
|
|
+ # send_msg.seq = "index:" + str(self.index) + ";"
|
|
|
+
|
|
|
+ def addEdge(self, edge_name: int, start_node: int, end_node: int) -> None:
|
|
|
+ """增加导航拓扑边"""
|
|
|
+ self.index += 1
|
|
|
+ send_msg = QtEdge_()
|
|
|
+ send_msg.seq = "index:" + str(index) + ";"
|
|
|
+ send_msg.edge.edge_name_().append(edge_name);
|
|
|
+ send_msg.edge.start_node_name.append(start_node)
|
|
|
+ send_msg.edge.end_node_name.append(end_node)
|
|
|
+ send_msg.edge.dog_speed.append(0.5)
|
|
|
+
|
|
|
+ send_msg.edge.dog_stats.append(0)
|
|
|
+ send_msg.edge.edge_length.append(0)
|
|
|
+ send_msg.edge.dog_back_stats.append(0)
|
|
|
+ send_msg.edge.edge_state.append(0)
|
|
|
+ send_msg.edge.edge_state_1.append(0)
|
|
|
+ send_msg.edge.edge_state_2.append(0)
|
|
|
+ send_msg.edge.edge_state_3.append(0)
|
|
|
+ send_msg.edge.edge_state_4.append(0)
|
|
|
+
|
|
|
+ if self.pubQtEdge.Write(send_msg):
|
|
|
+ print("Publish success. msg:", send_msg)
|
|
|
+ else:
|
|
|
+ print("Waitting for subscriber.")
|
|
|
+
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ """删除拓扑点"""
|
|
|
+
|
|
|
+ """删除拓扑边"""
|
|
|
+
|
|
|
+ def query_nodes(self):
|
|
|
+ """查询点信息"""
|
|
|
+ send_msg = QtCommand_()
|
|
|
+ pass
|
|
|
+
|
|
|
+ def query_edges(self):
|
|
|
+ """查询边信息"""
|
|
|
+ pass
|
|
|
+
|
|
|
+ def startRelocation(self) -> None:
|
|
|
+ """开启重定位"""
|
|
|
+ self.index += 1
|
|
|
+ send_msg = QtCommand_()
|
|
|
+ send_msg.seq = "index:" + str(self.index) + ";"
|
|
|
+ send_msg.command = 6
|
|
|
+ send_msg.attribute = 2
|
|
|
+
|
|
|
+ if self.pubQtCommand.Write(send_msg, 0.5):
|
|
|
+ print("Publish success. msg:", send_msg)
|
|
|
+ else:
|
|
|
+ print("Waitting for subscriber.")
|
|
|
+
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ def initPost(self):
|
|
|
+ """重定位初始化"""
|
|
|
+ self.index += 1
|
|
|
+ send_msg = QtCommand_()
|
|
|
+ send_msg.seq = "index:" + str(self.index) + ";"
|
|
|
+ send_msg.command = 7
|
|
|
+ send_msg.quaternion_x = 0
|
|
|
+ send_msg.quaternion_y = 0
|
|
|
+ send_msg.quaternion_z = 0
|
|
|
+ send_msg.quaternion_w = 1
|
|
|
+ send_msg.translation_x = 0
|
|
|
+ send_msg.translation_y = 0
|
|
|
+ send_msg.translation_z = 0
|
|
|
+
|
|
|
+ if self.pubQtCommand.Write(send_msg, 0.5):
|
|
|
+ print("Publish success. msg:", send_msg)
|
|
|
+ else:
|
|
|
+ print("Waitting for subscriber.")
|
|
|
+
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ def startNavigation(self):
|
|
|
+ """开启导航"""
|
|
|
+ self.index += 1
|
|
|
+ send_msg = QtCommand_()
|
|
|
+ send_msg.seq = "index:" + str(self.index) + ";"
|
|
|
+ send_msg.command = 8
|
|
|
+
|
|
|
+ if self.pubQtCommand.Write(send_msg, 0.5):
|
|
|
+ print("Publish success. msg:", send_msg)
|
|
|
+ else:
|
|
|
+ print("Waitting for subscriber.")
|
|
|
+
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ def defaultMultiNavigation(self):
|
|
|
+ """多点循环导航(默认点)"""
|
|
|
+ send_msg = QtCommand_()
|
|
|
+ send_msg.seq = "index:" + str(self.index) + ";"
|
|
|
+ send_msg.command = 10
|
|
|
+
|
|
|
+ if self.pubQtCommand.Write(send_msg, 0.5):
|
|
|
+ print("Publish success. msg:", send_msg)
|
|
|
+ else:
|
|
|
+ print("Waitting for subscriber.")
|
|
|
+
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ def setMultiNavigation(self, nodes: list):
|
|
|
+ """ 多点循环导航(设置点)"""
|
|
|
+ self.index += 1
|
|
|
+ send_msg = QtCommand_()
|
|
|
+ send_msg.seq = "index:" + str(self.index) + ";"
|
|
|
+ send_msg.command = 11
|
|
|
+ for node in nodes:
|
|
|
+ send_msg.node_edge_name.append(node)
|
|
|
+
|
|
|
+ if self.pubQtCommand.Write(send_msg, 0.5):
|
|
|
+ print("Publish success. msg:", send_msg)
|
|
|
+ else:
|
|
|
+ print("Waitting for subscriber.")
|
|
|
+
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ def deleteAllNode(self) -> None:
|
|
|
+ self.index += 1
|
|
|
+ send_msg = QtCommand_()
|
|
|
+ send_msg.seq = "index:" + str(self.index) + ";"
|
|
|
+ send_msg.command = 1
|
|
|
+ send_msg.attribute = 1
|
|
|
+ send_msg.node_edge_name.append(999)
|
|
|
+
|
|
|
+ if self.pubQtCommand.Write(send_msg, 0.5):
|
|
|
+ print("Publish success. msg:", send_msg)
|
|
|
+ else:
|
|
|
+ print("Waitting for subscriber.")
|
|
|
+
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ def deleteAllEdge(self) -> None:
|
|
|
+ self.index += 1
|
|
|
+ send_msg = QtCommand_()
|
|
|
+ send_msg.seq = "index:" + str(self.index) + ";"
|
|
|
+ # 1为删除指令
|
|
|
+ send_msg.command = 1
|
|
|
+ # 2为边
|
|
|
+ send_msg.attribute = 2
|
|
|
+ send_msg.node_edge_name.append(999)
|
|
|
+
|
|
|
+ if self.pubQtCommand.Write(send_msg, 0.5):
|
|
|
+ print("Publish success. msg:", send_msg)
|
|
|
+ else:
|
|
|
+ print("Waitting for subscriber.")
|
|
|
+
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ def pauseNavigation(self) -> None:
|
|
|
+ """暂停导航"""
|
|
|
+ self.index += 1
|
|
|
+ send_msg = QtCommand_()
|
|
|
+ send_msg.seq = "index:" + str(self.index) + ";"
|
|
|
+ send_msg.command = 13
|
|
|
+
|
|
|
+ if self.pubQtCommand.Write(send_msg, 0.5):
|
|
|
+ print("Publish success. msg:", send_msg)
|
|
|
+ else:
|
|
|
+ print("Waitting for subscriber.")
|
|
|
+
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ def recoverNavigation(self) -> None:
|
|
|
+ """恢复导航"""
|
|
|
+ self.index += 1
|
|
|
+ send_msg = QtCommand_()
|
|
|
+ send_msg.seq = "index:" + str(self.index) + ";"
|
|
|
+ send_msg.command = 14
|
|
|
+
|
|
|
+ if self.pubQtCommand.Write(send_msg, 0.5):
|
|
|
+ print("Publish success. msg:", send_msg)
|
|
|
+ else:
|
|
|
+ print("Waitting for subscriber.")
|
|
|
+
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ def resturnStartNode(self) -> None:
|
|
|
+ """返回起点(默认第一个点为起点)"""
|
|
|
+ self.index += 1
|
|
|
+ send_msg = QtCommand_()
|
|
|
+ send_msg.seq = "index:" + str(self.index) + ";"
|
|
|
+ send_msg.command = 15
|
|
|
+
|
|
|
+ if self.pubQtCommand.Write(send_msg, 0.5):
|
|
|
+ print("Publish success. msg:", send_msg)
|
|
|
+ else:
|
|
|
+ print("Waitting for subscriber.")
|
|
|
+
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ def closeAllNodes(self) -> None:
|
|
|
+ """关闭所有节点"""
|
|
|
+ self.index += 1
|
|
|
+ send_msg = QtCommand_()
|
|
|
+ send_msg.seq = "index:" + str(self.index) + ";"
|
|
|
+ send_msg.command = 99
|
|
|
+
|
|
|
+ if self.pubQtCommand.Write(send_msg, 0.5):
|
|
|
+ print("Publish success. msg:", send_msg)
|
|
|
+ else:
|
|
|
+ print("Waitting for subscriber.")
|
|
|
+
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ # def addNodeAndEdge(self):
|
|
|
+ # self.index += 1
|
|
|
+ # send_msg = QtNode_()
|
|
|
+ # timeNow = time.time()
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ # ChannelFactoryInitialize(0, "eth0")
|
|
|
+ # sub = ChannelSubscriber("rt/qt_command", SLAMState_)
|
|
|
+ # sub.Init(SLAMStateHandler, 10)
|
|
|
+
|
|
|
+ # while True:
|
|
|
+ # time.sleep(10.0)
|
|
|
+
|
|
|
+ slamTest = slamDemo()
|
|
|
+
|
|
|
+ while True:
|
|
|
+ time.sleep(10.0)
|
|
|
+
|
|
|
+
|
|
|
+##########################################
|