The Replay is a weekly newsletter for dev and engineering leaders.
Delivered once a week, it's your curated guide to the most important conversations around frontend dev, emerging AI tools, and the state of modern software.
A pub/sub or publish-subscribe service is a messaging pattern in which the publisher (sender) sends messages to subscribers (receivers) by categorizing them into topics or classes, without knowing the specifics of any single subscriber.
On the other side, the subscriber subscribes to a specific class or topic, and receives the messages associated with that topic published by the publisher, without knowing any details about the publisher.
This system provides greater network scalability and can be used in several applications, like streaming analytics or data integration pipelines to ingest and distribute data.
In this guide, I will briefly discuss how you can implement a pub/sub service in Go using Go Patterns. We will be implementing a pub/sub service using in-process communication between several Goroutines over the channel; because we will be using concurrent programming, channels help communicate between independently running Goroutines.
Throughout this guide, we will be following the below file structure. We have created a new package named pubsub and a module called main.go where we will run the crypto price example:
├── main.go
└── pubsub
├── broker.go
├── go.mod
├── go.sum
├── message.go
└── subscriber.go
Let’s now begin with a simple implementation. First, let’s start by discussing message structure. Here, each message object can have multiple attributes, including the topic and message body:
type Message struct {
topic string
body string
}
Next, let’s talk about subscribers. Subscriber includes a unique identifier string for a map (we will discuss this later on). One important attribute that it holds is a channel of messages. The publisher pushes the messages to this channel, via the signal() method:
type Subscriber struct {
id string // id of subscriber
messages chan* Message // messages channel
topics map[string]bool // topics it is subscribed to.
active bool // if given subscriber is active
mutex sync.RWMutex // lock
}
And lastly, the Broker structure consists of all the subscribers, and a map of topics for the subscribers to subscribe to:
type Broker struct {
subscribers Subscribers // map of subscribers id:Subscriber
topics map[string]Subscribers // map of topic to subscribers
mut sync.RWMutex // mutex lock
}
The Subscribe method above subscribes a given topic to a given subscriber. This does so by adding a topic to Subscriber, then adding an entry into broker topics with a subscriber ID:
func (b *Broker) Subscribe(s *Subscriber, topic string) {
b.mut.Lock()
defer b.mut.Unlock()
if b.topics[topic] == nil {
b.topics[topic] = Subscribers{}
}
s.AddTopic(topic)
b.topics\[topic\][s.id] = s
}
In the code above, the Publisher method publishes the given message to a given topic. This works by creating a new message object, then pushing it to all subscriber channels that have subscribed to the topic.
Messages can be pushed using the signal() method, like so:
func (b *Broker) Publish(topic string, msg string) {
// publish the message to given topic.
b.mut.RLock()
bTopics := b.topics[topic]
b.mut.RUnlock()
for _, s := range bTopics {
m:= NewMessage(msg, topic)
if !s.active{
return
}
go (func(s *Subscriber){
s.Signal(m)
})(s)
}
}
The Unsubscribe method unsubscribes a subscriber from a given topic. The unsubscribing process deletes the subscriber ID from the specific topic map, and then removes the topic from the list of topics for that subscriber:
func (b *Broker) Unsubscribe(s *Subscriber, topic string) {
b.mut.RLock()
defer b.mut.RUnlock()
delete(b.topics[topic], s.id)
s.RemoveTopic(topic)
}
The signal method pushes the messages to the message channel. Before pushing to a channel, it checks if the channel is active or closed:
func (s *Subscriber)Signal(msg *Message) () {
// Gets the message from the channel
s.mutex.RLock()
defer s.mutex.RUnlock()
if s.active{
s.messages <- msg
}
}
RemoveSubscriber removes the given subscriber from the broker. It does so by unsubscribing the subscriber from all topics to which they have subscribed and deleting the subscriber from the main subscriber list:
func (b *Broker)RemoveSubscriber(s *Subscriber)(){
for topic := range(s.topics){
b.Unsubscribe(s, topic)
}
b.mut.Lock()
delete(b.subscribers, s.id)
b.mut.Unlock()
s.Destruct()
}
The Destruct method of subscriber sets the active as false, which means it closes the message channel once we are done sending. This is important in Go because it aims to clean the resources after the job is done:
func (s *Subscriber)Destruct() {
// destructor for subscriber.
s.mutex.RLock()
defer s.mutex.RUnlock()
s.active = false
close(s.messages)
}
Note that Go follows several distinctive approaches, one of which is “Don’t communicate by sharing memory, share memory by communicating”. But Go is pragmatic language, so when we have a shared data structure accessed by multiple Goroutines, it is fine to lock for the sake of protected access.
Now that you have some understanding of a few of the important snippets, let’s discuss the final complete code.
Start with pubsub/message.go. In this module, the message structure is defined along with some useful methods such as NewMessage(msg string, topic string) (* Message), which creates a new message object and returns it.
GetTopic() string returns the topic for given message object, and GetMessageBody() string returns the message body of given message object:
package pubsub
type Message struct {
topic string
body string
}
func NewMessage(msg string, topic string) (* Message) {
// Returns the message object
return &Message{
topic: topic,
body: msg,
}
}
func (m *Message) GetTopic() string {
// returns the topic of the message
return m.topic
}
func (m *Message) GetMessageBody() string {
// returns the message body.
return m.body
}
Next is pubsub/subscriber.go. In this module, the subscriber and its helpful methods are defined:
CreateNewSubscriber() (string, *Subscriber) returns a new Subscriber objectAddTopic(topic string) adds the given topic to the subscriberRemoveTopic(topic string) removes the given topic from the subscriberGetTopics()([]string) returns the list of topics that the subscriber has subscribed toListen() method listens to the subscriber’s message channel and prints the messagepackage pubsub
import (
"crypto/rand"
"fmt"
"log"
"sync"
)
type Subscriber struct {
id string // id of subscriber
messages chan* Message // messages channel
topics map[string]bool // topics it is subscribed to.
active bool // if given subscriber is active
mutex sync.RWMutex // lock
}
func CreateNewSubscriber() (string, *Subscriber) {
// returns a new subscriber.
b := make([]byte, 8)
_, err := rand.Read(b)
if err != nil {
log.Fatal(err)
}
id := fmt.Sprintf("%X-%X", b[0:4], b[4:8])
return id, &Subscriber{
id: id,
messages: make(chan *Message),
topics: map[string]bool{},
active: true,
}
}
func (s * Subscriber)AddTopic(topic string)(){
// add topic to the subscriber
s.mutex.RLock()
defer s.mutex.RUnlock()
s.topics[topic] = true
}
func (s * Subscriber)RemoveTopic(topic string)(){
// remove topic to the subscriber
s.mutex.RLock()
defer s.mutex.RUnlock()
delete(s.topics, topic)
}
func (s * Subscriber)GetTopics()([]string){
// Get all topic of the subscriber
s.mutex.RLock()
defer s.mutex.RUnlock()
topics := []string{}
for topic, _ := range s.topics {
topics = append(topics, topic)
}
return topics
}
func (s *Subscriber)Destruct() {
// destructor for subscriber.
s.mutex.RLock()
defer s.mutex.RUnlock()
s.active = false
close(s.messages)
}
func (s *Subscriber)Signal(msg *Message) () {
// Gets the message from the channel
s.mutex.RLock()
defer s.mutex.RUnlock()
if s.active{
s.messages <- msg
}
}
func (s *Subscriber)Listen() {
// Listens to the message channel, prints once received.
for {
if msg, ok := <- s.messages; ok {
fmt.Printf("Subscriber %s, received: %s from topic: %s\n", s.id, msg.GetMessageBody(), msg.GetTopic())
}
}
}
Lastly, we have pubsub/broker.go, where the broker and its methods are defined:
NewBroker() (*Broker) returns a new broker objectAddSubscriber()(*Subscriber) adds a given new subscriber to the brokerRemoveSubscriber(s *Subscriber)() removes the given subscriber from the brokerBroadcast(msg string, topics []string) broadcasts the given message to the given list of topicsGetSubscribers(topic string) int returns the number of subscribers subscribed to the given topicpackage pubsub
import (
"fmt"
"sync”
)
type Subscribers map[string]*Subscriber
type Broker struct {
subscribers Subscribers // map of subscribers id:Subscriber
topics map[string]Subscribers // map of topic to subscribers
mut sync.RWMutex // mutex lock
}
func NewBroker() (*Broker){
// returns new broker object
return &Broker{
subscribers: Subscribers{},
topics: map[string]Subscribers{},
}
}
func (b *Broker)AddSubscriber()(*Subscriber){
// Add subscriber to the broker.
b.mut.Lock()
defer b.mut.Unlock()
id, s := CreateNewSubscriber()
b.subscribers[id] = s;
return s
}
func (b *Broker)RemoveSubscriber(s *Subscriber)(){
// remove subscriber to the broker.
//unsubscribe to all topics which s is subscribed to.
for topic := range(s.topics){
b.Unsubscribe(s, topic)
}
b.mut.Lock()
// remove subscriber from list of subscribers.
delete(b.subscribers, s.id)
b.mut.Unlock()
s.Destruct()
}
func (b *Broker)Broadcast(msg string, topics []string){
// broadcast message to all topics.
for _, topic:=range(topics) {
for _, s := range(b.topics[topic]){
m:= NewMessage(msg, topic)
go (func(s *Subscriber){
s.Signal(m)
})(s)
}
}
}
func (b *Broker) GetSubscribers(topic string) int {
// get total subscribers subscribed to given topic.
b.mut.RLock()
defer b.mut.RUnlock()
return len(b.topics[topic])
}
func (b *Broker) Subscribe(s *Subscriber, topic string) {
// subscribe to given topic
b.mut.Lock()
defer b.mut.Unlock()
if b.topics[topic] == nil {
b.topics[topic] = Subscribers{}
}
s.AddTopic(topic)
b.topics[topic][s.id] = s
fmt.Printf("%s Subscribed for topic: %s\n", s.id, topic)
}
func (b *Broker) Unsubscribe(s *Subscriber, topic string) {
// unsubscribe to given topic
b.mut.RLock()
defer b.mut.RUnlock()
delete(b.topics[topic], s.id)
s.RemoveTopic(topic)
fmt.Printf("%s Unsubscribed for topic: %s\n", s.id, topic)
}
func (b *Broker) Publish(topic string, msg string) {
// publish the message to given topic.
b.mut.RLock()
bTopics := b.topics[topic]
b.mut.RUnlock()
for _, s := range bTopics {
m:= NewMessage(msg, topic)
if !s.active{
return
}
go (func(s *Subscriber){
s.Signal(m)
})(s)
}
}
Now that our pub/sub service is created, let’s use it as an example of getting the price update of cryptocurrencies. In this example, the publisher publishes the price value of cryptocurrencies, and whoever subscribes to a particular cryptocurrency shall receive the price update.
In this, the publisher randomly generates the price value of each cryptocurrency and publishes them with their respective topic names (the name of the cryptocurrency). Next, there are two subscribers who have subscribed to a set of crypto topics, so these respective subscribers receive the messages once they are available, and display them in the console:
package main
import (
"fmt"
"math/rand”
"time"
"./pubsub"
)
// available topics
var availableTopics = map[string]string{
"BTC": "BITCOIN",
"ETH": "ETHEREUM",
"DOT": "POLKADOT",
"SOL": "SOLANA",
}
func pricePublisher(broker *pubsub.Broker)(){
topicKeys := make([]string, 0, len(availableTopics))
topicValues := make([]string, 0, len(availableTopics))
for k, v := range availableTopics {
topicKeys = append(topicKeys, k)
topicValues = append(topicValues, v)
}
for {
randValue := topicValues[rand.Intn(len(topicValues))] // all topic values.
msg:= fmt.Sprintf("%f", rand.Float64())
// fmt.Printf("Publishing %s to %s topic\n", msg, randKey)
go broker.Publish(randValue, msg)
// Uncomment if you want to broadcast to all topics.
// go broker.Broadcast(msg, topicValues)
r := rand.Intn(4)
time.Sleep(time.Duration(r) * time.Second) //sleep for random secs.
}
}
func main(){
// construct new broker.
broker := pubsub.NewBroker()
// create new subscriber
s1 := broker.AddSubscriber()
// subscribe BTC and ETH to s1.
broker.Subscribe(s1, availableTopics["BTC"])
broker.Subscribe(s1, availableTopics["ETH"])
// create new subscriber
s2 := broker.AddSubscriber()
// subscribe ETH and SOL to s2.
broker.Subscribe(s2, availableTopics["ETH"])
broker.Subscribe(s2, availableTopics["SOL"])
go (func(){
// sleep for 5 sec, and then subscribe for topic DOT for s2
time.Sleep(3*time.Second)
broker.Subscribe(s2, availableTopics["DOT"])
})()
go (func(){
// s;eep for 5 sec, and then unsubscribe for topic SOL for s2
time.Sleep(5*time.Second)
broker.Unsubscribe(s2, availableTopics["SOL"])
fmt.Printf("Total subscribers for topic ETH is %v\n", broker.GetSubscribers(availableTopics["ETH"]))
})()
go (func(){
// s;eep for 5 sec, and then unsubscribe for topic SOL for s2
time.Sleep(10*time.Second)
broker.RemoveSubscriber(s2)
fmt.Printf("Total subscribers for topic ETH is %v\n", broker.GetSubscribers(availableTopics["ETH"]))
})()
// Concurrently publish the values.
go pricePublisher(broker)
// Concurrently listens from s1.
go s1.Listen()
// Concurrently listens from s2.
go s2.Listen()
// to prevent terminate
fmt.Scanln()
fmt.Println("Done!")
}
The output will look like the following:
❯ GO111MODULE=off go run main.go 208B51C5-1F40B37F Subscribed for topic: BITCOIN 208B51C5-1F40B37F Subscribed for topic: ETHEREUM 60466C8A-3662A48A Subscribed for topic: ETHEREUM 60466C8A-3662A48A Subscribed for topic: SOLANA Subscriber 60466C8A-3662A48A, received: 0.940509 from topic: ETHEREUM Subscriber 208B51C5-1F40B37F, received: 0.940509 from topic: ETHEREUM 60466C8A-3662A48A Subscribed for topic: POLKADOT Subscriber 60466C8A-3662A48A, received: 0.424637 from topic: SOLANA 60466C8A-3662A48A Unsubscribed for topic: SOLANA Total subscribers for topic ETH is 2 Subscriber 208B51C5-1F40B37F, received: 0.515213 from topic: BITCOIN Subscriber 60466C8A-3662A48A, received: 0.156519 from topic: ETHEREUM Subscriber 208B51C5-1F40B37F, received: 0.156519 from topic: ETHEREUM Subscriber 60466C8A-3662A48A, received: 0.283034 from topic: POLKADOT Subscriber 60466C8A-3662A48A, received: 0.380657 from topic: POLKADOT Subscriber 60466C8A-3662A48A, received: 0.218553 from topic: ETHEREUM Subscriber 208B51C5-1F40B37F, received: 0.218553 from topic: ETHEREUM 60466C8A-3662A48A Unsubscribed for topic: ETHEREUM 60466C8A-3662A48A Unsubscribed for topic: POLKADOT Total subscribers for topic ETH is 1 Subscriber 208B51C5-1F40B37F, received: 0.865335 from topic: BITCOIN Subscriber 208B51C5-1F40B37F, received: 0.028303 from topic: ETHEREUM Subscriber 208B51C5-1F40B37F, received: 0.059121 from topic: ETHEREUM
In the guide, we have discussed and demonstrated some of the design choices of a pub/sub service in Go using Goroutines and channels. However, this implementation uses an in-process communication between multiple Goroutines over channels, which is different from a distributed pub/sub service. The distributed service requires a sophisticated method for fault tolerance.
The full code for this tutorial can be found here. Happy coding!
Install LogRocket via npm or script tag. LogRocket.init() must be called client-side, not
server-side
$ npm i --save logrocket
// Code:
import LogRocket from 'logrocket';
LogRocket.init('app/id');
// Add to your HTML:
<script src="https://cdn.lr-ingest.com/LogRocket.min.js"></script>
<script>window.LogRocket && window.LogRocket.init('app/id');</script>

line-clamp to trim lines of textMaster the CSS line-clamp property. Learn how to truncate text lines, ensure cross-browser compatibility, and avoid hidden UX pitfalls when designing modern web layouts.

Discover seven custom React Hooks that will simplify your web development process and make you a faster, better, more efficient developer.

Promise.all still relevant in 2025?In 2025, async JavaScript looks very different. With tools like Promise.any, Promise.allSettled, and Array.fromAsync, many developers wonder if Promise.all is still worth it. The short answer is yes — but only if you know when and why to use it.

Discover what’s new in The Replay, LogRocket’s newsletter for dev and engineering leaders, in the October 29th issue.
Hey there, want to help make our blog better?
Join LogRocket’s Content Advisory Board. You’ll help inform the type of content we create and get access to exclusive meetups, social accreditation, and swag.
Sign up now
3 Replies to "Building a pub/sub service in Go"
pubsub/subscriber.go is missing
the same content of pubsub/message.go is present in pubsub/subscriber.go code section
Thanks for pointing this out! Should be fixed now.
thanks for the post, the link to go-patterns is broken
https://github.com/tmrts/go-patterns