Kafka Proxy Rust Forever (kprf)
HTTP Kafka producer-proxy based on librdkafka & warp
NOTE: This is non-stable alpha version so API and configuration can be changed.
Inspirations
In CityMobil we heavily rely on data-processing. It is widely used for pricing, statistics collection etc. Most of this data is processed from php-fpm.
For producing data to kafka internal librdkafka thread-pool is explicitly created. It is quite heavy operation for every php-fpm worker.
One of the easiest solutions is to use some external http-proxy. This is where 'kafka-proxy' (or kprf) comes to play.
Setup
rustc 1.51.0
is required to build kafka-proxy.
- To build debug version of kafka-proxy:
make # builds rust-debug version
Built debug binary is stored at target/debug/kprf
- To build release version of kafka-proxy:
make build_release
Built release binary is stored at target/release/kprf
- To run kafka-proxy(release version is recommended to use in production environment):
target/release/kprf --config=config_example.yaml
QuickStart
Following examples work if kafka-proxy is set up and running.
# Try asynchronous producing
curl 'http://127.0.0.1:4242/push' -H 'Content-Type: application/json' -d '{"records": [{"topic": "SOME_TOPIC", "data": "{"\a"\: "\b"\}"}],
"wait_for_send": false}'
# Possible success response:
{"status": "ok", "errors": []}
# Possible erroring response:
{"status": "error", "errors": [{"status": "error", "error": "some_message"}, {"status": "ok", "error":""}]}
JSON Fields:
records
– describes records for further producing.records[i].topic
– describes some kafka topic for producing.records[i].data
– describes some string data. Can be JSON, XML or anything.wait_for_send
– describes if http-producer client has to wait for delivery result or not. If false, message is produced asynchronously. Otherwise, synchronously. Default value isfalse
Configuration
At this moment, this options from librdkafka are supported:
kafka.brokers
– alias forbootstrap.servers
from librdkafka. Default value is empty array.kafka.user
– alias forsasl.username
from librdkafka. Default value is empty string.kafka.password
– alias forsasl.password
from librdkafka. Default value is empty string.kafka.message_max_bytes
– alias formessage.max.bytes
from librdkafka. Default value is1 MiB
.kafka.queue_buffering_max_messages
– alias forqueue.buffering.max.messages
from librdkafka. Default value is100000
.kafka.queue_buffering_max_ms
– alias forqueue.buffering.max.ms
from librdkafka. Default value is10
.kafka.queue_buffering_max_kbytes
– alias forqueue.buffering.max.kbytes
from librdkafka. Default value is1048576
.kafka.retries
– alias forretries
from librdkafka. Default value is3
.kafka.message_timeout_ms
– alias formessage.timeout.ms
from librdkafka. Default value is2000
.kafka.request_timeout_ms
– alias forrequest.timeout.ms
from librdkafka. Default value is30000
.kafka.request_required_acks
– alias forrequest.required.acks
from librdkafka. Default value is-1
.http.port
– port for HTTP server for producing messages. Default value is4242
http.metrics_port
– port for HTTP server for metrics. Default value is8088
output_file
– output file for logging. Default value is/dev/stdout
ratelimit.enabled
– enable or disable rate limits. Default value isfalse
ratelimit.rules
– rules for rate limits. Default value is[]
Example configuration
kafka:
brokers:
- '127.0.0.1:9092'
request_required_acks: 1
queue_buffering_max_ms: 20
queue_buffering_max_kbytes: 2048 # 2 MiB
http:
port: 4242
metrics_port: 8088
output_file: "/dev/stdout"
ratelimit: # ratelimit settings.
enabled: true # enables or disables ratelimiting.
rules: # defines list of rules for concrete topics.
- topic_name: "some_topic_name" # concrete topic name.
max_requests_per_minute: 42 # maximum requests per minute allowed for concrete topic.
Benchmarks
We managed to get such results with wrk
on v0.0.1-alpha version with 4 KiB
message size.
External host configuration:
CPU: 2-core Intel Core Processor (Broadwell) 2095 MHz.
RAM: 4 GiB DDR4
Virtualization: VT-x
Hypervisor vendor: KVM.
Kafka-proxy configuration:
kafka:
brokers:
- "some_kafka_broker_1.some_external_server"
- "some_kafka_broker_2.some_external_server"
- "some_kafka_broker_3.some_external_server"
- "some_kafka_broker_4.some_external_server"
- "some_kafka_broker_5.some_external_server"
request_required_acks: 1
queue_buffering_max_ms: 10
queue_buffering_max_kbytes: 2048 # 2 MiB
http:
port: 4242
- Asynchronous producing
wrk -s wrk.lua -t256 -c256 -d120s 'http://some_external_host:4242/push'
Running 2m test @ http://some_external_host:4242/push
256 threads and 256 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 20.72ms 11.53ms 145.52ms 72.16%
Req/Sec 49.57 21.97 1.02k 87.15%
864241 requests in 1.14m, 108.79MB read
Requests/sec: 12672.49
Transfer/sec: 1.60MB
- Synchronous producing
256 threads and 256 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 31.41ms 88.92ms 1.01s 96.82%
Req/Sec 59.35 13.91 111.00 74.87%
1765717 requests in 2.00m, 222.28MB read
Requests/sec: 14701.92
Transfer/sec: 1.85MB
Metrics
All metrics are available in OpenMetrics(prometheus) format on :8088
port(non-configurable at v0.0.1-alpha).
At this moment(v0.1.0) these metrics are available:
http_requests_duration
– Histogram of HTTP requests. Labelpush/sync
is set when message produced asynchronously, otherwise,push/async
is set.code
label is set according to HTTP response status.kafka_internal_queue_size
– Gauge of internal kafka-queue size, per topic.kafka_message_send_duration
– Histogram of kafka message duration before delivery result callback is received, per topic.kafka_sent_messages
– Counter of total kafka messages sent, per topic.kafka_errors_count
– Counter of total kafka errors, per topic.ratelimit_messages_count
– Counter of total ratelimited messages, per topic.
Kafka librdkafka metrics:
kafka_producer_reply_queue_size
– Operations (callbacks, events, etc.) waiting in queue.kafka_producer_current_messages_in_queue
– Current number of messages in producer queues.kafka_producer_current_messages_in_queue_bytes
– Current total size of messages in producer queues.kafka_producer_total_requests_count
– Total number of requests sent to brokerskafka_producer_total_bytes_sent
– Total number of bytes transmitted to brokerskafka_producer_total_responses_received
– Total number of responses received from brokerskafka_producer_total_bytes_received
– Total number of bytes received from brokerskafka_producer_total_messages_sent
– Total number of messages transmitted (produced) to brokerskafka_producer_total_messages_bytes_sent
– Total number of bytes transmitted (produced) to brokerskafka_producer_metadata_cache_topics_count
– Number of topics in the metadata cachekafka_producer_broker_state
– Broker state (INIT, DOWN, CONNECT, AUTH, APIVERSION_QUERY, AUTH_HANDSHAKE, UP, UPDATE).kafka_producer_broker_state_age
– The time since the last broker state change, in microsecondskafka_producer_broker_outbuf_count
– Number of requests awaiting transmission to the brokerkafka_producer_broker_outbuf_msg_count
– Number of messages awaiting transmission to the brokerkafka_producer_broker_waitresp_count
– Number of requests in-flight to the broker that are awaiting responsekafka_producer_broker_waitresp_msg_count
– Number of messages awaiting transmission to the brokerkafka_producer_broker_requests_sent
– Total number of requests sent to the brokerkafka_producer_broker_requests_sent_bytes
– Total number of bytes sent to the brokerkafka_producer_broker_transmission_errors
– Total number of transmission errorskafka_producer_broker_request_retries
– Total number of request retrieskafka_producer_request_timeouts
– Total number of requests that timed outkafka_producer_broker_responses_count
– Total number of responses received from the brokerkafka_producer_broker_bytes_received
– Total number of bytes received from the brokerkafka_producer_broker_errors_count
– Total number of received errorskafka_producer_topic_metadata_age
– The age of the client's metadata for this topic, in millisecondskafka_producer_topic_batchsize_avg
– Rolling window statistics for batch sizes, in byteskafka_producer_topic_batchcount_avg
– Rolling window statistics for batch message countskprf_app_metadata
– KPRF application metadata (commit_hash, version)
Further improvements
Write-Ahead-Log
. Write-Ahead-Log can be a good improvement if pattern of usage is asynchronous producing. Client does not know if his records were successfully sent to kafka. AddingWAL
can improve durability.