Apache Kafka is an event streaming platform that allows software applications to communicate effectively with each other. It’s an excellent choice for connecting small applications, like microservices, together.
In this article, you’ll learn how to build a microservice using Rust with Kafka. We will cover:
You can see the final project in this GitHub repo.
To follow along with this article, you’ll need Rust installed in your system and basic knowledge of Rust.
Microservices are small, independent applications that perform specific tasks that are useful for both decoupling components of complex applications and for promoting distributed applications.
Think of a microservice as an object on a larger scale. Similar to objects, microservices are independent and can be reused. However, the significant difference between the two is where they return data. While objects get and return data directly, microservices return data into a pool.
Microservices allow developers to build an application with different programming languages. Errors in a microservice rarely cause the overall application to crash.
Kafka is an event streaming platform used for reading and writing data that makes it easy to connect microservices. To learn more about Kafka, check out this Kafka real-time data streaming app tutorial.
Before we go into more depth about the platform, we need to get familiar with some Kafka terms.
To proceed with this article, you will need to set up your Kafka broker. Note that you’ll use a local broker on your system. Aside from local brokers, Kafka provides the options to use brokers either in the Cloud or on a remote system.
To set up Kafka on your system, follow these steps:
First, download the latest Kafka release here and extract the compressed file. After that, open the decompressed folder in your terminal and start the ZooKeeper server with the following command:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Then, open another terminal session in the decompressed folder and start the Kafka broker:
$ bin/kafka-server-start.sh config/server.properties
The Kafka broker requires an active ZooKeeper to function properly. The ZooKeeper maintains the broker, and without it, the broker will generate error messages.
Next, let’s organize published events in the Kafka broker by setting up topics, producers, and consumers.
To create a topic in your local server, run this command:
$ bin/kafka-topics.sh --create --topic topic-name --bootstrap-server localhost:9092
The first command starts up a consumer in the console, and --topic topic-name
tells the consumer to read from topic-name
. --bootstrap-server localhost:9092
tells Kafka to connect to your local server at localhost:9092
.
--from-beginning
tells the consumer to read all events from the earliest published. You can use --offset earliest
instead of --from-beginning
to achieve the same result. If you want the consumer to only listen to new events, use--offset latest
.
Kafka provides a consumer and producer that you can run in the terminal:
# console consumer reading from "topic-name" $ bin/kafka-console-consumer.sh --topic topic-name --from-beginning --bootstrap-server localhost:9092 # console producer publishing to "topic-name" $ bin/kafka-console-producer.sh --topic topic-name --bootstrap-server localhost:9092
The command above starts a producer in the console, and the producer publishes events to topic-name
, specified by --topic topic-name
. The producer is connected to your local server at localhost:9092
and is specified by --bootstrap-server localhost:9092
.
Now that you’ve seen producers and consumers in the console, I’ll show you how to build a Kafka producer and consumer with Rust. Kafka-based microservices built with Rust usually have one or more producers and consumers. Let’s get started!
First, you’ll need to create a new Rust project with the following command:
$ cargo new kafka_consumer
Next, add the Kafka library to Cargo.toml
:
[dependencies] kafka = "0.9"
Now that we’ve added the Kafka library, we can build the project to fetch its dependencies:
$ cargo build
Then paste this code into src/main.rs
:
use kafka::consumer::{Consumer, FetchOffset}; use std::str; fn main () { let hosts = vec!["localhost:9092".to_owned()]; let mut consumer = Consumer::from_hosts(hosts) .with_topic("topic-name".to_owned()) .with_fallback_offset(FetchOffset::Latest) .create() .unwrap(); loop { for ms in consumer.poll().unwrap().iter() { for m in ms.messages() { // If the consumer receives an event, this block is executed println!("{:?}", str::from_utf8(m.value).unwrap()); } consumer.consume_messageset(ms).unwrap(); } consumer.commit_consumed().unwrap(); } }
The consumer we created above reads events published to the topic-name
topic and prints the event’s message to the console. Here’s how it works in more detail:
First, we initialize a list of the hosts that the consumer will connect to on line 6 (in this case, localhost:9092
).
let hosts = vec!["localhost:9092".to_owned()];
Next, from line 8 to line 13, we initialize a consumer that is connected to the topic-name
topic (line 10) and start reading events from the latest published:
let mut consumer = Consumer::from_hosts(hosts) .with_topic("topic-name".to_owned()) .with_fallback_offset(FetchOffset::Latest) .create() .unwrap();
Finally, we create an infinite loop that listens to events published to topic-name
:
loop { for ms in consumer.poll().unwrap().iter() { for m in ms.messages() { // convert message from bytes to string println!("{:?}", str::from_utf8(m.value).unwrap()); } consumer.consume_messageset(ms).unwrap(); } consumer.commit_consumed().unwrap(); }
Now that we’ve successfully created our consumer, let’s look to our next steps for creating our producer.
First, create a new Rust project using this command:
$ cargo new kafka_producer
Then add the Kafka library to Cargo.toml
:
[dependencies] kafka = "0.9"
Build the project to fetch its dependencies:
$ cargo build
Then, paste this code into src/main.rs
:
use kafka::producer::{Producer, Record}; fn main() { let hosts = vec!["localhost:9092".to_owned()]; let mut producer = Producer::from_hosts(hosts) .create() .unwrap(); for i in 0..10 { let buf = format!("{i}"); producer.send(&Record::from_value("topic-name", buf.as_bytes())).unwrap(); println!("Sent: {i}"); } }
The idea is to create a producer that sends numbers 0
through 9
to the topic-name
topic. Here’s how it works piece-by-piece:
First, we define a list of the Kafka servers we are connecting to (localhost:9092
).
let hosts = vec!["localhost:9092".to_owned()];
Notice that the Producer::from_hosts
function initializes the producer. The initialized producer is connected to the local Kafka server (localhost:9092
).
let mut producer = Producer::from_hosts(hosts) .create() .unwrap();
We create a loop that runs from zero to nine and send each number to the topic-name
topic.
for i in 0..10 { let buf = format!("{i}"); producer.send(&Record::from_value("topic-name", buf.as_bytes())).unwrap(); println!("Sent: {i}"); } }
Now that we’ve set up our consumer and producer, let’s run them. The consumer listens to events published in topic-name
, and the producer project publishes events to topic-name.
Having an active consumer before publishing events ensures the events are consumed in real time.
In this section, we will build a microservice to store and delete texts. This microservice is operated by sending events that describe adding or removing text. The event will be a JSON string with an action key to describe its action and a value key to provide necessary data.
The following diagram showcases how the microservice works in adding text:
As you can see in the diagram, our microservice reads operations to perform from the actions topic and then publishes the list of texts to the texts topic. Now that we have some background, let’s build this microservice.
First, let’s create a new project:
$ cargo new texts-microservice
Add the following dependencies to Cargo.toml
using the command below:
[dependencies] kafka = "0.9" serde_json = "1.0.85"
Build the project using the following command:
$ cargo build
After that, we will create modules that each handle the producer, the consumer, and the texts. The file names of these modules are my_consumer.rs
, my_producer.rs
, and texts.rs
, in src
.
Now, follow these steps to build the microservice:
First, we need to initialize the modules, since we are splitting the microservice into modules. So, add this code block to the main.rs
file:
mod my_consumer; mod my_producer; mod texts; use my_consumer::MyConsumer; use my_producer::MyProducer; use texts::Texts;
After that, create the main
function in main.rs
:
fn main() { }
Next, define the Kafka server you are connecting the microservice to in main.rs
’s main
function:
let hosts = vec![ "localhost:9092".to_string() ];
Now, initialize the texts, consumer, and producer modules with the following lines in main.rs
’s main
function:
let mut texts = Texts::new(); let mut consumer = MyConsumer::new( hosts.clone(), "actions".to_string() ); let mut producer = MyProducer::new( hosts );
Next, create a loop that listens for events that the consumer receives:
loop { for ms in consumer.consume_events().iter() { for m in ms.messages() { } consumer.consume_messageset(ms); } consumer.commit_consumed(); }
After that, retrieve the data from the loop in the previous code block with this line:
let event_data = MyConsumer::get_event_data(m);
Now, extract the action to perform on texts from the event data with this line following the previous code block:
let action = event_data["action"].to_string();
After that, use if
, if/else
, and else
blocks to perform the action specified by event_data["action"]
:
if action == "\"add\"" { texts.add_text( event_data["value"].to_string() ); } else if action == "\"remove\"" { let index = event_data["value"].to_string().parse::<usize>().unwrap(); texts.remove_text( index ); } else { println!("Invalid action"); }
Finally, convert the texts to JSON string and send it to the texts topic with the producer at the end of the loop:
producer.send_data_to_topic( "texts", texts.to_json() );
In the end, your main.rs
file should look like this:
mod my_consumer; mod my_producer; mod texts; use my_consumer::MyConsumer; use my_producer::MyProducer; use texts::Texts; fn main() { let hosts = vec![ "localhost:9092".to_string() ]; let mut texts = Texts::new(); let mut consumer = MyConsumer::new( hosts.clone(), "actions".to_string() ); let mut producer = MyProducer::new( hosts ); // put here to show that the microservice has started println!("Started..."); loop { for ms in consumer.consume_events().iter() { for m in ms.messages() { // when the consumer receives an event, this block is executed let event_data = MyConsumer::get_event_data(m); let action = event_data["action"].to_string(); if action == "\"add\"" { texts.add_text( event_data["value"].to_string() ); } else if action == "\"remove\"" { let index = event_data["value"].to_string().parse::<usize>().unwrap(); texts.remove_text( index ); } else { println!("Invalid action"); } producer.send_data_to_topic( "texts", texts.to_json() ); } consumer.consume_messageset(ms); } consumer.commit_consumed(); } }
Now, create a my_consumer.rs
file in src
and paste the following code into it:
use kafka::consumer::{Consumer, FetchOffset, MessageSets, MessageSet, Message}; use std::str; use serde_json::Value; pub struct MyConsumer { consumer: Consumer } impl MyConsumer { pub fn new (hosts: Vec<String>, topic: String) -> Self { Self { consumer: Consumer::from_hosts(hosts) .with_topic(topic) .with_fallback_offset(FetchOffset::Latest) .create() .unwrap() } } pub fn get_event_data (m: &Message) -> Value { let event = str::from_utf8(m.value).unwrap().to_string(); serde_json::from_str(&event).unwrap() } pub fn consume_events(&mut self) -> MessageSets { self.consumer.poll().unwrap() } pub fn consume_messageset(&mut self, ms: MessageSet) { self.consumer.consume_messageset(ms).unwrap(); } pub fn commit_consumed(&mut self) { self.consumer.commit_consumed().unwrap(); } }
Also, create a my_producer.rs
file with this code in it:
use kafka::producer::{Producer, Record}; pub struct MyProducer { producer: Producer } impl MyProducer { pub fn new( hosts: Vec<String> ) -> Self { let producer = Producer::from_hosts( hosts ) .create() .unwrap(); Self { producer: producer } } pub fn send_data_to_topic(&mut self, topic: &str, data: String ) { let record = Record::from_value( topic, data.as_bytes() ); self.producer.send(&record).unwrap(); } }
Finally, create a texts.rs
file and paste the following into it:
use serde_json::json; pub struct Texts { texts: Vec<String> } impl Texts { pub fn new() -> Self { Self { texts: vec![] } } pub fn add_text(&mut self, text: String) { self.texts.push(text); } pub fn remove_text(&mut self, index: usize) { self.texts.remove( index ); } pub fn to_json(&self) -> String { json!(&self.texts).to_string() } }
After building the project, we need to run and test it.
First, create the texts and actions topics:
# create texts topic $ bin/kafka-topics.sh --create --topic texts --bootstrap-server localhost:9092 # create actions topic $ bin/kafka-topics.sh --create --topic actions --bootstrap-server localhost:9092
Next, set up a console consumer to topic texts
:
$ bin/kafka-console-consumer.sh --topic texts --from-beginning --bootstrap-server localhost:9092
Set up a producer to topic actions.
$ bin/kafka-console-producer.sh --topic actions --bootstrap-server localhost:9092
Let’s now run the project with:
$ cargo run
Then, write actions to the producer:
> { "action": "add", "value": "first text" } > { "action": "add", "value": "second text" } > { "action": "remove", "value": 1 } > { "action": "add", "value": "third text" }
Finally, let’s see the outcomes of each event in the consumer console:
[ "first text" ] [ "first text", "second text" ] [ "first text" ] [ "first text", "third text" ]
In this article, we covered how to create a basic microservice with Rust. Although a single microservice is simple and focuses on one task, multiple or more advanced microservices can form an easily maintained and distributed complex application that handle databases, real-time computing, Internet of Things, real-time data collection, and fraud detection.
Debugging Rust applications can be difficult, especially when users experience issues that are hard to reproduce. If you’re interested in monitoring and tracking the performance of your Rust apps, automatically surfacing errors, and tracking slow network requests and load time, try LogRocket.
LogRocket is like a DVR for web and mobile apps, recording literally everything that happens on your Rust application. Instead of guessing why problems happen, you can aggregate and report on what state your application was in when an issue occurred. LogRocket also monitors your app’s performance, reporting metrics like client CPU load, client memory usage, and more.
Modernize how you debug your Rust apps — start monitoring for free.
Hey there, want to help make our blog better?
Join LogRocket’s Content Advisory Board. You’ll help inform the type of content we create and get access to exclusive meetups, social accreditation, and swag.
Sign up nowSOLID principles help us keep code flexible. In this article, we’ll examine all of those principles and their implementation using JavaScript.
JavaScript’s Date API has many limitations. Explore alternative libraries like Moment.js, date-fns, and the new Temporal API.
Explore use cases for using npm vs. npx such as long-term dependency management or temporary tasks and running packages on the fly.
Validating and auditing AI-generated code reduces code errors and ensures that code is compliant.