Name: kafka
Owner: Code Climate
Description: Centralized Kafka client gem (temporary name)
Created: 2015-06-25 22:57:40.0
Updated: 2016-12-02 00:13:54.0
Pushed: 2017-01-05 23:31:18.0
Homepage: null
Size: 109
Language: Ruby
GitHub Committers
User | Most Recent Commit | # Commits |
---|
Other Committers
User | Most Recent Commit | # Commits |
---|
A generic Kafka client tuned for our own usage.
Features:
BSON
-serializedSIGTERM
ire "cc/kafka"
ucer = CC::Kafka::Producer.new("kafka://host:1234/topic", "client-id")
ucer.send_message(foo: :bar, baz: :bat)
ucer.close
umer = CC::Kafka::Consumer.new("client-id", ["kafka://host:1234", "..."], "topic", 0)
umer.on_message do |message|
Given the producer above, message will be
{
"foo" => :bar,
"baz" => :bat,
CC::Kafka::MESSAGE_OFFSET_KEY => "topic-0-1",
}
umer.start
Note: the value for the MESSAGE_OFFSET_KEY
identifies the message's offset
within the given topic and partition as <topic>-<partition>-<offset>
. It can
be used by consumers to tie created data to the message that lead to it and
prevent duplicate processing.
CC::Kafka.offset_model
Must respond to find_for_create!(attributes)
and return an object that
responds to set(attributes)
.
The attributes
used are topic
, partition
, and current
. And the object
returned from find_or_create!
must expose methods for each of these.
A Minidoc
-based module is included that can be included in client code for an offset model implementation that will work for many clients.
s KafkaOffset < Minidoc
ude CC::Kafka::OffsetStorage::Minidoc
Kafka.offset_model = KafkaOffset
Note: This is only necessary if using Consumer
.
Kafka.logger
This is optional and defaults to Logger.new(STDOUT)
. The configured object
must have the same interface as the standard Ruby logger.
Example:
a.logger = Rails.logger
Kafka.statsd
This is optional and defaults to a null object. The configured object should
represent a statsd client and respond to the usual methods, increment
,
time
, etc.
Kafka.ssl_ca_file
Path to a custom SSL Certificate Authority file.
Will result in:
.ca_file = Kafka.ssl_ca_file
Kafka.ssl_pem_file
Path to a custom SSL Certificate (and key) in concatenated, PEM format.
Will result in:
= File.read(Kafka.ssl_pem_file)
.cert = OpenSSL::X509::Certificate.new(pem)
.key = OpenSSL::PKey::RSA.new(pem)
See LICENSE