Kafka Sink
The Kafka sink sends generated data to Kafka topics using the confluent_kafka
Python package. It's useful for:
- Real-time data streaming
- Event-driven architectures
- Integration with Kafka-based data pipelines
Configuration
The Kafka sink accepts any configuration parameters that are valid for the confluent_kafka
Python package. The minimum required parameters are:
{
"sink": {
"type": "kafka",
"params": {
"bootstrap.servers": "your-kafka-bootstrap-server",
"topic": "topic_name"
}
}
}
Additional Configuration
You can add any additional configuration parameters supported by the confluent_kafka
package. Here are some common examples:
Secured Kafka Connection
{
"sink": {
"type": "kafka",
"params": {
"bootstrap.servers": "your-kafka-bootstrap-server",
"topic": "topic_name",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "your-api-key",
"sasl.password": "your-api-secret"
}
}
}
Example Usage
Here's a complete example that generates user events and sends them to a Kafka topic:
{
"schema": {
"id": "$uuid",
"name": "$name",
"email": "$email",
"timestamp": "$timestamp"
},
"sink": {
"type": "kafka",
"params": {
"bootstrap.servers": "kafka1:9092,kafka2:9092",
"topic": "user_events",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "your-api-key",
"sasl.password": "your-api-secret"
}
}
}
Message Format
Messages are sent as JSON strings with the following structure:
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"name": "John Doe",
"email": "john.doe@example.com",
"timestamp": 1710503445
}
Notes
- The sink uses the
confluent_kafka
Python package internally to connect to Kafka - All configuration parameters are passed directly to the
confluent_kafka.Producer
- The
topic
parameter is required but handled separately from the producer configuration - For a complete list of supported configuration parameters, refer to the confluent_kafka documentation (opens in a new tab)