-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(WIP) KAFKA-18616; Refactor Tools's ApiMessageFormatter #18695
base: trunk
Are you sure you want to change the base?
Conversation
@chia7712 What do you think about doing something like this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me wonder whether we should strengthen the version validation in the CoordinatorRecordSerdes.
If the version of the value record bumps in the future, formatters that lack version checks may print incorrect data. In such cases, throwing an accurate exception would be more appropriate.
ObjectNode json = new ObjectNode(JsonNodeFactory.instance); | ||
try { | ||
CoordinatorRecord record = deserialize( | ||
consumerRecord.key() != null ? ByteBuffer.wrap(consumerRecord.key()) : null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line#47 has checked the null, so this check is redundant.
|
||
import java.nio.ByteBuffer; | ||
|
||
/** | ||
* Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false. | ||
*/ | ||
public class OffsetsMessageFormatter extends ApiMessageFormatter { | ||
public class OffsetsMessageFormatter extends CoordinatorRecordMessageFormatter { | ||
private CoordinatorRecordSerde serde = new GroupCoordinatorRecordSerde(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can move serde
to CoordinatorRecordMessageFormatter
like CoordinatorRecordMessageParser
?
|
||
@Override | ||
protected boolean shouldPrint(short recordType) { | ||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a question, could you help me understand why this is true
instead of CoordinatorRecordType. TRANSACTION_LOG.id() == recordType
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reason is that there is only one element, TRANSACTION_LOG, in CoordinatorType, so there is no need to add extra checking here.
WIP - This is an attempt to refactor the ApiMessageFormatter to follow what we have done in #18688. The main difference with the current implementation is that unknown versions of values are still deserialized on a best effort basis. The result depends on the schema. This makes me wonder whether we should strengthen the version validation in the CoordinatorRecordSerdes.
Committer Checklist (excluded from commit message)