Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/src/generated/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,20 @@
"type": "<class 'int'>",
"validation": {}
},
{
"category": "GENERATION",
"default": "round_robin",
"description": "Session queue mode. Use 'FIFO' for traditional first-in-first-out, or 'round_robin' to serve each user's jobs in turn. In single-user mode, FIFO is always used regardless of this setting.",
"env_var": "INVOKEAI_SESSION_QUEUE_MODE",
"literal_values": [
"FIFO",
"round_robin"
],
"name": "session_queue_mode",
"required": false,
"type": "typing.Literal['FIFO', 'round_robin']",
"validation": {}
},
{
"category": "GENERATION",
"default": false,
Expand Down
20 changes: 12 additions & 8 deletions invokeai/app/api/routers/session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,11 @@ async def get_queue_item_ids(
queue_id: str = Path(description="The queue id to perform this operation on"),
order_dir: SQLiteDirection = Query(default=SQLiteDirection.Descending, description="The order of sort"),
) -> ItemIdsResult:
"""Gets all queue item ids that match the given parameters. Non-admin users only see their own items."""
"""Gets all queue item ids that match the given parameters. The IDs themselves are not sensitive;
per-item field redaction is performed when the items are fetched via list_all_queue_items or
get_queue_items_by_item_ids."""
try:
user_id = None if current_user.is_admin else current_user.user_id
return ApiDependencies.invoker.services.session_queue.get_queue_item_ids(
queue_id=queue_id, order_dir=order_dir, user_id=user_id
)
return ApiDependencies.invoker.services.session_queue.get_queue_item_ids(queue_id=queue_id, order_dir=order_dir)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error while listing all queue item ids: {e}")

Expand Down Expand Up @@ -436,10 +435,15 @@ async def get_queue_status(
current_user: CurrentUserOrDefault,
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> SessionQueueAndProcessorStatus:
"""Gets the status of the session queue. Non-admin users see only their own counts and cannot see current item details unless they own it."""
"""Gets the status of the session queue. Returns global counts plus the calling user's own
pending/in_progress counts (so the UI can show an X/Y badge). Non-admin users cannot see the
current item's identifiers unless they own it."""
try:
user_id = None if current_user.is_admin else current_user.user_id
queue = ApiDependencies.invoker.services.session_queue.get_queue_status(queue_id, user_id=user_id)
queue = ApiDependencies.invoker.services.session_queue.get_queue_status(
queue_id,
user_id=current_user.user_id,
is_admin=current_user.is_admin,
)
processor = ApiDependencies.invoker.services.session_processor.get_status()
return SessionQueueAndProcessorStatus(queue=queue, processor=processor)
except Exception as e:
Expand Down
103 changes: 86 additions & 17 deletions invokeai/app/api/sockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,20 +260,37 @@ async def _handle_sub_bulk_download(self, sid: str, data: Any) -> None:
async def _handle_unsub_bulk_download(self, sid: str, data: Any) -> None:
await self._sio.leave_room(sid, BulkDownloadSubscriptionEvent(**data).bulk_download_id)

def _owner_and_admin_sids(self, owner_user_id: str) -> list[str]:
"""Sids belonging to the event's owner or to any admin.

Used as `skip_sid` when broadcasting a sanitized companion event to the queue room,
so the owner and admins (who already received the full event) don't get a second
copy that would clobber their cache with redacted values.
"""
return [
sid
for sid, info in self._socket_users.items()
if info.get("user_id") == owner_user_id or info.get("is_admin")
]

async def _handle_queue_event(self, event: FastAPIEvent[QueueEventBase]):
"""Handle queue events with user isolation.

All queue item events (invocation events AND QueueItemStatusChangedEvent) are
private to the owning user and admins. They carry unsanitized user_id, batch_id,
session_id, origin, destination and error metadata, and must never be broadcast
to the whole queue room — otherwise any other authenticated subscriber could
observe cross-user queue activity.
Queue events split into two routing paths:

RecallParametersUpdatedEvent is also private to the owner + admins.
1. The owner and admins receive the full unsanitized event in their `user:{id}` /
`admin` rooms. The full payload may include batch_id, session_id, origin,
destination, error metadata, etc.

BatchEnqueuedEvent carries the enqueuing user's batch_id/origin/counts and
is also routed privately. QueueClearedEvent is the only queue event that
is still broadcast to the whole queue room.
2. For events that other authenticated users need to know about so their queue list
and badge counts stay in sync (QueueItemStatusChangedEvent and BatchEnqueuedEvent),
a sanitized companion event is also emitted to the full queue room with the
owner's and admins' sids in `skip_sid`. The companion uses `user_id="redacted"`
as a sentinel so the frontend handler knows to do tag invalidation only and skip
per-session side effects.

InvocationEventBase events stay private (owner + admins only). RecallParametersUpdatedEvent
is also private. QueueClearedEvent has no user identity and is broadcast to the queue room.

IMPORTANT: Check InvocationEventBase BEFORE QueueItemEventBase since InvocationEventBase
inherits from QueueItemEventBase. The order of isinstance checks matters!
Expand Down Expand Up @@ -302,10 +319,51 @@ async def _handle_queue_event(self, event: FastAPIEvent[QueueEventBase]):

logger.debug(f"Emitted private invocation event {event_name} to user room {user_room} and admin room")

# Other queue item events (QueueItemStatusChangedEvent) carry unsanitized
# user_id, batch_id, session_id, origin, destination and error metadata.
# They are private to the owning user + admins — never broadcast to the
# full queue room.
# QueueItemStatusChangedEvent: full to owner+admin, sanitized to everyone else in
# the queue room so their queue list, badge, and item caches refresh.
elif isinstance(event_data, QueueItemStatusChangedEvent):
user_room = f"user:{event_data.user_id}"
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room=user_room)
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room="admin")

sanitized = event_data.model_copy(
update={
"user_id": "redacted",
"batch_id": "redacted",
"session_id": "redacted",
"origin": None,
"destination": None,
"error_type": None,
"error_message": None,
"error_traceback": None,
}
)
# Strip identifying fields out of the embedded batch_status / queue_status too.
sanitized.batch_status = sanitized.batch_status.model_copy(
update={"batch_id": "redacted", "origin": None, "destination": None}
)
sanitized.queue_status = sanitized.queue_status.model_copy(
update={
"item_id": None,
"session_id": None,
"batch_id": None,
"user_pending": None,
"user_in_progress": None,
}
)
await self._sio.emit(
event=event_name,
data=sanitized.model_dump(mode="json"),
room=event_data.queue_id,
skip_sid=self._owner_and_admin_sids(event_data.user_id),
)

logger.debug(
f"Emitted queue_item_status_changed: full to {user_room}+admin, sanitized to queue {event_data.queue_id}"
)

# Other queue item events (currently none beyond QueueItemStatusChangedEvent that
# carry user_id) stay private to owner + admins.
elif isinstance(event_data, QueueItemEventBase) and hasattr(event_data, "user_id"):
user_room = f"user:{event_data.user_id}"
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room=user_room)
Expand All @@ -320,14 +378,25 @@ async def _handle_queue_event(self, event: FastAPIEvent[QueueEventBase]):
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room="admin")
logger.debug(f"Emitted private recall_parameters_updated event to user room {user_room} and admin room")

# BatchEnqueuedEvent carries the enqueuing user's batch_id, origin, and
# enqueued counts. Route it privately to the owner + admins so other
# users do not observe cross-user batch activity.
# BatchEnqueuedEvent: full to owner+admin, sanitized to everyone else in the queue
# room so their badge total and queue list pick up the new items.
elif isinstance(event_data, BatchEnqueuedEvent):
user_room = f"user:{event_data.user_id}"
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room=user_room)
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room="admin")
logger.debug(f"Emitted private batch_enqueued event to user room {user_room} and admin room")

sanitized = event_data.model_copy(
update={"user_id": "redacted", "batch_id": "redacted", "origin": None}
)
await self._sio.emit(
event=event_name,
data=sanitized.model_dump(mode="json"),
room=event_data.queue_id,
skip_sid=self._owner_and_admin_sids(event_data.user_id),
)
logger.debug(
f"Emitted batch_enqueued: full to {user_room}+admin, sanitized to queue {event_data.queue_id}"
)

else:
# For remaining queue events (e.g. QueueClearedEvent) that do not
Expand Down
3 changes: 3 additions & 0 deletions invokeai/app/services/config/config_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
ATTENTION_SLICE_SIZE = Literal["auto", "balanced", "max", 1, 2, 3, 4, 5, 6, 7, 8]
LOG_FORMAT = Literal["plain", "color", "syslog", "legacy"]
LOG_LEVEL = Literal["debug", "info", "warning", "error", "critical"]
SESSION_QUEUE_MODE = Literal["FIFO", "round_robin"]
CONFIG_SCHEMA_VERSION = "4.0.2"
EXTERNAL_PROVIDER_CONFIG_FIELDS = (
"external_gemini_api_key",
Expand Down Expand Up @@ -108,6 +109,7 @@ class InvokeAIAppConfig(BaseSettings):
force_tiled_decode: Whether to enable tiled VAE decode (reduces memory consumption with some performance penalty).
pil_compress_level: The compress_level setting of PIL.Image.save(), used for PNG encoding. All settings are lossless. 0 = no compression, 1 = fastest with slightly larger filesize, 9 = slowest with smallest filesize. 1 is typically the best setting.
max_queue_size: Maximum number of items in the session queue.
session_queue_mode: Session queue mode. Use 'FIFO' for traditional first-in-first-out, or 'round_robin' to serve each user's jobs in turn. In single-user mode, FIFO is always used regardless of this setting.<br>Valid values: `FIFO`, `round_robin`
clear_queue_on_startup: Empties session queue on startup. If true, disables `max_queue_history`.
max_queue_history: Keep the last N completed, failed, and canceled queue items. Older items are deleted on startup. Set to 0 to prune all terminal items. Ignored if `clear_queue_on_startup` is true.
allow_nodes: List of nodes to allow. Omit to allow all.
Expand Down Expand Up @@ -203,6 +205,7 @@ class InvokeAIAppConfig(BaseSettings):
force_tiled_decode: bool = Field(default=False, description="Whether to enable tiled VAE decode (reduces memory consumption with some performance penalty).")
pil_compress_level: int = Field(default=1, description="The compress_level setting of PIL.Image.save(), used for PNG encoding. All settings are lossless. 0 = no compression, 1 = fastest with slightly larger filesize, 9 = slowest with smallest filesize. 1 is typically the best setting.")
max_queue_size: int = Field(default=10000, gt=0, description="Maximum number of items in the session queue.")
session_queue_mode: SESSION_QUEUE_MODE = Field(default="round_robin", description="Session queue mode. Use 'FIFO' for traditional first-in-first-out, or 'round_robin' to serve each user's jobs in turn. In single-user mode, FIFO is always used regardless of this setting.")
clear_queue_on_startup: bool = Field(default=False, description="Empties session queue on startup. If true, disables `max_queue_history`.")
max_queue_history: Optional[int] = Field(default=None, ge=0, description="Keep the last N completed, failed, and canceled queue items. Older items are deleted on startup. Set to 0 to prune all terminal items. Ignored if `clear_queue_on_startup` is true.")

Expand Down
15 changes: 13 additions & 2 deletions invokeai/app/services/session_queue/session_queue_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,19 @@ def is_full(self, queue_id: str) -> IsFullResult:
pass

@abstractmethod
def get_queue_status(self, queue_id: str, user_id: Optional[str] = None) -> SessionQueueStatus:
"""Gets the status of the queue. If user_id is provided, also includes user-specific counts."""
def get_queue_status(
self,
queue_id: str,
user_id: Optional[str] = None,
is_admin: bool = False,
) -> SessionQueueStatus:
"""Gets the status of the queue.

Always returns global pending/in_progress/etc. counts. When user_id is provided, also
populates user_pending and user_in_progress with that user's own counts (so the UI can
render an X/Y badge). When is_admin is False, the current item's identifiers are hidden
unless the calling user owns the in-progress item.
"""
pass

@abstractmethod
Expand Down
6 changes: 6 additions & 0 deletions invokeai/app/services/session_queue/session_queue_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,12 @@ class SessionQueueStatus(BaseModel):
failed: int = Field(..., description="Number of queue items with status 'error'")
canceled: int = Field(..., description="Number of queue items with status 'canceled'")
total: int = Field(..., description="Total number of queue items")
user_pending: Optional[int] = Field(
default=None, description="Number of pending queue items for the calling user (multiuser only)"
)
user_in_progress: Optional[int] = Field(
default=None, description="Number of in-progress queue items for the calling user (multiuser only)"
)


class SessionQueueCountsByDestination(BaseModel):
Expand Down
Loading
Loading