AWS Distro for OpenTelemetry
Kafka receiver and exporter
Kafka receiver and exporter
The Kafka receiver and exporter allows you to send and receive telemetry signals (currently traces and metrics) from a Kafka cluster. The exporter will produce telemetry signals and will submit it to a configured topic. A Kafka receiver can consume from this topic and further send data to the telemetry pipeline configured in this collector instance. This component is flexible enough that you can have multiple consumer groups for the same topic.
Upstream Kafka Exporter/Receiver documentation
Please find bellow the documentation for each of these components:
Notes:
- These components don't create Kafka topics on your behalf. They need to be pre-created or you need to set the cluster configuration to auto create topics.
- It is a good practice to set the
protocol_version
property to match the version of Kafka used in your cluster. - The kafka exporter uses a synchronous producer that blocks and does not batch messages, therefore it should be used with batch and queued retry processors for higher throughput and resiliency.
The ADOT collector is tested with Kafka Versions 2.8.1 and 3.2.0.
Integrating with Amazon Managed Streaming for Apache Kafka (MSK)
The Kafka receiver and exporters can be integrated with Amazon Managed Streaming for Apache Kafka (MSK).
You can create a MSK cluster following this guide.
- Select the authentication method accordingly to the options supported by the receivers/exporters.
- Enable TLS.
After the cluster is provisioned, you can get the list of brokers by selecting the newly created cluster in the console and then
clicking in the View client information
button.
With the list of brokers, you can configure the collector accordingly.
For a Kafka receiver the configuration would look like:
kafka/receiver: auth: tls: insecure: false protocol_version: 3.2.0 topic: some-topic brokers: - b-2.testcluster.abc123.c13.kafka.us-west-2.amazonaws.com:9094 - b-1.testcluster.abc123.c13.kafka.us-west-2.amazonaws.com:9094 - b-3.testcluster.abc123.c13.kafka.us-west-2.amazonaws.com:9094
For a Kafka exporter, the configuration would look like:
kafka/exporter: auth: tls: insecure: false protocol_version: 3.2.0 topic: some-topic brokers: - b-2.testcluster.abc123.c13.kafka.us-west-2.amazonaws.com:9094 - b-1.testcluster.abc123.c13.kafka.us-west-2.amazonaws.com:9094 - b-3.testcluster.abc123.c13.kafka.us-west-2.amazonaws.com:9094
With such configuration you decouple producers (collector with exporter) and consumers (collector with receiver). This allow you to scale out consumers in case they perform some kind of processing on the telemetry signals as well as change the destination backend in the consumers without downtime as telemetry signals will still be recorded into the kafka topics while consumers are having their configuration changed.
Example
In this section we are going to describe the simplest possible example to integrate the kafka receiver with the kafka exporter. We will be using two collector instances: A and B. Those instances are running in two different ec2 hosts.
We are going to send telemetry data to collector A, which will submit this data to a Kafka topic. The collector B will be consuming from this same topic and it will receive this telemetry.
Configuration for instance A.
receivers: statsd: endpoint: 0.0.0.0:4567 aggregation_interval: 5s
exporters: kafka/exporter: protocol_version: "${extra_data.msk.kafka_version}" auth: tls: insecure: false topic: adot-collector-test brokers: - b-3.aocmskcluster281.test.c13.kafka.us-west-2.amazonaws.com:9094 - b-1.aocmskcluster281.test.c13.kafka.us-west-2.amazonaws.com:9094 - b-2.aocmskcluster281.test.c13.kafka.us-west-2.amazonaws.com:9094
service: pipelines: metrics: receivers: [statsd] exporters: [kafka/exporter]
Configuration for instance B.
receivers: kafka/receiver: topic: adot-collector-test protocol_version: "2.8.1" auth: tls: insecure: false brokers: - b-3.aocmskcluster281.test.c13.kafka.us-west-2.amazonaws.com:9094 - b-1.aocmskcluster281.test.c13.kafka.us-west-2.amazonaws.com:9094 - b-2.aocmskcluster281.test.c13.kafka.us-west-2.amazonaws.com:9094
exporters: logging: loglevel: debug
service: pipelines: metrics: receivers: [kafka/receiver] exporters: [logging]
In the ec2 instance of the collector A we execute the following command to send telemetry to the statsd receiver:
echo "kafka.test.example:1|c" | nc -w 1 -u localhost 4567
The following output is obtained in the collector B.
{"level":"info","timestamp":"2023-04-05T03:56:23.531Z","message":"ResourceMetrics #0\nResource SchemaURL: \nScopeMetrics #0\nScopeMetrics SchemaURL: \nInstrumentationScope \nMetric #0\nDescriptor:\n -> Name: kafka.test.example\n -> Description: \n -> Unit: \n -> DataType: Sum\n -> IsMonotonic: false\n -> AggregationTemporality: Delta\nNumberDataPoints #0\nStartTimestamp: 2023-04-05 03:56:18.402491427 +0000 UTC\nTimestamp: 2023-04-05 03:56:23.401572412 +0000 UTC\nValue: 1\n","kind":"exporter","data_type":"metrics","name":"logging"}