Moving Data From Kafka To ClickHouse With Vector

eric | May 20, 2024, 3:05 p.m.

Vector is a sturdy solution for moving data between different systems. In Vector parlance, you are moving data from a source to a sink, and in the middle you can do transforms. In this post, we are going to move data from a cluster of Apache Kafka brokers to a Clickhouse database. However, a lot of the suggestions will be applicable to any source or sink.      

Getting Started

Some prerequisites:

- One or several Kafka brokers are operational. If not, take a look at [1] before going any further.

- The brokers are receiving messages of some kind.

- You've got a Clickhouse server going. If that's not the case, take a look at [2], [3] and — if you are using Ubuntu — [4].

Install Vector using a script:

curl --proto '=https' --tlsv1.2 -sSfL https://sh.vector.dev | bash

Other installation methods are available, including Docker images, see also [5]

Next, we use a configuration file, config.yaml, to set up Vector. The configuration file - in ~/.vector/config - defines the sources, transformations, and the sinks. A simple example:

# Source: Kafka
sources:
  kafka:
    type: kafka
    bootstrap_servers: "broker_ip:port,another_broker_ip:port" # Kafka brokers
    group_id: "consumer_group_name"                            # Consumer group ID
    topics: ["topic"]                                          # Kafka topic to read from
    auto_offset_reset: "earliest"                              # Start reading from the earliest offset

# Transform: Parse the message string and map fields
transforms:
  field_mapping:
    type: remap
    inputs: ["kafka"]
    source: |

# Sink: Console output
sinks:
  console_output:
    type: console                 # Console output type
    inputs:
      - field_mapping             # Input from the transformation
    encoding:
      codec: json                 # Output format as JSON for easy reading

This is a configuration example without transformations, and the sink is to console, not to Clickhouse. We do this to build the pipeline stepwise, and get each step right before we move on to the next one.

Analyzing The Data From The Source

Before we look at what is coming out of Vector, let's look at the original messages on the Kafka broker. Let's use kcat for this. In this example, we use stock data, and the topic is "STK" and the group is "stocks":

$ kcat -b 10.0.0.20:9092 -G stocks STK

This gives us a long list of messages, the last being:

{"stock_symbol": "MSFT", "exchange": "SMART", "currency": "USD", "transaction_time": "2024-11-13 12:46:03.547902", "min_tick": 0.01, "bid": 421.7, "bid_size": 1300.0, "bid_exchange": "P", "ask": 421.99, "ask_size": 300.0, "ask_exchange": "PQ", "last": 421.62, "last_size": 100.0, "last_exchange": "P", "prev_bid": 421.51, "prev_bid_size": 200.0, "prev_ask": 421.97, "prev_ask_size": 200.0, "prev_last": 421.6, "volume": 474.0, "close": 423.03, "tick_time": "2024-11-13 12:46:03.547902", "tick_type": 2, "tick_price": 421.99, "tic_size": 300.0, "bboExchange": "9c0001", "snapshot_permissions": 3}
% Reached end of topic STK [2] at offset 889601

This looks straightforward; a well-formatted json message with keys and values. If we use a configuration file like the one above, and output the messages to console, what does Vector give us?

$ VECTOR_LOG=debug vector --config ~/.vector/config/vector.yaml

{"headers":{},"message":"{\"stock_symbol\": \"AAPL\", \"exchange\": \"SMART\", \"currency\": \"USD\", \"transaction_time\": \"2024-11-13 13:10:27.143100\", \"min_tick\": 0.01, \"bid\": 223.5, \"bid_size\": 1000.0, \"bid_exchange\": \"K\", \"ask\": 223.58, \"ask_size\": 400.0, \"ask_exchange\": \"Q\", \"last\": 223.53, \"last_size\": 100.0, \"last_exchange\": \"Q\", \"prev_bid\": 223.51, \"prev_bid_size\": 200.0, \"prev_ask\": 223.53, \"prev_ask_size\": 300.0, \"prev_last\": 223.51, \"volume\": 1241.0, \"close\": 224.23, \"tick_time\": \"2024-11-13 13:10:27.143100\", \"tick_type\": 2, \"tick_price\": 223.58, \"tic_size\": 400.0, \"bboExchange\": \"9c0001\", \"snapshot_permissions\": 3}","message_key":null,"offset":890820,"partition":0,"source_type":"kafka","timestamp":"2024-11-13T13:10:27.145Z","topic":"STK"}

Looks almost ok. There is one problem; part of the payload is within one field, "message", and that part is not json. It's a json string. Here's where Vector transformations shine. You can use Vector's own language, VRL, to transform virtually anything into the structures and variable types that you want. In this case, let's use VRL's parse_json function to get proper json out of the string:

# Transform: Parse the message string and map fields
transforms:
  field_mapping:
    type: remap
    inputs:
      - kafka
    source: |
      .message = parse_json!(.message)

With this transformation, we get everything in proper json form:

{"headers":{},"message":{"ask":148.77,"ask_exchange":"KPQ","ask_size":1400.0,"bboExchange":"9c0001","bid":148.73,"bid_exchange":"Q","bid_size":100.0,"close":148.29,"currency":"USD","exchange":"SMART","last":148.75,"last_exchange":"Z","last_size":100.0,"min_tick":0.01,"prev_ask":148.75,"prev_ask_size":1500.0,"prev_bid":148.71,"prev_bid_size":1700.0,"prev_last":148.74,"snapshot_permissions":3,"stock_symbol":"NVDA","tic_size":1400.0,"tick_price":148.77,"tick_time":"2024-11-13 13:33:59.488917","tick_type":3,"transaction_time":"2024-11-13 13:33:59.488917","volume":53409.0},"message_key":null,"offset":890562,"partition":1,"source_type":"kafka","timestamp":"2024-11-13T13:33:59.493Z","topic":"STK"}

There is still room for improvement, though. The data within the message is on a different level from the rest of the data, and there is some superfluous data. Going back to the transformation:

transforms:
  field_mapping:
    type: remap
    inputs:
      - kafka
    source: |
      del(.headers)
      del(.message_key)
      del(.source_type)
      del(.topic)
      .message = parse_json!(.message)
      .message.offset = .offset
      .message.partition = .partition
      .message.timestamp = .timestamp
      . = .message

Now, we are getting rid of metadata inf the message that is — most likely — not needed. Next, we bring metadata that is likely to be needed into the message. As a last step, we set the message equal to the payload.

{"ask":421.31,"ask_exchange":"PQ","ask_size":200.0,"bboExchange":"9c0001","bid":421.14,"bid_exchange":"QU","bid_size":200.0,"close":423.03,"currency":"USD","exchange":"SMART","last":421.3,"last_exchange":"D","last_size":700.0,"min_tick":0.01,"offset":896727,"partition":2,"prev_ask":421.35,"prev_ask_size":100.0,"prev_bid":421.3,"prev_bid_size":600.0,"prev_last":421.35,"snapshot_permissions":3,"stock_symbol":"MSFT","tic_size":200.0,"tick_price":421.31,"tick_time":"2024-11-13 14:11:41.050312","tick_type":3,"timestamp":"2024-11-13T14:11:41.061Z","transaction_time":"2024-11-13 14:11:41.050312","volume":1337.0}

Looking pretty good.

Analyzing The Data Sink

You may have an existing sink, or you may be creating a sink from scratch. Either way, you will have to look at how to accommodate the data in the best way possible. In our case, we already have a sink, a Clickhouse table. It looks like this:

:) DESCRIBE TABLE stock_ticker_data;

DESCRIBE TABLE stock_ticker_data

Query id: 48295571-3bf2-4e55-98e8-0c7d9b6f12df

        ┌─name─────────────────┬─type─────────────────
 1. │ transaction_id       │ UUID                
 2. │ stock_symbol         │ String             
 3. │ exchange             │ String              
 4. │ currency             │ String             
 5. │ transaction_time     │ DateTime64(6, 'UTC')
 6. │ min_tick             │ Float32             
 7. │ bid                  │ Float32             
 8. │ bid_size             │ Float32            
 9. │ bid_exchange         │ String             
10. │ ask                  │ Float32            
11. │ ask_size             │ Float32             
12. │ ask_exchange         │ String             
13. │ last                 │ Float32             
14. │ last_size            │ Float32            
15. │ last_exchange        │ String             
16. │ prev_bid             │ Float32            
17. │ prev_bid_size        │ Float32           
18. │ prev_ask             │ Float32            
19. │ prev_ask_size        │ Float32            
20. │ prev_last            │ Float32           
21. │ volume               │ Float32           
22. │ close                │ Float32           
23. │ tick_time            │ DateTime64(6, 'UTC')
24. │ tick_type            │ Int32              
25. │ tick_price           │ Float32           
26. │ tic_size             │ Float32            
27. │ bboExchange          │ String           
28. │ snapshot_permissions │ Int32             
29. │ kafka_partition      │ Int32             
30. │ kafka_offset         │ Int64            
31. │ kafka_timestamp      │ DateTime64(6, 'UTC')
    └──────────────────────┴──────────────────────┴───────

31 rows in set. Elapsed: 0.004 sec.

We have a missing field. The database requires a unique identifier — transaction_id — in addition to the fields we already have in the message. There is another problem as well; the timestamp format in kafka_timestamp does not coincide with the .timestamp coming from the Kafka broker. Let's fix both issues:

transforms:
  field_mapping:
    type: remap
    inputs:
      - kafka_source
    source: |
      # Get keys and values from the message payload
      .message = parse_json!(.message)
      
      # Add Kafka metadata
      .message.kafka_partition = .partition
      .message.kafka_offset = .offset
      kafka_time_parsed = parse_timestamp!(.timestamp, "%Y-%m-%d %H:%M:%S%.f")
      kafka_time = format_timestamp!(kafka_time_parsed, "%Y-%m-%d %H:%M:%S%.6f")
      .message.kafka_timestamp = kafka_time
      
      # Add a unique identifier for each tick
      .message.transaction_id = uuid_v4()
      
      # Add everything to payload
      . = .message
      
      # Get rid of superfluous content
      del(.partition)
      del(.offset)
      del(.timestamp)
      del(.message)

Output:

{"ask":421.84,"ask_exchange":"Q","ask_size":200.0,"bboExchange":"9c0001","bid":421.64,"bid_exchange":"QNL","bid_size":800.0,"close":423.03,"currency":"USD","exchange":"SMART","kafka_offset":899336,"kafka_partition":2,"kafka_timestamp":"2024-11-13 14:30:50.475000","last":421.74,"last_exchange":"Q","last_size":100.0,"min_tick":0.01,"prev_ask":421.83,"prev_ask_size":800.0,"prev_bid":421.63,"prev_bid_size":200.0,"prev_last":421.73,"snapshot_permissions":3,"stock_symbol":"MSFT","tic_size":100.0,"tick_price":421.74,"tick_time":"2024-11-13 14:30:50.474435","tick_type":4,"transaction_id":"8c0a8d79-c2c1-4047-82d9-4627ac632204","transaction_time":"2024-11-13 14:30:50.474435","volume":4336.0}

All timestamps are now uniformly formatted and work with the DateTime(6, 'UTC') format used in the Clickhouse table.

Connecting To The Sink

It's time to connect to Clickhouse and start sending messages to it from Kafka. We can have multiple sinks, and as long we are in a trial-and-error phase, it might be a good idea to output both to the console and to Clickhouse. Let's just add another sink under sinks:

# Sink: Console output
sinks:
  console_output:
    type: console                # Console output type
    inputs:
      - field_mapping            # Input from the transformation
    encoding:
      codec: json                # Output format as JSON for easy reading

  clickhouse:
    type: clickhouse             # Sink type for ClickHouse
    inputs: ["field_mapping"]    # Input from the transformation step
    endpoint: "your_server:port" # Replace with your ClickHouse server
    database: "default"          # Your ClickHouse database
    table: "stock_ticker_data"   # The target table in ClickHouse
    skip_unknown_fields: true
    compression: "gzip"          # Compression for data transfer
    batch:
      max_bytes: 10485760        # Max batch size in bytes (10MB)

So how do we make sure this is reaching Clickhouse? Before we run it, lets make sure that the table is empty:

:) SELECT * FROM  stock_ticker_data ORDER BY tick_time;

SELECT *
FROM stock_ticker_data
ORDER BY tick_time ASC

Query id: 0fa77f02-9912-42c0-94c7-05e064c4c591

Ok.

0 rows in set. Elapsed: 0.002 sec.

Fine. We start the script:

$ VECTOR_LOG=debug vector --config ~/.vector/config/vector.yaml

Then we go back to Clickhouse for a look:

:) SELECT * FROM  stock_ticker_data ORDER BY tick_time DESC LIMIT 10;

SELECT *
FROM stock_ticker_data
ORDER BY tick_time DESC
LIMIT 10

Query id: 3e357216-2a5b-42c1-b9c0-ba19d75569c6

    ┌─transaction_id───────────────────────┬─stock_symbol─┬─exchange─┬─currency─┬───────────transaction_time─┬─min_tick─┬────bid─┬─bid_size─┬─bid_exchange─┬────ask─┬─ask_size─┬─ask_exchange─┬───last─┬─last_size─┬─last_exchange─┬─prev_bid─┬─prev_bid_size─┬─prev_ask─┬─prev_ask_size─┬─prev_last─┬─volume─┬──close─┬──────────────────tick_time─┬─tick_type─┬─tick_price─┬─tic_size─┬─bboExchange─┬─snapshot_permissions─┬─kafka_partition─┬─kafka_offset─┬────────────kafka_timestamp─┐
 1. │ 731a24d3-f8c8-4160-b9fb-26241de40803 │ MSFT         │ SMART    │ USD      │ 2024-11-13 16:29:21.698411 │     0.01 │ 422.81 │      100 │ Q            │ 422.84 │      100 │ Q            │ 422.81 │       100 │ D             │    422.8 │           200 │   422.86 │           200 │    422.82 │  45871 │ 423.03 │ 2024-11-13 16:29:21.698411 │         5 │     422.81 │      100 │ 9c0001      │                    3 │               2 │       971482 │ 2024-11-13 16:29:21.705000 │
 2. │ 595ff59f-d4c7-418c-bd62-e48e414b83ee │ MSFT         │ SMART    │ USD      │ 2024-11-13 16:29:21.477296 │     0.01 │ 422.81 │      100 │ Q            │ 422.84 │      100 │ Q            │ 422.82 │       600 │ D             │    422.8 │           200 │   422.86 │           200 │    422.83 │  45871 │ 423.03 │ 2024-11-13 16:29:21.477296 │         3 │     422.84 │      100 │ 9c0001      │                    3 │               2 │       971481 │ 2024-11-13 16:29:21.478000 │
 3. │ 41b50e29-27d8-42ff-8d7d-deee3e50f1e8 │ AAPL         │ SMART    │ USD      │ 2024-11-13 16:29:21.460808 │     0.01 │ 224.39 │      300 │ QV           │ 224.41 │     1800 │ PQVZLU       │ 224.39 │       900 │ D             │    224.4 │           200 │   224.42 │           800 │     224.4 │ 139034 │ 224.23 │ 2024-11-13 16:29:21.460808 │         5 │     224.39 │      900 │ 9c0001      │                    3 │               2 │       971480 │ 2024-11-13 16:29:21.466000 │
 4. │ 158f7959-6daf-47ae-bc61-d79c0771c6d9 │ AAPL         │ SMART    │ USD      │ 2024-11-13 16:29:21.449326 │     0.01 │ 224.39 │      300 │ QV           │ 224.41 │     1800 │ PQVZLU       │  224.4 │       100 │ Q             │    224.4 │           200 │   224.42 │           800 │    224.41 │ 139034 │ 224.23 │ 2024-11-13 16:29:21.449326 │         8 │         -1 │   139034 │ 9c0001      │                    3 │               2 │       971479 │ 2024-11-13 16:29:21.454000 │
 5. │ 0546e049-1ea4-4df4-916d-7994e7ccb93e │ MSFT         │ SMART    │ USD      │ 2024-11-13 16:29:21.209395 │     0.01 │ 422.81 │      100 │ Q            │ 422.84 │      200 │ Q            │ 422.82 │       600 │ D             │    422.8 │           200 │   422.86 │          1000 │    422.83 │  45871 │ 423.03 │ 2024-11-13 16:29:21.209395 │         8 │         -1 │    45871 │ 9c0001      │                    3 │               2 │       971478 │ 2024-11-13 16:29:21.217000 │
 6. │ 00d2fc3f-a642-4560-9536-44a351c9e18f │ MSFT         │ SMART    │ USD      │ 2024-11-13 16:29:21.203090 │     0.01 │ 422.81 │      100 │ Q            │ 422.84 │      200 │ Q            │ 422.82 │       600 │ D             │    422.8 │           200 │   422.86 │          1000 │    422.83 │  45865 │ 423.03 │ 2024-11-13 16:29:21.203090 │         5 │     422.82 │      600 │ 9c0001      │                    3 │               2 │       971477 │ 2024-11-13 16:29:21.208000 │
 7. │ 4cb43010-a2f0-49d4-8798-05483cfeda01 │ NVDA         │ SMART    │ USD      │ 2024-11-13 16:29:20.952215 │     0.01 │ 147.33 │      400 │ QZ           │ 147.34 │     1200 │ JKPQVZNUH    │ 147.34 │       100 │ D             │   147.34 │           700 │   147.35 │          1500 │    147.36 │ 728558 │ 148.29 │ 2024-11-13 16:29:20.952215 │         2 │     147.34 │     1200 │ 9c0001      │                    3 │               2 │       971476 │ 2024-11-13 16:29:20.953000 │
 8. │ 51cecf7b-2e01-487c-855d-9eabcf9cc10f │ GTLB         │ SMART    │ USD      │ 2024-11-13 16:29:20.718662 │     0.01 │  62.77 │      200 │ QZ           │  62.94 │     1000 │ QL           │  62.85 │       100 │ Q             │    62.76 │           300 │    62.95 │          1300 │      62.8 │   8264 │  59.95 │ 2024-11-13 16:29:20.718662 │         0 │      62.77 │      200 │ 9c0001      │                    3 │               2 │       971475 │ 2024-11-13 16:29:20.720000 │
 9. │ f21be0aa-0e7d-40e4-a8d9-75a855a623f6 │ NVDA         │ SMART    │ USD      │ 2024-11-13 16:29:20.707354 │     0.01 │ 147.33 │      700 │ QVZ          │ 147.35 │     1200 │ PQVLH        │ 147.34 │       100 │ D             │   147.34 │           900 │   147.36 │          1500 │    147.36 │ 728558 │ 148.29 │ 2024-11-13 16:29:20.707354 │         3 │     147.35 │     1200 │ 9c0001      │                    3 │               2 │       971474 │ 2024-11-13 16:29:20.712000 │
10. │ d7efcba1-13b4-4129-b5a7-f3c61d3da259 │ NVDA         │ SMART    │ USD      │ 2024-11-13 16:29:20.707354 │     0.01 │ 147.33 │      700 │ QVZ          │ 147.35 │     1200 │ PQVLH        │ 147.34 │       100 │ D             │   147.34 │           900 │   147.36 │          1500 │    147.36 │ 728558 │ 148.29 │ 2024-11-13 16:29:20.707354 │         0 │     147.33 │      700 │ 9c0001      │                    3 │               2 │       971473 │ 2024-11-13 16:29:20.712000 │
    └──────────────────────────────────────┴──────────────┴──────────┴──────────┴────────────────────────────┴──────────┴────────┴──────────┴──────────────┴────────┴──────────┴──────────────┴────────┴───────────┴───────────────┴──────────┴───────────────┴──────────┴───────────────┴───────────┴────────┴────────┴────────────────────────────┴───────────┴────────────┴──────────┴─────────────┴──────────────────────┴─────────────────┴──────────────┴────────────────────────────┘

10 rows in set. Elapsed: 0.023 sec. Processed 145.71 thousand rows, 30.95 MB (6.38 million rows/s., 1.36 GB/s.)
Peak memory usage: 132.21 KiB.

We're up and running. For more details on how to work integrate Kafka, Vector, and Clickhouse, take a look at [6], [7], and [8].

References

[1] Bare Metal Kafka Using KRaft - https://www.ericeikrem.com/bare-metal-kafka/

[2] ClickHouse Quickstart https://clickhouse.com/docs/en/getting-started/quick-start

[3] Install ClickHouse https://clickhouse.com/docs/en/install

[4] How To Install and Use ClickHouse on Ubuntu 20.04 https://www.digitalocean.com/community/tutorials/how-to-install-and-use-clickhouse-on-ubuntu-20-04

[5] Vector Quickstart https://vector.dev/docs/setup/quickstart/

[6] Using Vector With Kafka and ClickHouse https://clickhouse.com/docs/en/integrations/kafka/kafka-vector

[7] Integrating Vector With ClickHouse https://clickhouse.com/docs/en/integrations/vector

[8] Integrating Kafka with ClickHouse https://clickhouse.com/docs/en/integrations/kafka

About Me

Experienced dev and PM. Data science, DataOps, Python and R. DevOps, Linux, clean code and agile. 10+ years working remotely. Polyglot. Startup experience.
LinkedIn Profile

By Me

Statistics & R - a blog about - you guessed it - statistics and the R programming language.
R-blog

Erlang Explained - a blog on the marvelllous programming language Erlang.
Erlang Explained