Vlado Tesanovic CEO / developer @ innovic.io , open source lover, lifelong learner, github.com/vladotesanovic writing code in my free time.

How to use event-driven programming in Node.js

With the Command Query Responsibility Segregation pattern

4 min read 1134

The most used approach for building software today is the request / response mechanism with layered architecture (n-tier) beneath, where the calls propagate vertically through layers. Patterns like MVC, have become very popular and, in a way, standard when people learn and write software.

As layered architecture is the easiest one and can solve many problems, it does not mean that it is the silver bullet for solving all the problems that exist in the software industry. Some of the software can be written more expressively using different design patterns. Layered architecture goes well with small and medium-sized projects. The tricky part of it is to keep everything organized and not make too many layers, or we will end up with Baklava code.

layered architecture

Layered architecture

Alternatively, we have event-driven programming which is mostly used in front-end development, where one event can be propagated through the system and many actors can act upon catching that event. Data flow is unidirectional, and adding new features can be done without editing existing components.

While event-driven programming is dominant for building user interfaces, we can use it for writing server-side code too. Good use cases are highly asynchronous systems that do not require an immediate response from the server and use different communication channels to publish progress of a request.

Demonstration

In this tutorial, we will not only dispatch events to demonstrate event-driven programming but also implement CQRS design patterns which divides code that edits the data (commands) from one which is used for reading the data (queries).

Main building blocks of our application will be:

  • Commands
  • Handlers
  • Events
  • Queries

Commands are the actions that will either run business logic or dispatch new events. Events will be used to dispatch other commands. We can have event handlers as well. Query actions and query handlers are responsible for querying (reading) items.

If we imagine a bidding system where one action can trigger other actions in a defined order, and we
want to make it highly asynchronous. We will end up with features like:

  • Check if a bid is the highest one
  • Sending email to all interested parties (bidders and the owner)
  • Add a bid in the database
  • Create an activity for that bid
  • Extend the bidding process for two hours upon receiving the latest bid (Bidding Fee Auction)

Here is a diagram of the flow in our system:

flow of demonstration

With CQRS module implemented, each event will produce one or more commands, and each command will trigger a new event.

This event-driven system enables the aspect-oriented programming paradigm. Which basically means you can add additional functionality to a software without changing existing functionalities. In our case, it will mean chaining new commands and command handlers with events.

Implementation

We have chosen Nestjs to implement the described solution for our imaginary bidding system.

Nestjs offers, in its rich ecosystem, CQRS module. Main building blocks of that module are three injectable classes: EventBus, QueryBus, and CommandBus. Each, as the name implies, can trigger either event, query or command.

Reading and writing code for this demo will require learning and diving into Nestjs, as there are many concepts which need to be grasped. Nestjs is a feature-rich framework, which heavily relies on decorators, observables, and it comes with a module system (similar to the one from Angular), dependency injection, inversion of control and etc.

I’ll try to highlight only the important bits from the code, otherwise, this article will be too long. At the bottom of it, you will find a link to a Github repository with all the code and working demo. Here is the directory structure:

structure of app

From the main controller (and main route /) we will dispatch BidEvent. In Nestjs, controllers are the route handlers.

@Controller()
export class AppController {
  constructor(private readonly eventBus: EventBus, private queryBus: QueryBus) {}

  @Get()
  async bid(): Promise<object> {

    // We are hard-coding values here
    // instead of collecting them from a request
    this.eventBus.publish(
      new BidEvent('4ccd1088-b5da-44e2-baa0-ee4e0a58659d', '0ac04f2a-4866-42de-9387-cf392f64cd52', 233),
    );

    return {
      status: 'PENDING',
    };
  }

  @Get('/audiences')
  async getAudiences() {
    const allAudiences = await this.queryBus.execute(new GetAuctionQuery());

    return allAudiences;
  }
}

The real power of our system lies in BidSaga class. Responsibility of this Class (service) is to listen on BidEvents and to dispatch commands. Developers with experience with rxjs and writing effects in ngrx package will find this code familiar and easy to read.

@Injectable()
export class BidSaga {

  @Saga()
  createBid = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(BidEvent),
      map((event: BidEvent) => {
        return new BidCommand(event.bidUser, event.auctionID, event.bidAmount);
      }),
    );
  }

  @Saga()
  createBidSuccess = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(BidEventSuccess),
      flatMap((event: BidEventSuccess) => {

        return [
          new MailCommand(event.user.email, {
            title: 'You did it...',
            message: 'Congrats',
          }),
          new PostponeAuctionCommand(event.auctionID),
          // create activity command
        ];
      }),
    );
  }
}

Notice that we created bidTransactionGUID variable and we passed it to BidEvent, that value is used to glue commands and events.

As you can see in the code above, BidEvent will dispatch BidCommand. Further on, in our code BidHandler (for BidCommand) will dispatch either BidEventSuccess or BidEventFail.

export class AuctionModel extends AggregateRoot {
  constructor(private readonly auction: IAuctionInterface) {
    super();
  }

  postponeAuction() {
    // validation and etc.

    // postpone it, and return new auction object with postponed date
    const auction = { ...this.auction };

    this.apply(new AuctionEventsPostponed(auction));
  }

  bidOnAuction(userID: string, amount: number) {
    // validation and etc.
    try {

      // business logic
      // upon successful bidding, dispatch new event
      this.apply(new BidEventSuccess(this.auction.id, amount, { email: 'fake@email.com', id: userID }));

    } catch (e) {

      // dispatch bid event fail action
      this.apply(new BidEventFail(e));
    }
  }
}

The model shown above is run through BidHandler service.

After BidEventSuccess is dispatched, new commands will be launched– MailCommand and PostponeAuctionCommand.

@Injectable()
export class AuctionSaga {

  @Saga()
  createBid = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(AuctionEventsPostponed),
      flatMap((event: AuctionEventsPostponed) => {

        // send emails to all existing bidders
        const bidders = [
          new MailCommand('bidder1@emailid', {
            title: 'Someone made a bid',
            message: 'Hurry up',
          }),
          new MailCommand('bidder2@emailid', {
            title: 'Someone made a bid',
            message: 'Hurry up',
          }),
        ];

        return [
          ...bidders,
          // create activity
        ];
      }),
    );
  }
}

As we can see in the examples from above, everything is about dispatching commands and chaining them with new events. A new feature will mean the creation of new command and new events that are triggered after.

If anything fails through this process, we can dispatch cleaning command with bidTransactionGUID information to delete things associated with this bid in the system.

Conclusion

If it is applied to the right place and for the right scenario, the event-driven programming paradigm can be a huge win for application architecture. If you think of an application where the flow of the program is determined by events, it can be a perfect fit for this programming approach.

Repository: https://github.com/vladotesanovic/cqrs

Plug: , a DVR for web apps

LogRocket is a frontend application monitoring solution that lets you replay problems as if they happened in your own browser. Instead of guessing why errors happen, or asking users for screenshots and log dumps, LogRocket lets you replay the session to quickly understand what went wrong. It works perfectly with any app, regardless of framework, and has plugins to log additional context from Redux, Vuex, and @ngrx/store.

In addition to logging Redux actions and state, LogRocket records console logs, JavaScript errors, stacktraces, network requests/responses with headers + bodies, browser metadata, and custom logs. It also instruments the DOM to record the HTML and CSS on the page, recreating pixel-perfect videos of even the most complex single-page apps.

.
Vlado Tesanovic CEO / developer @ innovic.io , open source lover, lifelong learner, github.com/vladotesanovic writing code in my free time.

Leave a Reply