Kafka Connect: Fix CME in IcebergSinkConfig causing flaky multi-table tests#16438
Conversation
2cf942e to
1fb4bd3
Compare
1fb4bd3 to
8ab75e2
Compare
|
@wombatu-kun can we focus on fixing UT in this PR and leave other changes as follow-up enhancements? |
8ab75e2 to
6c06dbe
Compare
|
@manuzhang CI on the scoped-down version just failed on a different parameterization of the same assertion — The CI artifact only contained the unit-test log — no per-test integration report, no Connect/Kafka container output — so I can't root-cause from this run. Restored both observability commits onto this branch ( |
4a8871d to
232acd5
Compare
|
@manuzhang quick update: after a few CI reruns with the observability commits in place, one of them caught the real failure. Stack trace pinned it down to a thread-safety bug, not the control-topic issue I originally suspected.
Fix is one line: switch This PR now contains:
Dropped the observability commits again as you originally suggested — #16504 carries them as a follow-up. Might also be worth retitling this PR to something like "Kafka Connect: Fix flaky multi-table integration tests by making IcebergSinkConfig.tableConfig thread-safe" if you agree the CME fix is now the primary change. |
|
@wombatu-kun Please update PR description and make it more concise. |
Done |
| private Admin admin; | ||
| private String connectorName; | ||
| private String testTopic; | ||
| private String controlTopic; |
There was a problem hiding this comment.
Do we really need this change after the fix?
I see one comment saying it doesn't fix the problem.
There was a problem hiding this comment.
You're right — it's not needed. The control-topic isolation was my original hypothesis, before the stack trace pinned the flake to the ConcurrentModificationException in IcebergSinkConfig.tableConfig. The one-line Maps.newConcurrentMap() fix eliminates the race at its source, so the per-test control topic only mitigated a symptom. I've dropped that commit and force-pushed; the PR is now just the single-line thread-safety fix (1915c991c).
While you're already in context here: could you also take a look at the companion PR #16504? It adds the per-test integration-test reports and Docker container log capture — without that log capture we would never have surfaced the CME stack trace and pinned down the root cause. The artifacts produced by the commits originally in this PR can be seen in this CI run: https://github.com/apache/iceberg/actions/runs/26213568037.
…ig.tableConfig Coordinator.commit() parallelizes per-table commits via Tasks.foreach.executeWith(exec) (Coordinator.java:173). Each parallel call invokes config.tableConfig(tableIdentifier.toString()) at Coordinator.java:231, which uses HashMap.computeIfAbsent on a non-thread-safe Map (Maps.newHashMap()). When two tables commit concurrently, the second thread's computeIfAbsent observes the first thread's insertion mid-traversal and throws ConcurrentModificationException, killing the Coordinator thread mid-commit. This is the underlying root cause of the TestIntegrationDynamicTable.testIcebergSink and TestIntegrationMultiTable.testIcebergSink flakes: the first table's commit succeeds (snapshot recorded), the second is lost when the Coordinator dies, and the Awaitility hasSize(1) assertion on the missed table times out at 30s. The control-topic isolation in 6c06dbe reduces how often the race fires but does not eliminate it. Switch tableConfigMap from Maps.newHashMap() to Maps.newConcurrentMap() so computeIfAbsent is atomic and safe under concurrent table-commit threads. Single-table sinks (TestIntegration, testDynamicRouteWithTopicRewritingSMT) never hit the race; only multi-table commits do, which matches the historical failure pattern. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
232acd5 to
1915c99
Compare
|
Merged to main. |
Summary
Multi-table integration tests in
kafka-connect-tests(TestIntegrationDynamicTable.testIcebergSink,TestIntegrationMultiTable.testIcebergSink) were flaking ~5/10 runs onassertThat(table.snapshots()).hasSize(1). Root cause is a thread-safety bug inIcebergSinkConfig.tableConfigexposed under parallel multi-table commits.Root cause
tableConfig(...)usesHashMap.computeIfAbsenton a non-thread-safetableConfigMap.Coordinator.commit()parallelizes per-table commits viaTasks.foreach.executeWith(exec)(Coordinator.java:173); each task callsconfig.tableConfig(...)atCoordinator.java:231. On 2+ table commits, the second thread'scomputeIfAbsentsees the first's insertion mid-traversal and throwsConcurrentModificationException, killing the Coordinator after the first table commits but before the second. The AwaitilityhasSize(1)assertion on the lost table then times out. Single-table sinks invokecommitToTableexactly once and never hit the race — matching the historical failure pattern. CME stack trace from CI run #26213568037.Fix
Switch
tableConfigMapfromMaps.newHashMap()toMaps.newConcurrentMap().ConcurrentHashMap.computeIfAbsentis documented atomic per the JDK contract — the race becomes structurally impossible.Related
#16504 — diagnostic CI plumbing (per-test integration test reports + docker container log capture) used to capture the CME stack trace, split out per reviewer request.