CrowdStrike/kafka-python

Name: kafka-python

Owner: Crowdstrike

Description: Kafka protocol support in Python

Created: 2014-07-18 17:42:44.0

Updated: 2017-04-26 14:31:08.0

Pushed: 2015-10-17 22:05:05.0

Homepage: null

Size: 1236

Language: Python

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

Kafka Python client

Build Status

This module provides low-level protocol support for Apache Kafka as well as high-level consumer and producer classes. Request batching is supported by the protocol as well as broker-aware request routing. Gzip and Snappy compression is also supported for message sets.

http://kafka.apache.org/

On Freenode IRC at #kafka-python, as well as #apache-kafka

For general discussion of kafka-client design and implementation (not python specific), see https://groups.google.com/forum/m/#!forum/kafka-clients

License

Copyright 2014, David Arthur under Apache License, v2.0. See LICENSE

Status

The current stable version of this package is 0.9.2 and is compatible with

Kafka broker versions

Python versions

Usage

High level
 kafka import KafkaClient, SimpleProducer, SimpleConsumer

 send messages synchronously
a = KafkaClient("localhost:9092")
ucer = SimpleProducer(kafka)

te that the application is responsible for encoding messages to type str
ucer.send_messages("my-topic", "some message")
ucer.send_messages("my-topic", "this method", "is variadic")

nd unicode message
ucer.send_messages("my-topic", u'?????'.encode('utf-8'))

 send messages asynchronously
RNING: current implementation does not guarantee message delivery on failure!
ssages can get dropped! Use at your own risk! Or help us improve with a PR!
ucer = SimpleProducer(kafka, async=True)
ucer.send_messages("my-topic", "async message")

 wait for acknowledgements
K_AFTER_LOCAL_WRITE : server will wait till the data is written to
                      a local log before sending response
K_AFTER_CLUSTER_COMMIT : server will block until the message is committed
                         by all in sync replicas before sending a response
ucer = SimpleProducer(kafka, async=False,
                      req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
                      ack_timeout=2000)

onse = producer.send_messages("my-topic", "another message")

esponse:
print(response[0].error)
print(response[0].offset)

 send messages in batch. You can use any of the available
oducers for doing this. The following producer will collect
ssages in batch and send them to Kafka after 20 messages are
llected or every 60 seconds
tes:
If the producer dies before the messages are sent, there will be losses
Call producer.stop() to send the messages and cleanup
ucer = SimpleProducer(kafka, batch_send=True,
                      batch_send_every_n=20,
                      batch_send_every_t=60)

 consume messages
umer = SimpleConsumer(kafka, "my-group", "my-topic")
message in consumer:
# message is raw byte string -- decode if necessary!
# e.g., for unicode: `message.decode('utf-8')`
print(message)

a.close()
Keyed messages
 kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner

a = KafkaClient("localhost:9092")

shedPartitioner is default
ucer = KeyedProducer(kafka)
ucer.send("my-topic", "key1", "some message")
ucer.send("my-topic", "key2", "this methode")

ucer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
Multiprocess consumer
 kafka import KafkaClient, MultiProcessConsumer

a = KafkaClient("localhost:9092")

is will split the number of partitions among two processes
umer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)

is will spawn processes such that each handles 2 partitions max
umer = MultiProcessConsumer(kafka, "my-group", "my-topic",
                            partitions_per_proc=2)

message in consumer:
print(message)

message in consumer.get_messages(count=5, block=True, timeout=4):
print(message)
Low level
 kafka import KafkaClient, create_message
 kafka.protocol import KafkaProtocol
 kafka.common import ProduceRequest

a = KafkaClient("localhost:9092")

= ProduceRequest(topic="my-topic", partition=1,
messages=[create_message("some message")])
s = kafka.send_produce_request(payloads=[req], fail_on_error=True)
a.close()

s[0].topic      # "my-topic"
s[0].partition  # 1
s[0].error      # 0 (hopefully)
s[0].offset     # offset of the first message sent in this request

Install

Install with your favorite package manager

Latest Release

Pip:

install kafka-python

Releases are also listed at https://github.com/mumrah/kafka-python/releases

Bleeding-Edge
clone https://github.com/mumrah/kafka-python
install ./kafka-python

Setuptools:

clone https://github.com/mumrah/kafka-python
_install ./kafka-python

Using setup.py directly:

clone https://github.com/mumrah/kafka-python
afka-python
on setup.py install
Optional Snappy install
Install Development Libraries

Download and build Snappy from http://code.google.com/p/snappy/downloads/list

Ubuntu:

get install libsnappy-dev

OSX:

 install snappy

From Source:

 http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz
xzvf snappy-1.0.5.tar.gz
nappy-1.0.5
nfigure

 make install
Install Python Module

Install the python-snappy module

install python-snappy

Tests

Run the unit tests

Run a subset of unit tests
n protocol tests only
-- -v test.test_protocol
hell
st with pypy only
-e pypy
hell
n only 1 test, and use python 2.7
-e py27 -- -v --with-id --collect-only
ck a test number from the list like #102
-e py27 -- -v --with-id 102
Run the integration tests

The integration tests will actually start up real local Zookeeper instance and Kafka brokers, and send messages in using the client.

First, get the kafka binaries for integration testing:

ild_integration.sh

By default, the build_integration.sh script will download binary distributions for all supported kafka versions. To test against the latest source build, set KAFKA_VERSION=trunk and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended)

A_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh

Then run the tests against supported Kafka versions, simply set the KAFKA_VERSION env variable to the server build you want to use for testing:

A_VERSION=0.8.0 tox
A_VERSION=0.8.1 tox
A_VERSION=0.8.1.1 tox
A_VERSION=trunk tox

This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.