c.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. #!/usr/bin/python
  2. # -*- coding=utf-8 -*-
  3. # @Create Time: 2024-01-10 16:16:57
  4. # @Last Modified time:2024-01-10 16:16:57
  5. import sys, time
  6. from rocketmq.client import PushConsumer, ConsumeStatus
  7. topicName = 'test'
  8. groupName = 'log'
  9. nameserver = 'localhost:9876'
  10. TAGS = 'XXX'
  11. # 消息处理回调
  12. def callback(msg):
  13. # 模拟业务
  14. print('Received message. messageId: ', msg.id, ' body: ', msg.body, msg.get_property('property'))
  15. # 消费成功回复CONSUME_SUCCESS
  16. return ConsumeStatus.CONSUME_SUCCESS
  17. # 消费成功回复消息状态
  18. # return ConsumeStatus.RECONSUME_LATER
  19. def start_consume_message():
  20. # 初始化消费者,并设置消费者组信息
  21. consumer = PushConsumer(groupName)
  22. # 设置服务地址
  23. consumer.set_name_server_address(nameserver)
  24. # 订阅topic
  25. consumer.subscribe(topicName, callback, TAGS)
  26. print(' [Consumer] Waiting for messages.')
  27. # 启动消费者
  28. consumer.start()
  29. while True:
  30. time.sleep(30)
  31. # 资源释放
  32. consumer.shutdown()
  33. if __name__ == '__main__':
  34. start_consume_message()