Skip to content

Kafka Basics

1. Introduction to Kafka

1.1 Kafka Overview

Kafka is a distributed streaming platform.

(1) A streaming platform has three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way.
  • Process streams of records as they occur.

(2) Kafka is commonly used for two broad classes of applications:

  • Building real-time streaming data pipelines that reliably get data between systems or applications.
  • Building real-time streaming applications that transform or react to streams of data.

To understand how Kafka does these things, let’s dive into the capabilities of Kafka.

(3) First, let’s cover a few concepts:

  • Kafka runs as a cluster on one or more servers called brokers, which can span multiple data centers.
  • The cluster stores streams of records in categories called topics.
  • Each record consists of a key, a value, and a timestamp.

(4) Kafka has four core APIs:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more Kafka topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running producers or consumers that connect Kafka topics to existing applications or data systems for reuse. For example, a connector for a relational database might capture every change to a table.

homepage-banner

In Kafka, the communication between the client and the server is done through a simple, high-performance, language-agnostic TCP protocol. This protocol is versioned and maintains backward compatibility with older versions. Kafka provides a Java client, but there are clients available in multiple languages.

1.2 Topics and Partitions

Let’s first delve into the core abstraction that Kafka provides for a stream of records - topics.

A topic can be thought of as a category of messages, and each topic is divided into multiple partitions. Each partition is stored as an append log file.

Topics represent categories or names for publishing records. Kafka topics are always multi-subscriber; in other words, a topic can have zero, one, or multiple consumers subscribing to the data written to it.

For each topic, the Kafka cluster maintains a partitioned log as shown below:

homepage-banner

Each partition is an ordered, immutable sequence of records that are continuously appended to a structured commit log. Each record in a partition is assigned a sequential ID called an offset, which uniquely identifies each record in the partition.

Kafka cluster persistently stores all published records - whether they have been consumed or not - for a configurable retention period. For example, if the retention policy is set to two days, the records will be available for consumption for two days after being published, and then they will be discarded to free up space. Kafka’s performance is actually constant with respect to data size, so storing data for a long time is not an issue.

homepage-banner

Actually, the unique metadata retained by each consumer is the offset or position of the consumer in the log. This offset is controlled by the consumer: typically, the consumer linearly increments its offset as it reads records, but in fact, since the position is controlled by the consumer, it can consume records in any order it likes. For example, a consumer can reset to an older offset to reprocess past data, or jump to the most recent record and start consuming from “now”.

The combination of these features means that Kafka consumers are very cheap - they can come and go without much impact on the cluster or other consumers. For example, you can use our command-line tool “tail” to consume the contents of any topic without altering what existing consumers are consuming.

Partitions in the log serve several purposes. First, they allow the log to scale beyond what will fit on a single server. Each individual partition must fit on the servers that host it, but a topic can have many partitions, so it can handle an arbitrary amount of data. Second, they act as the unit of parallelism - more on that later.

1.3 Distribution

Multiple partitions of a topic are distributed across multiple servers in a Kafka cluster. Each server (Kafka instance) is responsible for the read and write operations of the partitions. Additionally, Kafka can be configured to have a certain number of replicas for each partition. Each partition will be replicated on multiple machines to improve availability.

Based on the replicated scheme, it means that scheduling needs to be done for multiple replicas. Each partition has a server designated as the “leader”. The leader is responsible for all the read and write operations. If the leader fails, another follower will take over as the new leader. The follower simply follows the leader monotonically and synchronizes messages. As a result, the server acting as the leader bears the entire request load. Therefore, from the perspective of the overall cluster, the number of partitions corresponds to the number of “leaders”. Kafka evenly distributes the “leaders” across each instance to ensure overall performance stability.

1.4 Producers and Consumers

1.4.1 Producers

Producers publish data to specified topics. The producer can also determine which partition the message belongs to, using methods like “round-robin” or other algorithms.

1.4.2 Consumers

  • Kafka supports only topics. Each consumer belongs to a consumer group, and each group can have multiple consumers. Messages sent to a topic will be consumed by only one consumer in each group.
  • If all consumer instances have the same consumer group, records will be effectively load balanced across the consumer instances.
  • If all consumer instances have different consumer groups, each record will be broadcasted to all consumer processes.

homepage-banner

Analysis: Two servers hosting a Kafka cluster with four partitions (P0-P3) and two consumer groups. Consumer group A has two consumer instances, while group B has four consumer instances.

The way consumption is implemented in Kafka is by dividing the partitions in the log across consumer instances, so that each instance has exclusive consumption of a “fair share” of partitions at any given time. The process of maintaining membership within a group is handled dynamically by the Kafka protocol. If new instances join the group, they will take over some partitions from other members of the group; if an instance dies, its partitions will be redistributed to the remaining instances.

Kafka provides only total order of records within a partition, not across different partitions in a topic. For most applications, the ability to order records within a partition combined with the ability to partition data by key is sufficient. However, if you need total order of records, you can achieve it by using a topic with only one partition, but this would mean that each consumer group has only one consumer process.

1.5 Ensuring Kafka for Consumers

  • Messages sent to partitions will be appended to the log in the order they are received. This means that if record M1 is sent by a producer that also sends record M2, and M1 is sent first, then M1 will have a lower offset and appear earlier in the log than M2.
  • Consumer instances view records in the order they are stored in the log. For consumers, the order in which they consume messages is consistent with the order of messages in the log.
  • If the “replicationfactor” of a topic is N, then N-1 Kafka instances can fail without losing any records that were committed to the log.

1.6 Kafka as a Messaging System

How does Kafka’s stream concept compare to traditional enterprise messaging systems?

(1) Traditional Messaging Systems

Traditionally, there are two models for messaging: queuing and publish-subscribe. In queuing, a consumer pool can read from the server and each record goes to one of them; in publish-subscribe, records are broadcasted to all consumers. Each of these models has its advantages and disadvantages. The advantage of queuing is that it allows you to partition data processing across multiple consumer instances, enabling scalability. Unfortunately, once a process reads data, it is gone from the queue, making it not multi-user. Publish-subscribe allows you to broadcast data to multiple processes, but because each message is sent to every subscriber, it doesn’t scale for processing.

Kafka’s concept of consumer groups encompasses these two concepts. Like queues, consumer groups allow you to partition processing into a group of processes (members of the consumer group). And like publish-subscribe, Kafka allows you to broadcast messages to multiple consumer groups.

(2) Advantages of Kafka

The advantage of Kafka’s model is that every topic has these properties - it can scale for processing and it’s also multi-user - no need to choose one.

Compared to traditional messaging systems, Kafka has stronger ordering guarantees.

Traditional queues retain records on the server in order and distribute records in storage order if multiple consumers consume from the queue. However, even though the server distributes records in order, they are delivered to consumers asynchronously, so they can arrive out of order at different consumers. This effectively means losing the order of records in the presence of parallel consumption. Messaging systems typically solve this problem by having the concept of “exclusive consumer” that allows only one process to consume from the queue, but of course, this means no parallelism in processing.

Kafka does better. By having the concept of parallelism - partitions - within a topic, Kafka is able to provide ordering guarantees and load balancing in a consumer process pool. This is achieved by assigning partitions in the topic to consumers in the consumer group, so that each partition is only consumed by one consumer in the group. By doing this, we ensure that a consumer is the sole reader of that partition and uses data in order. As there are many partitions, this can still balance the load of many consumer instances. But note that the number of consumer instances in a consumer group cannot exceed the number of partitions.

1.7 Kafka as a Storage System

  • Any message queue that allows separation of publishing and consuming messages actually acts as a storage system for ongoing messages. What sets Kafka apart is that it is a very good storage system.
  • Data written to Kafka is written to disk and replicated for fault tolerance. Kafka allows producers to wait for acknowledgment, so that writes are not considered complete until fully replicated, and writes are guaranteed to be present even if the server they were written to fails.
  • Kafka’s disk structure scales well - whether it’s 50 KB or 50 TB of persistent data on a server, Kafka performs the same.
  • By taking storage seriously and allowing clients to control their read position, you can think of Kafka as a specialized distributed file system dedicated to high-performance, low-latency commit log storage, replication, and propagation.
  • For more details on Kafka’s commit log storage and replication design, please refer to https://kafka.apache.org/documentation/#design.

1.8 Kafka for Stream Processing

  • It’s not enough to just read, write, and store data streams; the goal is to achieve real-time processing of streams.
  • In Kafka, a stream processor refers to any processing of continuous data streams obtained from input topics and producing anything to output topics.
  • For example, a retail application might receive input streams of sales and shipments, and output re-ordered streams and price adjustments calculated from this data.
  • Simple processing can be done directly using the producer and consumer APIs. However, for more complex transformations, Kafka provides a fully integrated Streams API. This allows building applications that perform non-trivial processing, such as computing aggregations on streams or joining streams together.
  • This tool helps address challenges faced by such applications: handling unordered data, reprocessing input upon code changes, performing stateful computations, etc.
  • The Streams API builds on core primitives provided by Kafka: it uses the producer and consumer APIs for input, Kafka for state storage, and the same group mechanism for fault tolerance between stream processor instances.

2. Kafka Use Cases

2.1 Messaging

Kafka can replace more traditional message brokers. There are several reasons to use a message broker (separating processing from data generation, buffering unprocessed messages, etc.). Compared to most messaging systems, Kafka has better throughput, built-in partitioning, replication, and fault-tolerance, making it an ideal solution for large-scale messaging applications.

Based on experience, messaging usage is often relatively low but may require low end-to-end latency and is typically dependent on the strong durability guarantees provided by Kafka.

In this domain, Kafka can compete with traditional messaging systems like ActiveMQ or RabbitMQ.

2.2 Website Activity Tracking

The original use case for Kafka was to be able to rebuild a user activity tracking pipeline as a set of real-time publish-subscribe feeds. This means that site activity (page views, searches, or other actions a user might take) is published to a central topic, with each activity type having its own topic. These feeds can be used for a range of use cases, including real-time processing, real-time monitoring, and loading into Hadoop or offline data warehousing systems for offline processing and reporting.

Activity tracking can often be very high as many activity messages are generated for each user page view.

2.3 Metrics

Kafka is commonly used for operational monitoring data. This involves aggregating statistics from distributed applications to generate centralized summaries of operational data.

2.4 Log Aggregation

Many people use Kafka as an alternative to log aggregation solutions. Log aggregation typically involves collecting physical log files from servers and putting them in a central location (possibly a file server or HDFS) for processing. Kafka abstracts away the details of files and provides a clearer abstraction of log or event data as message streams. This allows for lower latency processing and easier support for multiple data sources and distributed data consumption. Compared to log-centric systems like Scribe or Flume, Kafka provides equally impressive performance, stronger durability guarantees due to replication, and lower end-to-end latency.

2.5 Stream Processing

Many Kafka users process data in pipelines consisting of multiple stages, where raw input data is consumed from Kafka topics and then aggregated, enriched, or otherwise transformed into new topics for further consumption or subsequent processing.

For example, a processing pipeline for recommending news articles might fetch article content from RSS feeds and publish it to an “articles” topic; further processing might normalize or deduplicate this content and publish the cleaned article content to a new topic; the final processing stage might attempt to recommend this content to users. Such processing pipelines are graphs of real-time data streams created from individual topics. Starting from 0.10.0.0, there is a lightweight yet powerful stream processing library called Kafka Streams available in Apache Kafka for performing such data processing as described above. Other open-source stream processing tools include Apache Storm and Apache Samza.

2.6 Event Sourcing

Event Sourcing is an application design style where state changes are recorded as a sequence of records ordered by time. Kafka’s support for storing large log data makes it an excellent backend for applications built in this style.

2.7 Commit Log

Kafka can serve as an external commit log for distributed systems. This log helps in replicating data between nodes and acts as a resynchronization mechanism for failed nodes to recover their data. Kafka’s log compression feature supports this use case. In this use case, Kafka is similar to the Apache BookKeeper project.

3. Kafka Installation

3.1 Download and Install

Go to the official website http://kafka.apache.org/downloads.html to download the desired version. In my case, I downloaded the latest stable version 2.1.0.

Note: Since Kafka console scripts are different for Unix and Windows platforms, on Windows platform, use bin\\windows\\ instead of bin/ and change the script extension to .bat.

3.2 Configuring and Starting Zookeeper

In order for Kafka to function properly, Zookeeper must be configured. Without Zookeeper, both the Kafka cluster and the producer and consumer clients will not work correctly. Therefore, it is necessary to configure and start the Zookeeper service.

(1) Zookeeper requires a Java environment.

(2) The Kafka download package already includes the ZooKeeper service, so you only need to modify the configuration file and start it.

3.3 Configure Kafka

(1) Modify the configuration file

broker.id=0
listeners=PLAINTEXT://localhost:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

Note: You can modify the configuration file according to your own needs.

  • broker.id: Unique identifier ID
  • listeners=PLAINTEXT://localhost:9092: Kafka service listening address and port
  • log.dirs: Log storage directory
  • zookeeper.connect: Specify the ZooKeeper service

(2) Configure environment variables

[root@ ~]# cat /etc/profile.d/kafka.sh
export KAFKA_HOME="/data/kafka_2.11-2.1.0"
export PATH="${KAFKA_HOME}/bin:$PATH"
[root@ ~]# source /etc/profile.d/kafka.sh

(3) Configure the service startup script

[root@ ~]# cat /etc/init.d/kafka
#!/bin/sh
#
# chkconfig: 345 99 01
# description: Kafka
#
# File : Kafka
#
# Description: Starts and stops the Kafka server
#

source /etc/rc.d/init.d/functions

KAFKA_HOME=/data/kafka_2.11-2.1.0
KAFKA_USER=root
export LOG_DIR=/tmp/kafka-logs

[ -e /etc/sysconfig/kafka ] && . /etc/sysconfig/kafka

# See how we were called.
case "$1" in

  start)
    echo -n "Starting Kafka:"
    /sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &"
    echo " done."
    exit 0
    ;;

  stop)
    echo -n "Stopping Kafka: "
    /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill \-9"
    echo " done."
    exit 0
    ;;
  hardstop)
    echo -n "Stopping (hard) Kafka: "
    /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill -9"
    echo " done."
    exit 0
    ;;

  status)
    c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'`
    if [ "$c_pid" = "" ] ; then
      echo "Stopped"
      exit 3
    else
      echo "Running $c_pid"
      exit 0
    fi
    ;;

  restart)
    stop
    start
    ;;

  *)
    echo "Usage: kafka {start|stop|hardstop|status|restart}"
    exit 1
    ;;

esac

3.4 Start Kafka Service

(1) Start Zookeeper service in the background.

zookeeper-server-start.sh /data/kafka_2.11-2.1.0/config/zookeeper.properties

(2) Start the Kafka service.

service kafka start
service kafka status
ss -nutl

4. Kafka Simple Introduction

4.1 Creating Topics

Create a topic named “along” with only one partition and one replica.

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testABC

If we run the “list topic” command, we can now see the following topic:

kafka-topics.sh --list --zookeeper localhost:2181

4.2 Sending Some Messages

Kafka comes with a command-line client that takes input from a file or standard input and sends it as messages to the Kafka cluster. By default, each line is sent as a separate message.

Run the producer and then type some messages in the console to send to the server.

kafka-console-producer.sh --broker-list localhost:9092 --topic testABC
>This is a message
>This is another message

4.3 Starting the Consumer

Kafka also has a command-line consumer that dumps messages to standard output.

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testABC --from-beginning
This is a message
This is another message

All command-line tools have additional options; running the command without any arguments will display more detailed documentation on how to use them.

5. Setting up a Multi-Broker Kafka Cluster

So far, we have been working with a single broker, but that’s not much fun. For Kafka, a single broker is just a cluster of size 1, so there isn’t much difference other than starting some broker instances. But to get a feel for it, let’s expand our cluster to three nodes (still on our local machine).

5.1 Preparing the Configuration Files

cat config/server-1.properties
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1

cat config/server-2.properties
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

Note: The broker.id property is a unique and permanent name for each node in the cluster. We need to override the port and log directory because we are running these on the same machine and we want all brokers to attempt to register or overwrite each other’s data on the same port.

5.2 Start two more Kafka services in the cluster

kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-1.properties
kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-2.properties

5.3 Performing Operations in a Cluster

(1) Now create a new topic called my-replicated-topic with a replication factor of 3.

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

(2) In a cluster, run the “describe topics” command to see which broker is doing what.

kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Note: The first line provides a summary of all partitions, with each additional line providing information about a specific partition. Since we only have one partition for this topic, there is only one line.

  • “leader” is the node responsible for all reads and writes for a given partition. Each node will become the leader for a randomly selected portion of the partition.
  • “replicas” is the list of nodes that replicate this partition’s log, regardless of whether they are leaders or even currently active.
  • “isr” is the collection of “in-sync” replicas. This is a subset of the replica list that is currently active and has been caught up by the leader.

Please note that Leader: 2, in my example, node 2 is the leader for the only partition of this topic.

(3) You can run the same command on the original topic we created to see its location.

kafka-topics.sh --describe --zookeeper localhost:2181 --topic testABC

(4) Post some messages to our new topic:

kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>my test message 1
>my test message 2

(5) Now let’s use these messages:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2

5.4 Testing Cluster Fault Tolerance

(1) Now let’s test the fault tolerance. Broker 2 acts as the leader, so let’s kill it:

ps aux | grep server-2.properties |awk '{print $2}' |xargs kill

(2) The leader has switched to one of the subordinate nodes, and Node 2 is no longer in the synchronized replica set:

kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

(3) Even if the initially elected leader for writing has failed, these messages are still available for consumption:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

6. Using Kafka Connect to Import/Export Data

Writing data to the console and writing it back to the console is a convenient starting point, but sometimes you may want to use data from other sources or export data from Kafka to other systems. For many systems, you can use Kafka Connect to import or export data instead of writing custom integration code.

Kafka Connect is a tool that comes with Kafka for importing and exporting data. It is an extensible tool that runs connectors, implementing custom logic to interact with external systems. In this quick start, we will learn how to use simple connectors to run Kafka Connect, which imports data from files into Kafka topics and exports data from Kafka topics to files.

(1) First, create some seed data for testing:

echo -e "foo\nbar" > test.txt

(2) Next, start two connectors running in standalone mode, meaning they run in a single local dedicated process. Provide three configuration files as parameters.

  • The first one is always the configuration for the Kafka Connect framework, which includes common configurations such as the Kafka broker to connect to and the serialization format of the data.
  • The remaining configuration files specify the connectors to be created. These files include the unique connector names, the connector classes to instantiate, and any additional configurations required by the connectors.
connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

Note: These sample configuration files that come with Kafka use the default local cluster configuration you started earlier and create two connectors: the first one is a source connector that reads lines from an input file and generates a Kafka topic for each line, and the second one is a sink connector that reads messages from the Kafka topic and generates each message as a line in an output file.

(3) Verify if the import was successful (in a separate terminal)

During the startup process, you will see many log messages, including some indicating the instantiation of the connectors.

① Once the Kafka Connect process starts, the source connector should begin reading lines from the test.txt topic and generate them into the connect-test topic, and the sink connector should start reading messages from the connect-test topic and write them to the test.sink.txt file. We can verify if the data has been passed through the entire pipeline by checking the contents of the output file.

cat test.sink.txt
foo
bar

② Please note that the data is stored in the Kafka topic connect-test, so we can also run a console consumer to view the data in the topic (or use custom consumer code to process it):

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

(4) Continue adding data and verify.

echo Another line>> test.txt     
cat test.sink.txt
foo
bar
Another line

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"Another line"}

Reference

  • https://www.cnblogs.com/along21/p/10278100.html
Feedback