Messaging with Kafka

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a publish-subscribe messaging system with a unique design. Kafka mainly operates based on a topic model. A topic is a category or feed name to which records get published. Topics in Kafka are always multi-subscriber.

This guide walks you through the process of messaging with Apache Kafka using Ballerina language.

The following are the sections available in this guide.

What you?ll build

To understanding how you can use Kafka for publish-subscribe messaging, let's consider a real-world use case of a product management system. This product management system consists of a product admin portal using which the product administrator can update the price for a product. This price update message should be consumed by a couple of franchisees and an inventory control system to take appropriate actions. Kafka is an ideal messaging system for this scenario. In this particular use case, once the admin updates the price of a product, the update message is published to a Kafka topic called product-price to which the franchisees and the inventory control system subscribed to listen. The following diagram illustrates this use case.

alt text

In this example, the Ballerina Kafka Connector is used to connect Ballerina to Apache Kafka. With this Kafka Connector, Ballerina can act as both message publisher and subscriber.

Optional Requirements

If you want to skip the basics, you can download the git repo and directly move to the “Testing” section by skipping “Implementation” section.

Create the project structure

Ballerina is a complete programming language that supports custom project structures. Use the following package structure for this guide.

  ??? franchisee1
  ?    ??? franchisee1.bal
  ??? franchisee2
  ?    ??? franchisee2.bal
  ??? inventory_control_system
  ?    ??? inventory_control_system.bal
  ??? product_admin_portal
       ??? product_admin_portal.bal
       ??? tests
            ??? product_admin_portal_test.bal

Let's get started with the implementation of a Kafka service, which is subscribed to the Kafka topic product-price. Let's consider the inventory_control_system for example. First, let's see how to add the Kafka configurations for a Kafka subscriber written in Ballerina language. Refer to the code segment attached below.

A Kafka subscriber in Ballerina needs to consist of a kafka:SimpleConsumer endpoint in which you specify the required configurations for a Kafka subscriber.

The bootstrapServers field provides the list of host and port pairs, which are the addresses of the Kafka brokers in a “bootstrap” Kafka cluster.

The groupId field specifies the Id of the consumer group.

The topics field specifies the topics that must be listened by this consumer.

The pollingInterval field is the time interval that a consumer polls the topic.

Let's now see the complete implementation of the inventory_control_system, which is a Kafka topic subscriber.

In the above code, resource onMessage will be triggered whenever a message published to the topic specified.

To check the implementations of the other subscribers, see franchisee1.bal and franchisee2.bal.

Let's next focus on the implementation of the product_admin_portal, which acts as the message publisher. It contains an HTTP service using which a product admin updates the price of a product.

First, let's see how to add the Kafka configurations for a Kafka publisher written in Ballerina language. Refer to the code segment attached below.

A Kafka producer in Ballerina needs to consist of a kafka:SimpleProducer endpoint in which you specify the required configurations for a Kafka publisher.

Let's now see the complete implementation of the product_admin_portal, which is a Kafka topic publisher. Inline comments added for better understanding.

rt ballerina/http;
rt wso2/kafka;

Invoking the service
Writing unit tests

In Ballerina, the unit test cases should be in the same package inside a folder named as 'tests'. When writing the test functions the below convention should be followed.

This guide contains unit test for the 'product_admin_portal' service implemented above.

To run the unit tests, navigate to messaging-with-kafka/guide and run the following command.

 ballerina test

When running the unit tests, make sure that Kafka is up and running.

To check the implementation of the test file, refer to the product_admin_portal_test.bal.


Once you are done with the development, you can deploy the services using any of the methods that we listed below.

Deploying locally

As the first step, you can build Ballerina executable archives (.balx) of the services that we developed above. Navigate to messaging-with-kafka/guide and run the following command.

 ballerina build
Deploying on Docker

You can run the service that we developed above as a docker container. As Ballerina platform includes Ballerina_Docker_Extension, which offers native support for running ballerina programs on containers, you just need to put the corresponding docker annotations on your service code. Since this guide requires Kafka as a prerequisite, you need a couple of more steps to configure it in docker container.

Now let's see how we can deploy the product_admin_portal we developed above on docker. We need to import ballerinax/docker and use the annotation @docker:Config as shown below to enable docker image generation during the build time.

Here we run the docker image with flag-p <host_port>:<container_port> so that we use the host port 9090 and the container port 9090. Therefore you can access the service through the host port.

Deploying on Kubernetes
Node Port:

Add /etc/hosts entry to match hostname.

Access the service

Ballerina is by default observable. Meaning you can easily observe your services, resources, etc. However, observability is disabled by default via configuration. Observability can be enabled by adding following configurations to ballerina.conf file in messaging-with-kafka/guide/.


ag to enable Metrics

ag to enable Tracing

NOTE: The above configuration is the minimum configuration needed to enable tracing and metrics. With these configurations default values are load as the other configuration parameters of metrics and tracing.


You can monitor ballerina services using in built tracing capabilities of Ballerina. We'll use Jaeger as the distributed tracing system. Follow the following steps to use tracing with Ballerina.


Metrics and alerts are built-in with ballerina. We will use Prometheus as the monitoring tool. Follow the below steps to set up Prometheus and view metrics for product_admin_portal service.




NOTE: Ballerina will by default have following metrics for HTTP server connector. You can enter following expression in Prometheus UI


Ballerina has a log package for logging to the console. You can import ballerina/log package and start logging. The following section will describe how to search, analyze, and visualize logs in real time using Elastic Stack.

i) Create a file named logstash.conf with the following content

