Vijit Ail Software Engineer at toothsi. I work with React and NodeJS to build customer-centric products. Reach out to me on LinkedIn or Instagram.

Microservices with NestJS, Kafka, and TypeScript

9 min read 2621

Microservices with NestJS, Kafka, and TypeScript

The microservices architecture is an increasingly popular architecture pattern that follows the SOA (Service Oriented Architecture) concept. It has several advantages and disadvantages, but one of its most significant benefits is that it allows for easier scalability compared to other architectures.

This guide will demonstrate how to build microservices using NestJS with Kafka and TypeScript.

Jump ahead:

Setting up the project workspace

Let’s start by setting up the project workspace for our demo. Before proceeding, ensure that you have Apache Kafka installed and running on your local machine. You can check out this guide for setting up Kafka locally.

You will create an authentication microservice to create and save user details. Along with the auth microservice, you’ll also need an API gateway to emit events from an HTTP API endpoint to the microservice.

Since you’ll be creating multiple backend services, it’s best to have a monorepo to enable code sharing. A monorepo is a single version-controlled code repository that includes various applications and libraries.

Nx is a popular tool for managing monorepos. This framework allows you to architect and scale web applications and services in your monorepo.

Create an Nx workspace by running the following command:

> npx [email protected]

Command to Run Nx Workspace

Specify the nest option while creating the workspace and name the application api-gateway.

Once the workspace is created, install the project dependencies by running the following commands:

> cd nestjs-microservices
> npm i @nestjs/microservices kafkajs class-validator class-transformer

Getting started with the API gateway

The API gateway is one of the services in a microservices architecture that connects the client-side and external applications to the internal services. API gateway handles the routing, transforming and aggregating request data, and implementing shared logic like authentication and rate-limiters.

User Gateway Kafka Auth Microservice

Typically, when the frontend app calls the signup API, the API gateway will emit the create_user event to the Kafka queue. The auth microservice will poll the create_user event from the queue and create a user from the event payload.

The frontend app won’t interact with the microservice directly in this architectural pattern. Usually, in real-world scenarios, the API gateway also has additional functionalities like logging, analytics, and load balancing.

In the api-gateway app, create the auth module responsible for handling authentication-related requests like sign up. Configure the Kafka client using ClientsModule from the @nestjs/microservices package in the auth module.

The ClientsModule exposes a static register() method that takes an array of objects describing microservice transporters as an argument. Each transporter object has a name property, a transporter property, which in this case is Transport.Kafka, and a transporter-specific options property. The name property defines the injection token, which you can use later in your service class to publish events:

// apps/api-gateway/src/auth/auth.module.ts

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { AuthController } from './auth.controller';
import { AuthService } from './auth.service';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'AUTH_MICROSERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'auth',
            brokers: ['localhost:9092'],
          },
          producerOnlyMode: true,
          consumer: {
            groupId: 'auth-consumer',
          },
        },
      },
    ]),
  ],
  providers: [AuthService],
  controllers: [AuthController],
})
export class AuthModule {} 

Since the transport is Kafka, the options property will define the Kafka client object, which includes the clientId, brokers, and a consumer object with the groupId. Here, the groupId you specify is essential because consumers with the same groupId can only read the published events.



For example, consumers with groupId auth-consumer can only read events published with groupId: 'auth-consumer'. The producerOnlyMode option will bypass consumer group registration for the api-gateway app and only function as a producer.

Code sharing among microservices

Since the HTTP request payload and the event payload for creating the user will be the same, it’s better to create a shared data transfer object (DTO) that all the services can access in the monorepo.

Create a shared library by running the following command:

> nx g @nrwl/node:lib shared

Next, create a dto folder; add CreateUserDto class with mandatory name and email properties in the create-user.dto.ts file:

// libs/shared/src/lib/dto/create-user.dto.ts

import { IsEmail, IsNotEmpty, IsString } from 'class-validator';

export class CreateUserDto {
  @IsString()
  @IsNotEmpty()
  name: string;

  @IsEmail()
  @IsNotEmpty()
  email: string;
}
// libs/shared/src/lib/dto/index.ts

export * from './create-user.dto';

Now, add an entry in the paths object for resolving dto files in the tsconfig.base.json file:

{
  ...
  "paths": {
      ...
      "@nestjs-microservices/shared/dto": ["libs/shared/src/lib/dto/index.ts"]
  }
  ...
}

Coming back to the api-gateway app, create the AuthService to publish an event to the auth microservice.

Create an instance of ClientKafka by using the @Inject() decorator, and use the AUTH_MICROSERVICE injection token specified in the name property of the Kafka transport. Using the client instance, you can access the emit() method to publish the create_user event along with the payload:

// apps/api-gateway/src/auth/auth.service.ts

import { Inject, Injectable } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { CreateUserDto } from '@nestjs-microservices/shared/dto';

@Injectable()
export class AuthService {
  constructor(
    @Inject('AUTH_MICROSERVICE') private readonly authClient: ClientKafka
  ) {}

  createUser(createUserDto: CreateUserDto) {
    this.authClient.emit('create_user', JSON.stringify(createUserDto));
  }
}

Next, in the AuthController class, create an API endpoint for creating the user and call the createUser method of the AuthService:

// apps/api-gateway/src/auth/auth.controller.ts

import { Body, Controller, Post, ValidationPipe } from '@nestjs/common';
import { AuthService } from './auth.service';
import { CreateUserDto } from '@nestjs-microservices/shared/dto';

@Controller('auth')
export class AuthController {
  constructor(private readonly authService: AuthService) {}

  @Post('sign-up')
  createUser(@Body(ValidationPipe) createUserDto: CreateUserDto) {
    return this.authService.createUser(createUserDto);
  }
}

Creating the auth microservice

Next, you’ll create the auth microservice to consume the create_user message sent by the API gateway.

To create the auth-microservice app, run the following command:

> nx g @nrwl/nest:app auth-microservice

In the main.ts file of the auth-microservice app, remove the boilerplate code of the bootstrap() function and replace it with the NestFactory.createMicroservice() method.

Pass the AppModule in the first argument and the Kafka transport object in the second argument of the createMicroservice() method. In the groupId property of the consumer object, ensure you’ve specified the same value used in AuthModule of the api-gateway app:

// apps/auth-microservice/src/main.ts

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092'],
        },
        consumer: {
          groupId: 'auth-consumer',
        },
      },
    }
  );
  await app.listen();
}

Next, create a User entity in the shared library to represent user data. Use this entity it in the UsersRepository class to perform actions like saving user data and retrieving a single user by id:

// libs/shared/src/lib/entities/user.entity.ts

export class User {
  id?: number;
  name: string;
  email: string;
}
// libs/shared/src/lib/entities/index.ts

export * from './user.entity';
// apps/auth-microservice/src/app/users.repository.ts

import { Injectable } from '@nestjs/common';
import { User } from '@nestjs-microservices/shared/entities';

@Injectable()
export class UsersRepository {
  private readonly users: User[] = [];

  save(user: User) {
    this.users.push({ ...user, id: this.users.length + 1 });
  }

  findOne(id: number) {
    return this.users.find((u) => u.id === id) || null;
  }
}

Usually, in NestJS applications, TypeORM is used to manage the data using a database, but for the sake of brevity in this demo, we’ll store the data in memory.

In the AppService, create the methods createUser() and getUser() to create and find a user, respectively, with the UsersRepository methods:

// apps/auth-microservice/src/app/app.service.ts

import { CreateUserDto } from '@nestjs-microservices/shared/dto';
import { User } from '@nestjs-microservices/shared/entities';
import { Injectable } from '@nestjs/common';
import { UsersRepository } from './users.repository';

@Injectable()
export class AppService {
  constructor(private readonly usersRepository: UsersRepository) {}

  createUser(data: CreateUserDto): void {
    this.usersRepository.save(data);
  }

  getUser(id: number): User {
    return this.usersRepository.findOne(id);
  }
}

Now in the app.controller.ts file, create a method to handle the create_user event. Use the @EventPattern() decorator to create an event handler and pass the event name. You can access the event data using the @Payload() decorator and, similar to the @Body() decorator, you can use the ValidationPipe to validate the payload object:

// apps/auth-microservice/src/app/app.controller.ts

import { CreateUserDto } from '@nestjs-microservices/shared/dto';
import { Controller, ParseIntPipe, ValidationPipe } from '@nestjs/common';
import { EventPattern, MessagePattern, Payload } from '@nestjs/microservices';

import { AppService } from './app.service';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @EventPattern('create_user')
  handleUserCreate(@Payload(ValidationPipe) data: CreateUserDto) {
    this.appService.createUser(data);
  }
}

Adding the payments microservice

Now that the auth microservice is up and running, let’s add a payments microservice to process payments for the user accounts and complete the whole architecture.

Auth Microservice Architecture for Payment Processing

The first step is to create an API endpoint in the api-gateway app that the frontend app can call to initiate the payment. In the request body, you’ll need the userId and the payment amount.


More great articles from LogRocket:


The API gateway will emit an event called process_payment with the payment data, which the payment microservice will read. The payment microservice will require user details to process the payment. Since you’re only passing the userId, the rest of the data will be retrieved from the auth service by publishing the get_user event.

In the api-gateway app, you’ll create the payments module and register the Kafka client similar to how you set it up for the auth module. For the payments module, the groupId will be payment-consumer:

// apps/api-gateway/src/payment/payments.module.ts

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { PaymentController } from './payment.controller';
import { PaymentService } from './payment.service';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'PAYMENT_MICROSERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'payment',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'payment-consumer',
          },
        },
      },
    ]),
  ],
  providers: [PaymentService],
  controllers: [PaymentController],
})
export class PaymentModule {}

Then, create the MakePaymentDto in the shared library that will be used to represent the payload for processing the payment:

// libs/shared/src/lib/dto/make-payment.dto.ts

import { IsNotEmpty, IsNumber } from 'class-validator';

export class MakePaymentDto {
  @IsNotEmpty()
  @IsNumber()
  userId: number;

  @IsNotEmpty()
  @IsNumber()
  amount: number;
}

Next, in the PaymentController, add a POST API to create an entry point for publishing the process_payment event:

// apps/api-gateway/src/payment/payment.controller.ts

import { Body, Controller, Post, ValidationPipe } from '@nestjs/common';
import { PaymentService } from './payment.service';
import { MakePaymentDto } from '@nestjs-microservices/shared/dto';

@Controller('payments')
export class PaymentController {
  constructor(private readonly paymentService: PaymentService) {}

  @Post('pay')
  makePayment(@Body(ValidationPipe) makePaymentDto: MakePaymentDto) {
    return this.paymentService.makePayment(makePaymentDto);
  }
}

In the PaymentService, inject the ClientKafka using the @Inject() decorator and the PAYMENT_MICROSERVICE as the injection token. Now, emit the process_payment event using the Kafka client:

// apps/api-gateway/src/payment/payment.service.ts

import { Inject, Injectable } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { MakePaymentDto } from '@nestjs-microservices/shared/dto';

@Injectable()
export class PaymentService {
  constructor(
    @Inject('PAYMENT_MICROSERVICE') private readonly paymentClient: ClientKafka
  ) {}

  makePayment(makePaymentDto: MakePaymentDto) {
    this.paymentClient.emit('process_payment', JSON.stringify(makePaymentDto));
  }
}

The API gateway portion of the payments integration is complete. Now, let’s build with the microservice app.

Building the microservice app

Start by running the following command to create the payments-microservice app:

> nx g @nrwl/nest:app payments-microservice

Similar to the auth-microservice, replace the content of the main.ts file with the createMicroservice() method. Here you’ll also define groupId with the value 'payment-consumer':

// apps/payments-microservice/src/main.ts

import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

import { AppModule } from './app/app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092'],
        },
        consumer: {
          groupId: 'payment-consumer',
        },
      },
    }
  );
  await app.listen();
}

bootstrap();

Now, register a Kafka client for emitting events to the auth-microservice in the app.module.ts file:

// apps/payments-microservice/src/app/app.module.ts

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';

import { AppController } from './app.controller';
import { AppService } from './app.service';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'AUTH_MICROSERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'auth',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'auth-consumer',
          },
        },
      },
    ]),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

In the AppService, inject the Kafka client for publishing events to the auth-microservice app. When you emit the get_user event, you’ll need to listen for the response from the auth microservice; this is different from what you did for the create_user event.

With Kafka communication, an event’s response is returned in a reply event that NestJS handles out of the box. Nest automatically sends the reply back in a new event that ends with a .reply keyword. This type of communication between microservices is known as the request-response pattern.

To configure this communication in the AppService, you’ll need to implement the onModuleInit interface and use the onModuleInit() lifecycle method to subscribe to the response of the get_user event using the subscribeToResponseOf() method. You don’t have to manually add any handlers for the get_user.reply event since Nest takes care of that.

// apps/payments-microservice/src/app/app.service.ts

import { MakePaymentDto } from '@nestjs-microservices/shared/dto';
import { User } from '@nestjs-microservices/shared/entities';
import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';

@Injectable()
export class AppService implements OnModuleInit {
  constructor(
    @Inject('AUTH_MICROSERVICE') private readonly authClient: ClientKafka
  ) {}

  processPayment(makePaymentDto: MakePaymentDto) {
    const { userId, amount } = makePaymentDto;
    console.log('process payment');
    this.authClient
      .send('get_user', JSON.stringify({ userId }))
      .subscribe((user: User) => {
        console.log(
          `process payment for user ${user.name} - amount: ${amount}`
        );
      });
  }

  onModuleInit() {
    this.authClient.subscribeToResponseOf('get_user');
  }
}

Instead of using the emit() method to publish the get_user event, use the send() method. The send() method enables you to use a callback to subscribe to the reply of an event.

Now, finish off the payments microservice by adding the event handler for process_payment event in the payment AppController:

// apps/payments-microservice/src/app/app.controller.ts

import { MakePaymentDto } from '@nestjs-microservices/shared/dto';
import { Controller, ValidationPipe } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';

import { AppService } from './app.service';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @EventPattern('process_payment')
  handleProcessPayment(@Payload(ValidationPipe) data: MakePaymentDto) {
    this.appService.processPayment(data);
  }
}

Next, switch back to the AppController of the auth-microservice app and add the handler for the get_user event to complete the request-response communication. For the request-response messaging pattern, you’ll need to use the @MessagePattern() decorator to consume the event:

// apps/auth-microservice/src/app/app.controller.ts


@Controller()
export class AppController {

  @MessagePattern('get_user')
  handleGetUser(@Payload('userId', ParseIntPipe) userId: number) {
    return this.appService.getUser(userId);
  }
}

Running and testing the services

To test and see all the services in action, you’ll need to run the following commands individually on separate terminals:

> nx serve api-gateway
> nx serve auth-microservice
> nx serve payments-microservice

Now that the microservices are up and running, let’s test them with Postman.

Select the /api/auth/signup API to create the user:

Testing NestJS and Kafka Microservices with Postman

Next, call the /api/payments/pay API to process the payment with userId and the amount:

Calling the Microservices API

Console Message Confirming Microservices Set Up

If the communication is set up correctly between the microservices, you should see a console message appear on the terminal like that shown above.

Conclusion

This article investigated building a robust application using a microservices architecture with NestJS and Kafka. We demonstrated how these technologies can be used together to create a scalable, reliable, and easy-to-maintain system. This stack is worth considering if you’re looking for a way to build microservices that are easy to develop and deploy.

You can explore microservices further by creating a retry logic with Kafka communication so that critical data is never lost if a microservice is down.

For your reference, you can find the complete code from this guide on GitHub.

: Full visibility into your web and mobile apps

LogRocket is a frontend application monitoring solution that lets you replay problems as if they happened in your own browser. Instead of guessing why errors happen, or asking users for screenshots and log dumps, LogRocket lets you replay the session to quickly understand what went wrong. It works perfectly with any app, regardless of framework, and has plugins to log additional context from Redux, Vuex, and @ngrx/store.

In addition to logging Redux actions and state, LogRocket records console logs, JavaScript errors, stacktraces, network requests/responses with headers + bodies, browser metadata, and custom logs. It also instruments the DOM to record the HTML and CSS on the page, recreating pixel-perfect videos of even the most complex single-page and mobile apps.

.
Vijit Ail Software Engineer at toothsi. I work with React and NodeJS to build customer-centric products. Reach out to me on LinkedIn or Instagram.

One Reply to “Microservices with NestJS, Kafka, and TypeScript”

  1. Select the /api/auth/signup API to create the user:
    here endpoint is not correct. it should be /api/auth/sign-up

Leave a Reply