15.06.2017       Выпуск 182 (12.06.2017 - 18.06.2017)       Статьи

Введение в Apache Kafka для питонистов


Экспериментальная функция:

Ниже вы видите текст статьи по ссылке. По нему можно быстро понять ссылка достойна прочтения или нет

Просим обратить внимание, что текст по ссылке и здесь может не совпадать.

In this blog post, we’re going to get back to basics and walk through how to get started using Apache Kafka with your Python applications.

We will assume some basic knowledge of Kafka. If you’re new to the project, theintroductionanddesignsections of the Apache documentation are an excellent place to start. TheConfluent blogis also packed with great information; Jay Kreps’sA Practical Guide to Building a Streaming Platformcovers many of the core concepts again, but with a focus on Kafka’s role at a company-wide scale. Also noteworthy are Ben Stopford’s microservices blog posts (The Data Dichotomy,Services on a Backbone of Events) for his unique take on the relationship between applications and data.


For our examples we’ll useConfluent Open Source. This is an Apache 2.0 licensed open source distribution of Kafka that includesconnectorsfor various data systems, aREST layerfor Kafka, and aschema registry. On OS X this is easily installed via thetar archive. Instructions forall platformsare available on the Confluent website.

The Confluent Python clientconfluent-kafka-pythonleverages the high performance C clientlibrdkafka(also developed and supported by Confluent). Before installing the Python client you’ll need to install the librdkafka shared library and corresponding C header files. If you’re on OS X, the easiest way to achieve this is to install thehomebrew librdkafkapackage:

brew install librdkafka

librdkafka is also available in package form for other platforms, or it’s also very easy toinstall from source. For more information refer to theinstallation instructions.

You’ll also need to set up your environment to be able to build Python C extensions, if it’s not already. On OS X, that means installing Xcode via the app store, then installing the command line developer tools as follows:

xcode-select --install

Now you’re ready to install the Python client. This is most conveniently done from PyPI usingpip(generally inside avirtual environment):

pip install confluent-kafka

Starting Kafka

You can get a single-broker Kafka cluster up and running quickly using default configuration files included with Confluent Open Source.

First, you’ll need to start aZookeeperinstance, which Kafka utilizes for providing various distributed system related services. Assuming you used thezip or tararchive to install Confluent Open Source, you can start Zookeeper from the installation directory as follows:

./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

Then to start a Kafka broker:

./bin/kafka-server-start ./etc/kafka/server.properties

That’s it! You now have a Kafka broker to play with.

Producing Messages

Here’s a simple program that writes a message with key ‘hello‘ and value ‘world‘ to the Kafka topicmytopic:

After importing theProducerclass from theconfluent_kafkapackage, we construct aProducerinstance and assign it to the variablep. The constructor takes a single argument: a dictionary of configuration parameters. Because confluent-kafka uses librdkafka for its underlying implementation, it shares the same set ofconfiguration properties.

The only required property isbootstrap.serverswhich is used to specify the address of one or more brokers in your Kafka cluster. In our case, there is only one, but a real-world Kafka cluster may grow to tens or hundreds of nodes. It doesn’t matter which broker(s) you specify here; this setting simply provides a starting point for the client to query the cluster – any broker can answer metadata requests about the cluster.

In the call to theproducemethod, both thekeyandvalueparameters need to be either abyte-like object(in Python 2.x this includes strings), a Unicode object, orNone. In Python 3.x, strings are Unicode and will be converted to a sequence of bytes using the UTF-8 encoding. In Python 2.x, objects of typeunicodewill be encoded using the default encoding. Often, you will want to serialize objects of a particular type before writing them to Kafka. A common pattern for doing this is to subclassProducerand override theproducemethod with one that performs the required serialization.

The produce method returns immediately without waiting for confirmation that the message has been successfully produced to Kafka (or otherwise). Theflushmethod blocks untilalloutstanding produce commands have completed, or the optional timeout (specified as a number of seconds) has been exceeded. You can test to see whether all produce commands have completed by checking the value returned by theflushmethod: if it is greater than zero, there are still produce commands that have yet to complete. Note that you should typically callflushonly at application teardown, not during normal flow of execution, as it will prevent requests from being streamlined in a performant manner.

To be notified when produce commands have completed, you can specify a callback function in the produce call. Here’s an example:

The callback method has two parameters – the first provides information about anyerrorthat occured whilst producing the message and the second information about themessageproduced. Callbacks are executed as a side-effect of calls to thepollorflushmethods. Unlike theflushmethod, thepollmethod always blocks for the specified timeout period (measured in seconds). An advantage of the poll based callback mechanism is that it allows you to keep everything single threaded and easy to reason about.

Consuming Messages

Data is read from Kafka usingconsumersthat are generally working together as part of aconsumer group. Consumers subscribe to one or more topics and are automatically assigned to a subset of each topic’s partitions. If consumers are added or removed (perhaps due to failure) from the group, the group will automaticallyrebalanceso that one and only one consumer is ever reading from each partition in each topic of the subscription set. For more detailed information on how consumer groups work, Jason Gustafson’sblog postcovering the Java consumer is an excellent reference.

Below is a simple example that creates a Kafka consumer that joins consumer groupmygroupand reads messages from its assigned partitions until Ctrl-C is pressed:

A number of configuration parameters are worth noting:

  1. bootstrap.servers: As with the producer, this specifies the initial point of contact with the Kafka cluster.
  2. group.id: The name of the consumer group the consumer is part of. If the consumer group does not yet exist when the consumer is constructed (there are no existing consumers that are part of the group), it will be created automatically. Similarly, if all consumers in a group leave the group, the group will be automatically destroyed.
  3. client.id: Although optional, each consumer in a group should be assigned a unique id – this allows you to differentiate between clients in Kafka error logs and monitoring aggregates.
  4. default.topic.config: A number of topic related configuration properties are grouped together under this top level property. One commonly used topic-level property isauto.offset.resetwhich specifies which offset to start reading from if there have been no offsets committed to a topic/partition yet. This defaults tolatest, however you will often want this to besmallestso that old messages are not ignored when you first start reading from a topic.
  5. enable.auto.commit: By default, as the consumer reads messages from Kafka, it will periodically commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka. Often you would like more control over exactly when offsets are committed. In this case you can setenable.auto.committoFalseand call thecommitmethod on the consumer. For simplicity, we have left auto offset commit enabled in this example.

After constructing the consumer, thesubscribemethod is called to inform Kafka that we wish to join the consumer groupmygroup(specified in the configuration) and read messages from a single topicmytopic. It’s possible to subscribe to more than one topic by specifying more than one topic name in the list provided to thesubscribemethod. Note that you can’t do this by calling thesubscribemethod a second time – this would result in the consumer first unsubscribing from the original subscription set and then subscribing to only the topic(s) in the newly specified one.

Having subscribed to a set of topics, we enter the main poll loop. This is wrapped in atry/exceptblock that allows controlled shutdown of the consumer via theclosemethod when the user interrupts program execution. If theclosemethod is omitted, the consumer group would not rebalance immediately – removal of the consumer from the group would occur as per the consumer groupfailure detection protocolafter thesession.timeout.mshas elapsed.

On the consumer, thepollmethod blocks until aMessageobject is ready for consumption, or until the timeout period (specified in seconds) has elapsed, in which case the return value isNone. When aMessageobject is available, there are essentially three cases to consider, differentiated by the value returned byMessage.error():

  1. None: TheMessageobject represents a consumed message. The message key, value and other relevant information can be obtained via thekey(),value(),timestamp(),topic(),partition()andoffset()methods of theMessageobject.
  2. KafkaError._PartitionEOF: TheMessageobject does not encapsulate any consumed message – it simply signals that the end of a partition has been reached. You can use thepartition()andtopic() methods to determine the pertinent partition.
  3. Any other value: Anerroroccurred during consumption. Depending on the result ofMessage.error(), otherMessageobject methods may return valid values. For most error types, use oftopic()andpartition()is valid.


Thatconcludes our introduction on how to integrate Apache Kafka with your Python applications. In order to keep this post to a reasonable length, we’ve omitted some of the more advanced features provided by the library. For example, you can hook into the partition assignment process that happens after you callsubscribeon the consumer but before any messages are read. This allows you to do things like pre-load state associated with the partition assignment for joining with the consumed messages. The client also ships withAvroProducerandAvroConsumerclasses that allow you to serialize data inAvroformat and manage the evolution of the associated schemas usingschema registry. For further information, refer to theAPI documentation, theexamplesin the github repo, oruser’s guideon our website.

For expert advice on deploying or operating Kafka, we offer a range oftrainingandtechnical consultingservices covering all levels of expertise. For large-scale deployments of Kafka we offerConfluent Enterprisewhich provides a number of powerful features in addition to those provided by Confluent Open Source as well as enterprise grade support. Finally, a hosted and fully managed version Apache Kafka is just around the corner with the up-comingConfluent Cloud.

Лучшая Python рассылка

Нас поддерживает

Python Software Foundation

Разместим вашу рекламу

Пиши: mail@pythondigest.ru

Нашли опечатку?

Выделите фрагмент и отправьте нажатием Ctrl+Enter.

Система Orphus