Sinks
Kafka Sink

Kafka Sink

The Kafka sink sends generated data to Kafka topics using the confluent_kafka Python package. It's useful for:

  • Real-time data streaming
  • Event-driven architectures
  • Integration with Kafka-based data pipelines

Configuration

The Kafka sink accepts any configuration parameters that are valid for the confluent_kafka Python package. The minimum required parameters are:

{
  "sink": {
    "type": "kafka",
    "params": {
      "bootstrap.servers": "your-kafka-bootstrap-server",
      "topic": "topic_name"
    }
  }
}

Additional Configuration

You can add any additional configuration parameters supported by the confluent_kafka package. Here are some common examples:

Secured Kafka Connection

{
  "sink": {
    "type": "kafka",
    "params": {
      "bootstrap.servers": "your-kafka-bootstrap-server",
      "topic": "topic_name",
      "security.protocol": "SASL_SSL",
      "sasl.mechanism": "PLAIN",
      "sasl.username": "your-api-key",
      "sasl.password": "your-api-secret"
    }
  }
}

Example Usage

Here's a complete example that generates user events and sends them to a Kafka topic:

{
  "schema": {
    "id": "$uuid",
    "name": "$name",
    "email": "$email",
    "timestamp": "$timestamp"
  },
  "sink": {
    "type": "kafka",
    "params": {
      "bootstrap.servers": "kafka1:9092,kafka2:9092",
      "topic": "user_events",
      "security.protocol": "SASL_SSL",
      "sasl.mechanism": "PLAIN",
      "sasl.username": "your-api-key",
      "sasl.password": "your-api-secret"
    }
  }
}

Message Format

Messages are sent as JSON strings with the following structure:

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "name": "John Doe",
  "email": "john.doe@example.com",
  "timestamp": 1710503445
}

Notes

  • The sink uses the confluent_kafka Python package internally to connect to Kafka
  • All configuration parameters are passed directly to the confluent_kafka.Producer
  • The topic parameter is required but handled separately from the producer configuration
  • For a complete list of supported configuration parameters, refer to the confluent_kafka documentation (opens in a new tab)