ballerina-guides/messaging-with-kafka

Name: messaging-with-kafka

Owner: ballerina-guides

Description: null

Created: 2018-03-12 03:58:06.0

Updated: 2018-04-30 13:54:18.0

Pushed: 2018-05-03 02:28:41.0

Homepage: null

Size: 256

Language: Ballerina

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

Build Status

Messaging with Kafka

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a publish-subscribe messaging system with a unique design. Kafka mainly operates based on a topic model. A topic is a category or feed name to which records get published. Topics in Kafka are always multi-subscriber.

This guide walks you through the process of messaging with Apache Kafka using Ballerina language.

The following are the sections available in this guide.

What you?ll build

To understanding how you can use Kafka for publish-subscribe messaging, let's consider a real-world use case of a product management system. This product management system consists of a product admin portal using which the product administrator can update the price for a product. This price update message should be consumed by a couple of franchisees and an inventory control system to take appropriate actions. Kafka is an ideal messaging system for this scenario. In this particular use case, once the admin updates the price of a product, the update message is published to a Kafka topic called product-price to which the franchisees and the inventory control system subscribed to listen. The following diagram illustrates this use case.

alt text

In this example, the Ballerina Kafka Connector is used to connect Ballerina to Apache Kafka. With this Kafka Connector, Ballerina can act as both message publisher and subscriber.

Prerequisites
Optional Requirements
Implementation

If you want to skip the basics, you can download the git repo and directly move to the “Testing” section by skipping “Implementation” section.

Create the project structure

Ballerina is a complete programming language that supports custom project structures. Use the following package structure for this guide.

aging-with-kafka
 guide
  ??? franchisee1
  ?    ??? franchisee1.bal
  ??? franchisee2
  ?    ??? franchisee2.bal
  ??? inventory_control_system
  ?    ??? inventory_control_system.bal
  ??? product_admin_portal
       ??? product_admin_portal.bal
       ??? tests
            ??? product_admin_portal_test.bal
Implementation

Let's get started with the implementation of a Kafka service, which is subscribed to the Kafka topic product-price. Let's consider the inventory_control_system for example. First, let's see how to add the Kafka configurations for a Kafka subscriber written in Ballerina language. Refer to the code segment attached below.

Kafka subscriber configurations
afka consumer endpoint
oint kafka:SimpleConsumer consumer {
bootstrapServers: "localhost:9092, localhost:9093",
// Consumer group ID
groupId: "inventorySystemd",
// Listen from topic 'product-price'
topics: ["product-price"],
// Poll every 1 second
pollingInterval:1000

A Kafka subscriber in Ballerina needs to consist of a kafka:SimpleConsumer endpoint in which you specify the required configurations for a Kafka subscriber.

The bootstrapServers field provides the list of host and port pairs, which are the addresses of the Kafka brokers in a “bootstrap” Kafka cluster.

The groupId field specifies the Id of the consumer group.

The topics field specifies the topics that must be listened by this consumer.

The pollingInterval field is the time interval that a consumer polls the topic.

Let's now see the complete implementation of the inventory_control_system, which is a Kafka topic subscriber.

inventory_control_system.bal
rt ballerina/log;
rt wso2/kafka;

afka consumer endpoint
oint kafka:SimpleConsumer consumer {
bootstrapServers: "localhost:9092, localhost:9093",
// Consumer group ID
groupId: "inventorySystemd",
// Listen from topic 'product-price'
topics: ["product-price"],
// Poll every 1 second
pollingInterval:1000


afka service that listens from the topic 'product-price'
inventoryControlService' subscribed to new product price updates from
he product admin and updates the Database.
ice<kafka:Consumer> kafkaService bind consumer {
// Triggered whenever a message added to the subscribed topic
onMessage(kafka:ConsumerAction consumerAction, kafka:ConsumerRecord[] records) {
    // Dispatched set of Kafka records to service, We process each one by one.
    foreach record in records {
        blob serializedMsg = record.value;
        // Convert the serialized message to string message
        string msg = serializedMsg.toString("UTF-8");
        log:printInfo("New message received from the product admin");
        // log the retrieved Kafka record
        log:printInfo("Topic: " + record.topic + "; Received Message: " + msg);
        // Mock logic
        // Update the database with the new price for the specified product
        log:printInfo("Database updated with the new price for the product");
    }
}

In the above code, resource onMessage will be triggered whenever a message published to the topic specified.

To check the implementations of the other subscribers, see franchisee1.bal and franchisee2.bal.

Let's next focus on the implementation of the product_admin_portal, which acts as the message publisher. It contains an HTTP service using which a product admin updates the price of a product.

First, let's see how to add the Kafka configurations for a Kafka publisher written in Ballerina language. Refer to the code segment attached below.

Kafka producer configurations
afka producer endpoint
oint kafka:SimpleProducer kafkaProducer {
bootstrapServers: "localhost:9092",
clientID:"basic-producer",
acks:"all",
noRetries:3

A Kafka producer in Ballerina needs to consist of a kafka:SimpleProducer endpoint in which you specify the required configurations for a Kafka publisher.

Let's now see the complete implementation of the product_admin_portal, which is a Kafka topic publisher. Inline comments added for better understanding.

product_admin_portal.bal
rt ballerina/http;
rt wso2/kafka;

onstants to store admin credentials
al string ADMIN_USERNAME = "Admin";
al string ADMIN_PASSWORD = "Admin";

afka producer endpoint
oint kafka:SimpleProducer kafkaProducer {
bootstrapServers: "localhost:9092",
clientID:"basic-producer",
acks:"all",
noRetries:3


TTP service endpoint
oint http:Listener listener {
port:9090


p:ServiceConfig {basePath:"/product"}
ice<http:Service> productAdminService bind listener {

@http:ResourceConfig {methods:["POST"], consumes:["application/json"],
    produces:["application/json"]}
updatePrice (endpoint client, http:Request request) {
    http:Response response;
    json reqPayload;
    float newPriceAmount;

    // Try parsing the JSON payload from the request
    match request.getJsonPayload() {
        // Valid JSON payload
        json payload => reqPayload = payload;
        // NOT a valid JSON payload
        any => {
            response.statusCode = 400;
            response.setJsonPayload({"Message":"Not a valid JSON payload"});
            _ = client -> respond(response);
            done;
        }
    }

    json username = reqPayload.Username;
    json password = reqPayload.Password;
    json productName = reqPayload.Product;
    json newPrice = reqPayload.Price;

    // If payload parsing fails, send a "Bad Request" message as the response
    if (username == null || password == null || productName == null ||
        newPrice == null) {
        response.statusCode = 400;
        response.setJsonPayload({"Message":"Bad Request - Invalid payload"});
        _ = client->respond(response);
        done;
    }

    // Convert the price value to float
    var result = <float>newPrice.toString();
    match result {
        float value => {
            newPriceAmount = value;
        }
        error err => {
            response.statusCode = 400;
            response.setJsonPayload({"Message":"Invalid amount specified"});
            _ = client->respond(response);
            done;
        }
    }

    // If the credentials does not match with the admin credentials,
    // send an "Access Forbidden" response message
    if (username.toString() != ADMIN_USERNAME ||
        password.toString() != ADMIN_PASSWORD) {
        response.statusCode = 403;
        response.setJsonPayload({"Message":"Access Forbidden"});
        _ = client->respond(response);
        done;
    }

    // Construct and serialize the message to be published to the Kafka topic
    json priceUpdateInfo = {"Product":productName, "UpdatedPrice":newPriceAmount};
    blob serializedMsg = priceUpdateInfo.toString().toBlob("UTF-8");

    // Produce the message and publish it to the Kafka topic
    kafkaProducer->send(serializedMsg, "product-price", partition = 0);
    // Send a success status to the admin request
    response.setJsonPayload({"Status":"Success"});
    _ = client->respond(response);
}

Testing
Invoking the service
Writing unit tests

In Ballerina, the unit test cases should be in the same package inside a folder named as 'tests'. When writing the test functions the below convention should be followed.

This guide contains unit test for the 'product_admin_portal' service implemented above.

To run the unit tests, navigate to messaging-with-kafka/guide and run the following command.

 ballerina test

When running the unit tests, make sure that Kafka is up and running.

To check the implementation of the test file, refer to the product_admin_portal_test.bal.

Deployment

Once you are done with the development, you can deploy the services using any of the methods that we listed below.

Deploying locally

As the first step, you can build Ballerina executable archives (.balx) of the services that we developed above. Navigate to messaging-with-kafka/guide and run the following command.

 ballerina build
Deploying on Docker

You can run the service that we developed above as a docker container. As Ballerina platform includes Ballerina_Docker_Extension, which offers native support for running ballerina programs on containers, you just need to put the corresponding docker annotations on your service code. Since this guide requires Kafka as a prerequisite, you need a couple of more steps to configure it in docker container.

Now let's see how we can deploy the product_admin_portal we developed above on docker. We need to import ballerinax/docker and use the annotation @docker:Config as shown below to enable docker image generation during the build time.

product_admin_portal.bal
rt ballerinax/docker;
ther imports

onstants to store admin credentials

afka producer endpoint

ker:Config {
registry:"ballerina.guides.io",
name:"product_admin_portal",
tag:"v1.0"


ker:CopyFiles {
files:[{source:<path_to_Kafka_connector_jars>,
        target:"/ballerina/runtime/bre/lib"}]


ker:Expose{}
oint http:Listener listener {
port:9090


p:ServiceConfig {basePath:"/product"}
ice<http:Service> productAdminService bind listener {
ballerina build product_admin_portal

un following command to start docker container: 
ocker run -d -p 9090:9090 ballerina.guides.io/product_admin_portal:v1.0
 docker run -d -p 9090:9090 ballerina.guides.io/product_admin_portal:v1.0

Here we run the docker image with flag-p <host_port>:<container_port> so that we use the host port 9090 and the container port 9090. Therefore you can access the service through the host port.

Deploying on Kubernetes
product_admin_portal.bal
rt ballerinax/kubernetes;
ther imports

onstants to store admin credentials

afka producer endpoint

ernetes:Ingress {
stname:"ballerina.guides.io",
me:"ballerina-guides-product-admin-portal",
th:"/"


ernetes:Service {
rviceType:"NodePort",
me:"ballerina-guides-product-admin-portal"


ernetes:Deployment {
age:"ballerina.guides.io/product_admin_portal:v1.0",
me:"ballerina-guides-product-admin-portal",
pyFiles:[{target:"/ballerina/runtime/bre/lib",
              source:<path_to_Kafka_connector_jars>}]


oint http:Listener listener {
port:9090


p:ServiceConfig {basePath:"/product"}
ice<http:Service> productAdminService bind listener {
 ballerina build product_admin_portal

un following command to deploy kubernetes artifacts:  
ubectl apply -f ./target/product_admin_portal/kubernetes
 kubectl apply -f ./target/product_admin_portal/kubernetes 

eployment.extensions "ballerina-guides-product-admin-portal" created
ngress.extensions "ballerina-guides-product-admin-portal" created
ervice "ballerina-guides-product-admin-portal" created
 kubectl get service
 kubectl get deploy
 kubectl get pods
 kubectl get ingress

Node Port:

rl -v -X POST -d \
"Username":"Admin", "Password":"Admin", "Product":"ABC", "Price":100.00}' \
ttp://localhost:<Node_Port>/product/updatePrice" -H "Content-Type:application/json"

Ingress:

Add /etc/hosts entry to match hostname.

27.0.0.1 ballerina.guides.io

Access the service

url -v -X POST -d \
{"Username":"Admin", "Password":"Admin", "Product":"ABC", "Price":100.00}' \
http://ballerina.guides.io/product/updatePrice" -H "Content-Type:application/json" 
Observability

Ballerina is by default observable. Meaning you can easily observe your services, resources, etc. However, observability is disabled by default via configuration. Observability can be enabled by adding following configurations to ballerina.conf file in messaging-with-kafka/guide/.

.observability]

.observability.metrics]
ag to enable Metrics
led=true

.observability.tracing]
ag to enable Tracing
led=true

NOTE: The above configuration is the minimum configuration needed to enable tracing and metrics. With these configurations default values are load as the other configuration parameters of metrics and tracing.

Tracing

You can monitor ballerina services using in built tracing capabilities of Ballerina. We'll use Jaeger as the distributed tracing system. Follow the following steps to use tracing with Ballerina.

Metrics

Metrics and alerts are built-in with ballerina. We will use Prometheus as the monitoring tool. Follow the below steps to set up Prometheus and view metrics for product_admin_portal service.

b7a.observability.metrics]
nabled=true
rovider="micrometer"

b7a.observability.metrics.micrometer]
egistry.name="prometheus"

b7a.observability.metrics.prometheus]
ort=9700
ostname="0.0.0.0"
escriptions=false
tep="PT1M"

NOTE: Ballerina will by default have following metrics for HTTP server connector. You can enter following expression in Prometheus UI

Logging

Ballerina has a log package for logging to the console. You can import ballerina/log package and start logging. The following section will describe how to search, analyze, and visualize logs in real time using Elastic Stack.

i) Create a file named logstash.conf with the following content

t {  
ts{ 
 port => 5044 



er {  
k{  
 match => { 
 "message" => "%{TIMESTAMP_ISO8601:date}%{SPACE}%{WORD:logLevel}%{SPACE}
 \[%{GREEDYDATA:package}\]%{SPACE}\-%{SPACE}%{GREEDYDATA:logMessage}"
 }  



ut {  
sticsearch{  
 hosts => "elasticsearch:9200"  
 index => "store"  
 document_type => "store_logs"  


ii) Save the above logstash.conf inside a directory named as {SAMPLE_ROOT}\pipeline

iii) Start the logstash container, replace the {SAMPLE_ROOT} with your directory name

cker run -h logstash --name logstash --link elasticsearch:elasticsearch \
--rm -v ~/{SAMPLE_ROOT}/pipeline:/usr/share/logstash/pipeline/ \
044:5044 docker.elastic.co/logstash/logstash:6.2.2

i) Create a file named filebeat.yml with the following content

beat.prospectors:
pe: log
ths:
- /usr/share/filebeat/ballerina.log
ut.logstash:
sts: ["logstash:5044"]  

NOTE : Modify the ownership of filebeat.yml file using $chmod go-w filebeat.yml

ii) Save the above filebeat.yml inside a directory named as {SAMPLE_ROOT}\filebeat

iii) Start the logstash container, replace the {SAMPLE_ROOT} with your directory name

cker run -v {SAMPLE_ROOT}/filbeat/filebeat.yml:/usr/share/filebeat/filebeat.yml \
SAMPLE_ROOT}/guide/product_admin_portal/ballerina.log:/usr/share\
ebeat/ballerina.log --link logstash:logstash docker.elastic.co/beats/filebeat:6.2.2

This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.