The Parseable Kafka Connector enables log ingestion from Apache Kafka into Parseable, providing a high-performance, scalable, and efficient logging pipeline.
Features
-
Consumer & Producer Support: Supports both consuming and producing messages (ready to use for DLT).
-
Configurable Buffering & Performance Settings: Optimized for high-throughput data processing.
-
Security Integration: Supports SSL/TLS and SASL authentication.
-
Fault Tolerance & Partitioning: Handles partition balancing, offsets, and error handling.
Configuration Options
General Kafka Configuration
Parameter | Environment Variable | Default Value | Description | Usage |
---|---|---|---|---|
|
| Comma-separated list of Kafka bootstrap servers. | Specifies the Kafka brokers the client should connect to. | |
|
|
| Client ID for Kafka connection. | Identifies the client instance in Kafka logs. |
|
|
| Number of parallel threads for Kafka partition listeners. | Determines the number of threads used to process Kafka partitions. |
|
|
| Policy for handling bad data. | Determines how the client should handle corrupt or invalid messages. Options: |
-
All parameters can be set using command-line arguments or environment variables.
-
Environment variables take precedence over default values.
-
When configuring both producer and consumer, make sure to specify relevant options in their respective sections.
For more details, refer to Kafka's official documentation.
Consumer Configuration
Parameter | Environment Variable | Default Value | Description | Usage |
---|---|---|---|---|
|
|
| Comma-separated list of topics to consume from. | Specify the Kafka topics the consumer should subscribe to. |
|
|
| The consumer group ID. | Used to group consumers for load balancing and fault tolerance. |
|
|
| Size of the buffer for batching records per partition. | Controls the number of messages buffered before processing. |
|
|
| Timeout for buffer flush in milliseconds. | Defines the time to wait before flushing buffered messages. |
|
|
| Group instance ID for static membership. | Useful for maintaining static assignments in consumer groups. |
|
|
| Partition assignment strategy. | Determines how partitions are assigned among consumers. |
|
|
| Session timeout in milliseconds. | Time before a consumer is considered inactive. |
|
|
| Heartbeat interval in milliseconds. | Frequency at which consumers send heartbeats. |
|
|
| Maximum poll interval in milliseconds. | Maximum time between poll calls before the consumer is considered dead. |
|
|
| Enable auto offset store. | Determines whether offsets are automatically stored after processing messages. |
|
|
| Auto offset reset behavior. | Determines whether to start from the beginning ( |
|
|
| Minimum bytes to fetch. | The smallest amount of data the broker should send. |
|
|
| Maximum bytes to fetch. | The maximum amount of data fetched in a single request. |
|
|
| Maximum wait time for fetch in milliseconds. | Maximum time the broker should wait before sending data. |
|
|
| Maximum bytes to fetch per partition. | Limits the maximum data fetched per partition. |
|
|
| Minimum messages to queue. | Controls the minimum number of messages buffered in the consumer. |
|
|
| Maximum message queue size in KBytes. | Determines the maximum queue size in kilobytes. |
|
|
| Enable partition EOF. | Signals when the end of a partition is reached. |
|
|
| Check CRCs on messages. | Ensures message integrity by verifying CRCs. |
|
|
| Transaction isolation level. | Controls whether uncommitted transactions should be visible to the consumer. |
|
|
| Maximum bytes per message. | Defines the largest individual message the consumer can fetch. |
|
|
| Statistics interval in milliseconds. | Defines the frequency at which consumer statistics are collected. |
-
All parameters can be set using command-line arguments or environment variables.
-
Environment variables take precedence over default values.
-
Some values, such as
group_instance_id
, are dynamically generated if not explicitly provided.
For more details, refer to Kafka's official documentation on consumer configurations.
Producer Configuration
Note: Producer configuration is not necessary at this moment. When DLT is implemented, these settings will be required.
Parameter | Environment Variable | Default Value | Description | Usage |
---|---|---|---|---|
|
|
| Number of acknowledgments the producer requires. | Determines when a message is considered successfully sent ( |
|
|
| Compression type for messages. | Determines how messages are compressed ( |
|
|
| Maximum size of a request in bytes. | Defines the size of batches sent to Kafka. |
|
|
| Delay to wait for more messages in the same batch. | Controls latency vs. throughput trade-off. |
|
|
| Local message timeout. | Time before an unacknowledged message is dropped. |
|
|
| Maximum number of in-flight requests per connection. | Controls how many messages can be sent without acknowledgment. |
|
|
| Maximum size of a message in bytes. | Restricts the maximum message size that can be sent. |
|
|
| Enable idempotent producer. | Ensures exactly-once delivery guarantees. |
|
|
| Transaction timeout. | Maximum time for a transaction before it times out. |
|
|
| Total bytes of memory the producer can use. | Limits the memory available for buffering messages. |
|
|
| Time to wait before retrying a failed request. | Defines back-off time for retries. |
|
|
| Time to wait for a response from brokers. | Limits how long the producer waits for broker acknowledgment. |
|
|
| Maximum number of messages allowed on the producer queue. | Prevents excessive message buffering. |
|
|
| Maximum total message size sum allowed on the producer queue. | Restricts the producer queue's total size. |
|
|
| Maximum time to report success or failure after send. | Defines the upper bound on message delivery time. |
|
|
| Maximum number of retries per message. | Controls how many times a message is retried before failing. |
|
|
| Maximum back-off time between retries. | Ensures retries are not too frequent. |
-
All parameters can be set using command-line arguments or environment variables.
-
Environment variables take precedence over default values.
-
Certain parameters, such as
--producer-acks
and--producer-enable-idempotence
, affect message durability and reliability.
For more details, refer to Kafka's official documentation on producer configurations.
Security Configuration
Parameter | Environment Variable | Default Value | Description | Usage |
---|---|---|---|---|
|
|
| Security protocol used for communication. | Determines whether SSL, SASL, or plaintext is used. |
|
|
| CA certificate file path. | Required when using SSL or SASL_SSL. |
|
|
| Client certificate file path. | Required when using SSL or SASL_SSL. |
|
|
| Client key file path. | Required when using SSL or SASL_SSL. |
|
|
| SSL key password. | Used if the SSL key is password protected. |
|
|
| SASL authentication mechanism. | Required when using SASL_SSL or SASL_PLAINTEXT ( |
|
|
| SASL username. | Required for |
|
|
| SASL password. | Required for |
|
|
| Kerberos service name. | Required when using |
|
|
| Kerberos principal. | Used for Kerberos authentication. |
|
|
| Path to Kerberos keytab file. | Required when using Kerberos authentication. |
|
|
| OAuth Bearer token endpoint. | Required when using |
|
|
| OAuth client ID. | Used for authentication with an OAuth provider. |
|
|
| OAuth client secret. | Used to authenticate the OAuth client. |
|
|
| OAuth scope. | Defines the permissions requested from the OAuth provider. |
Security Configuration Combinations
Plaintext Communication (No Security)
-
--security-protocol=PLAINTEXT
-
No additional parameters required.
SSL Encryption
-
--security-protocol=SSL
-
Required parameters:
-
--ssl-ca-location
-
--ssl-certificate-location
-
--ssl-key-location
-
--ssl-key-password
(if the key is password-protected)
-
SASL Authentication with SSL
-
--security-protocol=SASL_SSL
-
Required parameters:
-
--sasl-mechanism
-
--sasl-username
and--sasl-password
(forPLAIN
orSCRAM
mechanisms) -
--kerberos-service-name
and--kerberos-principal
(forGSSAPI
mechanism) -
--kerberos-keytab
(if using Kerberos authentication) -
SSL parameters (if required by the security policy)
-
SASL Authentication without SSL
-
--security-protocol=SASL_PLAINTEXT
-
Required parameters:
-
--sasl-mechanism
-
--sasl-username
and--sasl-password
(forPLAIN
orSCRAM
mechanisms) -
--kerberos-service-name
and--kerberos-principal
(forGSSAPI
mechanism) -
--kerberos-keytab
(if using Kerberos authentication)
-
OAuth Bearer Token Authentication (Not supported yet)
-
--security-protocol=SASL_SSL
orSASL_PLAINTEXT
-
--sasl-mechanism=OAUTHBEARER
-
Required parameters:
-
--oauth-token-endpoint
-
--oauth-client-id
-
--oauth-client-secret
-
--oauth-scope
(if required by the OAuth provider)
-
Examples
SSL Configuration
export P_KAFKA_SECURITY_PROTOCOL="SSL"
export P_KAFKA_SSL_CA_LOCATION="/path/to/ca.pem"
export P_KAFKA_SSL_CERTIFICATE_LOCATION="/path/to/client-cert.pem"
export P_KAFKA_SSL_KEY_LOCATION="/path/to/client-key.pem"
export P_KAFKA_SSL_KEY_PASSWORD="my-secure-password"
SASL Configuration
export P_KAFKA_SECURITY_PROTOCOL="SASL_SSL"
export P_KAFKA_SASL_MECHANISM="SCRAM-SHA-512"
export P_KAFKA_SASL_USERNAME="my-user"
export P_KAFKA_SASL_PASSWORD="my-password"
For more details, refer to Kafka's official security documentation.
Concurrency and Multi-Instance Processing
The connector uses a dedicated thread per partition with configurable concurrency:
export P_KAFKA_PARTITION_LISTENER_CONCURRENCY="2"
Thread Assignment Formula:
Threads per ingestor = min(Partitions per ingestor, Configured threads)
Example Scenarios:
-
Balanced Configuration:
-
6 partitions
-
2 ingest nodes
-
3 threads per node
-
Result: Each thread handles one partition
-
-
Over-threaded Configuration:
-
4 partitions
-
2 ingest nodes
-
4 threads per node
-
Result: 2 threads per node will be idle
-
Consumer Group Rebalancing
Rebalance triggers:
-
New consumer joins
-
Existing consumer leaves
-
Network issues
-
Partition reassignment
Available strategies:
-
Range: Assigns consecutive partitions
-
RoundRobin: Distributes evenly
-
Sticky: Minimizes reassignments (Not recommended for Parseable since rdKafka issues)
-
CooperativeSticky: Controlled rebalance (Not recommended for Parseable since rdKafka support is lacking)
Recommended configuration(by default):
export P_KAFKA_CONSUMER_PARTITION_STRATEGY="roundrobin,range"
export P_KAFKA_CONSUMER_SESSION_TIMEOUT="60000"
export P_KAFKA_CONSUMER_HEARTBEAT_INTERVAL="3000"
Metrics
All metrics listed in librdkafka's statistics documentation are available at the /metrics
endpoint.
Architecture and Design
Core Components
-
KafkaStreams: Manages consumers and partitions.
-
StreamWorker: Processes records per partition.
-
ParseableSinkProcessor: Transforms messages and sinks them to Parseable.
-
RebalanceListener: Handles partition rebalancing.
-
Metrics Collector: Provides Prometheus metrics.
-
Security Layer: Configurable authentication.
Error Handling
Error Type | Handling Strategy |
---|---|
Connection Errors | Retries every 1 second |
Fatal Errors | Stops the pipeline |
Auth Errors | Stops the pipeline |
Best Practices
-
Performance Tuning:
-
More partitions = Better parallelism
-
Use RoundRobin partition assignment
-
Increase partition count for high lag
-
-
Security:
-
Prefer SSL over SASL_PLAINTEXT
-
Rotate certificates regularly
-
Monitor authentication failures
-