eric | May 16, 2023, 9:09 a.m.
A basic Apache Kafka test-setup with 2 servers and 3 brokers and 3 controllers each, using KRaft. The recommended setup for production is at least 3 brokers and 3 controllers. Note: The following recipe is confirmed to be working on Apache Kafka 2.3.2 to 2.7.2 using KRaft and with Java openjdk 17.0.13 and Scala 2.13.
If you don't have Scala and Java installed, you will need to install these first. It is probably best to go for Scala 2.13.x With root:
# scala -version
bash: scala: command not found...
Download the Scala command line tool and use it to set up Scala, see also the Scala intro page:
# curl -fL https://github.com/coursier/coursier/releases/latest/download/cs-x86_64-pc-linux.gz | gzip -d > cs && chmod +x cs && ./cs setup
Use the tool to install the latest version of 2.13:
# ./cs install scala:2.13.15 scalac:2.13.15
# export PATH="$PATH:/home/{your username}/.local/share/coursier/bin"
# # scala -version
Scala code runner version 2.13.15 -- Copyright 2002-2024, LAMP/EPFL and Lightbend, Inc.
Download Kafka binaries from the Kafka downloads page.
Move the files to somewhere where SELinux will allow you to run them:
$ mv ~/Downloads/kafka_2.13-3.x.x /usr/local/bin/
First off, you need to edit the broker's properties file:
# cp /usr/local/bin/kafka/kafka_2.13-3.x.x/config/kraft/server.properties /usr/local/bin/kafka/kafka_2.13-3.x.x/config/kraft/server1.properties
# vim /usr/local/bin/kafka/kafka_2.13-3.x.x/config/kraft/server1.properties
For this example, we are using three brokers and three controllers on each server with IPv4 addresses and ports as follows:
- Broker 1: IP-address 10.0.0.20, listener on port 9092, controller on port 19092
- Broker 2: IP-address 10.0.0.20, listener on port 9093, controller on port 19093
- Broker 3: IP-address 10.0.0.20, listener on port 9094, controller on port 19094
- Broker 4: IP-address 10.0.0.22, listener on port 9092, controller on port 19092
- Broker 5: IP-address 10.0.0.22, listener on port 9093, controller on port 19093
- Broker 6: IP-address 10.0.0.22, listener on port 9094, controller on port 19094
- Broker 7: IP-address 10.0.0.24, listener on port 9092, controller on port 19092
- Broker 8: IP-address 10.0.0.24, listener on port 9093, controller on port 19093
- Broker 9: IP-address 10.0.0.24, listener on port 9094, controller on port 19094
The following must be specified in the properties file for Broker 1 (server1.properties):
node.id=1
controller.quorum.voters=1@10.0.0.20:19092,2@10.0.0.20:19093,3@10.0.0.20:19094,4@10.0.0.22:19092,5@10.0.0.22:19093,6@10.0.0.22:19094,7@10.0.0.24:19092,8@10.0.0.24:19093,9@10.0.0.24:19094
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:19092
advertised.listeners=PLAINTEXT://10.0.0.20:9092
num.partitions=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.dirs=/tmp/kraft-combined-logs/01
The following must be specified in the properties file for Broker 2 (server2.properties):
node.id=2
controller.quorum.voters=1@10.0.0.20:19092,2@10.0.0.20:19093,3@10.0.0.20:19094,4@10.0.0.22:19092,5@10.0.0.22:19093,6@10.0.0.22:19094,7@10.0.0.24:19092,8@10.0.0.24:19093,9@10.0.0.24:19094
listeners=PLAINTEXT://0.0.0.0:9093,CONTROLLER://0.0.0.0:19093
advertised.listeners=PLAINTEXT://10.0.0.22:9093
num.partitions=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.dirs=/tmp/kraft-combined-logs/02
..and so forth for each node. For this to work, make sure that your firewalls are open for the ports above. For Ubuntu and the like:
# ufw allow 9092/tcp
# ufw allow 19092/tcp
# ufw allow 9093/tcp
# ufw allow 19093/tcp
# ufw allow 9094/tcp
# ufw allow 19094/tcp
# ufw status numbered
Rinse and repeat for each port. In Rocky or Alma:
# firewall-cmd --zone=public --add-port=9092/tcp --permanent
# firewall-cmd --zone=public --add-port=19092/tcp --permanent
# firewall-cmd --zone=public --add-port=9093/tcp --permanent
# firewall-cmd --zone=public --add-port=19093/tcp --permanent
# firewall-cmd --zone=public --add-port=9094/tcp --permanent
# firewall-cmd --zone=public --add-port=19094/tcp --permanent
# firewall-cmd --reload
# firewall-cmd --zone=public --list-all
Output: public (active)
target: default
icmp-block-inversion: no
interfaces: enp1s0 enp9s0
sources:
services: cockpit dhcpv6-client ssh
ports: 3100/tcp 3000/tcp 9080/tcp 9093/tcp 9096/tcp 9092/tcp 9094/tcp 19094/tcp 19093/tcp 19092/tcp
protocols:
forward: yes
masquerade: no
forward-ports:
source-ports:
icmp-blocks:
rich rules:
Add a user that will run Kafka on both brokers:
$ sudo adduser --system kafkauser
$ sudo usermod -a -G adm kafkauser
$ sudo chown -R kafkauser:kafkauser /usr/local/bin/kafka_?.??-?.?.?
Replace ? with your scala version and kafka version.
$ cd /usr/local/bin/kafka/kafka_?.??-?.?.?
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
For each broker on each server:
server1 $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server1.properties
Output: Formatting /tmp/kraft-combined-logs with metadata.version 3.7-IV4.
...
server2 $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server4.properties
Output: Formatting /tmp/kraft-combined-logs with metadata.version 3.7-IV4.
...
In addition, the kafka-user needs access to the kafka log files in the /tmp-directory. For each directory:
$ sudo chown kafkauser:kafkauser /tmp/kraft-combined-logs/01
$ sudo chown kafkauser:kafkauser /tmp/kraft-combined-logs/02
...
To enable automatic restart in case of failures or system restarts, you can use a process manager like systemd (assuming you are using a Linux-based operating system). Create a systemd service file for Kafka on each server. For example, create a file named kafka.service in the /etc/systemd/system/ directory.
server1 $ touch /etc/systemd/system/kafka_broker01.service
server1 $ touch /etc/systemd/system/kafka_broker02.service
server1 $ touch /etc/systemd/system/kafka_broker03.service
server2 $ touch /etc/systemd/system/kafka_broker04.service
server2 $ touch /etc/systemd/system/kafka_broker05.service
server2 $ touch /etc/systemd/system/kafka_broker06.service
...
Add the following contents to the kafka_broker01.service file for Broker 1, modifying the paths and options according to your setup:
[Unit]
Description=Apache Kafka Server
Wants=network.target
After=network.target
[Service]
Type=simple
Restart=always
RestartSec=1
User=kafkauser
ExecStart=/usr/local/bin/kafka_?.?-?.?.?/bin/kafka-server-start.sh /usr/local/bin/kafka//kafka_?.?-?.?.?/config/kraft/server1.properties
ExecStop=/usr/local/bin/kafka_?.?-?.?.?/bin/kafka-server-stop.sh
[Install]
WantedBy=multi-user.target
Replace version numbers with your version of Kafka. Repeat the process for Broker 2 - 6, replacing the server1.properties with server2.properties, and so on.
Enable the services and start them on each server:
server1 $ sudo systemctl enable kafka_broker01.service
server1 $ sudo systemctl start kafka_broker01.service
server1 $ sudo systemctl status kafka_broker01.service
...
That's it! Now try getting metadata from the cluster using kcat:
$ kcat -L -b 10.0.0.20:9092
Metadata for all topics (from broker 1: 10.0.0.20:9092/1):
9 brokers:
broker 1 at 10.0.0.20:9092
broker 2 at 10.0.0.20:9093
broker 3 at 10.0.0.20:9094
broker 4 at 10.0.0.22:9092
broker 5 at 10.0.0.22:9093 (controller)
broker 6 at 10.0.0.22:9094
broker 7 at 10.0.0.24:9092
broker 8 at 10.0.0.24:9093
broker 9 at 10.0.0.24:9094
0 topics:
It is possible to harden the service significantly. The price is more complexity, including more complex fault-finding. The service file for Broker 1 could look like this:
[Unit]
Description=Apache Kafka Server
Wants=network.target
After=network.target
[Service]
Type=simple
User=kafka-user
Restart=always
RestartSec=1
ExecStart=/usr/local/bin/kafka_2.13-3.3.2/bin/kafka-server-start.sh /usr/local/bin/kafka_2.13-3.3.2/config/kraft/server1.properties
ExecStop=/usr/local/bin/kafka_2.13-3.2.2/bin/kafka-server-stop.sh
NoNewPrivileges=true
PrivateTmp=yes
RestrictNamespaces=uts ipc pid user cgroup
ProtectKernelTunables=yes
ProtectKernelModules=yes
ProtectControlGroups=yes
PrivateUsers=strict
CapabilityBoundingSet=CAP_NET_BIND_SERVICE CAP_DAC_READ_SEARCH
[Install]
WantedBy=multi-user.target
The hardening chiefly consists of privilege restrictions configured with User=, Group=, CapabilityBoundingSet= or the various file system namespacing options (such as PrivateDevices=, PrivateTmp=), control of privileges, private temporary directories made inaccessible to other services, and prevention of explicit kernel module loading.
Create a similar file for Broker 2, and so on. Remember to change the properties file name.
Enable the services and start them:
server1 $ sudo systemctl enable kafka_broker01.service
server1 $ sudo systemctl start kafka_broker01.service
server1 $ sudo systemctl status kafka_broker01.service
...
If you introduce changes to the kafka service, you normally need to reformat the kafka storage, reload the service daemon, and restart the service:
$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server1.properties
$ sudo systemctl daemon-reload
$ sudo systemctl restart kafka_broker01.service $ sudo systemctl status kafka_broker01.service
If you forget to format the storage after changes to the kafka setup, you tend to get the following message in the error log:
Jul 21 16:50:41 broker02 kafka-server-start.sh[1975]: Could not rename log file '/usr/local/bin/kafka/kafka_2.13-3.3.2/bin/../logs/kafkaServer-gc.log' to '/usr/local/bin/kafka/kafka_2.13-3.3.2/bin/../logs/kafkaServer-gc.log.6' (Read-only file system).
Format the kafka storage anew as set out above.
Scala Introduction Page https://docs.scala-lang.org/getting-started/index.html#using-the-scala-installer-recommended-way
Apache Kafka® Quick Start - https://developer.confluent.io/quickstart/kafka-local/
Kafka Command-Line Interface (CLI) Tools - https://docs.confluent.io/kafka/operations-tools/kafka-tools.html
Console Producer and Consumer Basics - https://developer.confluent.io/tutorials/kafka-console-consumer-producer-basics/kafka.html
Running Kafka in Production - https://docs.confluent.io/platform/current/kafka/deployment.html
Running Apache Kafka in Production (Podcast)- https://developer.confluent.io/podcast/running-apache-kafka-in-production/
Experienced dev and PM. Data science, DataOps, Python and R. DevOps, Linux, clean code and agile. 10+ years working remotely. Polyglot. Startup experience.
LinkedIn Profile
Statistics & R - a blog about - you guessed it - statistics and the R programming language.
R-blog
Erlang Explained - a blog on the marvelllous programming language Erlang.
Erlang Explained