Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support service name override for DSM checkpoints in Spark context #8077

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions dd-java-agent/instrumentation/spark/spark_2.12/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,24 @@ dependencies {
testImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "$sparkVersion"
testImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "$sparkVersion"

testImplementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "$sparkVersion"
testImplementation group: 'org.apache.kafka', name: "kafka_$scalaVersion", version: '2.4.0'
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.0'
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.4.0.RELEASE'
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '2.4.0.RELEASE'

testRuntimeOnly project(':dd-java-agent:instrumentation:kafka-clients-0.11')

test_spark24Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "2.4.8"
test_spark24Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "2.4.8"
test_spark24Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "2.4.8"
test_spark24Implementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "2.4.8"

test_spark32Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "3.2.4"
test_spark32Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "3.2.4"
test_spark32Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "3.2.4"
test_spark24Implementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "3.2.4"

// We do not support netty versions older than this because of a change to the number of parameters to the
// PooledByteBufAllocator constructor. See this PR where the new constructor (the only one we support) was introduced:
// https://github.com/netty/netty/pull/10267
Expand All @@ -56,6 +67,7 @@ dependencies {
latestDepTestImplementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: '+'
latestDepTestImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: '+'
latestDepTestImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: '+'
latestDepTestImplementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "+"
}

tasks.named("test").configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public String[] helperClassNames() {
packageName + ".RemoveEldestHashMap",
packageName + ".SparkAggregatedTaskMetrics",
packageName + ".SparkConfAllowList",
packageName + ".SparkConfUtils",
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import datadog.trace.agent.test.AgentTestRunner
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.spark.api.java.function.VoidFunction2
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.junit.Rule
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.rule.EmbeddedKafkaRule
import org.springframework.kafka.test.utils.KafkaTestUtils

class SparkStreamingKafkaTest extends AgentTestRunner {
static final SOURCE_TOPIC = "source"
static final SINK_TOPIC = "sink"

@Override
boolean isDataStreamsEnabled() {
return true
}

@Rule
EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, false, 1, SOURCE_TOPIC, SINK_TOPIC)
EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka

@Override
void configurePreAgent() {
super.configurePreAgent()
injectSysConfig("dd.integration.spark.enabled", "true")
injectSysConfig("dd.integration.kafka.enabled", "true")
}

def "test dsm checkpoints are correctly set"() {
setup:
def appName = "test-app"
def sparkSession = SparkSession.builder()
.config("spark.master", "local[2]")
.config("spark.driver.bindAddress", "localhost")
.appName(appName)
.getOrCreate()

def producerProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString())
def producer = new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer()

when:
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>(SOURCE_TOPIC, i, i.toString()))
}
producer.flush()

def df = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString())
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.option("subscribe", SOURCE_TOPIC)
.load()

def query = df
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString())
.option("checkpointLocation", "/tmp/" + System.currentTimeMillis().toString())
.option("topic", SINK_TOPIC)
.trigger(Trigger.Once())
.foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
@Override
void call(Dataset<Row> rowDataset, Long aLong) throws Exception {
rowDataset.show()
rowDataset.write()
}
})
.start()

query.processAllAvailable()

then:
query.stop()
producer.close()

// check that checkpoints were written with a service name override == "SparkAppName"
assert TEST_DATA_STREAMS_WRITER.payloads.size() > 0
assert TEST_DATA_STREAMS_WRITER.services.size() == 1
assert TEST_DATA_STREAMS_WRITER.services.get(0) == appName
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public String[] helperClassNames() {
packageName + ".RemoveEldestHashMap",
packageName + ".SparkAggregatedTaskMetrics",
packageName + ".SparkConfAllowList",
packageName + ".SparkConfUtils",
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import static datadog.trace.core.datastreams.TagsProcessor.PARTITION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
import static datadog.trace.instrumentation.spark.SparkConfUtils.getDatabricksClusterName;
import static datadog.trace.instrumentation.spark.SparkConfUtils.getIsRunningOnDatabricks;
import static datadog.trace.instrumentation.spark.SparkConfUtils.getServiceNameOverride;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.trace.api.Config;
import datadog.trace.api.DDTags;
import datadog.trace.api.DDTraceId;
import datadog.trace.api.sampling.PrioritySampling;
Expand Down Expand Up @@ -110,8 +112,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {

private final boolean isRunningOnDatabricks;
private final String databricksClusterName;
private final String databricksServiceName;
private final String sparkServiceName;
private final String serviceNameOverride;

private boolean lastJobFailed = false;
private String lastJobFailedMessage;
Expand All @@ -130,10 +131,9 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
this.appId = appId;
this.sparkVersion = sparkVersion;

isRunningOnDatabricks = sparkConf.contains("spark.databricks.sparkContextId");
databricksClusterName = sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null);
databricksServiceName = getDatabricksServiceName(sparkConf, databricksClusterName);
sparkServiceName = getSparkServiceName(sparkConf, isRunningOnDatabricks);
isRunningOnDatabricks = getIsRunningOnDatabricks(sparkConf);
databricksClusterName = getDatabricksClusterName(sparkConf);
serviceNameOverride = getServiceNameOverride(sparkConf);

// If JVM exiting with System.exit(code), it bypass the code closing the application span
//
Expand Down Expand Up @@ -924,10 +924,8 @@ private AgentTracer.SpanBuilder buildSparkSpan(String spanName, Properties prope
AgentTracer.SpanBuilder builder =
tracer.buildSpan(spanName).withSpanType("spark").withTag("app_id", appId);

if (databricksServiceName != null) {
builder.withServiceName(databricksServiceName);
} else if (sparkServiceName != null) {
builder.withServiceName(sparkServiceName);
if (serviceNameOverride != null) {
builder.withServiceName(serviceNameOverride);
}

addPropertiesTags(builder, properties);
Expand Down Expand Up @@ -1153,45 +1151,6 @@ private static String getBatchIdFromBatchKey(String batchKey) {
return batchKey.substring(batchKey.lastIndexOf(".") + 1);
}

private static String getDatabricksServiceName(SparkConf conf, String databricksClusterName) {
if (Config.get().isServiceNameSetByUser()) {
return null;
}

String serviceName = null;
String runName = getDatabricksRunName(conf);
if (runName != null) {
serviceName = "databricks.job-cluster." + runName;
} else if (databricksClusterName != null) {
serviceName = "databricks.all-purpose-cluster." + databricksClusterName;
}

return serviceName;
}

private static String getSparkServiceName(SparkConf conf, boolean isRunningOnDatabricks) {
// If config is not set or running on databricks, not changing the service name
if (!Config.get().useSparkAppNameAsService() || isRunningOnDatabricks) {
return null;
}

// Keep service set by user, except if it is only "spark" or "hadoop" that can be set by USM
String serviceName = Config.get().getServiceName();
if (Config.get().isServiceNameSetByUser()
&& !"spark".equals(serviceName)
&& !"hadoop".equals(serviceName)) {
log.debug("Service '{}' explicitly set by user, not using the application name", serviceName);
return null;
}

String sparkAppName = conf.get("spark.app.name", null);
if (sparkAppName != null) {
log.info("Using Spark application name '{}' as the Datadog service name", sparkAppName);
}

return sparkAppName;
}

private static void reportKafkaOffsets(
final String appName, final AgentSpan span, final SourceProgress progress) {
if (!span.traceConfig().isDataStreamsEnabled()
Expand Down Expand Up @@ -1234,34 +1193,4 @@ private static void reportKafkaOffsets(
}
}
}

private static String getDatabricksRunName(SparkConf conf) {
String allTags = conf.get("spark.databricks.clusterUsageTags.clusterAllTags", null);
if (allTags == null) {
return null;
}

try {
// Using the jackson JSON lib used by spark
// https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0
JsonNode jsonNode = objectMapper.readTree(allTags);

for (JsonNode node : jsonNode) {
String key = node.get("key").asText();
if ("RunName".equals(key)) {
// Databricks jobs launched by Azure Data Factory have an uuid at the end of the name
return removeUuidFromEndOfString(node.get("value").asText());
}
}
} catch (Exception ignored) {
}

return null;
}

@SuppressForbidden // called at most once per spark application
private static String removeUuidFromEndOfString(String input) {
return input.replaceAll(
"_[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$", "");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package datadog.trace.instrumentation.spark;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.trace.api.Config;
import de.thetaphi.forbiddenapis.SuppressForbidden;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkConfUtils {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final Logger log = LoggerFactory.getLogger(SparkConfUtils.class);

public static boolean getIsRunningOnDatabricks(SparkConf sparkConf) {
return sparkConf.contains("spark.databricks.sparkContextId");
}

public static String getDatabricksClusterName(SparkConf sparkConf) {
return sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null);
}

public static String getDatabricksServiceName(SparkConf conf, String databricksClusterName) {
if (Config.get().isServiceNameSetByUser()) {
return null;
}

String serviceName = null;
String runName = getDatabricksRunName(conf);
if (runName != null) {
serviceName = "databricks.job-cluster." + runName;
} else if (databricksClusterName != null) {
serviceName = "databricks.all-purpose-cluster." + databricksClusterName;
}

return serviceName;
}

public static String getSparkServiceName(SparkConf conf, boolean isRunningOnDatabricks) {
// If config is not set or running on databricks, not changing the service name
if (!Config.get().useSparkAppNameAsService() || isRunningOnDatabricks) {
return null;
}

// Keep service set by user, except if it is only "spark" or "hadoop" that can be set by USM
String serviceName = Config.get().getServiceName();
if (Config.get().isServiceNameSetByUser()
&& !"spark".equals(serviceName)
&& !"hadoop".equals(serviceName)) {
log.debug("Service '{}' explicitly set by user, not using the application name", serviceName);
return null;
}

String sparkAppName = conf.get("spark.app.name", null);
if (sparkAppName != null) {
log.info("Using Spark application name '{}' as the Datadog service name", sparkAppName);
}

return sparkAppName;
}

public static String getServiceNameOverride(SparkConf conf) {
boolean isRunningOnDatabricks = getIsRunningOnDatabricks(conf);
String databricksClusterName = getDatabricksClusterName(conf);
String databricksServiceName = getDatabricksServiceName(conf, databricksClusterName);
String sparkServiceName = getSparkServiceName(conf, isRunningOnDatabricks);

return databricksServiceName != null ? databricksServiceName : sparkServiceName;
}

private static String getDatabricksRunName(SparkConf conf) {
String allTags = conf.get("spark.databricks.clusterUsageTags.clusterAllTags", null);
System.out.println("### AllTags: " + allTags);
if (allTags == null) {
return null;
}

try {
// Using the jackson JSON lib used by spark
// https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0
JsonNode jsonNode = objectMapper.readTree(allTags);

for (JsonNode node : jsonNode) {
String key = node.get("key").asText();
System.out.println("### Key node: " + key + ", value: " + node.get("value").asText());
if ("RunName".equals(key)) {
System.out.println("### Key value: " + node.get("value").asText());
// Databricks jobs launched by Azure Data Factory have an uuid at the end of the name
return removeUuidFromEndOfString(node.get("value").asText());
}
}
} catch (Exception e) {
System.out.println("### Failed to parse databricks run tags - " + e.getMessage());
}

return null;
}

@SuppressForbidden // called at most once per spark application
private static String removeUuidFromEndOfString(String input) {
return input.replaceAll(
"_[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$", "");
}
}
Loading
Loading