Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions gigl/distributed/utils/networking.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ def write_readiness_signal(readiness_uri: Uri) -> None:
def wait_for_readiness_signal(
readiness_uri: Uri,
timeout: float = 3600.0,
poll_interval: float = 10.0,
log_every_n_attempts: int = 10,
poll_interval_s: float = 10.0,
log_every_n_attempts: int = 30,
) -> None:
"""Poll for a readiness sentinel file before initiating RPC connections.

Expand All @@ -243,13 +243,16 @@ def wait_for_readiness_signal(
readiness_uri: The URI to poll for the sentinel file.
Supports both GcsUri (production) and LocalUri (testing).
timeout: Maximum time in seconds to wait for the signal. Defaults to 3600.
poll_interval: Time in seconds between poll attempts. Defaults to 10.
poll_interval_s: Time in seconds between poll attempts. Defaults to 10.
log_every_n_attempts: Number of attempts between log messages. Defaults to 30.
e.g. with poll_interval set to 10, and log_every_n_attempts set to 30, we will log every 300 seconds (5 minutes).


Raises:
TimeoutError: If the readiness signal is not found within the timeout.
"""
logger.info(
f"Waiting for readiness signal at {readiness_uri} (timeout={timeout}s, poll_interval={poll_interval}s)"
f"Waiting for readiness signal at {readiness_uri} (timeout={timeout}s, poll_interval={poll_interval_s}s)"
)
file_loader = FileLoader()
start_time = time.monotonic()
Expand All @@ -265,10 +268,10 @@ def wait_for_readiness_signal(
)
if attempt % log_every_n_attempts == 0:
logger.info(
f"Readiness signal not yet available at {readiness_uri}. Elapsed: {elapsed:.0f}s. Retrying in {poll_interval}s..."
f"Readiness signal not yet available at {readiness_uri}. Elapsed: {elapsed:.0f}s. Retrying in {poll_interval_s}s... Expect the next log message in {log_every_n_attempts * poll_interval_s}s"
)
attempt += 1
time.sleep(poll_interval)
time.sleep(poll_interval_s)


def get_graph_store_info() -> GraphStoreInfo:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/distributed/utils/networking_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ def test_wait_for_readiness_signal(self) -> None:
self.addCleanup(temp_dir.cleanup)
readiness_uri = LocalUri(temp_dir.name) / "readiness.txt"
with self.assertRaises(TimeoutError):
wait_for_readiness_signal(readiness_uri, timeout=0.1, poll_interval=0.01)
wait_for_readiness_signal(readiness_uri, timeout=0.1, poll_interval_s=0.01)

write_readiness_signal(readiness_uri)

Expand Down