eric | May 20, 2024, 3:05 p.m.
Vector is a sturdy solution for moving data between different systems. In Vector parlance, you are moving data from a source to a sink, and in the middle you can do transforms. In this post, we are going to move data from a cluster of Apache Kafka brokers to a Clickhouse database. However, a lot of the suggestions will be applicable to any source or sink.
Some prerequisites:
- One or several Kafka brokers are operational. If not, take a look at [1] before going any further.
- The brokers are receiving messages of some kind.
- You've got a Clickhouse server going. If that's not the case, take a look at [2], [3] and — if you are using Ubuntu — [4].
Install Vector using a script:
curl --proto '=https' --tlsv1.2 -sSfL https://sh.vector.dev | bash
Other installation methods are available, including Docker images, see also [5]
Next, we use a configuration file, config.yaml, to set up Vector. The configuration file - in ~/.vector/config - defines the sources, transformations, and the sinks. A simple example:
# Source: Kafka
sources:
kafka:
type: kafka
bootstrap_servers: "broker_ip:port,another_broker_ip:port" # Kafka brokers
group_id: "consumer_group_name" # Consumer group ID
topics: ["topic"] # Kafka topic to read from
auto_offset_reset: "earliest" # Start reading from the earliest offset
# Transform: Parse the message string and map fields
transforms:
field_mapping:
type: remap
inputs: ["kafka"]
source: |
# Sink: Console output
sinks:
console_output:
type: console # Console output type
inputs:
- field_mapping # Input from the transformation
encoding:
codec: json # Output format as JSON for easy reading
This is a configuration example without transformations, and the sink is to console, not to Clickhouse. We do this to build the pipeline stepwise, and get each step right before we move on to the next one.
Before we look at what is coming out of Vector, let's look at the original messages on the Kafka broker. Let's use kcat for this. In this example, we use stock data, and the topic is "STK" and the group is "stocks":
$ kcat -b 10.0.0.20:9092 -G stocks STK
This gives us a long list of messages, the last being:
{"stock_symbol": "MSFT", "exchange": "SMART", "currency": "USD", "transaction_time": "2024-11-13 12:46:03.547902", "min_tick": 0.01, "bid": 421.7, "bid_size": 1300.0, "bid_exchange": "P", "ask": 421.99, "ask_size": 300.0, "ask_exchange": "PQ", "last": 421.62, "last_size": 100.0, "last_exchange": "P", "prev_bid": 421.51, "prev_bid_size": 200.0, "prev_ask": 421.97, "prev_ask_size": 200.0, "prev_last": 421.6, "volume": 474.0, "close": 423.03, "tick_time": "2024-11-13 12:46:03.547902", "tick_type": 2, "tick_price": 421.99, "tic_size": 300.0, "bboExchange": "9c0001", "snapshot_permissions": 3}
% Reached end of topic STK [2] at offset 889601
This looks straightforward; a well-formatted json message with keys and values. If we use a configuration file like the one above, and output the messages to console, what does Vector give us?
$ VECTOR_LOG=debug vector --config ~/.vector/config/vector.yaml
{"headers":{},"message":"{\"stock_symbol\": \"AAPL\", \"exchange\": \"SMART\", \"currency\": \"USD\", \"transaction_time\": \"2024-11-13 13:10:27.143100\", \"min_tick\": 0.01, \"bid\": 223.5, \"bid_size\": 1000.0, \"bid_exchange\": \"K\", \"ask\": 223.58, \"ask_size\": 400.0, \"ask_exchange\": \"Q\", \"last\": 223.53, \"last_size\": 100.0, \"last_exchange\": \"Q\", \"prev_bid\": 223.51, \"prev_bid_size\": 200.0, \"prev_ask\": 223.53, \"prev_ask_size\": 300.0, \"prev_last\": 223.51, \"volume\": 1241.0, \"close\": 224.23, \"tick_time\": \"2024-11-13 13:10:27.143100\", \"tick_type\": 2, \"tick_price\": 223.58, \"tic_size\": 400.0, \"bboExchange\": \"9c0001\", \"snapshot_permissions\": 3}","message_key":null,"offset":890820,"partition":0,"source_type":"kafka","timestamp":"2024-11-13T13:10:27.145Z","topic":"STK"}
Looks almost ok. There is one problem; part of the payload is within one field, "message", and that part is not json. It's a json string. Here's where Vector transformations shine. You can use Vector's own language, VRL, to transform virtually anything into the structures and variable types that you want. In this case, let's use VRL's parse_json function to get proper json out of the string:
# Transform: Parse the message string and map fields
transforms:
field_mapping:
type: remap
inputs:
- kafka
source: |
.message = parse_json!(.message)
With this transformation, we get everything in proper json form:
{"headers":{},"message":{"ask":148.77,"ask_exchange":"KPQ","ask_size":1400.0,"bboExchange":"9c0001","bid":148.73,"bid_exchange":"Q","bid_size":100.0,"close":148.29,"currency":"USD","exchange":"SMART","last":148.75,"last_exchange":"Z","last_size":100.0,"min_tick":0.01,"prev_ask":148.75,"prev_ask_size":1500.0,"prev_bid":148.71,"prev_bid_size":1700.0,"prev_last":148.74,"snapshot_permissions":3,"stock_symbol":"NVDA","tic_size":1400.0,"tick_price":148.77,"tick_time":"2024-11-13 13:33:59.488917","tick_type":3,"transaction_time":"2024-11-13 13:33:59.488917","volume":53409.0},"message_key":null,"offset":890562,"partition":1,"source_type":"kafka","timestamp":"2024-11-13T13:33:59.493Z","topic":"STK"}
There is still room for improvement, though. The data within the message is on a different level from the rest of the data, and there is some superfluous data. Going back to the transformation:
transforms:
field_mapping:
type: remap
inputs:
- kafka
source: |
del(.headers)
del(.message_key)
del(.source_type)
del(.topic)
.message = parse_json!(.message)
.message.offset = .offset
.message.partition = .partition
.message.timestamp = .timestamp
. = .message
Now, we are getting rid of metadata inf the message that is — most likely — not needed. Next, we bring metadata that is likely to be needed into the message. As a last step, we set the message equal to the payload.
{"ask":421.31,"ask_exchange":"PQ","ask_size":200.0,"bboExchange":"9c0001","bid":421.14,"bid_exchange":"QU","bid_size":200.0,"close":423.03,"currency":"USD","exchange":"SMART","last":421.3,"last_exchange":"D","last_size":700.0,"min_tick":0.01,"offset":896727,"partition":2,"prev_ask":421.35,"prev_ask_size":100.0,"prev_bid":421.3,"prev_bid_size":600.0,"prev_last":421.35,"snapshot_permissions":3,"stock_symbol":"MSFT","tic_size":200.0,"tick_price":421.31,"tick_time":"2024-11-13 14:11:41.050312","tick_type":3,"timestamp":"2024-11-13T14:11:41.061Z","transaction_time":"2024-11-13 14:11:41.050312","volume":1337.0}
Looking pretty good.
You may have an existing sink, or you may be creating a sink from scratch. Either way, you will have to look at how to accommodate the data in the best way possible. In our case, we already have a sink, a Clickhouse table. It looks like this:
:) DESCRIBE TABLE stock_ticker_data;
DESCRIBE TABLE stock_ticker_data
Query id: 48295571-3bf2-4e55-98e8-0c7d9b6f12df
┌─name─────────────────┬─type─────────────────
1. │ transaction_id │ UUID
2. │ stock_symbol │ String
3. │ exchange │ String
4. │ currency │ String
5. │ transaction_time │ DateTime64(6, 'UTC')
6. │ min_tick │ Float32
7. │ bid │ Float32
8. │ bid_size │ Float32
9. │ bid_exchange │ String
10. │ ask │ Float32
11. │ ask_size │ Float32
12. │ ask_exchange │ String
13. │ last │ Float32
14. │ last_size │ Float32
15. │ last_exchange │ String
16. │ prev_bid │ Float32
17. │ prev_bid_size │ Float32
18. │ prev_ask │ Float32
19. │ prev_ask_size │ Float32
20. │ prev_last │ Float32
21. │ volume │ Float32
22. │ close │ Float32
23. │ tick_time │ DateTime64(6, 'UTC')
24. │ tick_type │ Int32
25. │ tick_price │ Float32
26. │ tic_size │ Float32
27. │ bboExchange │ String
28. │ snapshot_permissions │ Int32
29. │ kafka_partition │ Int32
30. │ kafka_offset │ Int64
31. │ kafka_timestamp │ DateTime64(6, 'UTC')
└──────────────────────┴──────────────────────┴───────
31 rows in set. Elapsed: 0.004 sec.
We have a missing field. The database requires a unique identifier — transaction_id — in addition to the fields we already have in the message. There is another problem as well; the timestamp format in kafka_timestamp does not coincide with the .timestamp coming from the Kafka broker. Let's fix both issues:
transforms:
field_mapping:
type: remap
inputs:
- kafka_source
source: |
# Get keys and values from the message payload
.message = parse_json!(.message)
# Add Kafka metadata
.message.kafka_partition = .partition
.message.kafka_offset = .offset
kafka_time_parsed = parse_timestamp!(.timestamp, "%Y-%m-%d %H:%M:%S%.f")
kafka_time = format_timestamp!(kafka_time_parsed, "%Y-%m-%d %H:%M:%S%.6f")
.message.kafka_timestamp = kafka_time
# Add a unique identifier for each tick
.message.transaction_id = uuid_v4()
# Add everything to payload
. = .message
# Get rid of superfluous content
del(.partition)
del(.offset)
del(.timestamp)
del(.message)
Output:
{"ask":421.84,"ask_exchange":"Q","ask_size":200.0,"bboExchange":"9c0001","bid":421.64,"bid_exchange":"QNL","bid_size":800.0,"close":423.03,"currency":"USD","exchange":"SMART","kafka_offset":899336,"kafka_partition":2,"kafka_timestamp":"2024-11-13 14:30:50.475000","last":421.74,"last_exchange":"Q","last_size":100.0,"min_tick":0.01,"prev_ask":421.83,"prev_ask_size":800.0,"prev_bid":421.63,"prev_bid_size":200.0,"prev_last":421.73,"snapshot_permissions":3,"stock_symbol":"MSFT","tic_size":100.0,"tick_price":421.74,"tick_time":"2024-11-13 14:30:50.474435","tick_type":4,"transaction_id":"8c0a8d79-c2c1-4047-82d9-4627ac632204","transaction_time":"2024-11-13 14:30:50.474435","volume":4336.0}
All timestamps are now uniformly formatted and work with the DateTime(6, 'UTC') format used in the Clickhouse table.
It's time to connect to Clickhouse and start sending messages to it from Kafka. We can have multiple sinks, and as long we are in a trial-and-error phase, it might be a good idea to output both to the console and to Clickhouse. Let's just add another sink under sinks:
# Sink: Console output
sinks:
console_output:
type: console # Console output type
inputs:
- field_mapping # Input from the transformation
encoding:
codec: json # Output format as JSON for easy reading
clickhouse:
type: clickhouse # Sink type for ClickHouse
inputs: ["field_mapping"] # Input from the transformation step
endpoint: "your_server:port" # Replace with your ClickHouse server
database: "default" # Your ClickHouse database
table: "stock_ticker_data" # The target table in ClickHouse
skip_unknown_fields: true
compression: "gzip" # Compression for data transfer
batch:
max_bytes: 10485760 # Max batch size in bytes (10MB)
So how do we make sure this is reaching Clickhouse? Before we run it, lets make sure that the table is empty:
:) SELECT * FROM stock_ticker_data ORDER BY tick_time;
SELECT *
FROM stock_ticker_data
ORDER BY tick_time ASC
Query id: 0fa77f02-9912-42c0-94c7-05e064c4c591
Ok.
0 rows in set. Elapsed: 0.002 sec.
Fine. We start the script:
$ VECTOR_LOG=debug vector --config ~/.vector/config/vector.yaml
Then we go back to Clickhouse for a look:
:) SELECT * FROM stock_ticker_data ORDER BY tick_time DESC LIMIT 10;
SELECT *
FROM stock_ticker_data
ORDER BY tick_time DESC
LIMIT 10
Query id: 3e357216-2a5b-42c1-b9c0-ba19d75569c6
┌─transaction_id───────────────────────┬─stock_symbol─┬─exchange─┬─currency─┬───────────transaction_time─┬─min_tick─┬────bid─┬─bid_size─┬─bid_exchange─┬────ask─┬─ask_size─┬─ask_exchange─┬───last─┬─last_size─┬─last_exchange─┬─prev_bid─┬─prev_bid_size─┬─prev_ask─┬─prev_ask_size─┬─prev_last─┬─volume─┬──close─┬──────────────────tick_time─┬─tick_type─┬─tick_price─┬─tic_size─┬─bboExchange─┬─snapshot_permissions─┬─kafka_partition─┬─kafka_offset─┬────────────kafka_timestamp─┐
1. │ 731a24d3-f8c8-4160-b9fb-26241de40803 │ MSFT │ SMART │ USD │ 2024-11-13 16:29:21.698411 │ 0.01 │ 422.81 │ 100 │ Q │ 422.84 │ 100 │ Q │ 422.81 │ 100 │ D │ 422.8 │ 200 │ 422.86 │ 200 │ 422.82 │ 45871 │ 423.03 │ 2024-11-13 16:29:21.698411 │ 5 │ 422.81 │ 100 │ 9c0001 │ 3 │ 2 │ 971482 │ 2024-11-13 16:29:21.705000 │
2. │ 595ff59f-d4c7-418c-bd62-e48e414b83ee │ MSFT │ SMART │ USD │ 2024-11-13 16:29:21.477296 │ 0.01 │ 422.81 │ 100 │ Q │ 422.84 │ 100 │ Q │ 422.82 │ 600 │ D │ 422.8 │ 200 │ 422.86 │ 200 │ 422.83 │ 45871 │ 423.03 │ 2024-11-13 16:29:21.477296 │ 3 │ 422.84 │ 100 │ 9c0001 │ 3 │ 2 │ 971481 │ 2024-11-13 16:29:21.478000 │
3. │ 41b50e29-27d8-42ff-8d7d-deee3e50f1e8 │ AAPL │ SMART │ USD │ 2024-11-13 16:29:21.460808 │ 0.01 │ 224.39 │ 300 │ QV │ 224.41 │ 1800 │ PQVZLU │ 224.39 │ 900 │ D │ 224.4 │ 200 │ 224.42 │ 800 │ 224.4 │ 139034 │ 224.23 │ 2024-11-13 16:29:21.460808 │ 5 │ 224.39 │ 900 │ 9c0001 │ 3 │ 2 │ 971480 │ 2024-11-13 16:29:21.466000 │
4. │ 158f7959-6daf-47ae-bc61-d79c0771c6d9 │ AAPL │ SMART │ USD │ 2024-11-13 16:29:21.449326 │ 0.01 │ 224.39 │ 300 │ QV │ 224.41 │ 1800 │ PQVZLU │ 224.4 │ 100 │ Q │ 224.4 │ 200 │ 224.42 │ 800 │ 224.41 │ 139034 │ 224.23 │ 2024-11-13 16:29:21.449326 │ 8 │ -1 │ 139034 │ 9c0001 │ 3 │ 2 │ 971479 │ 2024-11-13 16:29:21.454000 │
5. │ 0546e049-1ea4-4df4-916d-7994e7ccb93e │ MSFT │ SMART │ USD │ 2024-11-13 16:29:21.209395 │ 0.01 │ 422.81 │ 100 │ Q │ 422.84 │ 200 │ Q │ 422.82 │ 600 │ D │ 422.8 │ 200 │ 422.86 │ 1000 │ 422.83 │ 45871 │ 423.03 │ 2024-11-13 16:29:21.209395 │ 8 │ -1 │ 45871 │ 9c0001 │ 3 │ 2 │ 971478 │ 2024-11-13 16:29:21.217000 │
6. │ 00d2fc3f-a642-4560-9536-44a351c9e18f │ MSFT │ SMART │ USD │ 2024-11-13 16:29:21.203090 │ 0.01 │ 422.81 │ 100 │ Q │ 422.84 │ 200 │ Q │ 422.82 │ 600 │ D │ 422.8 │ 200 │ 422.86 │ 1000 │ 422.83 │ 45865 │ 423.03 │ 2024-11-13 16:29:21.203090 │ 5 │ 422.82 │ 600 │ 9c0001 │ 3 │ 2 │ 971477 │ 2024-11-13 16:29:21.208000 │
7. │ 4cb43010-a2f0-49d4-8798-05483cfeda01 │ NVDA │ SMART │ USD │ 2024-11-13 16:29:20.952215 │ 0.01 │ 147.33 │ 400 │ QZ │ 147.34 │ 1200 │ JKPQVZNUH │ 147.34 │ 100 │ D │ 147.34 │ 700 │ 147.35 │ 1500 │ 147.36 │ 728558 │ 148.29 │ 2024-11-13 16:29:20.952215 │ 2 │ 147.34 │ 1200 │ 9c0001 │ 3 │ 2 │ 971476 │ 2024-11-13 16:29:20.953000 │
8. │ 51cecf7b-2e01-487c-855d-9eabcf9cc10f │ GTLB │ SMART │ USD │ 2024-11-13 16:29:20.718662 │ 0.01 │ 62.77 │ 200 │ QZ │ 62.94 │ 1000 │ QL │ 62.85 │ 100 │ Q │ 62.76 │ 300 │ 62.95 │ 1300 │ 62.8 │ 8264 │ 59.95 │ 2024-11-13 16:29:20.718662 │ 0 │ 62.77 │ 200 │ 9c0001 │ 3 │ 2 │ 971475 │ 2024-11-13 16:29:20.720000 │
9. │ f21be0aa-0e7d-40e4-a8d9-75a855a623f6 │ NVDA │ SMART │ USD │ 2024-11-13 16:29:20.707354 │ 0.01 │ 147.33 │ 700 │ QVZ │ 147.35 │ 1200 │ PQVLH │ 147.34 │ 100 │ D │ 147.34 │ 900 │ 147.36 │ 1500 │ 147.36 │ 728558 │ 148.29 │ 2024-11-13 16:29:20.707354 │ 3 │ 147.35 │ 1200 │ 9c0001 │ 3 │ 2 │ 971474 │ 2024-11-13 16:29:20.712000 │
10. │ d7efcba1-13b4-4129-b5a7-f3c61d3da259 │ NVDA │ SMART │ USD │ 2024-11-13 16:29:20.707354 │ 0.01 │ 147.33 │ 700 │ QVZ │ 147.35 │ 1200 │ PQVLH │ 147.34 │ 100 │ D │ 147.34 │ 900 │ 147.36 │ 1500 │ 147.36 │ 728558 │ 148.29 │ 2024-11-13 16:29:20.707354 │ 0 │ 147.33 │ 700 │ 9c0001 │ 3 │ 2 │ 971473 │ 2024-11-13 16:29:20.712000 │
└──────────────────────────────────────┴──────────────┴──────────┴──────────┴────────────────────────────┴──────────┴────────┴──────────┴──────────────┴────────┴──────────┴──────────────┴────────┴───────────┴───────────────┴──────────┴───────────────┴──────────┴───────────────┴───────────┴────────┴────────┴────────────────────────────┴───────────┴────────────┴──────────┴─────────────┴──────────────────────┴─────────────────┴──────────────┴────────────────────────────┘
10 rows in set. Elapsed: 0.023 sec. Processed 145.71 thousand rows, 30.95 MB (6.38 million rows/s., 1.36 GB/s.)
Peak memory usage: 132.21 KiB.
We're up and running. For more details on how to work integrate Kafka, Vector, and Clickhouse, take a look at [6], [7], and [8].
[1] Bare Metal Kafka Using KRaft - https://www.ericeikrem.com/bare-metal-kafka/
[2] ClickHouse Quickstart https://clickhouse.com/docs/en/getting-started/quick-start
[3] Install ClickHouse https://clickhouse.com/docs/en/install
[4] How To Install and Use ClickHouse on Ubuntu 20.04 https://www.digitalocean.com/community/tutorials/how-to-install-and-use-clickhouse-on-ubuntu-20-04
[5] Vector Quickstart https://vector.dev/docs/setup/quickstart/
[6] Using Vector With Kafka and ClickHouse https://clickhouse.com/docs/en/integrations/kafka/kafka-vector
[7] Integrating Vector With ClickHouse https://clickhouse.com/docs/en/integrations/vector
[8] Integrating Kafka with ClickHouse https://clickhouse.com/docs/en/integrations/kafka
Experienced dev and PM. Data science, DataOps, Python and R. DevOps, Linux, clean code and agile. 10+ years working remotely. Polyglot. Startup experience.
LinkedIn Profile
Statistics & R - a blog about - you guessed it - statistics and the R programming language.
R-blog
Erlang Explained - a blog on the marvelllous programming language Erlang.
Erlang Explained