By eric | Published May 20, 2024 | Updated May 13, 2025
Vector is a sturdy solution for moving data between different systems. In Vector parlance, you are moving data from a source to a sink, and in the middle you can do transforms. In this post, we are going to move data from a cluster of Apache Kafka brokers to a Clickhouse database, but most of it will be applicable to any source or sink. Vector has a huge — and ever-increasing — library of sources and sinks.
Some prerequisites:
— One or several Kafka brokers are operational. If not, have a look at [1] before going any further.
— The brokers are receiving messages of some kind.
— You've got a Clickhouse server going. If that's not the case, visit [2], [3] and — if you are using Ubuntu — [4].
Install Vector using a script:
curl --proto '=https' --tlsv1.2 -sSfL https://sh.vector.dev | bash
Other installation methods are available, including Docker images, see also [5]
Next, we use a configuration file, config.yaml, to set up Vector. The configuration file — in ~/.vector/config — defines the sources, transformations, and the sinks. A simple example:
# Source: Kafka
sources:
kafka:
type: kafka
bootstrap_servers: "broker_ip:port,another_broker_ip:port" # Kafka brokers
group_id: "consumer_group_name" # Consumer group ID
topics: ["topic"] # Kafka topic to read from
auto_offset_reset: "earliest" # Start reading from the earliest offset
# Transform: Parse the message string and map fields
transforms:
field_mapping:
type: remap
inputs: ["kafka"]
source: |
# Sink: Console output
sinks:
console_output:
type: console # Console output type
inputs:
- field_mapping # Input from the transformation
encoding:
codec: json # Output format as JSON for easy reading
This is a configuration example without transformations, and the sink is to console, not to Clickhouse. We do this to build the pipeline stepwise, and get each step right before we move on to the next one.
Before we look at what is coming out of Vector, let's look at the original messages on the Kafka broker. Let's use kcat for this. In this example, we use stock data, and the topic is "STK" and the group is "stocks":
$ kcat -b 10.0.0.20:9092 -G stocks STK
This gives us a long list of messages, the last being:
{
"stock_symbol": "MSFT",
"exchange": "SMART",
"currency": "USD",
"transaction_time": "2024-11-13 12:46:03.547902",
"min_tick": 0.01,
"bid": 421.7,
"bid_size": 1300.0,
"bid_exchange": "P",
"ask": 421.99,
"ask_size": 300.0,
"ask_exchange": "PQ",
"last": 421.62,
"last_size": 100.0,
"last_exchange": "P",
"prev_bid": 421.51,
"prev_bid_size": 200.0,
"prev_ask": 421.97,
"prev_ask_size": 200.0,
"prev_last": 421.6,
"volume": 474.0,
"close": 423.03,
"tick_time": "2024-11-13 12:46:03.547902",
"tick_type": 2,
"tick_price": 421.99,
"tic_size": 300.0,
"bboExchange": "9c0001",
"snapshot_permissions": 3
}
% Reached end of topic STK [2] at offset 889601
This looks straightforward; a well-formatted json message with keys and values. If we use a configuration file like the one above, and output the messages to console, what does Vector give us?
$ VECTOR_LOG=debug vector --config ~/.vector/config/vector.yaml
{"headers":{},
"message":"{\"stock_symbol\": \"AAPL\", \"exchange\": \"SMART\", \"currency\": \"USD\", \"transaction_time\": \"2024-11-13 13:10:27.143100\", \"min_tick\": 0.01, \"bid\": 223.5, \"bid_size\": 1000.0, \"bid_exchange\": \"K\", \"ask\": 223.58, \"ask_size\": 400.0, \"ask_exchange\": \"Q\", \"last\": 223.53, \"last_size\": 100.0, \"last_exchange\": \"Q\", \"prev_bid\": 223.51, \"prev_bid_size\": 200.0, \"prev_ask\": 223.53, \"prev_ask_size\": 300.0, \"prev_last\": 223.51, \"volume\": 1241.0, \"close\": 224.23, \"tick_time\": \"2024-11-13 13:10:27.143100\", \"tick_type\": 2, \"tick_price\": 223.58, \"tic_size\": 400.0, \"bboExchange\": \"9c0001\", \"snapshot_permissions\": 3}",
"message_key":null,
"offset":890820,
"partition":0,
"source_type":"kafka",
"timestamp":"2024-11-13T13:10:27.145Z",
"topic":"STK"
}
Looks almost ok. There is one problem; part of the payload is within one field, "message", and that part is not json. It's a json string. Here's where Vector transformations shine. You can use Vector's own language, VRL, to transform virtually anything into the structures and variable types that you want. In this case, let's use VRL's parse_json function to get proper json out of the string:
# Transform: Parse the message string and map fields
transforms:
field_mapping:
type: remap
inputs:
- kafka
source: |
.message = parse_json!(.message)
With this transformation, we get everything in proper json form:
{"headers":{},
"message":
{
"ask":148.77,
"ask_exchange":"KPQ",
"ask_size":1400.0,
"bboExchange":"9c0001"
,
"bid":148.73,
"bid_exchange":"Q",
"bid_size":100.0,
"close":148.29,
"currency":"USD",
"exchange":"SMART",
"last":148.75,
"last_exchange":"Z",
"last_size":100.0,
"min_tick":0.01,
"prev_ask":148.75,
"prev_ask_size":1500.0,
"prev_bid":148.71,
"prev_bid_size":1700.0,
"prev_last":148.74,
"snapshot_permissions":3,
"stock_symbol":"NVDA",
"tic_size":1400.0,
"tick_price":148.77,
"tick_time":"2024-11-13 13:33:59.488917",
"tick_type":3,
"transaction_time":"2024-11-13 13:33:59.488917",
"volume":53409.0
},
"message_key":null,
"offset":890562,
"partition":1,
"source_type":"kafka",
"timestamp":"2024-11-13T13:33:59.493Z",
"topic":"STK"
}
There is still room for improvement, though. The data within the message is on a different level from the rest of the data, and there is some superfluous data. Going back to the transformation:
transforms:
field_mapping:
type: remap
inputs:
- kafka
source: |
del(.headers)
del(.message_key)
del(.source_type)
del(.topic)
.message = parse_json!(.message)
.message.offset = .offset
.message.partition = .partition
.message.timestamp = .timestamp
. = .message
Now, we are getting rid of metadata in the message that is — most likely — not needed. Next, we bring metadata that is likely to be needed into the message. As a last step, we set the message equal to the payload.
{
"ask":421.31,
"ask_exchange":"PQ",
"ask_size":200.0,
"bboExchange":"9c0001",
"bid":421.14,
"bid_exchange":"QU",
"bid_size":200.0,
"close":423.03,
"currency":"USD",
"exchange":"SMART",
"last":421.3,
"last_exchange":"D",
"last_size":700.0,
"min_tick":0.01,
"offset":896727,
"partition":2,
"prev_ask":421.35,
"prev_ask_size":100.0,
"prev_bid":421.3,
"prev_bid_size":600.0,
"prev_last":421.35,
"snapshot_permissions":3,
"stock_symbol":"MSFT",
"tic_size":200.0,
"tick_price":421.31,
"tick_time":"2024-11-13 14:11:41.050312",
"tick_type":3,
"timestamp":"2024-11-13T14:11:41.061Z",
"transaction_time":"2024-11-13 14:11:41.050312",
"volume":1337.0
}
Looking pretty good.
You may have an existing sink, or you may be creating a sink from scratch. Either way, you will have to look at how to accommodate the data in the best way possible. In our case, we already have a sink, a Clickhouse table. It looks like this:
:) DESCRIBE TABLE stock_ticker_data;
DESCRIBE TABLE stock_ticker_data
Query id: 48295571-3bf2-4e55-98e8-0c7d9b6f12df
┌─name─────────────────┬─type─────────────────
1. │ transaction_id │ UUID
2. │ stock_symbol │ String
3. │ exchange │ String
4. │ currency │ String
5. │ transaction_time │ DateTime64(6, 'UTC')
6. │ min_tick │ Float32
7. │ bid │ Float32
8. │ bid_size │ Float32
9. │ bid_exchange │ String
10. │ ask │ Float32
11. │ ask_size │ Float32
12. │ ask_exchange │ String
13. │ last │ Float32
14. │ last_size │ Float32
15. │ last_exchange │ String
16. │ prev_bid │ Float32
17. │ prev_bid_size │ Float32
18. │ prev_ask │ Float32
19. │ prev_ask_size │ Float32
20. │ prev_last │ Float32
21. │ volume │ Float32
22. │ close │ Float32
23. │ tick_time │ DateTime64(6, 'UTC')
24. │ tick_type │ Int32
25. │ tick_price │ Float32
26. │ tic_size │ Float32
27. │ bboExchange │ String
28. │ snapshot_permissions │ Int32
29. │ kafka_partition │ Int32
30. │ kafka_offset │ Int64
31. │ kafka_timestamp │ DateTime64(6, 'UTC')
└──────────────────────┴──────────────────────┴───────
31 rows in set. Elapsed: 0.004 sec.
We have a missing field in the data coming from the transformation. The database requires a unique identifier — transaction_id — in addition to the fields we already have in the message. There is another problem as well; the timestamp format in kafka_timestamp does not coincide with the .timestamp coming from the Kafka broker. Let's resolve both issues:
transforms:
field_mapping:
type: remap
inputs:
- kafka_source
source: |
# Get keys and values from the message payload
.message = parse_json!(.message)
# Add Kafka metadata
.message.kafka_partition = .partition
.message.kafka_offset = .offset
# Parse the date added by Kafka
kafka_time_parsed = parse_timestamp!(.timestamp, "%+")
.message.kafka_timestamp = format_timestamp!(kafka_time_parsed, "%Y-%m-%d %H:%M:%S%.6f")
# Add a unique identifier for each tick
.message.transaction_id = uuid_v4()
# Add everything to payload
. = .message
# Get rid of superfluous content
del(.partition)
del(.offset)
del(.timestamp)
del(.message)
Output:
{
"ask":421.84,
"ask_exchange":"Q",
"ask_size":200.0,
"bboExchange":"9c0001",
"bid":421.64,
"bid_exchange":"QNL",
"bid_size":800.0,
"close":423.03,
"currency":"USD",
"exchange":"SMART",
"kafka_offset":899336,
"kafka_partition":2,
"kafka_timestamp":"2024-11-13 14:30:50.475000",
"last":421.74,
"last_exchange":"Q",
"last_size":100.0,
"min_tick":0.01,
"prev_ask":421.83,
"prev_ask_size":800.0,
"prev_bid":421.63,
"prev_bid_size":200.0,
"prev_last":421.73,
"snapshot_permissions":3,
"stock_symbol":"MSFT",
"tic_size":100.0,
"tick_price":421.74,
"tick_time":"2024-11-13 14:30:50.474435",
"tick_type":4,
"transaction_id":"8c0a8d79-c2c1-4047-82d9-4627ac632204",
"transaction_time":"2024-11-13 14:30:50.474435",
"volume":4336.0
}
All timestamps are now uniformly formatted and work with the DateTime(6, 'UTC') format used in the Clickhouse table.
It's time to connect to Clickhouse and start sending messages to it from Kafka. We can have multiple sinks, and as long we are in a trial-and-error phase, it might be a good idea to output both to the console and to Clickhouse. Let's just add another sink under sinks:
# Sink: Console output
sinks:
console_output:
type: console # Console output type
inputs:
- field_mapping # Input from the transformation
encoding:
codec: json # Output format as JSON for easy reading
clickhouse:
type: clickhouse # Sink type for ClickHouse
inputs: ["field_mapping"] # Input from the transformation step
endpoint: "your_server:port" # Replace with your ClickHouse server
database: "default" # Your ClickHouse database
table: "stock_ticker_data" # The target table in ClickHouse
skip_unknown_fields: true
compression: "gzip" # Compression for data transfer
batch:
max_bytes: 10485760 # Max batch size in bytes (10MB)
So how do we make sure this is reaching Clickhouse? Before we run it, let's make sure that the table is empty:
:) SELECT * FROM stock_ticker_data ORDER BY tick_time;
SELECT *
FROM stock_ticker_data
ORDER BY tick_time ASC
Query id: 0fa77f02-9912-42c0-94c7-05e064c4c591
Ok.
0 rows in set. Elapsed: 0.002 sec.
Fine. We start the script:
$ VECTOR_LOG=debug vector --config ~/.vector/config/vector.yaml
Then we go back to Clickhouse for a look:
clickhouse :) select * from stock_ticker_data ORDER BY tick_time DESC LIMIT 5
SELECT *
FROM stock_ticker_data
ORDER BY tick_time DESC
LIMIT 5
Query id: 50fa30c2-1ebd-4a3c-9f38-4e948f2dcf47
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 25.4.2.
Row 1:
──────
stock_symbol: NVDA
exchange: SMART
currency: USD
transaction_time: 2025-05-13 14:01:55.737032
min_tick: 0.01
bid: 127.62
bid_size: 5700
bid_exchange: KPQVZNUH
ask: 127.63
ask_size: 2300
ask_exchange: AKPQZNUH
last: 127.61
last_size: 200
last_exchange: K
prev_bid: 127.65
prev_bid_size: 1700
prev_ask: 127.66
prev_ask_size: 900
prev_last: 127.62
volume: 589418
close: 123
tick_type: 8
tick_price: -1
tic_size: 589418
bboExchange: 9c0001
snapshot_permissions: 3
kafka_partition: 1
kafka_offset: 2133488 -- 2.13 million
kafka_timestamp: 2025-05-13 14:01:55.737000
transaction_id: 3382377c-d88b-456d-b1d8-b57b54ee1f9f
tick_time: 2025-05-13 14:01:55.737032
Row 2:
──────
stock_symbol: NVDA
exchange: SMART
currency: USD
transaction_time: 2025-05-13 14:01:55.730362
min_tick: 0.01
bid: 127.62
bid_size: 5700
bid_exchange: KPQVZNUH
ask: 127.63
ask_size: 2300
ask_exchange: AKPQZNUH
last: 127.61
last_size: 200
last_exchange: K
prev_bid: 127.65
prev_bid_size: 1700
prev_ask: 127.66
prev_ask_size: 900
prev_last: 127.62
volume: 589402
close: 123
tick_type: 4
tick_price: 127.61
tic_size: 200
bboExchange: 9c0001
snapshot_permissions: 3
kafka_partition: 0
kafka_offset: 2134866 -- 2.13 million
kafka_timestamp: 2025-05-13 14:01:55.734000
transaction_id: ca9e92e4-2155-4633-af00-a4f706570d0d
tick_time: 2025-05-13 14:01:55.730362
Row 3:
──────
stock_symbol: MSFT
exchange: SMART
currency: USD
transaction_time: 2025-05-13 14:01:55.480371
min_tick: 0.01
bid: 450.01
bid_size: 300
bid_exchange: PZ
ask: 450.13
ask_size: 100
ask_exchange: Q
last: 450.1
last_size: 100
last_exchange: P
prev_bid: 450.02
prev_bid_size: 200
prev_ask: 450.14
prev_ask_size: 200
prev_last: 450.09
volume: 29543
close: 449.26
tick_type: 0
tick_price: 450.01
tic_size: 300
bboExchange: 9c0001
snapshot_permissions: 3
kafka_partition: 0
kafka_offset: 2134865 -- 2.13 million
kafka_timestamp: 2025-05-13 14:01:55.481000
transaction_id: bd0c29cb-b113-4475-850b-986d20ddef15
tick_time: 2025-05-13 14:01:55.480371
Row 4:
──────
stock_symbol: NVDA
exchange: SMART
currency: USD
transaction_time: 2025-05-13 14:01:55.475400
min_tick: 0.01
bid: 127.62
bid_size: 5700
bid_exchange: KPQVZNUH
ask: 127.63
ask_size: 2300
ask_exchange: AKPQZNUH
last: 127.62
last_size: 200
last_exchange: D
prev_bid: 127.65
prev_bid_size: 1700
prev_ask: 127.66
prev_ask_size: 900
prev_last: 127.65
volume: 589402
close: 123
tick_type: 8
tick_price: -1
tic_size: 589402
bboExchange: 9c0001
snapshot_permissions: 3
kafka_partition: 2
kafka_offset: 2131817 -- 2.13 million
kafka_timestamp: 2025-05-13 14:01:55.480000
transaction_id: 7532f7ae-1532-4765-a3f7-a6be7648dc91
tick_time: 2025-05-13 14:01:55.475400
Row 5:
──────
stock_symbol: NVDA
exchange: SMART
currency: USD
transaction_time: 2025-05-13 14:01:55.475400
min_tick: 0.01
bid: 127.62
bid_size: 5700
bid_exchange: KPQVZNUH
ask: 127.63
ask_size: 2300
ask_exchange: AKPQZNUH
last: 127.62
last_size: 200
last_exchange: D
prev_bid: 127.65
prev_bid_size: 1700
prev_ask: 127.66
prev_ask_size: 900
prev_last: 127.65
volume: 589402
close: 123
tick_type: 5
tick_price: 127.62
tic_size: 200
bboExchange: 9c0001
snapshot_permissions: 3
kafka_partition: 0
kafka_offset: 2134864 -- 2.13 million
kafka_timestamp: 2025-05-13 14:01:55.479000
transaction_id: dc44598c-01d2-4577-a01a-edd800c8e45a
tick_time: 2025-05-13 14:01:55.475400
5 rows in set. Elapsed: 0.080 sec. Processed 2.27 million rows, 28.92 MB (453.90 million rows/s., 7.24 GB/s.)
Peak memory usage: 955.99 KiB.
We're up and running. For more details on how to integrate Kafka, Vector, and Clickhouse, have a look at [6], [7], and [8].
It is easy to misconfigure something, as there are many moving parts. One typical error is Vector failing due to faults in the network configuration:
WARN http: vector::internal_events::http_client: HTTP error. error=error trying to connect: tcp connect error: Connection refused (os error 111)
This typically happens when the Clickhouse server is not running, there is a mismatch between the Vector and Clickhouse configurations, blocking network configurations, or a combination of the above.
1) The ClickHouse server is not running:
clickhouse-server # systemctl status clickhouse-server
If it is not running:
clickhouse-server # systemctl start clickhouse-server
If it is not enabled:
clickhouse-server # systemctl enable clickhouse-server
2) There is a mismatch between the ClickHouse and the Vector configuration:
The default listening port for Clickhouse is 8123. Make sure that this is the port that Clickhouse is listening to:
# netstat -tulpn | grep LISTEN
tcp 0 0 0.0.0.0:8123 0.0.0.0:* LISTEN 16065/clickhouse-se
tcp 0 0 0.0.0.0:9009 0.0.0.0:* LISTEN 16065/clickhouse-se
tcp 0 0 0.0.0.0:9005 0.0.0.0:* LISTEN 16065/clickhouse-se
tcp 0 0 0.0.0.0:9004 0.0.0.0:* LISTEN 16065/clickhouse-se
tcp 0 0 0.0.0.0:9000 0.0.0.0:* LISTEN 16065/clickhouse-se
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 1135/sshd: /usr/sbi
The netstat doesn't return anything similar to the above, you need to modify the ports in the Vector or the Clickhouse configuration file so that they correspond. In addition, check the network interface setup on the Clickhouse server (/etc/clickhouse-server/config.xml). If only IPv6 or IPv4 networks are available, then:
<listen_try>0</listen_try>
Moreover, only the listen host for either IPv6 or IPv4 should be configured. For IPv4 only, listening for any host:
<listen_host>0.0.0.0</listen_host>
3) Firewall rules or network configurations are blocking the connection:
Check that the port you have chosen above is open:
Rocky/Alma:
clickhouse-server # firewall-cmd --zone=public --list-all
Debian (with ufw installed)/Ubuntu:
clickhouse-server # ufw status
[1] Bare Metal Kafka Using KRaft - https://www.ericeikrem.com/bare-metal-kafka/
[2] ClickHouse Quickstart https://clickhouse.com/docs/en/getting-started/quick-start
[3] Install ClickHouse https://clickhouse.com/docs/en/install
[4] How To Install and Use ClickHouse on Ubuntu 20.04 https://www.digitalocean.com/community/tutorials/how-to-install-and-use-clickhouse-on-ubuntu-20-04
[5] Vector Quickstart https://vector.dev/docs/setup/quickstart/
[6] Using Vector With Kafka and ClickHouse https://clickhouse.com/docs/en/integrations/kafka/kafka-vector
[7] Integrating Vector With ClickHouse https://clickhouse.com/docs/en/integrations/vector
[8] Integrating Kafka with ClickHouse https://clickhouse.com/docs/en/integrations/kafka
Experienced dev and PM. Data science, DataOps, Python and R. DevOps, Linux, clean code and agile. 15+ years working remotely. Polyglot. Startup experience.
Signal: ElToro.1966