Name: confluent-kafka-go
Owner: Confluent Inc.
Description: Confluent's Apache Kafka Golang client
Created: 2016-07-12 22:23:34.0
Updated: 2018-01-17 08:30:35.0
Pushed: 2017-12-15 11:42:08.0
Size: 235
Language: Go
GitHub Committers
User | Most Recent Commit | # Commits |
---|
Other Committers
User | Most Recent Commit | # Commits |
---|
confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform.
Features:
High performance - confluent-kafka-go is a lightweight wrapper around librdkafka, a finely tuned C client.
Reliability - There are a lot of details to get right when writing an Apache Kafka client. We get them right in one place (librdkafka) and leverage this work across all of our clients (also confluent-kafka-python and confluent-kafka-dotnet).
Supported - Commercial support is offered by Confluent.
Future proof - Confluent, founded by the creators of Kafka, is building a streaming platform with Apache Kafka at its core. It's high priority for us that client features keep pace with core Apache Kafka and components of the Confluent Platform.
The Golang bindings provides a high-level Producer and Consumer with support for the balanced consumer groups of Apache Kafka 0.9 and above.
See the API documentation for more information.
License: Apache License v2.0
High-level balanced consumer
rt (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
for true {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
break
}
}
c.Close()
Producer
rt (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
if err != nil {
panic(err)
}
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
topic := "myTopic"
for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// Wait for message deliveries
p.Flush(15 * 1000)
More elaborate examples are available in the examples directory.
This client for Go depends on librdkafka v0.11.0 or later, so you either need to install librdkafka through your OS/distributions package manager, or download and build it from source.
librdkafka-dev
from the standard
repositories or using Confluent's Deb repository.librdkafka-devel
using Confluent's YUM repository.librdkafka
from Homebrew. You may also need to brew install pkg-config if you don't already have it.librdkafka.redist
NuGet package.Build from source:
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure --prefix /usr
make
sudo make install
et -u github.com/confluentinc/confluent-kafka-go/kafka
See the examples for usage details.
Note that the development of librdkafka and the Go client are kept in synch. So if you use HEAD on master of the Go client, then you need to use HEAD on master of librdkafka. See this issue for more details.
There are two main API strands: channel based or function based.
Messages, errors and events are posted on the consumer.Events channel for the application to read.
Pros:
Cons:
go.events.channel.size
).See examples/consumer_channel_example
Messages, errors and events are polled through the consumer.Poll() function.
Pros:
Cons:
Application writes messages to the producer.ProducerChannel. Delivery reports are emitted on the producer.Events or specified private channel.
Pros:
Cons:
See examples/producer_channel_example
Application calls producer.Produce() to produce messages. Delivery reports are emitted on the producer.Events or specified private channel.
Pros:
Cons:
NOTE: Requires pkg-config
To link your application statically with librdkafka append -tags static
to
your application's go build
command, e.g.:
$ cd kafkatest/go_verifiable_consumer
$ go build -tags static
This will create a binary with librdkafka statically linked, do note however that any librdkafka dependencies (such as ssl, sasl2, lz4, etc, depending on librdkafka build configuration) will be linked dynamically and thus required on the target system.
To create a completely static binary append -tags static_all
instead.
This requires all dependencies to be available as static libraries
(e.g., libsasl2.a). Static libraries are typically not installed
by default but are available in the corresponding ..-dev
or ..-devel
packages (e.g., libsasl2-dev).
After a succesful static build verify the dependencies by running
ldd ./your_program
(or otool -L ./your_program
on OSX), librdkafka should not be listed.
See kafka/README
Contributions to the code, examples, documentation, et.al, are very much appreciated.
Make your changes, run gofmt, tests, etc, push your branch, create a PR, and sign the CLA.