返回

RocketMQ 5.x如何使用GRPC方式发送消费消息

后端

RocketMQ已经作为云原生的分布式消息中间件被越来越多的企业接受使用,它满足高吞吐、低延时、高可用、高可靠及分布式等特性。而在分布式场景下,系统之间的数据交互十分频繁。本文以RocketMQ的最新版本5.x 为例,详细介绍了如何使用GRPC方式发送消费消息。

RocketMQ 5.x新增了proxy模式部署方式,也就是支持了GRPC的消费方式消费。使用Go语言作为开发语言,在了解了RocketMQ 5.x的基本概念和原理后,我们可以开始尝试使用GRPC方式发送消费消息。

准备工作

在开始之前,我们需要确保以下条件已经满足:

  • 安装并运行RocketMQ服务器。
  • 安装并运行GRPC客户端。
  • 确保客户端和服务器之间能够相互通信。

发送消息

package main

import (
	"context"
	"fmt"
	"io"
	"log"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
	// 创建一个GRPC生产者客户端
	producer, err := rocketmq.NewProducer(
		rocketmq.WithNameServer([]string{"127.0.0.1:9876"}),
		rocketmq.WithNamespace("GID_GID"),
		rocketmq.WithGRPCChannel(func() (*rocketmq.GRPCChannel, error) {
			return rocketmq.DefaultGRPCChannel()
		}),
	)
	if err != nil {
		log.Fatalf("create producer error: %v", err)
	}

	// 创建一个topic
	topic := "TopicTest"

	// 发送消息
	for i := 0; i < 10; i++ {
		msg := &primitive.Message{
			Topic: topic,
			Body:  []byte(fmt.Sprintf("Hello RocketMQ %d", i)),
		}
		res, err := producer.SendSync(context.Background(), msg)
		if err != nil {
			log.Fatalf("send message error: %v", err)
		}

		fmt.Printf("send message success: %v\n", res)
	}

	// 关闭生产者客户端
	producer.Close()
}

消费消息

package main

import (
	"context"
	"fmt"
	"io"
	"log"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
	// 创建一个GRPC消费者客户端
	consumer, err := rocketmq.NewPushConsumer(
		rocketmq.WithNameServer([]string{"127.0.0.1:9876"}),
		rocketmq.WithNamespace("GID_GID"),
		rocketmq.WithGRPCChannel(func() (*rocketmq.GRPCChannel, error) {
			return rocketmq.DefaultGRPCChannel()
		}),
	)
	if err != nil {
		log.Fatalf("create consumer error: %v", err)
	}

	// 订阅主题
	err = consumer.Subscribe(context.Background(), "TopicTest", func(ctx context.Context, msgs ...*primitive.MessageExt) error {
		for _, msg := range msgs {
			fmt.Printf("consume message: %v\n", msg)
		}

		return nil
	})
	if err != nil {
		log.Fatalf("subscribe error: %v", err)
	}

	// 启动消费者客户端
	err = consumer.Start()
	if err != nil {
		log.Fatalf("start consumer error: %v", err)
	}

	// 等待消费者客户端关闭
	select {}

	// 关闭消费者客户端
	consumer.Shutdown()
}

总结

本文详细介绍了如何在RocketMQ 5.x中使用GRPC方式发送消费消息。通过使用goland作为开发语言,我们可以轻松地编写代码来实现消息的发送和消费。希望本文对您有所帮助。