-
Notifications
You must be signed in to change notification settings - Fork 83
[feat][client] Support null value messages #563
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -1045,5 +1045,130 @@ TEST(ReaderTest, testReaderWithZeroMessageListenerThreads) { | |||||||||||||||||||||||||||||||
| client.close(); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| TEST(ReaderTest, testReadCompactedWithNullValue) { | ||||||||||||||||||||||||||||||||
| Client client(serviceUrl); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const std::string topicName = | ||||||||||||||||||||||||||||||||
| "persistent://public/default/testReadCompactedWithNullValue-" + std::to_string(time(nullptr)); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| Producer producer; | ||||||||||||||||||||||||||||||||
| ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // Send messages with keys | ||||||||||||||||||||||||||||||||
| ASSERT_EQ(ResultOk, | ||||||||||||||||||||||||||||||||
| producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1").build())); | ||||||||||||||||||||||||||||||||
| ASSERT_EQ(ResultOk, | ||||||||||||||||||||||||||||||||
| producer.send(MessageBuilder().setPartitionKey("key2").setContent("value2").build())); | ||||||||||||||||||||||||||||||||
| ASSERT_EQ(ResultOk, | ||||||||||||||||||||||||||||||||
| producer.send(MessageBuilder().setPartitionKey("key3").setContent("value3").build())); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // Send a tombstone (null value) for key2 | ||||||||||||||||||||||||||||||||
| auto tombstone = MessageBuilder().setPartitionKey("key2").setNullValue().build(); | ||||||||||||||||||||||||||||||||
| ASSERT_TRUE(tombstone.hasNullValue()); | ||||||||||||||||||||||||||||||||
| ASSERT_EQ(tombstone.getLength(), 0); | ||||||||||||||||||||||||||||||||
| ASSERT_EQ(ResultOk, producer.send(tombstone)); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // Update key1 with a new value | ||||||||||||||||||||||||||||||||
| ASSERT_EQ(ResultOk, | ||||||||||||||||||||||||||||||||
| producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-updated").build())); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // Trigger compaction via admin API | ||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||
| std::string compactUrl = | ||||||||||||||||||||||||||||||||
| adminUrl + "admin/v2/persistent/public/default/testReadCompactedWithNullValue-" + | ||||||||||||||||||||||||||||||||
| std::to_string(time(nullptr)) + "/compaction"; | ||||||||||||||||||||||||||||||||
| // Note: Compaction is async, we just trigger it | ||||||||||||||||||||||||||||||||
| makePutRequest(compactUrl, ""); | ||||||||||||||||||||||||||||||||
|
Comment on lines
+1077
to
+1081
|
||||||||||||||||||||||||||||||||
| std::string compactUrl = | |
| adminUrl + "admin/v2/persistent/public/default/testReadCompactedWithNullValue-" + | |
| std::to_string(time(nullptr)) + "/compaction"; | |
| // Note: Compaction is async, we just trigger it | |
| makePutRequest(compactUrl, ""); | |
| // Build compaction URL directly from topicName to avoid mismatches | |
| std::string topicPath = topicName; | |
| std::size_t schemePos = topicPath.find("://"); | |
| if (schemePos != std::string::npos) { | |
| topicPath.erase(schemePos, 3); // remove "://", e.g., "persistent://public/..." -> "persistent/public/..." | |
| } | |
| std::string compactUrl = adminUrl + "admin/v2/" + topicPath + "/compaction"; | |
| // Note: Compaction is async, we just trigger it, but assert that the request is accepted | |
| int res = makePutRequest(compactUrl, ""); | |
| ASSERT_FALSE(res != 204 && res != 409); |
Copilot
AI
Mar 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This read loop can silently succeed without validating behavior: it breaks when hasMessageAvailable is false or readNext fails, and the final assertion can still pass even if no messages were read at all. Prefer asserting readNext succeeds while hasMessageAvailable==true (or reading a known number of messages), and then assert on concrete outcomes (e.g., key1 == "value1-updated" and key2 is tombstone/absent after compaction).
| Result res = reader.readNext(msg, 3000); | |
| if (res != ResultOk) { | |
| break; | |
| } | |
| ASSERT_EQ(ResultOk, reader.readNext(msg, 3000)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageBuilder::setNullValue()only flips the metadata flag; it doesn't clear any previously-set payload, and the varioussetContent(...)overloads don't clearnull_value. This allows building messages with non-empty payload butnull_value=true(or vice versa), which will be interpreted incorrectly by consumers/table views. Make the two states mutually exclusive by clearing the payload/key-value when setting null value, and clearingnull_valuewhenever content is set.