123456789101112131415161718192021222324252627282930313233343536373839 |
- #!/usr/bin/python
- # -*- coding=utf-8 -*-
- # @Create Time: 2024-01-10 16:16:57
- # @Last Modified time:2024-01-10 16:16:57
- import sys, time
- from rocketmq.client import PushConsumer, ConsumeStatus
- topicName = 'test'
- groupName = 'log'
- nameserver = 'localhost:9876'
- TAGS = 'XXX'
- # 消息处理回调
- def callback(msg):
- # 模拟业务
- print('Received message. messageId: ', msg.id, ' body: ', msg.body, msg.get_property('property'))
- # 消费成功回复CONSUME_SUCCESS
- return ConsumeStatus.CONSUME_SUCCESS
- # 消费成功回复消息状态
- # return ConsumeStatus.RECONSUME_LATER
- def start_consume_message():
- # 初始化消费者,并设置消费者组信息
- consumer = PushConsumer(groupName)
- # 设置服务地址
- consumer.set_name_server_address(nameserver)
- # 订阅topic
- consumer.subscribe(topicName, callback, TAGS)
- print(' [Consumer] Waiting for messages.')
- # 启动消费者
- consumer.start()
- while True:
- time.sleep(30)
- # 资源释放
- consumer.shutdown()
- if __name__ == '__main__':
- start_consume_message()
|