Name: afkak
Owner: Sauce Labs
Description: Kafka client written in Twisted Python
Forked from: ciena/afkak
Created: 2016-11-07 10:21:46.0
Updated: 2016-11-07 10:21:48.0
Pushed: 2016-11-21 19:37:56.0
Homepage: null
Size: 530
Language: Python
GitHub Committers
User | Most Recent Commit | # Commits |
---|
Other Committers
User | Most Recent Commit | # Commits |
---|
This module provides low-level protocol support for Apache Kafka as well as high-level consumer and producer classes. Request batching is supported by the protocol as well as broker-aware request routing. Gzip and Snappy compression is also supported for message sets.
Copyright 2013, 2014, 2015 David Arthur under Apache License, v2.0. See LICENSE
Copyright 2014, 2015 Cyan, Inc. under Apache License, v2.0. See LICENSE
Copyright 2015 Ciena Corporation under Apache License, v2.0. See LICENSE
This project began as a port of the kafka-python library to Twisted.
* See Errata
[ Note: This code is not meant to be runable. See producer_example
and consumer_example
for runable example code. ]
afkak.client import KafkaClient
afkak.consumer import Consumer
afkak.producer import Producer
afkak.common import (OFFSET_EARLIEST, PRODUCER_ACK_ALL_REPLICAS,
PRODUCER_ACK_LOCAL_WRITE)
ent = KafkaClient("localhost:9092")
send messages
ucer = Producer(kClient)
producer.send_messages("my-topic", msgs=["some message"])
producer.send_messages("my-topic", msgs=["takes a list", "of messages"])
get confirmations/errors on the sends, add callbacks to the returned deferreds
ddCallbacks(handleResponses, handleErrors)
wait for acknowledgements
ODUCER_ACK_LOCAL_WRITE : server will wait till the data is written to
a local log before sending response
the default ]
ODUCER_ACK_ALL_REPLICAS : server will block until the message is committed
by all in sync replicas before sending a response
ucer = Producer(kClient,
req_acks=Producer.PRODUCER_ACK_LOCAL_WRITE,
ack_timeout=2000)
onseD = producer.send_messages("my-topic", msgs=["message"])
ing twisted's @inlineCallbacks:
onses = yield responseD
esponse:
print(response[0].error)
print(response[0].offset)
send messages in batch: You can use a producer with any of the
rtitioners for doing this. The following producer will collect
ssages in batch and send them to Kafka after 20 messages are
llected or every 60 seconds (whichever comes first). You can
so batch by number of bytes.
tes:
If the producer dies before the messages are sent, the caller would
not have had the callbacks called on the send_messages() returned
deferreds, and so can retry.
Calling producer.stop() before the messages are sent will
rback() the deferred(s) returned from the send_messages call(s)
ucer = Producer(kClient, batch_send=True,
batch_send_every_n=20,
batch_send_every_t=60)
onseD1 = producer.send_messages("my-topic", msgs=["message"])
onseD2 = producer.send_messages("my-topic", msgs=["message 2"])
consume messages
fine a function which takes a list of messages to process and
ssibly returns a deferred which fires when the processing is
mplete.
processor_func(consumer, messages):
# Store_Messages_In_Database may return a deferred
result = store_messages_in_database(messages)
# record last processed message
consumer.commit()
return result
partition = 3 # Consume only from partition 3.
umer = Consumer(kClient, "my-topic", the_partition, processor_func)
consumer.start(OFFSET_EARLIEST) # Start reading at earliest message
e deferred returned by consumer.start() will fire when an error
curs that can't handled by the consumer, or when consumer.stop()
called
d d
umer.stop()
ent.close()
afkak.client import KafkaClient
afkak.producer import Producer
afkak.partitioner import HashedPartitioner, RoundRobinPartitioner
a = KafkaClient("localhost:9092")
e the HashedPartitioner so that the producer will use the optional key
gument on send_messages()
ucer = Producer(kafka, partitioner_class=HashedPartitioner)
ucer.send_messages("my-topic", "key1", ["some message"])
ucer.send_messages("my-topic", "key2", ["this method"])
afkak.client import KafkaClient
a = KafkaClient("localhost:9092")
= ProduceRequest(topic="my-topic", partition=1,
messages=[KafkaProdocol.encode_message("some message")])
s = afkak.send_produce_request(payloads=[req], fail_on_error=True)
a.close()
s[0].topic # "my-topic"
s[0].partition # 1
s[0].error # 0 (hopefully)
s[0].offset # offset of the first message sent in this request
Afkak releases are available on PyPI.
Because the Afkak dependencies Twisted and python-snappy have binary extension modules you will need to install the Python development headers for the interpreter you wish to use. You'll need all of these to run Afkak's tests:
Debian/Ubuntu: | sudo apt-get install build-essential python-dev pypy-dev libsnappy-dev
|
OS X | brew install python pypy snappy
pip install virtualenv |
Then Afkak can be installed with pip as usual:
toxu
The integration tests will actually start up real local ZooKeeper instance and Kafka brokers, and send messages in using the client.
The makefile knows how to download several versions of Kafka. This will run just the integration tests against Kafka 0.8.1.1
A_VER=0.8.1.1 make toxi
toxa
toxik
Under Kafka 0.8.1 sometimes the test_consumer_integration:TestConsumerIntegration.test_consumer test will fail. This is due to an issue with Kafka where it will report the topic metadata including a leader, but will fail with UnknownTopicOrPartition when an attempt to write messages to the topic at the leader.
Due to the way the Kafka API is versioned, there is no way for the client to know the API version of which the server is capable. Afkak uses the version=1 API for the Offset Commit Request API call. Due to this, Afkak is not compatible with versions older than 0.8.2.1 for offset storage.