Skip to content

Kafka Connect: Fix CME in IcebergSinkConfig causing flaky multi-table tests#16438

Merged
pvary merged 1 commit into
apache:mainfrom
wombatu-kun:fix/flaky-kafka-connect-integration-test
May 22, 2026
Merged

Kafka Connect: Fix CME in IcebergSinkConfig causing flaky multi-table tests#16438
pvary merged 1 commit into
apache:mainfrom
wombatu-kun:fix/flaky-kafka-connect-integration-test

Conversation

@wombatu-kun
Copy link
Copy Markdown
Contributor

@wombatu-kun wombatu-kun commented May 20, 2026

Summary

Multi-table integration tests in kafka-connect-tests (TestIntegrationDynamicTable.testIcebergSink, TestIntegrationMultiTable.testIcebergSink) were flaking ~5/10 runs on assertThat(table.snapshots()).hasSize(1). Root cause is a thread-safety bug in IcebergSinkConfig.tableConfig exposed under parallel multi-table commits.

Root cause

tableConfig(...) uses HashMap.computeIfAbsent on a non-thread-safe tableConfigMap. Coordinator.commit() parallelizes per-table commits via Tasks.foreach.executeWith(exec) (Coordinator.java:173); each task calls config.tableConfig(...) at Coordinator.java:231. On 2+ table commits, the second thread's computeIfAbsent sees the first's insertion mid-traversal and throws ConcurrentModificationException, killing the Coordinator after the first table commits but before the second. The Awaitility hasSize(1) assertion on the lost table then times out. Single-table sinks invoke commitToTable exactly once and never hit the race — matching the historical failure pattern. CME stack trace from CI run #26213568037.

Fix

Switch tableConfigMap from Maps.newHashMap() to Maps.newConcurrentMap(). ConcurrentHashMap.computeIfAbsent is documented atomic per the JDK contract — the race becomes structurally impossible.

Related

#16504 — diagnostic CI plumbing (per-test integration test reports + docker container log capture) used to capture the CME stack trace, split out per reviewer request.

@wombatu-kun wombatu-kun marked this pull request as draft May 20, 2026 05:39
@wombatu-kun wombatu-kun reopened this May 20, 2026
@wombatu-kun wombatu-kun changed the title Kafka Connect: Increase integration test timeout to reduce flakiness Kafka Connect: Wait for connector removal in stopConnector to fix integration test flakiness May 20, 2026
@wombatu-kun wombatu-kun marked this pull request as ready for review May 20, 2026 05:53
@wombatu-kun wombatu-kun requested a review from manuzhang May 20, 2026 10:47
@manuzhang manuzhang requested review from bryanck, nastra and pvary May 20, 2026 10:52
@wombatu-kun wombatu-kun marked this pull request as draft May 20, 2026 11:02
@wombatu-kun wombatu-kun force-pushed the fix/flaky-kafka-connect-integration-test branch from 2cf942e to 1fb4bd3 Compare May 20, 2026 12:28
@wombatu-kun wombatu-kun changed the title Kafka Connect: Wait for connector removal in stopConnector to fix integration test flakiness Kafka Connect: Fix flaky integration tests by isolating iceberg.control.topic per test May 20, 2026
@wombatu-kun wombatu-kun marked this pull request as ready for review May 20, 2026 12:34
@wombatu-kun wombatu-kun force-pushed the fix/flaky-kafka-connect-integration-test branch from 1fb4bd3 to 8ab75e2 Compare May 21, 2026 04:12
@wombatu-kun
Copy link
Copy Markdown
Contributor Author

Looks like the final version really works. Could somebody review it and merge?

PRs CI failed without this fix: #16453 #16349 #16433 #16499

@manuzhang
Copy link
Copy Markdown
Member

@wombatu-kun can we focus on fixing UT in this PR and leave other changes as follow-up enhancements?

@wombatu-kun wombatu-kun force-pushed the fix/flaky-kafka-connect-integration-test branch from 8ab75e2 to 6c06dbe Compare May 21, 2026 07:19
@github-actions github-actions Bot removed the INFRA label May 21, 2026
@wombatu-kun wombatu-kun marked this pull request as draft May 21, 2026 07:24
@wombatu-kun wombatu-kun reopened this May 21, 2026
@github-actions github-actions Bot added the INFRA label May 21, 2026
@wombatu-kun
Copy link
Copy Markdown
Contributor Author

@manuzhang CI on the scoped-down version just failed on a different parameterization of the same assertion — TestIntegrationDynamicTable.testIcebergSink [1] null (the unpartitioned/no-branch case, runs first in the class), IntegrationTestBase.java:255, only on the Java 21 job. The original commit message documented failures concentrated on [2] test_branch, so this looks like a second cause and not the cross-test control-topic contamination the fix addresses.

The CI artifact only contained the unit-test log — no per-test integration report, no Connect/Kafka container output — so I can't root-cause from this run. Restored both observability commits onto this branch (ed0d5c3e, 8ab75e24) so the next CI run produces actionable diagnostics. Happy to re-split once we've understood the new failure. (I'll leave #16504 open in parallel for now.)

@wombatu-kun wombatu-kun force-pushed the fix/flaky-kafka-connect-integration-test branch from 4a8871d to 232acd5 Compare May 21, 2026 08:23
@github-actions github-actions Bot removed the INFRA label May 21, 2026
@wombatu-kun
Copy link
Copy Markdown
Contributor Author

@manuzhang quick update: after a few CI reruns with the observability commits in place, one of them caught the real failure. Stack trace pinned it down to a thread-safety bug, not the control-topic issue I originally suspected.

Coordinator.commit() parallelizes per-table commits via Tasks.foreach.executeWith(exec). Each parallel task calls IcebergSinkConfig.tableConfig(...), which until now used HashMap.computeIfAbsent on a non-thread-safe map. On multi-table commits (TestIntegrationDynamicTable.testIcebergSink, TestIntegrationMultiTable.testIcebergSink — both write to 2 tables), the second thread's computeIfAbsent would see the first thread's insert mid-traversal and throw ConcurrentModificationException, killing the Coordinator after the first table committed but before the second. That's why the failure was always on the 2-table tests and always presented as "snapshot count 0 instead of 1" after the 30s Awaitility timeout.

Fix is one line: switch tableConfigMap to Maps.newConcurrentMap(). Stack trace and CI run for reference: https://github.com/apache/iceberg/actions/runs/26213568037.

This PR now contains:

  1. 6c06dbeef — isolate iceberg.control.topic per test (defense-in-depth; reduces how often the race fires under stale control-topic events, but doesn't eliminate it)
  2. 232acd55 — the actual fix (ConcurrentModificationException in IcebergSinkConfig.tableConfig)

Dropped the observability commits again as you originally suggested — #16504 carries them as a follow-up.

Might also be worth retitling this PR to something like "Kafka Connect: Fix flaky multi-table integration tests by making IcebergSinkConfig.tableConfig thread-safe" if you agree the CME fix is now the primary change.

@wombatu-kun wombatu-kun marked this pull request as ready for review May 21, 2026 08:26
@manuzhang
Copy link
Copy Markdown
Member

@wombatu-kun Please update PR description and make it more concise.

@wombatu-kun wombatu-kun changed the title Kafka Connect: Fix flaky integration tests by isolating iceberg.control.topic per test Kafka Connect: Fix CME in IcebergSinkConfig causing flaky multi-table tests May 21, 2026
@wombatu-kun
Copy link
Copy Markdown
Contributor Author

@wombatu-kun Please update PR description and make it more concise.

Done

pvary
pvary previously approved these changes May 22, 2026
@pvary pvary dismissed their stale review May 22, 2026 07:41

I need clarification of tests

private Admin admin;
private String connectorName;
private String testTopic;
private String controlTopic;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this change after the fix?

I see one comment saying it doesn't fix the problem.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — it's not needed. The control-topic isolation was my original hypothesis, before the stack trace pinned the flake to the ConcurrentModificationException in IcebergSinkConfig.tableConfig. The one-line Maps.newConcurrentMap() fix eliminates the race at its source, so the per-test control topic only mitigated a symptom. I've dropped that commit and force-pushed; the PR is now just the single-line thread-safety fix (1915c991c).

While you're already in context here: could you also take a look at the companion PR #16504? It adds the per-test integration-test reports and Docker container log capture — without that log capture we would never have surfaced the CME stack trace and pinned down the root cause. The artifacts produced by the commits originally in this PR can be seen in this CI run: https://github.com/apache/iceberg/actions/runs/26213568037.

…ig.tableConfig

Coordinator.commit() parallelizes per-table commits via Tasks.foreach.executeWith(exec) (Coordinator.java:173). Each parallel call invokes config.tableConfig(tableIdentifier.toString()) at Coordinator.java:231, which uses HashMap.computeIfAbsent on a non-thread-safe Map (Maps.newHashMap()). When two tables commit concurrently, the second thread's computeIfAbsent observes the first thread's insertion mid-traversal and throws ConcurrentModificationException, killing the Coordinator thread mid-commit. This is the underlying root cause of the TestIntegrationDynamicTable.testIcebergSink and TestIntegrationMultiTable.testIcebergSink flakes: the first table's commit succeeds (snapshot recorded), the second is lost when the Coordinator dies, and the Awaitility hasSize(1) assertion on the missed table times out at 30s. The control-topic isolation in 6c06dbe reduces how often the race fires but does not eliminate it.

Switch tableConfigMap from Maps.newHashMap() to Maps.newConcurrentMap() so computeIfAbsent is atomic and safe under concurrent table-commit threads. Single-table sinks (TestIntegration, testDynamicRouteWithTopicRewritingSMT) never hit the race; only multi-table commits do, which matches the historical failure pattern.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@wombatu-kun wombatu-kun force-pushed the fix/flaky-kafka-connect-integration-test branch from 232acd5 to 1915c99 Compare May 22, 2026 08:26
@manuzhang manuzhang requested a review from pvary May 22, 2026 08:36
@pvary pvary merged commit 26169f7 into apache:main May 22, 2026
26 checks passed
@pvary
Copy link
Copy Markdown
Contributor

pvary commented May 22, 2026

Merged to main.
Thanks @wombatu-kun for the fix and @manuzhang for the review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants