Kafka Connector

The Parseable Kafka Connector enables log ingestion from Apache Kafka into Parseable, providing a high-performance, scalable, and efficient logging pipeline.

Features

  • Consumer & Producer Support: Supports both consuming and producing messages (ready to use for DLT).

  • Configurable Buffering & Performance Settings: Optimized for high-throughput data processing.

  • Security Integration: Supports SSL/TLS and SASL authentication.

  • Fault Tolerance & Partitioning: Handles partition balancing, offsets, and error handling.

Configuration Options

General Kafka Configuration

Parameter

Environment Variable

Default Value

Description

Usage

--bootstrap-servers

P_KAFKA_BOOTSTRAP_SERVERS

localhost:9092

Comma-separated list of Kafka bootstrap servers.

Specifies the Kafka brokers the client should connect to.

--client-id

P_KAFKA_CLIENT_ID

parseable-connect

Client ID for Kafka connection.

Identifies the client instance in Kafka logs.

--partition-listener-concurrency

P_KAFKA_PARTITION_LISTENER_CONCURRENCY

2

Number of parallel threads for Kafka partition listeners.

Determines the number of threads used to process Kafka partitions.

--bad-data-policy

P_CONNECTOR_BAD_DATA_POLICY

fail

Policy for handling bad data.

Determines how the client should handle corrupt or invalid messages. Options: fail, drop (not yet supported), dlt (not yet supported).

  • All parameters can be set using command-line arguments or environment variables.

  • Environment variables take precedence over default values.

  • When configuring both producer and consumer, make sure to specify relevant options in their respective sections.

For more details, refer to Kafka's official documentation.

Consumer Configuration

Parameter

Environment Variable

Default Value

Description

Usage

--consumer-topics

P_KAFKA_CONSUMER_TOPICS

None

Comma-separated list of topics to consume from.

Specify the Kafka topics the consumer should subscribe to.

--consumer-group-id

P_KAFKA_CONSUMER_GROUP_ID

parseable-connect-cg

The consumer group ID.

Used to group consumers for load balancing and fault tolerance.

--buffer-size

P_KAFKA_CONSUMER_BUFFER_SIZE

10000

Size of the buffer for batching records per partition.

Controls the number of messages buffered before processing.

--buffer-timeout

P_KAFKA_CONSUMER_BUFFER_TIMEOUT

10000ms

Timeout for buffer flush in milliseconds.

Defines the time to wait before flushing buffered messages.

--consumer-group-instance-id

P_KAFKA_CONSUMER_GROUP_INSTANCE_ID

parseable-connect-cg-ii-{random}

Group instance ID for static membership.

Useful for maintaining static assignments in consumer groups.

--consumer-partition-strategy

P_KAFKA_CONSUMER_PARTITION_STRATEGY

roundrobin,range

Partition assignment strategy.

Determines how partitions are assigned among consumers.

--consumer-session-timeout

P_KAFKA_CONSUMER_SESSION_TIMEOUT

60000

Session timeout in milliseconds.

Time before a consumer is considered inactive.

--consumer-heartbeat-interval

P_KAFKA_CONSUMER_HEARTBEAT_INTERVAL

3000

Heartbeat interval in milliseconds.

Frequency at which consumers send heartbeats.

--consumer-max-poll-interval

P_KAFKA_CONSUMER_MAX_POLL_INTERVAL

300000

Maximum poll interval in milliseconds.

Maximum time between poll calls before the consumer is considered dead.

--consumer-enable-auto-offset-store

P_KAFKA_CONSUMER_ENABLE_AUTO_OFFSET_STORE

true

Enable auto offset store.

Determines whether offsets are automatically stored after processing messages.

--consumer-auto-offset-reset

P_KAFKA_CONSUMER_AUTO_OFFSET_RESET

earliest

Auto offset reset behavior.

Determines whether to start from the beginning (earliest) or latest (latest) offset.

--consumer-fetch-min-bytes

P_KAFKA_CONSUMER_FETCH_MIN_BYTES

1

Minimum bytes to fetch.

The smallest amount of data the broker should send.

--consumer-fetch-max-bytes

P_KAFKA_CONSUMER_FETCH_MAX_BYTES

52428800

Maximum bytes to fetch.

The maximum amount of data fetched in a single request.

--consumer-fetch-max-wait

P_KAFKA_CONSUMER_FETCH_MAX_WAIT

500

Maximum wait time for fetch in milliseconds.

Maximum time the broker should wait before sending data.

--consumer-max-partition-fetch-bytes

P_KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES

1048576

Maximum bytes to fetch per partition.

Limits the maximum data fetched per partition.

--consumer-queued-min-messages

P_KAFKA_CONSUMER_QUEUED_MIN_MESSAGES

100000

Minimum messages to queue.

Controls the minimum number of messages buffered in the consumer.

--consumer-queued-max-messages-kbytes

P_KAFKA_CONSUMER_QUEUED_MAX_MESSAGES_KBYTES

65536

Maximum message queue size in KBytes.

Determines the maximum queue size in kilobytes.

--consumer-enable-partition-eof

P_KAFKA_CONSUMER_ENABLE_PARTITION_EOF

false

Enable partition EOF.

Signals when the end of a partition is reached.

--consumer-check-crcs

P_KAFKA_CONSUMER_CHECK_CRCS

false

Check CRCs on messages.

Ensures message integrity by verifying CRCs.

--consumer-isolation-level

P_KAFKA_CONSUMER_ISOLATION_LEVEL

read_committed

Transaction isolation level.

Controls whether uncommitted transactions should be visible to the consumer.

--consumer-fetch-message-max-bytes

P_KAFKA_CONSUMER_FETCH_MESSAGE_MAX_BYTES

1048576

Maximum bytes per message.

Defines the largest individual message the consumer can fetch.

--consumer-stats-interval

P_KAFKA_CONSUMER_STATS_INTERVAL

10000

Statistics interval in milliseconds.

Defines the frequency at which consumer statistics are collected.

  • All parameters can be set using command-line arguments or environment variables.

  • Environment variables take precedence over default values.

  • Some values, such as group_instance_id, are dynamically generated if not explicitly provided.

For more details, refer to Kafka's official documentation on consumer configurations.

Producer Configuration

Note: Producer configuration is not necessary at this moment. When DLT is implemented, these settings will be required.

Parameter

Environment Variable

Default Value

Description

Usage

--producer-acks

P_KAFKA_PRODUCER_ACKS

all

Number of acknowledgments the producer requires.

Determines when a message is considered successfully sent (0, 1, all).

--producer-compression-type

P_KAFKA_PRODUCER_COMPRESSION_TYPE

lz4

Compression type for messages.

Determines how messages are compressed (none, gzip, snappy, lz4, zstd).

--producer-batch-size

P_KAFKA_PRODUCER_BATCH_SIZE

16384

Maximum size of a request in bytes.

Defines the size of batches sent to Kafka.

--producer-linger-ms

P_KAFKA_PRODUCER_LINGER_MS

5

Delay to wait for more messages in the same batch.

Controls latency vs. throughput trade-off.

--producer-message-timeout-ms

P_KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS

120000

Local message timeout.

Time before an unacknowledged message is dropped.

--producer-max-inflight

P_KAFKA_PRODUCER_MAX_INFLIGHT

5

Maximum number of in-flight requests per connection.

Controls how many messages can be sent without acknowledgment.

--producer-message-max-bytes

P_KAFKA_PRODUCER_MESSAGE_MAX_BYTES

1048576

Maximum size of a message in bytes.

Restricts the maximum message size that can be sent.

--producer-enable-idempotence

P_KAFKA_PRODUCER_ENABLE_IDEMPOTENCE

true

Enable idempotent producer.

Ensures exactly-once delivery guarantees.

--producer-transaction-timeout-ms

P_KAFKA_PRODUCER_TRANSACTION_TIMEOUT_MS

60000

Transaction timeout.

Maximum time for a transaction before it times out.

--producer-buffer-memory

P_KAFKA_PRODUCER_BUFFER_MEMORY

33554432

Total bytes of memory the producer can use.

Limits the memory available for buffering messages.

--producer-retry-backoff-ms

P_KAFKA_PRODUCER_RETRY_BACKOFF_MS

100

Time to wait before retrying a failed request.

Defines back-off time for retries.

--producer-request-timeout-ms

P_KAFKA_PRODUCER_REQUEST_TIMEOUT_MS

30000

Time to wait for a response from brokers.

Limits how long the producer waits for broker acknowledgment.

--producer-queue-buffering-max-messages

P_KAFKA_PRODUCER_QUEUE_BUFFERING_MAX_MESSAGES

100000

Maximum number of messages allowed on the producer queue.

Prevents excessive message buffering.

--producer-queue-buffering-max-kbytes

P_KAFKA_PRODUCER_QUEUE_BUFFERING_MAX_KBYTES

1048576

Maximum total message size sum allowed on the producer queue.

Restricts the producer queue's total size.

--producer-delivery-timeout-ms

P_KAFKA_PRODUCER_DELIVERY_TIMEOUT_MS

120000

Maximum time to report success or failure after send.

Defines the upper bound on message delivery time.

--producer-max-retries

P_KAFKA_PRODUCER_MAX_RETRIES

2147483647

Maximum number of retries per message.

Controls how many times a message is retried before failing.

--producer-retry-backoff-max-ms

P_KAFKA_PRODUCER_RETRY_BACKOFF_MAX_MS

1000

Maximum back-off time between retries.

Ensures retries are not too frequent.

  • All parameters can be set using command-line arguments or environment variables.

  • Environment variables take precedence over default values.

  • Certain parameters, such as --producer-acks and --producer-enable-idempotence, affect message durability and reliability.

For more details, refer to Kafka's official documentation on producer configurations.

Security Configuration

Parameter

Environment Variable

Default Value

Description

Usage

--security-protocol

P_KAFKA_SECURITY_PROTOCOL

PLAINTEXT

Security protocol used for communication.

Determines whether SSL, SASL, or plaintext is used.

--ssl-ca-location

P_KAFKA_SSL_CA_LOCATION

None

CA certificate file path.

Required when using SSL or SASL_SSL.

--ssl-certificate-location

P_KAFKA_SSL_CERTIFICATE_LOCATION

None

Client certificate file path.

Required when using SSL or SASL_SSL.

--ssl-key-location

P_KAFKA_SSL_KEY_LOCATION

None

Client key file path.

Required when using SSL or SASL_SSL.

--ssl-key-password

P_KAFKA_SSL_KEY_PASSWORD

None

SSL key password.

Used if the SSL key is password protected.

--sasl-mechanism

P_KAFKA_SASL_MECHANISM

None

SASL authentication mechanism.

Required when using SASL_SSL or SASL_PLAINTEXT (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER).

--sasl-username

P_KAFKA_SASL_USERNAME

None

SASL username.

Required for PLAIN or SCRAM SASL mechanisms.

--sasl-password

P_KAFKA_SASL_PASSWORD

None

SASL password.

Required for PLAIN or SCRAM SASL mechanisms.

--kerberos-service-name

P_KAFKA_KERBEROS_SERVICE_NAME

None

Kerberos service name.

Required when using GSSAPI SASL mechanism.

--kerberos-principal

P_KAFKA_KERBEROS_PRINCIPAL

None

Kerberos principal.

Used for Kerberos authentication.

--kerberos-keytab

P_KAFKA_KERBEROS_KEYTAB

None

Path to Kerberos keytab file.

Required when using Kerberos authentication.

--oauth-token-endpoint

P_KAFKA_OAUTH_TOKEN_ENDPOINT

None

OAuth Bearer token endpoint.

Required when using OAUTHBEARER SASL mechanism.

--oauth-client-id

P_KAFKA_OAUTH_CLIENT_ID

None

OAuth client ID.

Used for authentication with an OAuth provider.

--oauth-client-secret

P_KAFKA_OAUTH_CLIENT_SECRET

None

OAuth client secret.

Used to authenticate the OAuth client.

--oauth-scope

P_KAFKA_OAUTH_SCOPE

None

OAuth scope.

Defines the permissions requested from the OAuth provider.

Security Configuration Combinations

Plaintext Communication (No Security)

  • --security-protocol=PLAINTEXT

  • No additional parameters required.

SSL Encryption

  • --security-protocol=SSL

  • Required parameters:

    • --ssl-ca-location

    • --ssl-certificate-location

    • --ssl-key-location

    • --ssl-key-password (if the key is password-protected)

SASL Authentication with SSL

  • --security-protocol=SASL_SSL

  • Required parameters:

    • --sasl-mechanism

    • --sasl-username and --sasl-password (for PLAIN or SCRAM mechanisms)

    • --kerberos-service-name and --kerberos-principal (for GSSAPI mechanism)

    • --kerberos-keytab (if using Kerberos authentication)

    • SSL parameters (if required by the security policy)

SASL Authentication without SSL

  • --security-protocol=SASL_PLAINTEXT

  • Required parameters:

    • --sasl-mechanism

    • --sasl-username and --sasl-password (for PLAIN or SCRAM mechanisms)

    • --kerberos-service-name and --kerberos-principal (for GSSAPI mechanism)

    • --kerberos-keytab (if using Kerberos authentication)

OAuth Bearer Token Authentication (Not supported yet)

  • --security-protocol=SASL_SSL or SASL_PLAINTEXT

  • --sasl-mechanism=OAUTHBEARER

  • Required parameters:

    • --oauth-token-endpoint

    • --oauth-client-id

    • --oauth-client-secret

    • --oauth-scope (if required by the OAuth provider)

Examples

SSL Configuration

export P_KAFKA_SECURITY_PROTOCOL="SSL"
export P_KAFKA_SSL_CA_LOCATION="/path/to/ca.pem"
export P_KAFKA_SSL_CERTIFICATE_LOCATION="/path/to/client-cert.pem"
export P_KAFKA_SSL_KEY_LOCATION="/path/to/client-key.pem"
export P_KAFKA_SSL_KEY_PASSWORD="my-secure-password"

SASL Configuration

export P_KAFKA_SECURITY_PROTOCOL="SASL_SSL"
export P_KAFKA_SASL_MECHANISM="SCRAM-SHA-512"
export P_KAFKA_SASL_USERNAME="my-user"
export P_KAFKA_SASL_PASSWORD="my-password"

For more details, refer to Kafka's official security documentation.

Concurrency and Multi-Instance Processing

The connector uses a dedicated thread per partition with configurable concurrency:

export P_KAFKA_PARTITION_LISTENER_CONCURRENCY="2"

Thread Assignment Formula:

Threads per ingestor = min(Partitions per ingestor, Configured threads)

Example Scenarios:

  1. Balanced Configuration:

    • 6 partitions

    • 2 ingest nodes

    • 3 threads per node

    • Result: Each thread handles one partition

  2. Over-threaded Configuration:

    • 4 partitions

    • 2 ingest nodes

    • 4 threads per node

    • Result: 2 threads per node will be idle

Consumer Group Rebalancing

Rebalance triggers:

  • New consumer joins

  • Existing consumer leaves

  • Network issues

  • Partition reassignment

Available strategies:

  • Range: Assigns consecutive partitions

  • RoundRobin: Distributes evenly

  • Sticky: Minimizes reassignments (Not recommended for Parseable since rdKafka issues)

  • CooperativeSticky: Controlled rebalance (Not recommended for Parseable since rdKafka support is lacking)

Recommended configuration(by default):

export P_KAFKA_CONSUMER_PARTITION_STRATEGY="roundrobin,range"
export P_KAFKA_CONSUMER_SESSION_TIMEOUT="60000"
export P_KAFKA_CONSUMER_HEARTBEAT_INTERVAL="3000"

Metrics

All metrics listed in librdkafka's statistics documentation are available at the /metrics endpoint.

Architecture and Design

Core Components

  1. KafkaStreams: Manages consumers and partitions.

  2. StreamWorker: Processes records per partition.

  3. ParseableSinkProcessor: Transforms messages and sinks them to Parseable.

  4. RebalanceListener: Handles partition rebalancing.

  5. Metrics Collector: Provides Prometheus metrics.

  6. Security Layer: Configurable authentication.

Error Handling

Error Type

Handling Strategy

Connection Errors

Retries every 1 second

Fatal Errors

Stops the pipeline

Auth Errors

Stops the pipeline

Best Practices

  1. Performance Tuning:

    • More partitions = Better parallelism

    • Use RoundRobin partition assignment

    • Increase partition count for high lag

  2. Security:

    • Prefer SSL over SASL_PLAINTEXT

    • Rotate certificates regularly

    • Monitor authentication failures

Updated on