datadog-kafka-connect-logs
is a Kafka Connector for sending
records from Kafka as logs to the Datadog Logs Intake API.
It is a plugin meant to be installed on a Kafka Connect Cluster running besides a Kafka Broker.
- Kafka version 1.0.0 and above.
- Java 8 and above.
- Confluent Platform 4.0.x and above (optional).
To install the plugin, one must have a working instance of Kafka Connect connected to a Kafka Broker. See also Confluent's documentation for easily setting this up.
See Confluent's documentation and the connector's page on Confluent Hub.
Download the latest version from the GitHub releases page. Also see Confluent's documentation on installing community connectors.
- Clone the repo from https://github.com/DataDog/datadog-kafka-connect-logs
- Verify that Java8 JRE or JDK is installed.
- Run
mvn clean compile package
. This builds the jar in the/target
directory. The file name has the formatdatadog-kafka-connect-logs-[VERSION].jar
. - The zip file for use on Confluent Hub can be found in
target/components/packages
.
- To install the plugin, place the plugin's jar file (see previous section on how to download or build it)
in or under the location specified in
plugin.path
. If you use Confluent Platform, runconfluent-hub install target/components/packages/<connector-zip-file>
. - Restart your Kafka Connect instance.
- Run the following command to manually create connector tasks. Adjust
topics
to configure the Kafka topic to be ingested and set your Datadogapi_key
.
curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d '{
"name": "datadog-kafka-connect-logs",
"config": {
"connector.class": "com.datadoghq.connect.logs.DatadogLogsSinkConnector",
"datadog.api_key": "<YOUR_API_KEY>",
"tasks.max": "3",
"topics":"<YOUR_TOPIC>",
}
}'
- You can verify that data is ingested to the Datadog platform by searching for
source:kafka-connect
in the Log Explorer tab - Use the following commands to check status, and manage connectors and tasks:
# List active connectors
curl http://localhost:8083/connectors
# Get datadog-kafka-connect-logs connector info
curl http://localhost:8083/connectors/datadog-kafka-connect-logs
# Get datadog-kafka-connect-logs connector config info
curl http://localhost:8083/connectors/datadog-kafka-connect-logs/config
# Delete datadog-kafka-connect-logs connector
curl http://localhost:8083/connectors/datadog-kafka-connect-logs -X DELETE
# Get datadog-kafka-connect-logs connector task info
curl http://localhost:8083/connectors/datadog-kafka-connect-logs/tasks
See the the Confluent documentation for additional REST examples.
After Kafka Connect is brought up on every host, all of the Kafka Connect instances will form a cluster automatically. A REST call can be executed against one of the cluster instances, and the configuration will automatically propagate to all instances in the cluster.
Name | Description | Default Value |
---|---|---|
name |
Connector name. A consumer group with this name will be created with tasks to be distributed evenly across the connector cluster nodes. | |
connector.class |
The Java class used to perform connector jobs. Keep the default unless you modify the connector. | com.datadoghq.connect.logs.DatadogLogsSinkConnector |
tasks.max |
The number of tasks generated to handle data collection jobs in parallel. The tasks will be spread evenly across all Datadog Kafka Connector nodes. | |
topics |
Comma separated list of Kafka topics for Datadog to consume. prod-topic1,prod-topic2,prod-topic3 |
|
datadog.api_key |
The API key of your Datadog platform. |
Name | Description | Default Value |
---|---|---|
datadog.site |
The site of the Datadog intake to send logs to (for example 'datadoghq.eu' to send data to the EU site) | datadoghq.com |
datadog.url |
Custom Datadog URL endpoint where your logs will be sent. datadog.url takes precedence over datadog.site . Example: http-intake.logs.datadoghq.com:443 |
|
datadog.tags |
Tags associated with your logs in a comma separated tag:value format. | |
datadog.service |
The name of the application or service generating the log events. | |
datadog.hostname |
The name of the originating host of the log. | |
datadog.proxy.url |
Proxy endpoint when logs are not directly forwarded to Datadog. | |
datadog.proxy.port |
Proxy port when logs are not directly forwarded to Datadog. | |
datadog.retry.max |
The number of retries before the output plugin stops. | 5 |
datadog.retry.backoff_ms |
The time in milliseconds to wait following an error before a retry attempt is made. | 3000 |
datadog.add_published_date |
Valid settings are true or false. When set to true , The timestamp is retrieved from the Kafka record and passed to Datadog as published_date |
|
datadog.parse_record_headers |
Valid settings are true or false. When set to true , Kafka Record Headers are parsed and passed to DataDog as a kafkaheaders object |
false |
To improve performance of the connector, you can try the following options:
- Update the number of records fetched per poll by setting
consumer.override.max.poll.records
in the plugin configuration. This plugin sends batches of records synchronously with each poll so a low number of records per poll will reduce throughput. Consider setting this to 500 or 1000. - Increase the number of parallel tasks by adjusting the
tasks.max
parameter. Only do this if the hardware is underutilized, such as low CPU, low memory usage, and low data injection throughput. Do not set more tasks than partitions. - Increase hardware resources on cluster nodes in case of resource exhaustion, such as high CPU, or high memory usage.
- Increase the number of Kafka Connect nodes.
Kafka Connect supports Single Message Transforms that let you change the structure or content of a message. To experiment with this feature, try adding these lines to your sink connector configuration:
transforms=addExtraField
transforms.addExtraField.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.addExtraField.static.field=extraField
transforms.addExtraField.static.value=extraValue
If you restart the sink connector and send some more test messages, each new record should have a extraField
field
with value value
. For more in-depth video, see confluent's documentation.
To run the supplied unit tests, run mvn test
from the root of the project.
Use use Confluent Platform for a batteries-included Kafka environment for local testing. Follow the guide here to install the Confluent Platform.
Then, install the Confluent Kafka Datagen Connector to create
sample data of arbitrary types. Install this Datadog Logs Connector by running
confluent-hub install target/components/packages/<connector-zip-file>
.
In the /test
directory, there are some .json
configuration files to make it easy to create Connectors. There are
configurations for both the Datagen Connector with various datatypes, as well as the Datadog Logs Connector. To the latter,
you will need to add a valid Datadog API Key for once you upload the .json
to Confluent Platform.
Once your connectors are set up, you will be able to test them and see relevant data. For performance tests, you can also use the following command packaged with Confluent platform:
kafka-producer-perf-test --topic perf-test --num-records 2000000 --record-size 100 --throughput 25000 --producer-props bootstrap.servers=localhost:9092 --print-metrics true
Datadog Kafka Connect Logs is licensed under the Apache License 2.0. Details can be found in the file LICENSE.