In this blog post, we’re going to get back to basics and walk through how to get started using Apache Kafka with your Python applications.
We will assume some basic knowledge of Kafka. If you’re new to the project, theintroductionanddesignsections of the Apache documentation are an excellent place to start. TheConfluent blogis also packed with great information; Jay Kreps’sA Practical Guide to Building a Streaming Platformcovers many of the core concepts again, but with a focus on Kafka’s role at a company-wide scale. Also noteworthy are Ben Stopford’s microservices blog posts (The Data Dichotomy,Services on a Backbone of Events) for his unique take on the relationship between applications and data.
For our examples we’ll useConfluent Open Source. This is an Apache 2.0 licensed open source distribution of Kafka that includesconnectorsfor various data systems, aREST layerfor Kafka, and aschema registry. On OS X this is easily installed via thetar archive. Instructions forall platformsare available on the Confluent website.
The Confluent Python clientconfluent-kafka-pythonleverages the high performance C clientlibrdkafka(also developed and supported by Confluent). Before installing the Python client you’ll need to install the librdkafka shared library and corresponding C header files. If you’re on OS X, the easiest way to achieve this is to install thehomebrew librdkafkapackage:
brew install librdkafka
You’ll also need to set up your environment to be able to build Python C extensions, if it’s not already. On OS X, that means installing Xcode via the app store, then installing the command line developer tools as follows:
Now you’re ready to install the Python client. This is most conveniently done from PyPI usingpip(generally inside avirtual environment):
pip install confluent-kafka
You can get a single-broker Kafka cluster up and running quickly using default configuration files included with Confluent Open Source.
First, you’ll need to start aZookeeperinstance, which Kafka utilizes for providing various distributed system related services. Assuming you used thezip or tararchive to install Confluent Open Source, you can start Zookeeper from the installation directory as follows:
Then to start a Kafka broker:
That’s it! You now have a Kafka broker to play with.
Here’s a simple program that writes a message with key ‘
hello‘ and value ‘
world‘ to the Kafka topic
After importing the
Producerclass from the
confluent_kafkapackage, we construct a
Producerinstance and assign it to the variable
p. The constructor takes a single argument: a dictionary of configuration parameters. Because confluent-kafka uses librdkafka for its underlying implementation, it shares the same set ofconfiguration properties.
The only required property is
bootstrap.serverswhich is used to specify the address of one or more brokers in your Kafka cluster. In our case, there is only one, but a real-world Kafka cluster may grow to tens or hundreds of nodes. It doesn’t matter which broker(s) you specify here; this setting simply provides a starting point for the client to query the cluster – any broker can answer metadata requests about the cluster.
In the call to the
producemethod, both the
valueparameters need to be either abyte-like object(in Python 2.x this includes strings), a Unicode object, or
None. In Python 3.x, strings are Unicode and will be converted to a sequence of bytes using the UTF-8 encoding. In Python 2.x, objects of type
unicodewill be encoded using the default encoding. Often, you will want to serialize objects of a particular type before writing them to Kafka. A common pattern for doing this is to subclass
Producerand override the
producemethod with one that performs the required serialization.
The produce method returns immediately without waiting for confirmation that the message has been successfully produced to Kafka (or otherwise). The
flushmethod blocks untilalloutstanding produce commands have completed, or the optional timeout (specified as a number of seconds) has been exceeded. You can test to see whether all produce commands have completed by checking the value returned by the
flushmethod: if it is greater than zero, there are still produce commands that have yet to complete. Note that you should typically call
flushonly at application teardown, not during normal flow of execution, as it will prevent requests from being streamlined in a performant manner.
To be notified when produce commands have completed, you can specify a callback function in the produce call. Here’s an example:
The callback method has two parameters – the first provides information about anyerrorthat occured whilst producing the message and the second information about themessageproduced. Callbacks are executed as a side-effect of calls to the
flushmethods. Unlike the
pollmethod always blocks for the specified timeout period (measured in seconds). An advantage of the poll based callback mechanism is that it allows you to keep everything single threaded and easy to reason about.
Data is read from Kafka usingconsumersthat are generally working together as part of aconsumer group. Consumers subscribe to one or more topics and are automatically assigned to a subset of each topic’s partitions. If consumers are added or removed (perhaps due to failure) from the group, the group will automaticallyrebalanceso that one and only one consumer is ever reading from each partition in each topic of the subscription set. For more detailed information on how consumer groups work, Jason Gustafson’sblog postcovering the Java consumer is an excellent reference.
Below is a simple example that creates a Kafka consumer that joins consumer group
mygroupand reads messages from its assigned partitions until Ctrl-C is pressed:
A number of configuration parameters are worth noting:
bootstrap.servers: As with the producer, this specifies the initial point of contact with the Kafka cluster.
group.id: The name of the consumer group the consumer is part of. If the consumer group does not yet exist when the consumer is constructed (there are no existing consumers that are part of the group), it will be created automatically. Similarly, if all consumers in a group leave the group, the group will be automatically destroyed.
client.id: Although optional, each consumer in a group should be assigned a unique id – this allows you to differentiate between clients in Kafka error logs and monitoring aggregates.
default.topic.config: A number of topic related configuration properties are grouped together under this top level property. One commonly used topic-level property is
auto.offset.resetwhich specifies which offset to start reading from if there have been no offsets committed to a topic/partition yet. This defaults to
latest, however you will often want this to besmallestso that old messages are not ignored when you first start reading from a topic.
enable.auto.commit: By default, as the consumer reads messages from Kafka, it will periodically commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka. Often you would like more control over exactly when offsets are committed. In this case you can setenable.auto.committo
Falseand call the
commitmethod on the consumer. For simplicity, we have left auto offset commit enabled in this example.
After constructing the consumer, the
subscribemethod is called to inform Kafka that we wish to join the consumer group
mygroup(specified in the configuration) and read messages from a single topic
mytopic. It’s possible to subscribe to more than one topic by specifying more than one topic name in the list provided to the
subscribemethod. Note that you can’t do this by calling the
subscribemethod a second time – this would result in the consumer first unsubscribing from the original subscription set and then subscribing to only the topic(s) in the newly specified one.
Having subscribed to a set of topics, we enter the main poll loop. This is wrapped in a
try/exceptblock that allows controlled shutdown of the consumer via the
closemethod when the user interrupts program execution. If the
closemethod is omitted, the consumer group would not rebalance immediately – removal of the consumer from the group would occur as per the consumer groupfailure detection protocolafter the
On the consumer, the
pollmethod blocks until aMessageobject is ready for consumption, or until the timeout period (specified in seconds) has elapsed, in which case the return value is
None. When a
Messageobject is available, there are essentially three cases to consider, differentiated by the value returned by
Messageobject represents a consumed message. The message key, value and other relevant information can be obtained via the
offset()methods of the
Messageobject does not encapsulate any consumed message – it simply signals that the end of a partition has been reached. You can use the
topic()methods to determine the pertinent partition.
- Any other value: Anerroroccurred during consumption. Depending on the result of
Messageobject methods may return valid values. For most error types, use of
Thatconcludes our introduction on how to integrate Apache Kafka with your Python applications. In order to keep this post to a reasonable length, we’ve omitted some of the more advanced features provided by the library. For example, you can hook into the partition assignment process that happens after you call
subscribeon the consumer but before any messages are read. This allows you to do things like pre-load state associated with the partition assignment for joining with the consumed messages. The client also ships with
AvroConsumerclasses that allow you to serialize data inAvroformat and manage the evolution of the associated schemas usingschema registry. For further information, refer to theAPI documentation, theexamplesin the github repo, oruser’s guideon our website.
For expert advice on deploying or operating Kafka, we offer a range oftrainingandtechnical consultingservices covering all levels of expertise. For large-scale deployments of Kafka we offerConfluent Enterprisewhich provides a number of powerful features in addition to those provided by Confluent Open Source as well as enterprise grade support. Finally, a hosted and fully managed version Apache Kafka is just around the corner with the up-comingConfluent Cloud.