
Go-Lang: PUB/SUB implementation using REDIS
Simple Go implementation to use Redis for Pub/Sub
9 December, 2022
0
0
0
Contributors
This blog will give you steps on how to implement a pub/sub using Redis.
Let's first see what PUB/SUB is ! 🙌
•
Publish/subscribe messaging, or pub/sub messaging, is a form of
asynchronous service-to-service communication used in server-less and
micro-services architectures.
•
In a pub/sub model, any message published to a topic is immediately received by all of the subscribers to the topic.
•
Pub/sub messaging can be used to enable event-driven architectures, or to decouple applications in order to increase performance, reliability and scalability.
Current Implementation using Redis

PUB/SUB based on our implementation
•
Redis will act as the middleman here.
•
Redis can be used as a Publisher/Subscriber platform.
•
Publishers can issue messages to any number of subscribers on a channel.
•
These messages are fire-and-forget, in that if a message is published and no subscribers exists, the message evaporates and cannot be recovered.
Prerequisites 🤝
Coding
•
Go - Language of choice. Basic understanding of Go will be enough to understand how the code works. Also I'll be going through the steps on how to implement it.
•
Redis - Using Redis for PUB/SUB
•
Docker - Used to run Redis container, I will add docker-compose.yml file for reference
Utilities required
•
Install Go, Please refer official website below
•
Install Docker desktop, Please refer official website below
Setting Redis Locally
Docker is used to setup Redis locally for the implementation. The docker container will run Redis and as we exposing the default ports, it will act as if we have Redis locally installed.
Note: Please run docker desktop for docker-compose to work
Above is the `docker-compose.yml` code to run the Redis docker container. ( Please note the type of file is "*.yml" or "*.yaml" ( Showcase has no .yml syntaxing [Not js code 🙅♂️])
Also if you see ${REDIS_PORT}, it will fetch the port number from environment file
Execute the file from command-line using,
`.env` refers to environment file with filename .env in root directory where the command is executed.
Please create the file .env in same directory as docker-compose.yml file and add configuration as in below snippet
Great with this Redis is running locally!
Setting up up the Go Code-base for PUB/SUB functionality 🏃♂️
As seen in the above diagram we have 2 services publisher and subscriber with Redis in between. For reduced complexity and code we will creating API's to publish and get logs from files to which the subscribed messages are written. A single go project i.e. a single module will handle all the cases.
Then lets start with bulk of the development,
Create go module, using command -
Please refer official document if its you're first time since it can help you understand the code base better
First and foremost, let's install our project dependencies:
•
Gorilla MUX - Go Package used to map requests to handler functions
•
Redigo Redis - Go package used to perform operations as of a Redis client
•
Viper - Viper is a complete configuration solution, used in project to load .env configurations
Please do read the official docs linked below to learn more about the package as we will be using only some of its functionalities
Let create main.go file which would be the entry point of the application in go.
So to do that use the commands from terminal to create folders in the given structure. You should already be in the directory level as the go.mod file
Now lets start setting up out API server to handle GET & POST requests.
•
Get request will get all the subscriber messages which are written in log file
•
Post request will publish a message
The given function above is called from main function in the main package as its is the initial point of execution
Now we need to define functions GetQueueRecords and PostQueueRecords
Now lets solve our problems one by one. So we need to save logs after subscription into file and also read from it on the get API call.
So we should create functions to do so, but it should be done not in the main package but in the different package. Lets create a helper package for all additional functionalities required.
Setting Up Helpers package 🫱🏻
Since all files are under helper you can consider each specific go file will contain helper functions related to it.
`config.go` : Contains way to access configuration although it not needed we could hard code configuration, i went with the best approach of loading configurations via `.env` file using go package viper
Reads configuration from .env file and finds the value for the key passed which is in root level.
`file.go` : Takes care of creating, opening and removing file. Basically file operations using inbuilt `os` package. Here we are writing to a file `log.log`.
`message.go` : Deals with the message being passed via the channel and processing of message which includes saving the message to file and retrieving it.
`redis.go` : Takes care of creating Redis connection, Publishing and subscribing to redis channel
The publish and subscription is being done on the same channel "get-queue-data". Once subscribed the service will listen on the subscription to receive the messages. All the messages are following the struct `MsgType` as defined in helper file `message.go`.
Updated logic in main.go 🫡
Now we need to use the all these functions from main.go. Since we are publishing and subscribing to same Redis and from a single service we need to create two connections. One for publishing and another for subscribing.
A lot of changes were made to main.go as you can see above. Lets go through each change.
•
So the first noticeable change would be the use of `RestAPI` struct to hold subscriber and publisher Redis connections. Why there was need for this approach was because the functions that require those connections are the handler functions which are inside `createServer` function and also helper function.
•
After creating the struct, `createServer` function was moved in as the structure function for `RestAPI`.
•
Along with `createServer` the handler function was also added as struct functions since they also depended on the structure values for processing.
•
Everything else links back to helper functions which is already covered earlier.
•
Now coming to the main.go execution. The log file is getting created on execution start and deleted when stopped via the defer call at the end.
Note: A defer statement defers the execution of a function until the surrounding function returns. Meaning it executes just before control is given back from the function. i.e. executed at very end.
•
Checking the `createServer` function call, you can see its run as a go routine, which makes in non blocking and it runs on its own thread.
Note: A goroutine is a function that executes simultaneously with other goroutines in a program and are lightweight threads managed by Go.
•
Go routine communication is via channels, as there was no need for communication and processing the current code does not utilize it. The `http server` served is supposed to run without stopping
•
The last line before defer triggers the subscription on channel and waits for messages send over the channel and writes it to file.
Setup compete lets run it 🏇
You can run the code in two ways
•
First would be to run via the go run command. The `PWD` (Present Working Directory) of the terminal should be as of main.go file
•
Second method would be to run after creating binaries and executing it
For Further Details Refer ✌️
I'll have the working code in my GitHub repository. Please have a look if you find anything not working.
Future Improvement ⌛️
This Go project is a part of of my main distributed system project. The goal of the project is to reliably implement asynchronous fan out (broadcast) to multiple subscribers. You can flow the above GitHub link to check for code changes and the updates will be posted as new shows.
•
Once implemented locally, I will be focusing on the use of cloud services to make it scalable.
•
Pipeline implementations for tests and creation of docker images.
•
Auto-deployment to cloud services