Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18632: Added few share consumer multibroker tests. #18679

Open
wants to merge 5 commits into
base: trunk
Choose a base branch
from

Conversation

smjn
Copy link
Contributor

@smjn smjn commented Jan 23, 2025

  • Added test to check share consumer behavior across share coordinator restart.
  • Another simple test based on a complicated share consumer.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker tests Test fixes (including flaky tests) labels Jan 23, 2025
@AndrewJSchofield AndrewJSchofield added KIP-932 Queues for Kafka ci-approved and removed triage PRs from the community labels Jan 24, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. A handful of comments to address.

@ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "15000"),
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.min.isr", value = "1"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that this override is unnecessary with 3 brokers and the default of 2 is better.

@ClusterConfigProperty(key = "share.coordinator.state.topic.min.isr", value = "1"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave this as the default of 2 also I think.

@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"),
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "3"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the default replication factor of all three of these internal topics is 3 also, which should be ideal for this test with 3 brokers.

Copy link
Contributor Author

@smjn smjn Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is necessary because the class level ClusterTestDefaults defines it to be 1. If we do not add the override here - it will be set to 1 and the re-election will not happen as I shut down a broker in the test.

setup();
String topicName = "multipart";
String groupId = "multipartGrp";
createTopic(topicName, 3, 3);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this createTopic method knows the topic ID and discards it. I suggest changing the signature of this method to return a Uuid and then the topic ID is known without the extra call to Admin.describeTopics.

// consume messages
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(groupId)) {
shareConsumer.subscribe(List.of(topicName));
alterShareAutoOffsetReset(groupId, "earliest");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experience with other tests tells us that it's best to set the auto-offset reset earlier in the test. It's an asynchronous action that may or may not take effect by the time consumption begins.

// get current share coordinator node
SharePartitionKey key = SharePartitionKey.getInstance(groupId, new TopicIdPartition(topicId, tpMulti));
int shareGroupStateTp = Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
List<Integer> curShareCoordNodeId = admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).topicNameValues().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME).get()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer admin.describeTopics(...).allTopicNames().get(Topic.SHARE_GROUP_TOPIC_NAME). I think that yields the same result more tersely.

@ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "15000"),
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.min.isr", value = "1"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments as previous test regarding min.isr and replication factor defaults which are appropriate without overrides.


// produce messages until we want
executer.execute(() -> {
while (!prodDone.get()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes a separate producer per loop iteration. I suggest re-using the producer across loop iterations. You can still use try-with-resources.

@@ -2015,4 +2204,88 @@ private void alterShareAutoOffsetReset(String groupId, String newValue) {
.get(60, TimeUnit.SECONDS), "Failed to alter configs");
}
}

private static class ComplexShareConsumer<K, V> implements Runnable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really needs a comment about what it's up to.


private static class ComplexShareConsumer<K, V> implements Runnable {
public static final int POLL_TIMEOUT_MS = 15000;
public static final int MAX_DELIVERY_COUNT = 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This 5 is really ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT.

@smjn smjn requested a review from AndrewJSchofield January 24, 2025 11:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka tests Test fixes (including flaky tests)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants