confluentinc/confluent-kafka-go

Name: confluent-kafka-go

Owner: Confluent Inc.

Description: Confluent's Apache Kafka Golang client

Created: 2016-07-12 22:23:34.0

Updated: 2018-01-17 08:30:35.0

Pushed: 2017-12-15 11:42:08.0

Homepage:

Size: 235

Language: Go

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

Confluent's Golang Client for Apache KafkaTM

confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform.

Features:

The Golang bindings provides a high-level Producer and Consumer with support for the balanced consumer groups of Apache Kafka 0.9 and above.

See the API documentation for more information.

License: Apache License v2.0

Examples

High-level balanced consumer

rt (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"


 main() {

c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost",
    "group.id":          "myGroup",
    "auto.offset.reset": "earliest",
})

if err != nil {
    panic(err)
}

c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)

for true {
    msg, err := c.ReadMessage(-1)
    if err == nil {
        fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    } else {
        fmt.Printf("Consumer error: %v (%v)\n", err, msg)
        break
    }
}

c.Close()

Producer

rt (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"


 main() {

p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
if err != nil {
    panic(err)
}

// Delivery report handler for produced messages
go func() {
    for e := range p.Events() {
        switch ev := e.(type) {
        case *kafka.Message:
            if ev.TopicPartition.Error != nil {
                fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
            } else {
                fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
            }
        }
    }
}()

// Produce messages to topic (asynchronously)
topic := "myTopic"
for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
    p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          []byte(word),
    }, nil)
}

// Wait for message deliveries
p.Flush(15 * 1000)

More elaborate examples are available in the examples directory.

Getting Started

Installing librdkafka

This client for Go depends on librdkafka v0.11.0 or later, so you either need to install librdkafka through your OS/distributions package manager, or download and build it from source.

Build from source:

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure --prefix /usr
make
sudo make install
Install the client
et -u github.com/confluentinc/confluent-kafka-go/kafka

See the examples for usage details.

Note that the development of librdkafka and the Go client are kept in synch. So if you use HEAD on master of the Go client, then you need to use HEAD on master of librdkafka. See this issue for more details.

API Strands

There are two main API strands: channel based or function based.

Channel Based Consumer

Messages, errors and events are posted on the consumer.Events channel for the application to read.

Pros:

Cons:

See examples/consumer_channel_example

Function Based Consumer

Messages, errors and events are polled through the consumer.Poll() function.

Pros:

Cons:

See examples/consumer_example

Channel Based Producer

Application writes messages to the producer.ProducerChannel. Delivery reports are emitted on the producer.Events or specified private channel.

Pros:

Cons:

See examples/producer_channel_example

Function Based Producer

Application calls producer.Produce() to produce messages. Delivery reports are emitted on the producer.Events or specified private channel.

Pros:

Cons:

See examples/producer_example

Static Builds

NOTE: Requires pkg-config

To link your application statically with librdkafka append -tags static to your application's go build command, e.g.:

$ cd kafkatest/go_verifiable_consumer
$ go build -tags static

This will create a binary with librdkafka statically linked, do note however that any librdkafka dependencies (such as ssl, sasl2, lz4, etc, depending on librdkafka build configuration) will be linked dynamically and thus required on the target system.

To create a completely static binary append -tags static_all instead. This requires all dependencies to be available as static libraries (e.g., libsasl2.a). Static libraries are typically not installed by default but are available in the corresponding ..-dev or ..-devel packages (e.g., libsasl2-dev).

After a succesful static build verify the dependencies by running ldd ./your_program (or otool -L ./your_program on OSX), librdkafka should not be listed.

Tests

See kafka/README

Contributing

Contributions to the code, examples, documentation, et.al, are very much appreciated.

Make your changes, run gofmt, tests, etc, push your branch, create a PR, and sign the CLA.


This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.