返回
RocketMQ 5.x如何使用GRPC方式发送消费消息
后端
2024-01-01 15:48:31
RocketMQ已经作为云原生的分布式消息中间件被越来越多的企业接受使用,它满足高吞吐、低延时、高可用、高可靠及分布式等特性。而在分布式场景下,系统之间的数据交互十分频繁。本文以RocketMQ的最新版本5.x 为例,详细介绍了如何使用GRPC方式发送消费消息。
RocketMQ 5.x新增了proxy模式部署方式,也就是支持了GRPC的消费方式消费。使用Go语言作为开发语言,在了解了RocketMQ 5.x的基本概念和原理后,我们可以开始尝试使用GRPC方式发送消费消息。
准备工作
在开始之前,我们需要确保以下条件已经满足:
- 安装并运行RocketMQ服务器。
- 安装并运行GRPC客户端。
- 确保客户端和服务器之间能够相互通信。
发送消息
package main
import (
"context"
"fmt"
"io"
"log"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
// 创建一个GRPC生产者客户端
producer, err := rocketmq.NewProducer(
rocketmq.WithNameServer([]string{"127.0.0.1:9876"}),
rocketmq.WithNamespace("GID_GID"),
rocketmq.WithGRPCChannel(func() (*rocketmq.GRPCChannel, error) {
return rocketmq.DefaultGRPCChannel()
}),
)
if err != nil {
log.Fatalf("create producer error: %v", err)
}
// 创建一个topic
topic := "TopicTest"
// 发送消息
for i := 0; i < 10; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte(fmt.Sprintf("Hello RocketMQ %d", i)),
}
res, err := producer.SendSync(context.Background(), msg)
if err != nil {
log.Fatalf("send message error: %v", err)
}
fmt.Printf("send message success: %v\n", res)
}
// 关闭生产者客户端
producer.Close()
}
消费消息
package main
import (
"context"
"fmt"
"io"
"log"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
// 创建一个GRPC消费者客户端
consumer, err := rocketmq.NewPushConsumer(
rocketmq.WithNameServer([]string{"127.0.0.1:9876"}),
rocketmq.WithNamespace("GID_GID"),
rocketmq.WithGRPCChannel(func() (*rocketmq.GRPCChannel, error) {
return rocketmq.DefaultGRPCChannel()
}),
)
if err != nil {
log.Fatalf("create consumer error: %v", err)
}
// 订阅主题
err = consumer.Subscribe(context.Background(), "TopicTest", func(ctx context.Context, msgs ...*primitive.MessageExt) error {
for _, msg := range msgs {
fmt.Printf("consume message: %v\n", msg)
}
return nil
})
if err != nil {
log.Fatalf("subscribe error: %v", err)
}
// 启动消费者客户端
err = consumer.Start()
if err != nil {
log.Fatalf("start consumer error: %v", err)
}
// 等待消费者客户端关闭
select {}
// 关闭消费者客户端
consumer.Shutdown()
}
总结
本文详细介绍了如何在RocketMQ 5.x中使用GRPC方式发送消费消息。通过使用goland作为开发语言,我们可以轻松地编写代码来实现消息的发送和消费。希望本文对您有所帮助。