Bare Metal Kafka Using KRaft

eric | May 16, 2023, 9:09 a.m.

A basic Apache Kafka test-setup with 2 servers and 3 brokers and 3 controllers each, using KRaft. The recommended setup for production is at least 3 brokers and 3 controllers. Note: The following recipe is confirmed to be working on Apache Kafka 2.3.2 to 2.7.2 using KRaft and with Java openjdk 17.0.13 and Scala 2.13.  

Preparations

Get the Kafka Files

If you don't have Scala and Java installed, you will need to install these first. It is probably best to go for Scala 2.13.x  With root:

# scala -version
bash: scala: command not found...

Download the Scala command line tool and use it to set up Scala, see also the Scala intro page:

# curl -fL https://github.com/coursier/coursier/releases/latest/download/cs-x86_64-pc-linux.gz | gzip -d > cs && chmod +x cs && ./cs setup

Use the tool to install the latest version of 2.13:

# ./cs install scala:2.13.15 scalac:2.13.15

# export PATH="$PATH:/home/{your username}/.local/share/coursier/bin"

# # scala -version
Scala code runner version 2.13.15 -- Copyright 2002-2024, LAMP/EPFL and Lightbend, Inc.

Download Kafka binaries from the Kafka downloads page.

Move the files to somewhere where SELinux will allow you to run them:

$ mv ~/Downloads/kafka_2.13-3.x.x /usr/local/bin/

Customize Broker Properties

First off, you need to edit the broker's properties file:

# cp /usr/local/bin/kafka/kafka_2.13-3.x.x/config/kraft/server.properties /usr/local/bin/kafka/kafka_2.13-3.x.x/config/kraft/server1.properties

# vim /usr/local/bin/kafka/kafka_2.13-3.x.x/config/kraft/server1.properties

For this example, we are using three brokers and three controllers on each server with  IPv4 addresses and ports as follows:

- Broker 1: IP-address 10.0.0.20, listener on port 9092, controller on port 19092

- Broker 2: IP-address 10.0.0.20, listener on port 9093, controller on port 19093

- Broker 3: IP-address 10.0.0.20, listener on port 9094, controller on port 19094

- Broker 4: IP-address 10.0.0.22, listener on port 9092, controller on port 19092

- Broker 5: IP-address 10.0.0.22, listener on port 9093, controller on port 19093

- Broker 6: IP-address 10.0.0.22, listener on port 9094, controller on port 19094

- Broker 7: IP-address 10.0.0.24, listener on port 9092, controller on port 19092

- Broker 8: IP-address 10.0.0.24, listener on port 9093, controller on port 19093

- Broker 9: IP-address 10.0.0.24, listener on port 9094, controller on port 19094

The following must be specified in the properties file for Broker 1 (server1.properties):

node.id=1

controller.quorum.voters=1@10.0.0.20:19092,2@10.0.0.20:19093,3@10.0.0.20:19094,4@10.0.0.22:19092,5@10.0.0.22:19093,6@10.0.0.22:19094,7@10.0.0.24:19092,8@10.0.0.24:19093,9@10.0.0.24:19094

listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:19092

advertised.listeners=PLAINTEXT://10.0.0.20:9092

num.partitions=3

offsets.topic.replication.factor=3

transaction.state.log.replication.factor=3

transaction.state.log.min.isr=3

log.dirs=/tmp/kraft-combined-logs/01

The following must be specified in the properties file for Broker 2 (server2.properties):

node.id=2

controller.quorum.voters=1@10.0.0.20:19092,2@10.0.0.20:19093,3@10.0.0.20:19094,4@10.0.0.22:19092,5@10.0.0.22:19093,6@10.0.0.22:19094,7@10.0.0.24:19092,8@10.0.0.24:19093,9@10.0.0.24:19094

listeners=PLAINTEXT://0.0.0.0:9093,CONTROLLER://0.0.0.0:19093

advertised.listeners=PLAINTEXT://10.0.0.22:9093

num.partitions=3

offsets.topic.replication.factor=3

transaction.state.log.replication.factor=3

transaction.state.log.min.isr=3

log.dirs=/tmp/kraft-combined-logs/02

..and so forth for each node. For this to work, make sure that your firewalls are open for the ports above. For Ubuntu and the like:

# ufw allow 9092/tcp

# ufw allow 19092/tcp

# ufw allow 9093/tcp

# ufw allow 19093/tcp

# ufw allow 9094/tcp

# ufw allow 19094/tcp

# ufw status numbered

Rinse and repeat for each port. In Rocky or Alma:

# firewall-cmd --zone=public --add-port=9092/tcp --permanent

# firewall-cmd --zone=public --add-port=19092/tcp --permanent

# firewall-cmd --zone=public --add-port=9093/tcp --permanent

# firewall-cmd --zone=public --add-port=19093/tcp --permanent

# firewall-cmd --zone=public --add-port=9094/tcp --permanent

# firewall-cmd --zone=public --add-port=19094/tcp --permanent

# firewall-cmd --reload

# firewall-cmd --zone=public --list-all

Output: public (active)
  target: default
  icmp-block-inversion: no
  interfaces: enp1s0 enp9s0
  sources:
  services: cockpit dhcpv6-client ssh
  ports: 3100/tcp 3000/tcp 9080/tcp 9093/tcp 9096/tcp 9092/tcp 9094/tcp 19094/tcp 19093/tcp 19092/tcp
  protocols:
  forward: yes
  masquerade: no
  forward-ports:
  source-ports:
  icmp-blocks:
  rich rules:

 

Create a Kafka User

Add a user that will run Kafka on both brokers:

$ sudo adduser --system kafkauser

$ sudo usermod -a -G adm kafkauser

$ sudo chown -R kafkauser:kafkauser /usr/local/bin/kafka_?.??-?.?.?

Replace ? with your scala version and kafka version.

Generate a Cluster UUID

$ cd /usr/local/bin/kafka/kafka_?.??-?.?.?
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

Format Log Directories

For each broker on each server:

server1 $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server1.properties
Output: Formatting /tmp/kraft-combined-logs with metadata.version 3.7-IV4.

...

server2 $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server4.properties

Output: Formatting /tmp/kraft-combined-logs with metadata.version 3.7-IV4.

...

In addition, the kafka-user needs access to the kafka log files in the /tmp-directory. For each directory:

$ sudo chown kafkauser:kafkauser /tmp/kraft-combined-logs/01

$ sudo chown kafkauser:kafkauser /tmp/kraft-combined-logs/02

...

Kafka Service

To enable automatic restart in case of failures or system restarts, you can use a process manager like systemd (assuming you are using a Linux-based operating system). Create a systemd service file for Kafka on each server. For example, create a file named kafka.service in the /etc/systemd/system/ directory.

server1 $ touch /etc/systemd/system/kafka_broker01.service

server1 $ touch /etc/systemd/system/kafka_broker02.service

server1 $ touch /etc/systemd/system/kafka_broker03.service

server2 $ touch /etc/systemd/system/kafka_broker04.service

server2 $ touch /etc/systemd/system/kafka_broker05.service

server2 $ touch /etc/systemd/system/kafka_broker06.service

...

Minimum Service File

Add the following contents to the kafka_broker01.service file for Broker 1, modifying the paths and options according to your setup:

[Unit]
Description=Apache Kafka Server
Wants=network.target
After=network.target

[Service]
Type=simple
Restart=always
RestartSec=1
User=kafkauser

ExecStart=/usr/local/bin/kafka_?.?-?.?.?/bin/kafka-server-start.sh /usr/local/bin/kafka//kafka_?.?-?.?.?/config/kraft/server1.properties

ExecStop=/usr/local/bin/kafka_?.?-?.?.?/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target

Replace version numbers with your version of Kafka. Repeat the process for Broker 2 - 6, replacing the server1.properties with server2.properties, and so on.

Enable & Start

Enable the services and start them on each server:

server1 $ sudo systemctl enable kafka_broker01.service
server1 $ sudo systemctl start kafka_broker01.service
server1 $ sudo systemctl status kafka_broker01.service

...

That's it! Now try getting metadata from the cluster using kcat:

$ kcat -L -b 10.0.0.20:9092

Metadata for all topics (from broker 1: 10.0.0.20:9092/1):
 9 brokers:
  broker 1 at 10.0.0.20:9092
  broker 2 at 10.0.0.20:9093
  broker 3 at 10.0.0.20:9094
  broker 4 at 10.0.0.22:9092
  broker 5 at 10.0.0.22:9093 (controller)
  broker 6 at 10.0.0.22:9094

  broker 7 at 10.0.0.24:9092
  broker 8 at 10.0.0.24:9093
  broker 9 at 10.0.0.24:9094

0 topics:

Elaborate Service File, Including Service Hardening

It is possible to harden the service significantly. The price is more complexity, including more complex fault-finding. The service file for Broker 1 could look like this:

[Unit]

Description=Apache Kafka Server

Wants=network.target

After=network.target

[Service]

Type=simple

User=kafka-user

Restart=always

RestartSec=1

ExecStart=/usr/local/bin/kafka_2.13-3.3.2/bin/kafka-server-start.sh /usr/local/bin/kafka_2.13-3.3.2/config/kraft/server1.properties

ExecStop=/usr/local/bin/kafka_2.13-3.2.2/bin/kafka-server-stop.sh

NoNewPrivileges=true

PrivateTmp=yes

RestrictNamespaces=uts ipc pid user cgroup

ProtectKernelTunables=yes

ProtectKernelModules=yes

ProtectControlGroups=yes

PrivateUsers=strict

CapabilityBoundingSet=CAP_NET_BIND_SERVICE CAP_DAC_READ_SEARCH

[Install]
WantedBy=multi-user.target

The hardening chiefly consists of privilege restrictions configured with User=, Group=, CapabilityBoundingSet= or the various file system namespacing options (such as PrivateDevices=, PrivateTmp=), control of privileges, private temporary directories made inaccessible to other services, and prevention of explicit kernel module loading.

Create a similar file for Broker 2, and so on. Remember to change the properties file name.

Enable & Start

Enable the services and start them:

server1 $ sudo systemctl enable kafka_broker01.service
server1 $ sudo systemctl start kafka_broker01.service
server1 $ sudo systemctl status kafka_broker01.service

...

Making Changes

If you introduce changes to the kafka service, you normally need to reformat the kafka storage, reload the service daemon, and restart the service:

$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server1.properties
$ sudo systemctl daemon-reload
$ sudo systemctl restart kafka_broker01.service
$ sudo systemctl status kafka_broker01.service

Troubleshooting

Read-only file system

If you forget to format the storage after changes to the kafka setup, you tend to get the following message in the error log:

Jul 21 16:50:41 broker02 kafka-server-start.sh[1975]: Could not rename log file '/usr/local/bin/kafka/kafka_2.13-3.3.2/bin/../logs/kafkaServer-gc.log' to '/usr/local/bin/kafka/kafka_2.13-3.3.2/bin/../logs/kafkaServer-gc.log.6' (Read-only file system).

Format the kafka storage anew as set out above.

References

Scala Introduction Page https://docs.scala-lang.org/getting-started/index.html#using-the-scala-installer-recommended-way

Apache Kafka® Quick Start - https://developer.confluent.io/quickstart/kafka-local/

Kafka Command-Line Interface (CLI) Tools - https://docs.confluent.io/kafka/operations-tools/kafka-tools.html

Console Producer and Consumer Basics - https://developer.confluent.io/tutorials/kafka-console-consumer-producer-basics/kafka.html

Running Kafka in Production - https://docs.confluent.io/platform/current/kafka/deployment.html

Running Apache Kafka in Production (Podcast)- https://developer.confluent.io/podcast/running-apache-kafka-in-production/

About Me

Experienced dev and PM. Data science, DataOps, Python and R. DevOps, Linux, clean code and agile. 10+ years working remotely. Polyglot. Startup experience.
LinkedIn Profile

By Me

Statistics & R - a blog about - you guessed it - statistics and the R programming language.
R-blog

Erlang Explained - a blog on the marvelllous programming language Erlang.
Erlang Explained