Name: kafka-python
Owner: Crowdstrike
Description: Kafka protocol support in Python
Created: 2014-07-18 17:42:44.0
Updated: 2017-04-26 14:31:08.0
Pushed: 2015-10-17 22:05:05.0
Homepage: null
Size: 1236
Language: Python
GitHub Committers
User | Most Recent Commit | # Commits |
---|
Other Committers
User | Most Recent Commit | # Commits |
---|
This module provides low-level protocol support for Apache Kafka as well as high-level consumer and producer classes. Request batching is supported by the protocol as well as broker-aware request routing. Gzip and Snappy compression is also supported for message sets.
http://kafka.apache.org/
On Freenode IRC at #kafka-python, as well as #apache-kafka
For general discussion of kafka-client design and implementation (not python specific), see https://groups.google.com/forum/m/#!forum/kafka-clients
Copyright 2014, David Arthur under Apache License, v2.0. See LICENSE
The current stable version of this package is 0.9.2 and is compatible with
Kafka broker versions
Python versions
kafka import KafkaClient, SimpleProducer, SimpleConsumer
send messages synchronously
a = KafkaClient("localhost:9092")
ucer = SimpleProducer(kafka)
te that the application is responsible for encoding messages to type str
ucer.send_messages("my-topic", "some message")
ucer.send_messages("my-topic", "this method", "is variadic")
nd unicode message
ucer.send_messages("my-topic", u'?????'.encode('utf-8'))
send messages asynchronously
RNING: current implementation does not guarantee message delivery on failure!
ssages can get dropped! Use at your own risk! Or help us improve with a PR!
ucer = SimpleProducer(kafka, async=True)
ucer.send_messages("my-topic", "async message")
wait for acknowledgements
K_AFTER_LOCAL_WRITE : server will wait till the data is written to
a local log before sending response
K_AFTER_CLUSTER_COMMIT : server will block until the message is committed
by all in sync replicas before sending a response
ucer = SimpleProducer(kafka, async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=2000)
onse = producer.send_messages("my-topic", "another message")
esponse:
print(response[0].error)
print(response[0].offset)
send messages in batch. You can use any of the available
oducers for doing this. The following producer will collect
ssages in batch and send them to Kafka after 20 messages are
llected or every 60 seconds
tes:
If the producer dies before the messages are sent, there will be losses
Call producer.stop() to send the messages and cleanup
ucer = SimpleProducer(kafka, batch_send=True,
batch_send_every_n=20,
batch_send_every_t=60)
consume messages
umer = SimpleConsumer(kafka, "my-group", "my-topic")
message in consumer:
# message is raw byte string -- decode if necessary!
# e.g., for unicode: `message.decode('utf-8')`
print(message)
a.close()
kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner
a = KafkaClient("localhost:9092")
shedPartitioner is default
ucer = KeyedProducer(kafka)
ucer.send("my-topic", "key1", "some message")
ucer.send("my-topic", "key2", "this methode")
ucer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
kafka import KafkaClient, MultiProcessConsumer
a = KafkaClient("localhost:9092")
is will split the number of partitions among two processes
umer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
is will spawn processes such that each handles 2 partitions max
umer = MultiProcessConsumer(kafka, "my-group", "my-topic",
partitions_per_proc=2)
message in consumer:
print(message)
message in consumer.get_messages(count=5, block=True, timeout=4):
print(message)
kafka import KafkaClient, create_message
kafka.protocol import KafkaProtocol
kafka.common import ProduceRequest
a = KafkaClient("localhost:9092")
= ProduceRequest(topic="my-topic", partition=1,
messages=[create_message("some message")])
s = kafka.send_produce_request(payloads=[req], fail_on_error=True)
a.close()
s[0].topic # "my-topic"
s[0].partition # 1
s[0].error # 0 (hopefully)
s[0].offset # offset of the first message sent in this request
Install with your favorite package manager
Pip:
install kafka-python
Releases are also listed at https://github.com/mumrah/kafka-python/releases
clone https://github.com/mumrah/kafka-python
install ./kafka-python
Setuptools:
clone https://github.com/mumrah/kafka-python
_install ./kafka-python
Using setup.py
directly:
clone https://github.com/mumrah/kafka-python
afka-python
on setup.py install
Download and build Snappy from http://code.google.com/p/snappy/downloads/list
Ubuntu:
get install libsnappy-dev
OSX:
install snappy
From Source:
http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz
xzvf snappy-1.0.5.tar.gz
nappy-1.0.5
nfigure
make install
Install the python-snappy
module
install python-snappy
n protocol tests only
-- -v test.test_protocol
hell
st with pypy only
-e pypy
hell
n only 1 test, and use python 2.7
-e py27 -- -v --with-id --collect-only
ck a test number from the list like #102
-e py27 -- -v --with-id 102
The integration tests will actually start up real local Zookeeper instance and Kafka brokers, and send messages in using the client.
First, get the kafka binaries for integration testing:
ild_integration.sh
By default, the build_integration.sh script will download binary distributions for all supported kafka versions. To test against the latest source build, set KAFKA_VERSION=trunk and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended)
A_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh
Then run the tests against supported Kafka versions, simply set the KAFKA_VERSION
env variable to the server build you want to use for testing:
A_VERSION=0.8.0 tox
A_VERSION=0.8.1 tox
A_VERSION=0.8.1.1 tox
A_VERSION=trunk tox