-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18632: Added few share consumer multibroker tests. #18679
base: trunk
Are you sure you want to change the base?
Conversation
smjn
commented
Jan 23, 2025
- Added test to check share consumer behavior across share coordinator restart.
- Another simple test based on a complicated share consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. A handful of comments to address.
@ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "15000"), | ||
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"), | ||
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"), | ||
@ClusterConfigProperty(key = "share.coordinator.state.topic.min.isr", value = "1"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that this override is unnecessary with 3 brokers and the default of 2 is better.
@ClusterConfigProperty(key = "share.coordinator.state.topic.min.isr", value = "1"), | ||
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"), | ||
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3"), | ||
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leave this as the default of 2 also I think.
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"), | ||
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3"), | ||
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"), | ||
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "3"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the default replication factor of all three of these internal topics is 3 also, which should be ideal for this test with 3 brokers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is necessary because the class level ClusterTestDefaults defines it to be 1. If we do not add the override here - it will be set to 1 and the re-election will not happen as I shut down a broker in the test.
setup(); | ||
String topicName = "multipart"; | ||
String groupId = "multipartGrp"; | ||
createTopic(topicName, 3, 3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this createTopic
method knows the topic ID and discards it. I suggest changing the signature of this method to return a Uuid and then the topic ID is known without the extra call to Admin.describeTopics
.
// consume messages | ||
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(groupId)) { | ||
shareConsumer.subscribe(List.of(topicName)); | ||
alterShareAutoOffsetReset(groupId, "earliest"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Experience with other tests tells us that it's best to set the auto-offset reset earlier in the test. It's an asynchronous action that may or may not take effect by the time consumption begins.
// get current share coordinator node | ||
SharePartitionKey key = SharePartitionKey.getInstance(groupId, new TopicIdPartition(topicId, tpMulti)); | ||
int shareGroupStateTp = Utils.abs(key.asCoordinatorKey().hashCode()) % 3; | ||
List<Integer> curShareCoordNodeId = admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).topicNameValues().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME).get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer admin.describeTopics(...).allTopicNames().get(Topic.SHARE_GROUP_TOPIC_NAME)
. I think that yields the same result more tersely.
@ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "15000"), | ||
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"), | ||
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"), | ||
@ClusterConfigProperty(key = "share.coordinator.state.topic.min.isr", value = "1"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comments as previous test regarding min.isr and replication factor defaults which are appropriate without overrides.
|
||
// produce messages until we want | ||
executer.execute(() -> { | ||
while (!prodDone.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes a separate producer per loop iteration. I suggest re-using the producer across loop iterations. You can still use try-with-resources.
@@ -2015,4 +2204,88 @@ private void alterShareAutoOffsetReset(String groupId, String newValue) { | |||
.get(60, TimeUnit.SECONDS), "Failed to alter configs"); | |||
} | |||
} | |||
|
|||
private static class ComplexShareConsumer<K, V> implements Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This really needs a comment about what it's up to.
|
||
private static class ComplexShareConsumer<K, V> implements Runnable { | ||
public static final int POLL_TIMEOUT_MS = 15000; | ||
public static final int MAX_DELIVERY_COUNT = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This 5 is really ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT
.