Skip to Content
SinksKafka Sink

Kafka Sink

The Kafka sink sends generated data to Kafka topics using the confluent_kafka Python package. It’s useful for:

  • Real-time data streaming
  • Event-driven architectures
  • Integration with Kafka-based data pipelines

Configuration

The Kafka sink accepts any configuration parameters that are valid for the confluent_kafka Python package. The minimum required parameters are:

{ "sink": { "type": "kafka", "params": { "bootstrap.servers": "your-kafka-bootstrap-server", "topic": "topic_name" } } }

Additional Configuration

You can add any additional configuration parameters supported by the confluent_kafka package. Here are some common examples:

Secured Kafka Connection

{ "sink": { "type": "kafka", "params": { "bootstrap.servers": "your-kafka-bootstrap-server", "topic": "topic_name", "security.protocol": "SASL_SSL", "sasl.mechanism": "PLAIN", "sasl.username": "your-api-key", "sasl.password": "your-api-secret" } } }

Example Usage

Here’s a complete example that generates user events and sends them to a Kafka topic:

{ "schema": { "id": "$uuid", "name": "$name", "email": "$email", "timestamp": "$timestamp" }, "sink": { "type": "kafka", "params": { "bootstrap.servers": "kafka1:9092,kafka2:9092", "topic": "user_events", "security.protocol": "SASL_SSL", "sasl.mechanism": "PLAIN", "sasl.username": "your-api-key", "sasl.password": "your-api-secret" } } }

Message Format

Messages are sent as JSON strings with the following structure:

{ "id": "550e8400-e29b-41d4-a716-446655440000", "name": "John Doe", "email": "[email protected]", "timestamp": 1710503445 }

Notes

  • The sink uses the confluent_kafka Python package internally to connect to Kafka
  • All configuration parameters are passed directly to the confluent_kafka.Producer
  • The topic parameter is required but handled separately from the producer configuration
  • For a complete list of supported configuration parameters, refer to the confluent_kafka documentation 
Last updated on