A pub/sub or publish-subscribe service is a messaging pattern in which the publisher (sender) sends messages to subscribers (receivers) by categorizing them into topics or classes, without knowing the specifics of any single subscriber.
On the other side, the subscriber subscribes to a specific class or topic, and receives the messages associated with that topic published by the publisher, without knowing any details about the publisher.
This system provides greater network scalability and can be used in several applications, like streaming analytics or data integration pipelines to ingest and distribute data.
In this guide, I will briefly discuss how you can implement a pub/sub service in Go using Go Patterns. We will be implementing a pub/sub service using in-process communication between several Goroutines over the channel; because we will be using concurrent programming, channels help communicate between independently running Goroutines.
Throughout this guide, we will be following the below file structure. We have created a new package named pubsub
and a module called main.go
where we will run the crypto price example:
├── main.go └── pubsub ├── broker.go ├── go.mod ├── go.sum ├── message.go └── subscriber.go
Let’s now begin with a simple implementation. First, let’s start by discussing message structure. Here, each message object can have multiple attributes, including the topic and message body:
type Message struct { topic string body string }
Next, let’s talk about subscribers. Subscriber
includes a unique identifier string for a map (we will discuss this later on). One important attribute that it holds is a channel of messages. The publisher pushes the messages to this channel, via the signal()
method:
type Subscriber struct { id string // id of subscriber messages chan* Message // messages channel topics map[string]bool // topics it is subscribed to. active bool // if given subscriber is active mutex sync.RWMutex // lock }
And lastly, the Broker
structure consists of all the subscribers, and a map of topics for the subscribers to subscribe to:
type Broker struct { subscribers Subscribers // map of subscribers id:Subscriber topics map[string]Subscribers // map of topic to subscribers mut sync.RWMutex // mutex lock }
The Subscribe
method above subscribes a given topic to a given subscriber. This does so by adding a topic to Subscriber
, then adding an entry into broker topics with a subscriber ID:
func (b *Broker) Subscribe(s *Subscriber, topic string) { b.mut.Lock() defer b.mut.Unlock() if b.topics[topic] == nil { b.topics[topic] = Subscribers{} } s.AddTopic(topic) b.topics\[topic\][s.id] = s }
In the code above, the Publisher
method publishes the given message to a given topic. This works by creating a new message object, then pushing it to all subscriber channels that have subscribed to the topic.
Messages can be pushed using the signal()
method, like so:
func (b *Broker) Publish(topic string, msg string) { // publish the message to given topic. b.mut.RLock() bTopics := b.topics[topic] b.mut.RUnlock() for _, s := range bTopics { m:= NewMessage(msg, topic) if !s.active{ return } go (func(s *Subscriber){ s.Signal(m) })(s) } }
The Unsubscribe
method unsubscribes a subscriber from a given topic. The unsubscribing process deletes the subscriber ID from the specific topic map, and then removes the topic from the list of topics for that subscriber:
func (b *Broker) Unsubscribe(s *Subscriber, topic string) { b.mut.RLock() defer b.mut.RUnlock() delete(b.topics[topic], s.id) s.RemoveTopic(topic) }
The signal
method pushes the messages to the message channel. Before pushing to a channel, it checks if the channel is active or closed:
func (s *Subscriber)Signal(msg *Message) () { // Gets the message from the channel s.mutex.RLock() defer s.mutex.RUnlock() if s.active{ s.messages <- msg } }
RemoveSubscriber
removes the given subscriber from the broker. It does so by unsubscribing the subscriber from all topics to which they have subscribed and deleting the subscriber from the main subscriber list:
func (b *Broker)RemoveSubscriber(s *Subscriber)(){ for topic := range(s.topics){ b.Unsubscribe(s, topic) } b.mut.Lock() delete(b.subscribers, s.id) b.mut.Unlock() s.Destruct() }
The Destruct
method of subscriber sets the active as false, which means it closes the message channel once we are done sending. This is important in Go because it aims to clean the resources after the job is done:
func (s *Subscriber)Destruct() { // destructor for subscriber. s.mutex.RLock() defer s.mutex.RUnlock() s.active = false close(s.messages) }
Note that Go follows several distinctive approaches, one of which is “Don’t communicate by sharing memory, share memory by communicating”. But Go is pragmatic language, so when we have a shared data structure accessed by multiple Goroutines, it is fine to lock for the sake of protected access.
Now that you have some understanding of a few of the important snippets, let’s discuss the final complete code.
Start with pubsub/message.go
. In this module, the message structure is defined along with some useful methods such as NewMessage(msg string, topic string) (* Message)
, which creates a new message object and returns it.
GetTopic() string
returns the topic for given message object, and GetMessageBody() string
returns the message body of given message object:
package pubsub type Message struct { topic string body string } func NewMessage(msg string, topic string) (* Message) { // Returns the message object return &Message{ topic: topic, body: msg, } } func (m *Message) GetTopic() string { // returns the topic of the message return m.topic } func (m *Message) GetMessageBody() string { // returns the message body. return m.body }
Next is pubsub/subscriber.go
. In this module, the subscriber and its helpful methods are defined:
CreateNewSubscriber() (string, *Subscriber)
returns a new Subscriber
objectAddTopic(topic string)
adds the given topic to the subscriberRemoveTopic(topic string)
removes the given topic from the subscriberGetTopics()([]string)
returns the list of topics that the subscriber has subscribed toListen()
method listens to the subscriber’s message channel and prints the messagepackage pubsub import ( "crypto/rand" "fmt" "log" "sync" ) type Subscriber struct { id string // id of subscriber messages chan* Message // messages channel topics map[string]bool // topics it is subscribed to. active bool // if given subscriber is active mutex sync.RWMutex // lock } func CreateNewSubscriber() (string, *Subscriber) { // returns a new subscriber. b := make([]byte, 8) _, err := rand.Read(b) if err != nil { log.Fatal(err) } id := fmt.Sprintf("%X-%X", b[0:4], b[4:8]) return id, &Subscriber{ id: id, messages: make(chan *Message), topics: map[string]bool{}, active: true, } } func (s * Subscriber)AddTopic(topic string)(){ // add topic to the subscriber s.mutex.RLock() defer s.mutex.RUnlock() s.topics[topic] = true } func (s * Subscriber)RemoveTopic(topic string)(){ // remove topic to the subscriber s.mutex.RLock() defer s.mutex.RUnlock() delete(s.topics, topic) } func (s * Subscriber)GetTopics()([]string){ // Get all topic of the subscriber s.mutex.RLock() defer s.mutex.RUnlock() topics := []string{} for topic, _ := range s.topics { topics = append(topics, topic) } return topics } func (s *Subscriber)Destruct() { // destructor for subscriber. s.mutex.RLock() defer s.mutex.RUnlock() s.active = false close(s.messages) } func (s *Subscriber)Signal(msg *Message) () { // Gets the message from the channel s.mutex.RLock() defer s.mutex.RUnlock() if s.active{ s.messages <- msg } } func (s *Subscriber)Listen() { // Listens to the message channel, prints once received. for { if msg, ok := <- s.messages; ok { fmt.Printf("Subscriber %s, received: %s from topic: %s\n", s.id, msg.GetMessageBody(), msg.GetTopic()) } } }
Lastly, we have pubsub/broker.go
, where the broker and its methods are defined:
NewBroker() (*Broker)
returns a new broker objectAddSubscriber()(*Subscriber)
adds a given new subscriber to the brokerRemoveSubscriber(s *Subscriber)()
removes the given subscriber from the brokerBroadcast(msg string, topics []string)
broadcasts the given message to the given list of topicsGetSubscribers(topic string) int
returns the number of subscribers subscribed to the given topicpackage pubsub import ( "fmt" "sync” ) type Subscribers map[string]*Subscriber type Broker struct { subscribers Subscribers // map of subscribers id:Subscriber topics map[string]Subscribers // map of topic to subscribers mut sync.RWMutex // mutex lock } func NewBroker() (*Broker){ // returns new broker object return &Broker{ subscribers: Subscribers{}, topics: map[string]Subscribers{}, } } func (b *Broker)AddSubscriber()(*Subscriber){ // Add subscriber to the broker. b.mut.Lock() defer b.mut.Unlock() id, s := CreateNewSubscriber() b.subscribers[id] = s; return s } func (b *Broker)RemoveSubscriber(s *Subscriber)(){ // remove subscriber to the broker. //unsubscribe to all topics which s is subscribed to. for topic := range(s.topics){ b.Unsubscribe(s, topic) } b.mut.Lock() // remove subscriber from list of subscribers. delete(b.subscribers, s.id) b.mut.Unlock() s.Destruct() } func (b *Broker)Broadcast(msg string, topics []string){ // broadcast message to all topics. for _, topic:=range(topics) { for _, s := range(b.topics[topic]){ m:= NewMessage(msg, topic) go (func(s *Subscriber){ s.Signal(m) })(s) } } } func (b *Broker) GetSubscribers(topic string) int { // get total subscribers subscribed to given topic. b.mut.RLock() defer b.mut.RUnlock() return len(b.topics[topic]) } func (b *Broker) Subscribe(s *Subscriber, topic string) { // subscribe to given topic b.mut.Lock() defer b.mut.Unlock() if b.topics[topic] == nil { b.topics[topic] = Subscribers{} } s.AddTopic(topic) b.topics[topic][s.id] = s fmt.Printf("%s Subscribed for topic: %s\n", s.id, topic) } func (b *Broker) Unsubscribe(s *Subscriber, topic string) { // unsubscribe to given topic b.mut.RLock() defer b.mut.RUnlock() delete(b.topics[topic], s.id) s.RemoveTopic(topic) fmt.Printf("%s Unsubscribed for topic: %s\n", s.id, topic) } func (b *Broker) Publish(topic string, msg string) { // publish the message to given topic. b.mut.RLock() bTopics := b.topics[topic] b.mut.RUnlock() for _, s := range bTopics { m:= NewMessage(msg, topic) if !s.active{ return } go (func(s *Subscriber){ s.Signal(m) })(s) } }
Now that our pub/sub service is created, let’s use it as an example of getting the price update of cryptocurrencies. In this example, the publisher publishes the price value of cryptocurrencies, and whoever subscribes to a particular cryptocurrency shall receive the price update.
In this, the publisher randomly generates the price value of each cryptocurrency and publishes them with their respective topic names (the name of the cryptocurrency). Next, there are two subscribers who have subscribed to a set of crypto topics, so these respective subscribers receive the messages once they are available, and display them in the console:
package main import ( "fmt" "math/rand” "time" "./pubsub" ) // available topics var availableTopics = map[string]string{ "BTC": "BITCOIN", "ETH": "ETHEREUM", "DOT": "POLKADOT", "SOL": "SOLANA", } func pricePublisher(broker *pubsub.Broker)(){ topicKeys := make([]string, 0, len(availableTopics)) topicValues := make([]string, 0, len(availableTopics)) for k, v := range availableTopics { topicKeys = append(topicKeys, k) topicValues = append(topicValues, v) } for { randValue := topicValues[rand.Intn(len(topicValues))] // all topic values. msg:= fmt.Sprintf("%f", rand.Float64()) // fmt.Printf("Publishing %s to %s topic\n", msg, randKey) go broker.Publish(randValue, msg) // Uncomment if you want to broadcast to all topics. // go broker.Broadcast(msg, topicValues) r := rand.Intn(4) time.Sleep(time.Duration(r) * time.Second) //sleep for random secs. } } func main(){ // construct new broker. broker := pubsub.NewBroker() // create new subscriber s1 := broker.AddSubscriber() // subscribe BTC and ETH to s1. broker.Subscribe(s1, availableTopics["BTC"]) broker.Subscribe(s1, availableTopics["ETH"]) // create new subscriber s2 := broker.AddSubscriber() // subscribe ETH and SOL to s2. broker.Subscribe(s2, availableTopics["ETH"]) broker.Subscribe(s2, availableTopics["SOL"]) go (func(){ // sleep for 5 sec, and then subscribe for topic DOT for s2 time.Sleep(3*time.Second) broker.Subscribe(s2, availableTopics["DOT"]) })() go (func(){ // s;eep for 5 sec, and then unsubscribe for topic SOL for s2 time.Sleep(5*time.Second) broker.Unsubscribe(s2, availableTopics["SOL"]) fmt.Printf("Total subscribers for topic ETH is %v\n", broker.GetSubscribers(availableTopics["ETH"])) })() go (func(){ // s;eep for 5 sec, and then unsubscribe for topic SOL for s2 time.Sleep(10*time.Second) broker.RemoveSubscriber(s2) fmt.Printf("Total subscribers for topic ETH is %v\n", broker.GetSubscribers(availableTopics["ETH"])) })() // Concurrently publish the values. go pricePublisher(broker) // Concurrently listens from s1. go s1.Listen() // Concurrently listens from s2. go s2.Listen() // to prevent terminate fmt.Scanln() fmt.Println("Done!") }
The output will look like the following:
❯ GO111MODULE=off go run main.go 208B51C5-1F40B37F Subscribed for topic: BITCOIN 208B51C5-1F40B37F Subscribed for topic: ETHEREUM 60466C8A-3662A48A Subscribed for topic: ETHEREUM 60466C8A-3662A48A Subscribed for topic: SOLANA Subscriber 60466C8A-3662A48A, received: 0.940509 from topic: ETHEREUM Subscriber 208B51C5-1F40B37F, received: 0.940509 from topic: ETHEREUM 60466C8A-3662A48A Subscribed for topic: POLKADOT Subscriber 60466C8A-3662A48A, received: 0.424637 from topic: SOLANA 60466C8A-3662A48A Unsubscribed for topic: SOLANA Total subscribers for topic ETH is 2 Subscriber 208B51C5-1F40B37F, received: 0.515213 from topic: BITCOIN Subscriber 60466C8A-3662A48A, received: 0.156519 from topic: ETHEREUM Subscriber 208B51C5-1F40B37F, received: 0.156519 from topic: ETHEREUM Subscriber 60466C8A-3662A48A, received: 0.283034 from topic: POLKADOT Subscriber 60466C8A-3662A48A, received: 0.380657 from topic: POLKADOT Subscriber 60466C8A-3662A48A, received: 0.218553 from topic: ETHEREUM Subscriber 208B51C5-1F40B37F, received: 0.218553 from topic: ETHEREUM 60466C8A-3662A48A Unsubscribed for topic: ETHEREUM 60466C8A-3662A48A Unsubscribed for topic: POLKADOT Total subscribers for topic ETH is 1 Subscriber 208B51C5-1F40B37F, received: 0.865335 from topic: BITCOIN Subscriber 208B51C5-1F40B37F, received: 0.028303 from topic: ETHEREUM Subscriber 208B51C5-1F40B37F, received: 0.059121 from topic: ETHEREUM
In the guide, we have discussed and demonstrated some of the design choices of a pub/sub service in Go using Goroutines and channels. However, this implementation uses an in-process communication between multiple Goroutines over channels, which is different from a distributed pub/sub service. The distributed service requires a sophisticated method for fault tolerance.
The full code for this tutorial can be found here. Happy coding!
Install LogRocket via npm or script tag. LogRocket.init()
must be called client-side, not
server-side
$ npm i --save logrocket // Code: import LogRocket from 'logrocket'; LogRocket.init('app/id');
// Add to your HTML: <script src="https://cdn.lr-ingest.com/LogRocket.min.js"></script> <script>window.LogRocket && window.LogRocket.init('app/id');</script>
Hey there, want to help make our blog better?
Join LogRocket’s Content Advisory Board. You’ll help inform the type of content we create and get access to exclusive meetups, social accreditation, and swag.
Sign up nowBuild scalable admin dashboards with Filament and Laravel using Form Builder, Notifications, and Actions for clean, interactive panels.
Break down the parts of a URL and explore APIs for working with them in JavaScript, parsing them, building query strings, checking their validity, etc.
In this guide, explore lazy loading and error loading as two techniques for fetching data in React apps.
Deno is a popular JavaScript runtime, and it recently launched version 2.0 with several new features, bug fixes, and improvements […]
3 Replies to "Building a pub/sub service in Go"
pubsub/subscriber.go is missing
the same content of pubsub/message.go is present in pubsub/subscriber.go code section
Thanks for pointing this out! Should be fixed now.
thanks for the post, the link to go-patterns is broken
https://github.com/tmrts/go-patterns