返回

dapr实战(三):发布订阅与观察者模式的异同

后端

前言

在上一篇文章中,我们介绍了Dapr中的服务之间调用( Service-to-service invocation )和状态管理( State management )功能。本文将介绍剩下的几个功能:发布订阅、密钥管理和服务发现。

发布订阅是一种常见的分布式系统模式,它允许组件通过松散耦合的方式进行通信。在发布订阅模式中,发布者将事件发布到主题,而订阅者则订阅这些主题。当主题中有新的事件时,订阅者会收到通知并对事件做出响应。

观察者模式是一种设计模式,它定义了一种一对多的依赖关系,其中一个对象(主题)可以有多个依赖对象(观察者)。当主题的状态发生改变时,它会通知所有观察者,而观察者则会对这一改变做出响应。

发布订阅与观察者模式的异同

发布订阅和观察者模式都是一种松散耦合的通信机制,它们允许组件通过非同步的方式进行通信。然而,这两者之间也存在一些差异。

  • 发布订阅是一种一对多的通信机制,而观察者模式则是一种一对多的通信机制。 在发布订阅模式中,一个发布者可以向多个订阅者发布事件,而一个订阅者也可以订阅多个主题。在观察者模式中,一个主题只能有一个观察者。
  • 发布订阅是一种异步通信机制,而观察者模式则是一种同步通信机制。 在发布订阅模式中,发布者将事件发布到主题后,并不等待订阅者对事件做出响应。而在观察者模式中,主题在通知观察者之前会等待观察者做出响应。
  • 发布订阅是一种基于事件的通信机制,而观察者模式则是一种基于状态的通信机制。 在发布订阅模式中,发布者只关心将事件发布到主题,而不需要关心订阅者如何处理这些事件。而在观察者模式中,主题需要关心观察者如何处理状态的改变。

发布订阅在分布式系统中的应用场景

发布订阅在分布式系统中有着广泛的应用场景,一些常见的应用场景包括:

  • 事件通知: 发布订阅可以用于在分布式系统中发送事件通知。例如,当一个微服务发生故障时,它可以将故障事件发布到一个主题,而其他微服务可以通过订阅该主题来收到故障通知。
  • 消息传递: 发布订阅可以用于在分布式系统中传递消息。例如,一个微服务可以将需要处理的消息发布到一个主题,而另一个微服务可以通过订阅该主题来接收这些消息。
  • 数据同步: 发布订阅可以用于在分布式系统中同步数据。例如,一个微服务可以将数据变更发布到一个主题,而其他微服务可以通过订阅该主题来同步这些数据变更。

Dapr中的发布订阅功能

Dapr中的发布订阅功能基于Apache Kafka实现。Dapr提供了一个名为"pubsub"的组件,它可以用来创建和管理主题。此外,Dapr还提供了一个名为"dapr-kafka"的SDK,它可以用来编写发布订阅应用程序。

Dapr中的发布订阅功能非常简单易用。创建一个主题只需要几行代码,发布和订阅事件也只需要几行代码。此外,Dapr还提供了丰富的特性,比如事件重试、事件顺序保证和事件死信队列等。

Dapr发布订阅功能的实战示例

下面是一个使用Dapr发布订阅功能的实战示例。在这个示例中,我们将创建一个主题,然后创建一个发布者应用程序和一个订阅者应用程序。

创建主题

首先,我们需要创建一个主题。我们可以使用以下命令来创建一个名为"my-topic"的主题:

dapr create topic my-topic

创建发布者应用程序

接下来,我们需要创建一个发布者应用程序。发布者应用程序将向"my-topic"主题发布事件。我们可以使用以下代码来创建发布者应用程序:

package main

import (
	"context"
	"fmt"
	"github.com/dapr/go-sdk/service/pubsub"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	for i := 0; i < 10; i++ {
		err = client.Publish(ctx, "my-topic", &pubsub.Message{
			Data: []byte(fmt.Sprintf("Hello Dapr! %d", i)),
		})
		if err != nil {
			panic(err)
		}
	}
}

创建订阅者应用程序

最后,我们需要创建一个订阅者应用程序。订阅者应用程序将从"my-topic"主题订阅事件。我们可以使用以下代码来创建订阅者应用程序:

package main

import (
	"context"
	"fmt"
	"github.com/dapr/go-sdk/service/pubsub"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	err = client.Subscribe(ctx, "my-topic", func(ctx context.Context, e *pubsub.Message) error {
		fmt.Println(string(e.Data))
		return nil
	})
	if err != nil {
		panic(err)
	}

	// 阻塞 main goroutine
	select {}
}

总结

发布订阅是一种非常重要的分布式系统模式,它可以用来实现事件通知、消息传递和数据同步等功能。Dapr中的发布订阅功能基于Apache Kafka实现,它非常简单易用,并且提供了丰富的特性。

在本文中,我们介绍了发布订阅和观察者模式的异同,以及发布订阅在分布式系统中的应用场景。此外,我们还介绍了Dapr中的发布订阅功能,并提供了一个实战示例。