Skip to content

Fix file identifier in rescheduleTasksFromReplica#1493

Open
ianton-ru wants to merge 3 commits intoantalya-26.1from
bugfix/antalya-26.1/task_reschedule_fix
Open

Fix file identifier in rescheduleTasksFromReplica#1493
ianton-ru wants to merge 3 commits intoantalya-26.1from
bugfix/antalya-26.1/task_reschedule_fix

Conversation

@ianton-ru
Copy link

@ianton-ru ianton-ru commented Mar 9, 2026

Changelog category (leave one):

  • Bug Fix (user-visible misbehavior in an official stable release)

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Solved #1486

File identifier for distributed tasks was changed between 25.8 and 26.1
In frontport #1414 it was missed and rescheduleTasksFromReplica continued to use old variant.

Fix unsyncronized access to replica_to_files_to_be_processed class member.

Documentation entry for user-facing changes

...

CI/CD Options

Exclude tests:

  • Fast test
  • Integration Tests
  • Stateless tests
  • Stateful tests
  • Performance tests
  • All with ASAN
  • All with TSAN
  • All with MSAN
  • All with UBSAN
  • All with Coverage
  • All with Aarch64
  • All Regression
  • Disable CI Cache

Regression jobs to run:

  • Fast suites (mostly <1h)
  • Aggregate Functions (2h)
  • Alter (1.5h)
  • Benchmark (30m)
  • ClickHouse Keeper (1h)
  • Iceberg (2h)
  • LDAP (1h)
  • Parquet (1.5h)
  • RBAC (1.5h)
  • SSL Server (1h)
  • S3 (2h)
  • Tiered Storage (2h)

@ianton-ru
Copy link
Author

@codex review

@github-actions
Copy link

github-actions bot commented Mar 9, 2026

Workflow [PR], commit [604c788]

@chatgpt-codex-connector
Copy link

Codex Review: Something went wrong. Try again later by commenting “@codex review”.

We were unable to download your code in a timely manner.
ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@ianton-ru
Copy link
Author

@codex review

@chatgpt-codex-connector
Copy link

Codex Review: Didn't find any major issues. 👍

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@vzakaznikov
Copy link
Collaborator

Audit Review

No confirmed defects in the PR. The fix correctly centralizes file identifier logic and resolves the key mismatch in rescheduleTasksFromReplica. It also fixes a secondary inconsistency in getPreQueuedFile where isArchive() was not checked (only send_over_whole_archive was), which could cause lookup mismatches when bucket info is present.

Pre-existing risk: data race on replica_to_files_to_be_processed

getReplicaForFile() iterates replica_to_files_to_be_processed without holding the mutex when called from getMatchingFileFromIterator:

// line 197, StorageObjectStorageStableTaskDistributor.cpp — no lock held
size_t file_replica_idx = getReplicaForFile(file_identifier);

Meanwhile rescheduleTasksFromReplica can concurrently erase() from the same map under the lock:

// line 319, StorageObjectStorageStableTaskDistributor.cpp — lock held
replica_to_files_to_be_processed.erase(number_of_current_replica);

Concurrent read iteration + erase on std::unordered_map is UB. Not introduced by this PR, but worth noting.

Minor style nit

getFileIdentifier takes ObjectInfoPtr by value (unnecessary atomic ref-count bump). const ObjectInfoPtr & would be slightly more efficient.

Confidence: High (9/10) — a test exercising rescheduleTasksFromReplica with archive or bucket-info files would raise it further.

@vzakaznikov
Copy link
Collaborator

Follow-up audit note: additional pre-existing UB/crash risks in StorageObjectStorageStableTaskDistributor (not introduced by this PR).

1) Unsynchronized unordered_map iteration/read vs erase

Container: replica_to_files_to_be_processed

Read path (no lock):

  • getReplicaForFile() iterates map:
    • src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp lines ~84-106
  • Called from getMatchingFileFromIterator() without holding mutex:
    • same file line ~197

Write path (with lock):

  • rescheduleTasksFromReplica() erases from map:
    • same file line ~319 (replica_to_files_to_be_processed.erase(...))

Why this is a defect: concurrent iteration/read + erase on std::unordered_map is undefined behavior and may crash/corrupt state.

2) Unsynchronized unordered_map pointer/iterator lifetime hazard in getNextTask

Container: replica_to_files_to_be_processed

Read path (no lock):

  • getNextTask() obtains iterator/pointer-like handle from find():
    • same file lines ~52-53
  • Later uses it to mutate list:
    • line ~70 (processed_file_list_ptr->second.push_back(file))

Concurrent write path (with lock):

  • rescheduleTasksFromReplica() may erase same key:
    • line ~319

Why this is a defect: if another thread erases the entry between find() and later push_back(), processed_file_list_ptr becomes invalid (UB/use-after-erase class behavior).

Minimal interleaving that can trigger UB

  1. T1 enters getNextTask(replica=X), reads processed_file_list_ptr = find(X) (no lock).
  2. T2 detects connection loss for same replica and enters rescheduleTasksFromReplica(X), acquires lock, erases map entry for X.
  3. T1 later executes processed_file_list_ptr->second.push_back(file) using invalid iterator/handle.

Suggested fix direction

  • Guard all accesses (find/iterate/read/write/erase) to replica_to_files_to_be_processed with the same mutex, or
  • Snapshot active replica ids under lock before hashing/iteration, and never keep iterators/refs across unlocks.

Given both issues are same shared-state root cause, fixing together would likely be safest.

@ianton-ru
Copy link
Author

Audit Report: PR #1493 — Fix file identifier in rescheduleTasksFromReplica (latest)

Scope: Altinity/ClickHouse PR #1493
Branch: bugfix/antalya-26.1/task_reschedule_fixantalya-26.1
Audit standard: .cursor/skills/audit-review/SKILL.md + .cursor/rules/audit-review.mdc
PR state audited: 2 commits (file identifier fix + unsynchronized access fix)


1. Scope and partitions

  • Two logical partitions (both in same files):
    1. File identifier centralization — Introduce getFileIdentifier() and use it everywhere for unprocessed_files key and getReplicaForFile() input so reschedule and assignment use the same key (fixes Object-storage task rescheduling uses inconsistent task identity, causing task loss/misrouting after replica failure #1486).
    2. Synchronization of replica_to_files_to_be_processed — Ensure every access (find, iteration in getReplicaForFile, erase, push_back) happens under the same mutex; fix use of stale iterator in getNextTask.
  • Files changed: 2
    • src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp
    • src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h
  • Net diff (both commits): +54 / -25 lines in .cpp; +2 in .h.

2. Call graph

Entrypoints (external):

  • getNextTask(size_t number_of_current_replica) — worker/replica requests next file task.
  • rescheduleTasksFromReplica(size_t number_of_current_replica) — replica marked lost; tasks returned to queue.

Dispatch / flow (current code):

  • getNextTask:
    saveLastNodeActivityunder mutex: find(replica), if end throw → unlock → getPreQueuedFile (takes mutex internally) → getMatchingFileFromIterator (mutex for iterator_exhausted and next; getFileIdentifier outside lock; mutex for getReplicaForFile; mutex for emplace) → getAnyUnprocessedFile (mutex for full duration) → if file: mutex, re-find(replica); if end (“replica lost”) then getFileIdentifier, getReplicaForFile, unprocessed_files.emplace, connection_to_files.push_back; else processed_file_list_ptr->second.push_back(file) → return file.
  • rescheduleTasksFromReplica:
    Under mutex for full duration: find(replica), size check, for each file: getFileIdentifier, getReplicaForFile, unprocessed_files.emplace, connection_to_files.push_back, then replica_to_files_to_be_processed.erase(replica).

Shared state and locking:

  • replica_to_files_to_be_processed: All read/write (find, iteration in getReplicaForFile, erase, push_back) occur only while the caller holds mutex. No iterator or pointer is held across unlock.
  • unprocessed_files, connection_to_files, last_node_activity, iterator_exhausted: Same — only touched under mutex in this file.
  • Single mutex; no nested locking → no lock-order or deadlock risk from this class.

Downstream: Iterator next(), logging, profile events.

Error paths: Exceptions thrown under lock or after checks; lock_guard releases on unwind; no half-update of maps.


3. Transition matrix

Transition Entry Processing State change Invariant
Replica asks for task getNextTask(replica) Pre-queue → iterator match → unprocessed Optional: unprocessed_files emplace or list push_back Key consistency; all map access under mutex
Replica lost rescheduleTasksFromReplica(replica) Move replica’s list to unprocessed_files by file_identifier, erase replica replica_to_files_to_be_processed.erase; unprocessed_files.emplace Same key when re-queued vs later assigned; mutex held
Pre-queue serve getPreQueuedFile Pop, lookup by getFileIdentifier, erase unprocessed_files.erase Key matches emplace key; under mutex
Iterator match getMatchingFileFromIterator next(), getFileIdentifier, getReplicaForFile under mutex, emplace or return unprocessed_files.emplace(key, …) key = getFileIdentifier(file); getReplicaForFile under mutex
Exhausted assign getAnyUnprocessedFile Iterate unprocessed_files under mutex, getFileIdentifier (log), erase, return unprocessed_files.erase(it) Under mutex

Invariants: (1) Same file → same key everywhere via getFileIdentifier(). (2) Every access to replica_to_files_to_be_processed and unprocessed_files is under mutex; no iterator/ref held across unlock.


4. Logical code-path testing summary

Partition 1 — getFileIdentifier:

  • Branches: send_over_whole_archive && file_object->isArchive() → path-to-archive; else → getAbsolutePathFromObjectInfo(...).value_or(getIdentifier()).
  • All call sites use it for keying and for getReplicaForFile; no mixed path/identifier usage remains.

Partition 2 — synchronization:

  • getNextTask: (a) First block: lock, find, check end, unlock. (b) After obtaining file: lock, re-find; if end → “replica lost” path (emplace, getReplicaForFile under same lock); else → push_back. No use of the first find’s result after unlock. (c) getReplicaForFile called only at 79, under lock 74.
  • getMatchingFileFromIterator: getReplicaForFile at 205 under lock 204.
  • rescheduleTasksFromReplica: Entire body under one lock; getReplicaForFile at 326 under that lock.
  • getReplicaForFile: Iterates replica_to_files_to_be_processed; always invoked with caller holding mutex → no race.

Expected outcomes: No key mismatch; no data race on shared maps; no use-after-erase; no deadlock (single mutex).


5. Fault categories and category-by-category injection

Category Scope Executed Outcome Defects
File-identifier consistency getFileIdentifier vs legacy getPath/getIdentifier Executed Pass None; PR fixes mismatch
Archive / bucket path isArchive(), send_over_whole_archive, getIdentifier() Executed Pass None; centralized
Data race on replica_to_files_to_be_processed find/iterate/erase/push_back under mutex Executed Pass None; all call sites hold mutex
Iterator/reference lifetime in getNextTask Re-find under lock before use; no stale ptr Executed Pass None; second commit enforces lock + re-find
getReplicaForFile call sites Must be called only under mutex Executed Pass All three sites (79, 205, 326) under lock
Deadlock / lock order Single mutex Executed Pass N/A; no nested lock
Exception / partial update Throws; lock_guard releases Executed Pass None
Integer/signedness size_t, map keys Not Applicable No changed arithmetic
Performance (ref-count) getFileIdentifier(ObjectInfoPtr by value) Executed Low nit Style only

Fault-category completion matrix: All in-scope categories executed or N/A; none deferred.


6. Confirmed defects (introduced by this PR)

None.

  • Partition 1: Centralizing file identifier in getFileIdentifier() and using it in rescheduleTasksFromReplica (and everywhere else) fixes the 25.8→26.1 key mismatch; getPreQueuedFile alignment with isArchive() is correct.
  • Partition 2: The synchronization fix correctly (1) wraps the initial “replica exists” check in getNextTask in a lock, (2) re-acquires the lock and re-finds before using the map entry when a file is found (eliminating use of a stale iterator and ensuring getReplicaForFile + emplace/push_back are under mutex), and (3) keeps getReplicaForFile in getMatchingFileFromIterator under an explicit lock. No remaining unsynchronized access to replica_to_files_to_be_processed in the reviewed code.

7. Pre-existing / residual (not introduced by this PR)

  • Iceberg path (line 195): When iceberg_read_optimization_enabled, the path set for metadata still uses send_over_whole_archive ? object_info->getPathOrPathToArchiveIfArchive() : object_info->getPath() instead of getFileIdentifier(). This is for the Iceberg command path, not the distribution key; low risk, possible future consistency improvement.
  • Style: getFileIdentifier(ObjectInfoPtr file_object, ...) could take const ObjectInfoPtr & to avoid an extra ref-count; low impact.

8. Coverage accounting and stop-condition status

  • Call-graph nodes: getNextTask, rescheduleTasksFromReplica, getPreQueuedFile, getMatchingFileFromIterator, getAnyUnprocessedFile, getReplicaForFile, getFileIdentifier — all reviewed.
  • Transitions: Request task, replica lost, pre-queue, iterator match, exhausted assign — all covered.
  • Fault categories: All defined categories executed or N/A; matrix complete.
  • Stop condition: Met.

9. Assumptions and limits

  • Static analysis only; no TSAN or runtime tests executed.
  • Only StorageObjectStorageStableTaskDistributor.cpp/.h and direct dependencies (e.g. ObjectInfo::getIdentifier(), getAbsolutePathFromObjectInfo) in scope.
  • No other callers of getReplicaForFile or of replica_to_files_to_be_processed outside this file assumed.

10. Confidence rating and evidence to raise it

  • Confidence: High (9/10).
  • Would raise further: (1) Regression test for reschedule with archive/bucket files. (2) TSAN run on object-storage cluster task distribution. (3) Code comment stating that all accesses to replica_to_files_to_be_processed and unprocessed_files are under mutex.

11. Residual risks and untested paths

  • Iceberg metadata path vs getFileIdentifier (see §7).
  • Minor ref-count style for getFileIdentifier parameter.

Summary (compact)

Audit update for PR #1493 (latest: file identifier fix + unsynchronized access fix):

Confirmed defects: None.

Scope: Two partitions — (1) file identifier centralization and reschedule key fix, (2) mutex protection for all accesses to replica_to_files_to_be_processed and re-find before use in getNextTask. Call graph, transition matrix, and fault categories (identifier consistency, archive/bucket, data race, iterator lifetime, getReplicaForFile call sites, deadlock, exception) executed; all passed or N/A. Pre-existing concurrency issues addressed by the second commit; all map access and getReplicaForFile calls are under the same mutex; getNextTask no longer uses a stale iterator. Assumptions: static analysis only; no TSAN or new tests run.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants