feat(python-driver): add public API for connection pooling and model dict conversion#2374
feat(python-driver): add public API for connection pooling and model dict conversion#2374uesleilima wants to merge 3 commits intoapache:masterfrom
Conversation
…dict conversion Add two enhancements to the Python driver's public API: 1. Add configure_connection() function that registers AGE agtype adapters on an existing psycopg connection without creating a new one. This enables use with external connection pools (e.g. psycopg_pool) and managed PostgreSQL services where LOAD 'age' may be restricted. Also explicitly export AgeLoader and ClientCursor as public symbols in age/__init__.py. (apache#2369) 2. Add to_dict() methods to Vertex, Edge, and Path model classes for conversion to plain Python dicts. This enables direct JSON serialization with json.dumps() without requiring custom conversion logic. (apache#2371) - Vertex.to_dict() returns {id, label, properties} - Edge.to_dict() returns {id, label, start_id, end_id, properties} - Path.to_dict() returns a list of to_dict() results Closes apache#2369 Closes apache#2371
…plugins - Replace confusing `skip_load` (double-negative) with `load` (positive boolean, default False). The default now correctly matches the intent: no LOAD by default for connection pool / managed PostgreSQL use cases. - Add `load_from_plugins` parameter for parity with setUpAge(). - Fix docstring to accurately describe parameter behavior. - Add 6 unit tests for configure_connection covering: default no-load, explicit load, load_from_plugins, search_path always set, adapter registration, and graph_name check delegation. Made-with: Cursor
|
@uesleilima thanks a lot for your patch. I let Claude code review run over the patch. Can you please address the issues ? Code ReviewSummaryAdds two useful features to the Python driver: a 🔴 Critical Issues
🟡 Suggestions
✅ What Looks Good
Verdict: Request Changes — The silent |
- Move TypeInfo.fetch() inside cursor block so search_path change is visible regardless of transaction isolation mode - Raise ValueError when load_from_plugins=True but load=False - Add type annotations to configure_connection signature - Document shallow-copy semantics in Vertex/Edge to_dict() - Path.to_dict() uses str() fallback for non-AGObj entities to guarantee JSON-serializable output - Add test for AgeNotSet when TypeInfo.fetch returns None - Add test for load_from_plugins=True without load=True - Replace fragile string assertions with assert_called_with/assert_any_call Made-with: Cursor
There was a problem hiding this comment.
Pull request overview
Adds new public APIs to the Python driver to better support external connection management (e.g., pooled connections) and to make model objects easier to serialize/consume in downstream apps.
Changes:
- Introduces
configure_connection()as a public API to configure an existing psycopg connection for AGE (search_path, agtype loaders, optional LOAD, optional graph check). - Adds
to_dict()helpers toVertex,Edge, andPathfor plain-Python dict/list conversion. - Exports
AgeLoader,ClientCursor, andconfigure_connectionfromage/__init__.pyand adds accompanying tests.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| drivers/python/age/age.py | Adds the new configure_connection() API for externally-managed connections. |
| drivers/python/age/models.py | Adds to_dict() methods for Path, Vertex, and Edge. |
| drivers/python/age/init.py | Exposes new public symbols at the package top-level. |
| drivers/python/test_age_py.py | Adds unit tests for to_dict() and configure_connection() and importability checks. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def to_dict(self) -> list: | ||
| # Non-AGObj elements (e.g. raw dicts/strings from malformed paths) | ||
| # are included as-is via str() to guarantee JSON-serializable output. | ||
| return [ | ||
| e.to_dict() if isinstance(e, AGObj) else str(e) | ||
| for e in self.entities | ||
| ] |
There was a problem hiding this comment.
Path.to_dict() currently coerces any non-AGObj element with str(e). If a path contains a plain dict/list (or other JSON-native types), this converts structured data into a string representation (e.g., dicts become single-quoted strings), which is lossy and not equivalent to JSON serialization. Consider leaving JSON-native types unchanged and only stringifying truly non-serializable objects (or recursively normalizing lists/dicts). Also handle self.entities is None (current default ctor) to avoid a TypeError when iterating.
| def configure_connection( | ||
| conn: psycopg.connection, | ||
| graph_name: str | None = None, | ||
| load: bool = False, | ||
| load_from_plugins: bool = False, | ||
| ) -> None: | ||
| """Register AGE agtype adapters on an existing connection. | ||
|
|
||
| This enables use of AGE with externally-managed connections, such as | ||
| those from psycopg_pool.ConnectionPool. By default the function does | ||
| **not** execute ``LOAD 'age'``, making it safe for managed PostgreSQL | ||
| services (Azure, AWS RDS) where the extension is pre-loaded via | ||
| ``shared_preload_libraries``. | ||
|
|
||
| Performs: | ||
| - ``SET search_path`` to include ``ag_catalog`` | ||
| - Fetches agtype OIDs and registers ``AgeLoader`` | ||
| - Optionally loads the AGE extension (``load=True``) | ||
| - Optionally checks/creates the graph | ||
|
|
||
| Args: | ||
| conn: An existing psycopg connection. | ||
| graph_name: Optional graph name to check/create. | ||
| load: If True, execute ``LOAD 'age'`` (or the plugins path). | ||
| Default False — suitable for environments where AGE is | ||
| already loaded. | ||
| load_from_plugins: If True (and ``load=True``), use | ||
| ``LOAD '$libdir/plugins/age'`` instead of ``LOAD 'age'``. | ||
|
|
||
| Raises: | ||
| ValueError: If ``load_from_plugins=True`` but ``load=False``. | ||
| AgeNotSet: If the agtype type is not found in the database. | ||
| """ | ||
| if load_from_plugins and not load: | ||
| raise ValueError( | ||
| "load_from_plugins=True requires load=True. " | ||
| "Set load=True to enable extension loading." | ||
| ) | ||
|
|
||
| with conn.cursor() as cursor: | ||
| if load: | ||
| if load_from_plugins: | ||
| cursor.execute("LOAD '$libdir/plugins/age';") | ||
| else: | ||
| cursor.execute("LOAD 'age';") | ||
|
|
||
| cursor.execute("SET search_path = ag_catalog, '$user', public;") | ||
|
|
||
| ag_info = TypeInfo.fetch(conn, 'agtype') | ||
|
|
||
| if not ag_info: | ||
| raise AgeNotSet( | ||
| "AGE agtype type not found. Ensure the AGE extension is " | ||
| "installed and loaded in the current database. " | ||
| "Run CREATE EXTENSION age; first." | ||
| ) | ||
|
|
||
| conn.adapters.register_loader(ag_info.oid, AgeLoader) | ||
| conn.adapters.register_loader(ag_info.array_oid, AgeLoader) | ||
|
|
||
| if graph_name is not None: | ||
| checkGraphCreated(conn, graph_name) | ||
|
|
There was a problem hiding this comment.
configure_connection() largely duplicates setUpAge() (LOAD/search_path/agtype TypeInfo fetch/loader registration/graph check). This duplication risks the two code paths drifting over time (e.g., changes to error handling or adapter registration). Consider refactoring so setUpAge() calls configure_connection(load=True, ...) (or vice versa) and share a single implementation.
| class TestModelToDict(unittest.TestCase): | ||
| """Unit tests for Vertex/Edge/Path to_dict() — no DB required.""" | ||
|
|
||
| def test_vertex_to_dict(self): | ||
| v = Vertex(id=123, label="Person", properties={"name": "Alice", "age": 30}) | ||
| d = v.to_dict() | ||
| self.assertEqual(d["id"], 123) | ||
| self.assertEqual(d["label"], "Person") | ||
| self.assertEqual(d["properties"], {"name": "Alice", "age": 30}) | ||
| # Verify it's a plain dict (JSON-serializable) | ||
| json_str = json.dumps(d) | ||
| self.assertIn("Alice", json_str) | ||
|
|
||
| def test_vertex_to_dict_empty_properties(self): | ||
| v = Vertex(id=1, label="Empty", properties=None) | ||
| d = v.to_dict() | ||
| self.assertEqual(d["properties"], {}) | ||
|
|
||
| def test_edge_to_dict(self): | ||
| e = Edge(id=456, label="KNOWS", properties={"since": 2020}) | ||
| e.start_id = 123 | ||
| e.end_id = 789 | ||
| d = e.to_dict() | ||
| self.assertEqual(d["id"], 456) | ||
| self.assertEqual(d["label"], "KNOWS") | ||
| self.assertEqual(d["start_id"], 123) | ||
| self.assertEqual(d["end_id"], 789) | ||
| self.assertEqual(d["properties"], {"since": 2020}) | ||
| json_str = json.dumps(d) | ||
| self.assertIn("KNOWS", json_str) | ||
|
|
||
| def test_path_to_dict(self): | ||
| v1 = Vertex(id=1, label="A", properties={"name": "start"}) | ||
| e = Edge(id=10, label="r", properties={"w": 1}) | ||
| e.start_id = 1 | ||
| e.end_id = 2 | ||
| v2 = Vertex(id=2, label="B", properties={"name": "end"}) | ||
| p = Path([v1, e, v2]) | ||
| d = p.to_dict() | ||
| self.assertEqual(len(d), 3) | ||
| self.assertEqual(d[0]["label"], "A") | ||
| self.assertEqual(d[1]["label"], "r") | ||
| self.assertEqual(d[1]["start_id"], 1) | ||
| self.assertEqual(d[2]["label"], "B") | ||
| # Verify the whole path is JSON-serializable | ||
| json_str = json.dumps(d) | ||
| self.assertIn("start", json_str) | ||
|
|
||
| def test_vertex_to_dict_is_plain_dict(self): | ||
| """to_dict() returns standard dict, not a model object.""" | ||
| v = Vertex(id=1, label="X", properties={"k": "v"}) | ||
| d = v.to_dict() | ||
| self.assertIsInstance(d, dict) | ||
| self.assertIsInstance(d["properties"], dict) | ||
|
|
||
|
|
||
| class TestPublicImports(unittest.TestCase): | ||
| """Verify that public API symbols are importable without type: ignore.""" | ||
|
|
||
| def test_import_configure_connection(self): | ||
| from age import configure_connection | ||
| self.assertTrue(callable(configure_connection)) | ||
|
|
||
| def test_import_age_loader(self): | ||
| from age import AgeLoader | ||
| self.assertIsNotNone(AgeLoader) | ||
|
|
||
| def test_import_client_cursor(self): | ||
| from age import ClientCursor | ||
| self.assertIsNotNone(ClientCursor) | ||
|
|
||
|
|
||
| class TestConfigureConnection(unittest.TestCase): | ||
| """Unit tests for configure_connection() — no DB required.""" | ||
|
|
There was a problem hiding this comment.
The new unit test classes (TestModelToDict, TestPublicImports, TestConfigureConnection) won’t run in CI when executing python test_age_py.py ... because the __main__ block constructs a TestSuite that only includes TestAgeBasic tests. Consider adding these new tests to the explicit suite, or switching this file to unittest.main() (and moving the DB/integration tests behind an opt-in flag) so the new tests are actually executed.
| def configure_connection( | ||
| conn: psycopg.connection, | ||
| graph_name: str | None = None, | ||
| load: bool = False, | ||
| load_from_plugins: bool = False, | ||
| ) -> None: |
There was a problem hiding this comment.
PR description and issue summary mention a skip_load parameter, but the implemented public API uses load: bool = False instead. Please align the PR description/docs with the actual signature (or rename the parameter) to avoid confusion for users upgrading based on the PR notes.
Summary
Two enhancements to the Python driver's public API:
1. Public API for connection pooling / external connection management (#2369)
The
Ageclass creates and owns a single connection internally. There's no supported way to register agtype adapters on externally-managed connections (e.g., frompsycopg_pool.ConnectionPool).Solution: Add
configure_connection(conn, graph_name=None, skip_load=False)that:search_pathtoag_catalogAgeLoaderLOAD 'age'by default (suitable for managed PostgreSQL services)Also explicitly exports
AgeLoader,ClientCursor, andconfigure_connectionas public symbols inage/__init__.py, eliminating the need for# type: ignorewhen accessing driver internals.Usage with connection pool:
2. Vertex/Edge/Path model classes dict/JSON conversion (#2371)
The driver's model classes lack conversion to plain Python data structures, forcing every consumer to write custom normalization.
Solution: Add
to_dict()methods:Vertex.to_dict()→{"id", "label", "properties"}Edge.to_dict()→{"id", "label", "start_id", "end_id", "properties"}Path.to_dict()→ list ofto_dict()resultsAll return plain dicts/lists that work directly with
json.dumps().Tests
Added 8 new unit tests (no DB required):
TestModelToDict(5 tests): vertex, edge, pathto_dict(), empty properties, JSON serializabilityTestPublicImports(3 tests): verifyconfigure_connection,AgeLoader,ClientCursorare importableCloses