Skip to content

Commit 88e763f

Browse files
committed
feat: add cursor-based pagination for messages and posts
Add get_messages_cursor() and get_posts_cursor() methods that use cursor-based pagination (faster, no COUNT query). Update iterators to use cursor mode by default, eliminating duplicate results.
1 parent 14bc2e4 commit 88e763f

3 files changed

Lines changed: 196 additions & 18 deletions

File tree

src/aleph/sdk/client/abstract.py

Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,13 @@
4242
from aleph.sdk.utils import extended_json_encoder
4343

4444
from ..query.filters import MessageFilter, PostFilter
45-
from ..query.responses import MessagesResponse, PostsResponse, PriceResponse
45+
from ..query.responses import (
46+
CursorMessagesResponse,
47+
CursorPostsResponse,
48+
MessagesResponse,
49+
PostsResponse,
50+
PriceResponse,
51+
)
4652
from ..types import GenericMessage, StorageEnum
4753
from ..utils import Writable, compute_sha256
4854

@@ -120,26 +126,47 @@ async def get_posts(
120126
"""
121127
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")
122128

129+
@abstractmethod
130+
async def get_posts_cursor(
131+
self,
132+
page_size: int = DEFAULT_PAGE_SIZE,
133+
cursor: str = "",
134+
post_filter: Optional[PostFilter] = None,
135+
ignore_invalid_messages: Optional[bool] = True,
136+
invalid_messages_log_level: Optional[int] = logging.NOTSET,
137+
) -> CursorPostsResponse:
138+
"""
139+
Fetch a list of posts from the network using cursor-based pagination.
140+
141+
:param page_size: Number of items to fetch, max 200 (Default: 200)
142+
:param cursor: Opaque cursor from a previous response's next_cursor. Empty string starts from the beginning.
143+
:param post_filter: Filter to apply to the posts (Default: None)
144+
:param ignore_invalid_messages: Ignore invalid messages (Default: True)
145+
:param invalid_messages_log_level: Log level to use for invalid messages (Default: logging.NOTSET)
146+
"""
147+
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")
148+
123149
async def get_posts_iterator(
124150
self,
125151
post_filter: Optional[PostFilter] = None,
126152
) -> AsyncIterable[PostMessage]:
127153
"""
128-
Fetch all filtered posts, returning an async iterator and fetching them page by page. Might return duplicates
129-
but will always return all posts.
154+
Fetch all filtered posts, returning an async iterator and fetching them
155+
using cursor-based pagination. Does not return duplicates.
130156
131157
:param post_filter: Filter to apply to the posts (Default: None)
132158
"""
133-
page = 1
134-
resp = None
135-
while resp is None or len(resp.posts) > 0:
136-
resp = await self.get_posts(
137-
page=page,
159+
cursor: str = ""
160+
while True:
161+
resp = await self.get_posts_cursor(
162+
cursor=cursor,
138163
post_filter=post_filter,
139164
)
140-
page += 1
141165
for post in resp.posts:
142166
yield post # type: ignore
167+
if resp.next_cursor is None:
168+
break
169+
cursor = resp.next_cursor
143170

144171
@abstractmethod
145172
async def download_file(self, file_hash: str) -> bytes:
@@ -224,26 +251,47 @@ async def get_messages(
224251
"""
225252
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")
226253

254+
@abstractmethod
255+
async def get_messages_cursor(
256+
self,
257+
page_size: int = DEFAULT_PAGE_SIZE,
258+
cursor: str = "",
259+
message_filter: Optional[MessageFilter] = None,
260+
ignore_invalid_messages: Optional[bool] = True,
261+
invalid_messages_log_level: Optional[int] = logging.NOTSET,
262+
) -> CursorMessagesResponse:
263+
"""
264+
Fetch a list of messages from the network using cursor-based pagination.
265+
266+
:param page_size: Number of items to fetch, max 200 (Default: 200)
267+
:param cursor: Opaque cursor from a previous response's next_cursor. Empty string starts from the beginning.
268+
:param message_filter: Filter to apply to the messages
269+
:param ignore_invalid_messages: Ignore invalid messages (Default: True)
270+
:param invalid_messages_log_level: Log level to use for invalid messages (Default: logging.NOTSET)
271+
"""
272+
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")
273+
227274
async def get_messages_iterator(
228275
self,
229276
message_filter: Optional[MessageFilter] = None,
230277
) -> AsyncIterable[AlephMessage]:
231278
"""
232-
Fetch all filtered messages, returning an async iterator and fetching them page by page. Might return duplicates
233-
but will always return all messages.
279+
Fetch all filtered messages, returning an async iterator and fetching
280+
them using cursor-based pagination. Does not return duplicates.
234281
235282
:param message_filter: Filter to apply to the messages
236283
"""
237-
page = 1
238-
resp = None
239-
while resp is None or len(resp.messages) > 0:
240-
resp = await self.get_messages(
241-
page=page,
284+
cursor: str = ""
285+
while True:
286+
resp = await self.get_messages_cursor(
287+
cursor=cursor,
242288
message_filter=message_filter,
243289
)
244-
page += 1
245290
for message in resp.messages:
246291
yield message
292+
if resp.next_cursor is None:
293+
break
294+
cursor = resp.next_cursor
247295

248296
@abstractmethod
249297
async def get_message(

src/aleph/sdk/client/http.py

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,12 @@
5252
RemovedMessageError,
5353
ResourceNotFoundError,
5454
)
55-
from ..query.filters import BalanceFilter, MessageFilter, PostFilter
55+
from ..query.filters import BalanceFilter, MessageFilter, PostFilter, SortBy
5656
from ..query.responses import (
5757
BalanceResponse,
5858
CreditsHistoryResponse,
59+
CursorMessagesResponse,
60+
CursorPostsResponse,
5961
MessagesResponse,
6062
Post,
6163
PostsResponse,
@@ -260,6 +262,56 @@ async def get_posts(
260262
pagination_item=response_json["pagination_item"],
261263
)
262264

265+
async def get_posts_cursor(
266+
self,
267+
page_size: int = 200,
268+
cursor: str = "",
269+
post_filter: Optional[PostFilter] = None,
270+
ignore_invalid_messages: Optional[bool] = True,
271+
invalid_messages_log_level: Optional[int] = logging.NOTSET,
272+
) -> CursorPostsResponse:
273+
ignore_invalid_messages = (
274+
True if ignore_invalid_messages is None else ignore_invalid_messages
275+
)
276+
invalid_messages_log_level = (
277+
logging.NOTSET
278+
if invalid_messages_log_level is None
279+
else invalid_messages_log_level
280+
)
281+
282+
if post_filter and post_filter.sort_by == SortBy.TX_TIME:
283+
raise ValueError(
284+
"sortBy=tx-time is not compatible with cursor-based pagination"
285+
)
286+
287+
page_size = min(page_size, 200)
288+
289+
params: Dict[str, str] = {}
290+
if post_filter:
291+
params = post_filter.as_http_params()
292+
params["cursor"] = cursor
293+
params["pagination"] = str(page_size)
294+
295+
async with self.http_session.get("/api/v0/posts.json", params=params) as resp:
296+
resp.raise_for_status()
297+
response_json = await resp.json()
298+
posts_raw = response_json["posts"]
299+
300+
posts: List[Post] = []
301+
for post_raw in posts_raw:
302+
try:
303+
posts.append(Post.model_validate(post_raw))
304+
except ValidationError as e:
305+
if not ignore_invalid_messages:
306+
raise e
307+
if invalid_messages_log_level:
308+
logger.log(level=invalid_messages_log_level, msg=e)
309+
return CursorPostsResponse(
310+
posts=posts,
311+
pagination_per_page=response_json["pagination_per_page"],
312+
next_cursor=response_json.get("next_cursor"),
313+
)
314+
263315
async def download_file_to_buffer(
264316
self,
265317
file_hash: str,
@@ -425,6 +477,67 @@ async def get_messages(
425477
pagination_item=response_json["pagination_item"],
426478
)
427479

480+
async def get_messages_cursor(
481+
self,
482+
page_size: int = 200,
483+
cursor: str = "",
484+
message_filter: Optional[MessageFilter] = None,
485+
ignore_invalid_messages: Optional[bool] = True,
486+
invalid_messages_log_level: Optional[int] = logging.NOTSET,
487+
) -> CursorMessagesResponse:
488+
ignore_invalid_messages = (
489+
True if ignore_invalid_messages is None else ignore_invalid_messages
490+
)
491+
invalid_messages_log_level = (
492+
logging.NOTSET
493+
if invalid_messages_log_level is None
494+
else invalid_messages_log_level
495+
)
496+
497+
if message_filter and message_filter.sort_by == SortBy.TX_TIME:
498+
raise ValueError(
499+
"sortBy=tx-time is not compatible with cursor-based pagination"
500+
)
501+
502+
page_size = min(page_size, 200)
503+
504+
params: Dict[str, str] = {}
505+
if message_filter:
506+
params = message_filter.as_http_params()
507+
params["cursor"] = cursor
508+
params["pagination"] = str(page_size)
509+
510+
async with self.http_session.get(
511+
"/api/v0/messages.json", params=params
512+
) as resp:
513+
resp.raise_for_status()
514+
response_json = await resp.json()
515+
messages_raw = response_json["messages"]
516+
517+
messages: List[AlephMessage] = []
518+
for message_raw in messages_raw:
519+
try:
520+
message = parse_message(message_raw)
521+
messages.append(message)
522+
except KeyError as e:
523+
if not ignore_invalid_messages:
524+
raise e
525+
logger.log(
526+
level=invalid_messages_log_level,
527+
msg=f"KeyError: Field '{e.args[0]}' not found",
528+
)
529+
except ValidationError as e:
530+
if not ignore_invalid_messages:
531+
raise e
532+
if invalid_messages_log_level:
533+
logger.log(level=invalid_messages_log_level, msg=e)
534+
535+
return CursorMessagesResponse(
536+
messages=messages,
537+
pagination_per_page=response_json["pagination_per_page"],
538+
next_cursor=response_json.get("next_cursor"),
539+
)
540+
428541
@overload
429542
async def get_message( # type: ignore
430543
self,

src/aleph/sdk/query/responses.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,23 @@ class MessagesResponse(PaginationResponse):
7676
pagination_item: str = "messages"
7777

7878

79+
class CursorPaginationResponse(BaseModel):
80+
pagination_per_page: int
81+
next_cursor: Optional[str] = None
82+
83+
84+
class CursorPostsResponse(CursorPaginationResponse):
85+
"""Cursor-paginated response from /api/v0/posts.json"""
86+
87+
posts: List[Post]
88+
89+
90+
class CursorMessagesResponse(CursorPaginationResponse):
91+
"""Cursor-paginated response from /api/v0/messages.json"""
92+
93+
messages: List[AlephMessage]
94+
95+
7996
class PriceResponse(BaseModel):
8097
"""Response from an aleph.im node API on the path /api/v0/price/{item_hash}"""
8198

0 commit comments

Comments
 (0)