AWS Distro for OpenTelemetry

Kafka receiver and exporter

Kafka receiver and exporter

The Kafka receiver and exporter allows you to send and receive telemetry signals (currently traces and metrics) from a Kafka cluster. The exporter will produce telemetry signals and will submit it to a configured topic. A Kafka receiver can consume from this topic and further send data to the telemetry pipeline configured in this collector instance. This component is flexible enough that you can have multiple consumer groups for the same topic.




Upstream Kafka Exporter/Receiver documentation

Please find bellow the documentation for each of these components:

Notes:

  • These components don't create Kafka topics on your behalf. They need to be pre-created or you need to set the cluster configuration to auto create topics.
  • It is a good practice to set the protocol_version property to match the version of Kafka used in your cluster.
  • The kafka exporter uses a synchronous producer that blocks and does not batch messages, therefore it should be used with batch and queued retry processors for higher throughput and resiliency.

The ADOT collector is tested with Kafka Versions 2.8.1 and 3.2.0.

Integrating with Amazon Managed Streaming for Apache Kafka (MSK)

The Kafka receiver and exporters can be integrated with Amazon Managed Streaming for Apache Kafka (MSK).

You can create a MSK cluster following this guide.

  • Select the authentication method accordingly to the options supported by the receivers/exporters.
  • Enable TLS.

After the cluster is provisioned, you can get the list of brokers by selecting the newly created cluster in the console and then clicking in the View client information button.

With the list of brokers, you can configure the collector accordingly.

For a Kafka receiver the configuration would look like:

kafka/receiver:
auth:
tls:
insecure: false
protocol_version: 3.2.0
topic: some-topic
brokers:
- b-2.testcluster.abc123.c13.kafka.us-west-2.amazonaws.com:9094
- b-1.testcluster.abc123.c13.kafka.us-west-2.amazonaws.com:9094
- b-3.testcluster.abc123.c13.kafka.us-west-2.amazonaws.com:9094

For a Kafka exporter, the configuration would look like:

kafka/exporter:
auth:
tls:
insecure: false
protocol_version: 3.2.0
topic: some-topic
brokers:
- b-2.testcluster.abc123.c13.kafka.us-west-2.amazonaws.com:9094
- b-1.testcluster.abc123.c13.kafka.us-west-2.amazonaws.com:9094
- b-3.testcluster.abc123.c13.kafka.us-west-2.amazonaws.com:9094

With such configuration you decouple producers (collector with exporter) and consumers (collector with receiver). This allow you to scale out consumers in case they perform some kind of processing on the telemetry signals as well as change the destination backend in the consumers without downtime as telemetry signals will still be recorded into the kafka topics while consumers are having their configuration changed.

Example

In this section we are going to describe the simplest possible example to integrate the kafka receiver with the kafka exporter. We will be using two collector instances: A and B. Those instances are running in two different ec2 hosts.

We are going to send telemetry data to collector A, which will submit this data to a Kafka topic. The collector B will be consuming from this same topic and it will receive this telemetry.

Configuration for instance A.

receivers:
statsd:
endpoint: 0.0.0.0:4567
aggregation_interval: 5s
exporters:
kafka/exporter:
protocol_version: "${extra_data.msk.kafka_version}"
auth:
tls:
insecure: false
topic: adot-collector-test
brokers:
- b-3.aocmskcluster281.test.c13.kafka.us-west-2.amazonaws.com:9094
- b-1.aocmskcluster281.test.c13.kafka.us-west-2.amazonaws.com:9094
- b-2.aocmskcluster281.test.c13.kafka.us-west-2.amazonaws.com:9094
service:
pipelines:
metrics:
receivers: [statsd]
exporters: [kafka/exporter]

Configuration for instance B.

receivers:
kafka/receiver:
topic: adot-collector-test
protocol_version: "2.8.1"
auth:
tls:
insecure: false
brokers:
- b-3.aocmskcluster281.test.c13.kafka.us-west-2.amazonaws.com:9094
- b-1.aocmskcluster281.test.c13.kafka.us-west-2.amazonaws.com:9094
- b-2.aocmskcluster281.test.c13.kafka.us-west-2.amazonaws.com:9094
exporters:
logging:
loglevel: debug
service:
pipelines:
metrics:
receivers: [kafka/receiver]
exporters: [logging]

In the ec2 instance of the collector A we execute the following command to send telemetry to the statsd receiver:

echo "kafka.test.example:1|c" | nc -w 1 -u localhost 4567

The following output is obtained in the collector B.

{"level":"info","timestamp":"2023-04-05T03:56:23.531Z","message":"ResourceMetrics #0\nResource SchemaURL: \nScopeMetrics #0\nScopeMetrics SchemaURL: \nInstrumentationScope \nMetric #0\nDescriptor:\n -> Name: kafka.test.example\n -> Description: \n -> Unit: \n -> DataType: Sum\n -> IsMonotonic: false\n -> AggregationTemporality: Delta\nNumberDataPoints #0\nStartTimestamp: 2023-04-05 03:56:18.402491427 +0000 UTC\nTimestamp: 2023-04-05 03:56:23.401572412 +0000 UTC\nValue: 1\n","kind":"exporter","data_type":"metrics","name":"logging"}