diff --git a/docs/03_features/201_rule_catalog.md b/docs/03_features/201_rule_catalog.md index a0b324d..020ecf5 100644 --- a/docs/03_features/201_rule_catalog.md +++ b/docs/03_features/201_rule_catalog.md @@ -92,6 +92,11 @@ LLMs and are not blocking by default. a reasoning-tree intent docstring. - `PY-AGENT-R008`: broad branch packages should split into focused subpackages or document the facade and owner map for agent repair loops. +- `PY-AGENT-R009`: public functions with deeply nested control flow should + expose the algorithm shape through guard clauses, explicit dispatch, + `match/case`, or small named pipeline steps. +- `PY-AGENT-R010`: public functions with broad linear statement blocks should + split into named helpers or pipeline steps that are easier for agents to edit. ## Reasoning Tree Policy @@ -115,6 +120,26 @@ folders surface as owner-map advice without becoming a raw child-count gate. Packages that already expose an explicit public facade are treated as having an owner map. +`PY-AGENT-R009` is backed by parser-owned function control-flow facts, not by +harness string scanning. The parser records branch count, loop count, maximum +nesting, loop nesting, terminal `else` opportunities, and repeated literal +dispatch chains for each function symbol during the normal AST collection pass. +The harness turns those facts into a compact repair hint when a public function +hides its algorithm behind nested `if`/loop structure. The rule stays advisory +by default so teams can tune or promote it after seeing their project shape. + +The implementation lives under `python_lang_project_harness.agent_readability` +because the target reader is the repair agent, not a human style reviewer. The +goal is short, explicit algorithm surfaces that an LLM can use from the +reasoning tree: guard clauses instead of nested `else`, `match/case` or dispatch +tables instead of literal branch ladders, and small named pipeline steps instead +of one broad loop body. Performance remains parser-first: the harness only +consumes `PythonFunctionControlFlow` facts and does not run a second AST parse. +`PY-AGENT-R010` complements `PY-AGENT-R009`: the former catches long flat +procedure-like public functions, while the latter catches nested control-flow +shape. This keeps the advice compact and avoids telling the agent the same +thing twice. + `render_python_reasoning_tree()` exposes the same tree as compact text for LLM repair loops. It includes an `[imports]` section for parser-resolved project-internal edges, a compact `[project]` section for declared package diff --git a/src/python_lang_parser/__init__.py b/src/python_lang_parser/__init__.py index f250c53..e2ade21 100644 --- a/src/python_lang_parser/__init__.py +++ b/src/python_lang_parser/__init__.py @@ -48,6 +48,7 @@ PythonDiagnosticSeverity, PythonExportContract, PythonExportContractKind, + PythonFunctionControlFlow, PythonImport, PythonModuleReport, PythonModuleShape, @@ -69,6 +70,7 @@ "PythonDiagnosticSeverity", "PythonExportContract", "PythonExportContractKind", + "PythonFunctionControlFlow", "PythonImport", "PythonModuleReport", "PythonModuleShape", diff --git a/src/python_lang_parser/_ast_collector.py b/src/python_lang_parser/_ast_collector.py index 5439ffb..7a09a1d 100644 --- a/src/python_lang_parser/_ast_collector.py +++ b/src/python_lang_parser/_ast_collector.py @@ -17,6 +17,7 @@ unparse, ) from ._call_effects import call_effect +from ._control_flow import collect_function_control_flow from ._export_model import PythonExportContract, PythonExportContractKind from ._exports import literal_string_sequence from .model import ( @@ -227,6 +228,11 @@ def _visit_symbol( if isinstance(node, ast.ClassDef) else () ), + control_flow=( + None + if isinstance(node, ast.ClassDef) + else collect_function_control_flow(node) + ), docstring=ast.get_docstring(node), has_annotations=symbol_has_annotations(node), is_public=is_public_name(node.name), diff --git a/src/python_lang_parser/_control_flow.py b/src/python_lang_parser/_control_flow.py new file mode 100644 index 0000000..18450b6 --- /dev/null +++ b/src/python_lang_parser/_control_flow.py @@ -0,0 +1,305 @@ +"""Parser-owned function control-flow shape collection.""" + +from __future__ import annotations + +import ast +from dataclasses import dataclass + +from ._ast_names import unparse +from .model import PythonFunctionControlFlow + + +def collect_function_control_flow( + node: ast.FunctionDef | ast.AsyncFunctionDef, +) -> PythonFunctionControlFlow: + """Return compact control-flow shape facts for one function body.""" + + collector = _ControlFlowCollector() + collector.visit_statements(node.body, nesting_depth=0, loop_depth=0) + return PythonFunctionControlFlow( + statement_count=collector.statement_count, + max_block_statement_count=collector.max_block_statement_count, + branch_count=collector.branch_count, + loop_count=collector.loop_count, + match_count=collector.match_count, + return_count=collector.return_count, + terminal_else_count=collector.terminal_else_count, + max_nesting_depth=collector.max_nesting_depth, + max_loop_nesting_depth=collector.max_loop_nesting_depth, + max_literal_dispatch_chain=collector.max_literal_dispatch_chain, + nested_control_flow_count=collector.nested_control_flow_count, + ) + + +@dataclass(slots=True) +class _ControlFlowCollector: + statement_count: int = 0 + max_block_statement_count: int = 0 + branch_count: int = 0 + loop_count: int = 0 + match_count: int = 0 + return_count: int = 0 + terminal_else_count: int = 0 + max_nesting_depth: int = 0 + max_loop_nesting_depth: int = 0 + max_literal_dispatch_chain: int = 0 + nested_control_flow_count: int = 0 + + def visit_statements( + self, + statements: list[ast.stmt], + *, + nesting_depth: int, + loop_depth: int, + ) -> None: + self.max_block_statement_count = max( + self.max_block_statement_count, + len(statements), + ) + for statement in statements: + self.visit_statement( + statement, + nesting_depth=nesting_depth, + loop_depth=loop_depth, + ) + + def visit_statement( + self, + statement: ast.stmt, + *, + nesting_depth: int, + loop_depth: int, + ) -> None: + self.statement_count += 1 + if isinstance(statement, ast.If): + self._visit_if( + statement, + nesting_depth=nesting_depth, + loop_depth=loop_depth, + ) + return + if isinstance(statement, ast.For | ast.AsyncFor | ast.While): + self._visit_loop( + statement, + nesting_depth=nesting_depth, + loop_depth=loop_depth, + ) + return + if isinstance(statement, ast.Match): + self._visit_match( + statement, + nesting_depth=nesting_depth, + loop_depth=loop_depth, + ) + return + if isinstance(statement, ast.Try): + self._visit_try( + statement, + nesting_depth=nesting_depth, + loop_depth=loop_depth, + ) + return + if isinstance(statement, ast.With | ast.AsyncWith): + self._visit_block( + statement.body, + nesting_depth=nesting_depth + 1, + loop_depth=loop_depth, + ) + return + if isinstance(statement, ast.Return): + self.return_count += 1 + + def _visit_if( + self, + statement: ast.If, + *, + nesting_depth: int, + loop_depth: int, + ) -> None: + depth = nesting_depth + 1 + self._record_control_flow(depth) + self.branch_count += 1 + self.max_literal_dispatch_chain = max( + self.max_literal_dispatch_chain, + _literal_dispatch_chain_count(statement), + ) + if statement.orelse and _body_has_terminal_exit(statement.body): + self.terminal_else_count += 1 + self.visit_statements( + statement.body, + nesting_depth=depth, + loop_depth=loop_depth, + ) + if len(statement.orelse) == 1 and isinstance(statement.orelse[0], ast.If): + self.visit_statement( + statement.orelse[0], + nesting_depth=nesting_depth, + loop_depth=loop_depth, + ) + return + self.visit_statements( + statement.orelse, + nesting_depth=depth, + loop_depth=loop_depth, + ) + + def _visit_loop( + self, + statement: ast.For | ast.AsyncFor | ast.While, + *, + nesting_depth: int, + loop_depth: int, + ) -> None: + depth = nesting_depth + 1 + next_loop_depth = loop_depth + 1 + self._record_control_flow(depth) + self.loop_count += 1 + self.max_loop_nesting_depth = max( + self.max_loop_nesting_depth, + next_loop_depth, + ) + self.visit_statements( + statement.body, + nesting_depth=depth, + loop_depth=next_loop_depth, + ) + self.visit_statements( + statement.orelse, + nesting_depth=depth, + loop_depth=next_loop_depth, + ) + + def _visit_match( + self, + statement: ast.Match, + *, + nesting_depth: int, + loop_depth: int, + ) -> None: + depth = nesting_depth + 1 + self._record_control_flow(depth) + self.match_count += 1 + self.branch_count += len(statement.cases) + for case in statement.cases: + self.visit_statements( + case.body, + nesting_depth=depth, + loop_depth=loop_depth, + ) + + def _visit_try( + self, + statement: ast.Try, + *, + nesting_depth: int, + loop_depth: int, + ) -> None: + depth = nesting_depth + 1 + self._record_control_flow(depth) + self.branch_count += len(statement.handlers) + self.branch_count += 1 if statement.orelse else 0 + self.branch_count += 1 if statement.finalbody else 0 + self.visit_statements( + statement.body, + nesting_depth=depth, + loop_depth=loop_depth, + ) + for handler in statement.handlers: + self.visit_statements( + handler.body, + nesting_depth=depth, + loop_depth=loop_depth, + ) + self.visit_statements( + statement.orelse, + nesting_depth=depth, + loop_depth=loop_depth, + ) + self.visit_statements( + statement.finalbody, + nesting_depth=depth, + loop_depth=loop_depth, + ) + + def _visit_block( + self, + statements: list[ast.stmt], + *, + nesting_depth: int, + loop_depth: int, + ) -> None: + self.max_nesting_depth = max(self.max_nesting_depth, nesting_depth) + self.visit_statements( + statements, + nesting_depth=nesting_depth, + loop_depth=loop_depth, + ) + + def _record_control_flow(self, depth: int) -> None: + self.max_nesting_depth = max(self.max_nesting_depth, depth) + if depth >= 3: + self.nested_control_flow_count += 1 + + +def _body_has_terminal_exit(statements: list[ast.stmt]) -> bool: + if not statements: + return False + return _statement_has_terminal_exit(statements[-1]) + + +def _statement_has_terminal_exit(statement: ast.stmt) -> bool: + if isinstance(statement, ast.Return | ast.Raise | ast.Break | ast.Continue): + return True + if isinstance(statement, ast.If): + return ( + bool(statement.orelse) + and _body_has_terminal_exit(statement.body) + and _body_has_terminal_exit(statement.orelse) + ) + if isinstance(statement, ast.Match): + return bool(statement.cases) and all( + _body_has_terminal_exit(case.body) for case in statement.cases + ) + return False + + +def _literal_dispatch_chain_count(statement: ast.If) -> int: + subject = _literal_dispatch_subject(statement.test) + if subject is None: + return 0 + count = 0 + current: ast.If | None = statement + while current is not None: + if _literal_dispatch_subject(current.test) != subject: + break + count += 1 + current = ( + current.orelse[0] + if len(current.orelse) == 1 and isinstance(current.orelse[0], ast.If) + else None + ) + return count + + +def _literal_dispatch_subject(expression: ast.expr) -> str | None: + if not isinstance(expression, ast.Compare): + return None + if len(expression.ops) != 1 or len(expression.comparators) != 1: + return None + operator = expression.ops[0] + comparator = expression.comparators[0] + if isinstance(operator, ast.Eq | ast.Is) and _is_literal_dispatch_value(comparator): + return unparse(expression.left) + if isinstance(operator, ast.In) and _is_literal_container(comparator): + return unparse(expression.left) + return None + + +def _is_literal_dispatch_value(expression: ast.expr) -> bool: + return isinstance(expression, ast.Constant) + + +def _is_literal_container(expression: ast.expr) -> bool: + return isinstance(expression, ast.Tuple | ast.Set | ast.List) and all( + _is_literal_dispatch_value(item) for item in expression.elts + ) diff --git a/src/python_lang_parser/_pyproject_metadata.py b/src/python_lang_parser/_pyproject_metadata.py index 97c5353..1121dae 100644 --- a/src/python_lang_parser/_pyproject_metadata.py +++ b/src/python_lang_parser/_pyproject_metadata.py @@ -24,14 +24,27 @@ def parse_python_project_metadata( root = Path(project_root) pyproject_path = root / "pyproject.toml" + payload = _read_pyproject_payload(pyproject_path) + if payload is None: + return None + return _metadata_from_payload(root, pyproject_path, payload) + + +def _read_pyproject_payload(pyproject_path: Path) -> dict[str, Any] | None: if not pyproject_path.exists(): return None try: - payload = tomllib.loads(pyproject_path.read_text(encoding="utf-8")) + return tomllib.loads(pyproject_path.read_text(encoding="utf-8")) except (OSError, tomllib.TOMLDecodeError, UnicodeDecodeError): return None + +def _metadata_from_payload( + root: Path, + pyproject_path: Path, + payload: dict[str, Any], +) -> PythonProjectMetadata: has_project_table = isinstance(payload.get("project"), dict) has_build_system_table = isinstance(payload.get("build-system"), dict) project = _table(payload.get("project")) diff --git a/src/python_lang_parser/_reasoning_tree_imports.py b/src/python_lang_parser/_reasoning_tree_imports.py index d9da266..e39085e 100644 --- a/src/python_lang_parser/_reasoning_tree_imports.py +++ b/src/python_lang_parser/_reasoning_tree_imports.py @@ -22,46 +22,85 @@ def python_reasoning_tree_import_edges( modules_by_namespace = _modules_by_namespace(modules) edges: list[PythonReasoningTreeImportEdge] = [] seen: set[tuple[str, tuple[str, ...], str, str, int, int]] = set() + for edge in _iter_import_edges( + modules, + namespaces=namespaces, + modules_by_namespace=modules_by_namespace, + ): + key = _edge_key(edge) + if key in seen: + continue + seen.add(key) + edges.append(edge) + return tuple(sorted(edges, key=lambda item: (item.importer_path, item.line))) + + +def _iter_import_edges( + modules: tuple[PythonReasoningTreeModuleInfo, ...], + *, + namespaces: set[tuple[str, ...]], + modules_by_namespace: dict[tuple[str, ...], PythonReasoningTreeModuleInfo], +) -> tuple[PythonReasoningTreeImportEdge, ...]: + edges: list[PythonReasoningTreeImportEdge] = [] for module in modules: if not module.namespace: continue for import_record in module.report.imports: - for import_name, bound_name, imported_namespace in _resolved_imports( + for resolved in _resolved_imports( module, import_record, namespaces=namespaces, ): - if imported_namespace == module.namespace: - continue - imported_module = modules_by_namespace.get(imported_namespace) - if imported_module is None: - continue - key = ( - module.path, - imported_namespace, - import_name, - import_record.scope, - import_record.location.line, - import_record.location.column, + edge = _import_edge( + module, + import_record, + resolved, + modules_by_namespace=modules_by_namespace, ) - if key in seen: + if edge is None: continue - seen.add(key) - edges.append( - PythonReasoningTreeImportEdge( - importer_path=module.path, - importer_namespace=module.namespace, - imported_path=imported_module.path, - imported_namespace=imported_namespace, - import_name=import_name, - bound_name=bound_name, - scope=import_record.scope, - line=import_record.location.line, - column=import_record.location.column, - is_relative=import_record.level > 0, - ) - ) - return tuple(sorted(edges, key=lambda item: (item.importer_path, item.line))) + edges.append(edge) + return tuple(edges) + + +def _import_edge( + module: PythonReasoningTreeModuleInfo, + import_record: PythonImport, + resolved: tuple[str, str, tuple[str, ...]], + *, + modules_by_namespace: dict[tuple[str, ...], PythonReasoningTreeModuleInfo], +) -> PythonReasoningTreeImportEdge | None: + import_name, bound_name, imported_namespace = resolved + if imported_namespace == module.namespace: + return None + imported_module = modules_by_namespace.get(imported_namespace) + if imported_module is None: + return None + return PythonReasoningTreeImportEdge( + importer_path=module.path, + importer_namespace=module.namespace, + imported_path=imported_module.path, + imported_namespace=imported_namespace, + import_name=import_name, + bound_name=bound_name, + scope=import_record.scope, + line=import_record.location.line, + column=import_record.location.column, + is_relative=import_record.level > 0, + ) + + +def _edge_key( + edge: PythonReasoningTreeImportEdge, +) -> tuple[str, tuple[str, ...], str, str, int, int]: + return ( + edge.importer_path, + edge.imported_namespace, + edge.import_name, + edge.scope, + edge.line, + edge.column, + ) def _resolved_imports( diff --git a/src/python_lang_parser/_symbol_model.py b/src/python_lang_parser/_symbol_model.py index a7bc62d..0e0ace0 100644 --- a/src/python_lang_parser/_symbol_model.py +++ b/src/python_lang_parser/_symbol_model.py @@ -33,6 +33,28 @@ class PythonCallEffect(StrEnum): DEBUG_BREAKPOINT = "debug_breakpoint" +@dataclass(frozen=True, slots=True) +class PythonFunctionControlFlow: + """Parser-owned control-flow shape facts for one function body.""" + + statement_count: int = 0 + max_block_statement_count: int = 0 + branch_count: int = 0 + loop_count: int = 0 + match_count: int = 0 + return_count: int = 0 + terminal_else_count: int = 0 + max_nesting_depth: int = 0 + max_loop_nesting_depth: int = 0 + max_literal_dispatch_chain: int = 0 + nested_control_flow_count: int = 0 + + def to_dict(self) -> dict[str, object]: + """Return a JSON-compatible representation.""" + + return asdict(self) + + @dataclass(frozen=True, slots=True) class PythonImport: """One import statement collected from a Python module.""" @@ -66,6 +88,7 @@ class PythonSymbol: end_line: int | None decorators: tuple[str, ...] = field(default_factory=tuple) base_classes: tuple[str, ...] = field(default_factory=tuple) + control_flow: PythonFunctionControlFlow | None = None docstring: str | None = None has_annotations: bool = False is_public: bool = False @@ -78,6 +101,9 @@ def to_dict(self) -> dict[str, object]: payload["kind"] = self.kind.value payload["decorators"] = list(self.decorators) payload["base_classes"] = list(self.base_classes) + payload["control_flow"] = ( + None if self.control_flow is None else self.control_flow.to_dict() + ) return payload diff --git a/src/python_lang_parser/model.py b/src/python_lang_parser/model.py index 26da8f6..42c6958 100644 --- a/src/python_lang_parser/model.py +++ b/src/python_lang_parser/model.py @@ -23,6 +23,7 @@ PythonAssignmentTarget, PythonCall, PythonCallEffect, + PythonFunctionControlFlow, PythonImport, PythonModuleShape, PythonNameBinding, @@ -112,6 +113,7 @@ def to_dict(self) -> dict[str, Any]: "PythonDiagnosticSeverity", "PythonExportContract", "PythonExportContractKind", + "PythonFunctionControlFlow", "PythonImport", "PythonModuleReport", "PythonModuleShape", diff --git a/src/python_lang_project_harness/__init__.py b/src/python_lang_project_harness/__init__.py index 0238011..8e8f620 100644 --- a/src/python_lang_project_harness/__init__.py +++ b/src/python_lang_project_harness/__init__.py @@ -10,6 +10,7 @@ PythonDiagnosticSeverity, PythonExportContract, PythonExportContractKind, + PythonFunctionControlFlow, PythonImport, PythonModuleReport, PythonModuleShape, @@ -146,6 +147,7 @@ "PythonDiagnosticSeverity", "PythonExportContract", "PythonExportContractKind", + "PythonFunctionControlFlow", "PythonHarnessConfig", "PythonHarnessFinding", "PythonHarnessReport", diff --git a/src/python_lang_project_harness/_agent_policy.py b/src/python_lang_project_harness/_agent_policy.py index 8108c6b..7974d30 100644 --- a/src/python_lang_project_harness/_agent_policy.py +++ b/src/python_lang_project_harness/_agent_policy.py @@ -21,6 +21,10 @@ from ._agent_reasoning_tree import agent_reasoning_tree_findings from ._model import PythonHarnessFinding, PythonRulePackDescriptor from ._source import path_location +from .agent_readability import ( + agent_algorithm_shape_findings, + agent_function_compactness_findings, +) if TYPE_CHECKING: from collections.abc import Iterable, Sequence @@ -55,6 +59,8 @@ def evaluate(self, report: PythonModuleReport) -> Iterable[PythonHarnessFinding] findings: list[PythonHarnessFinding] = [] findings.extend(_module_docstring_findings(report, self.pack_id)) findings.extend(_public_callable_annotation_findings(report, self.pack_id)) + findings.extend(agent_algorithm_shape_findings(report, self.pack_id)) + findings.extend(agent_function_compactness_findings(report, self.pack_id)) return tuple(findings) def evaluate_project_modules( diff --git a/src/python_lang_project_harness/_agent_policy_catalog.py b/src/python_lang_project_harness/_agent_policy_catalog.py index 958e932..1f465be 100644 --- a/src/python_lang_project_harness/_agent_policy_catalog.py +++ b/src/python_lang_project_harness/_agent_policy_catalog.py @@ -17,6 +17,8 @@ PY_AGENT_R006 = "PY-AGENT-R006" PY_AGENT_R007 = "PY-AGENT-R007" PY_AGENT_R008 = "PY-AGENT-R008" +PY_AGENT_R009 = "PY-AGENT-R009" +PY_AGENT_R010 = "PY-AGENT-R010" _RULE_LABELS = { "language": "python", @@ -87,6 +89,22 @@ requirement="Split crowded branch packages into focused subpackages, or document the facade and owner map so agents do not treat one folder as one responsibility.", labels=dict(_RULE_LABELS), ), + PythonHarnessRule( + rule_id=PY_AGENT_R009, + pack_id=AGENT_POLICY_PACK_ID, + severity=PythonDiagnosticSeverity.INFO, + title="Function hides algorithm behind nested control flow", + requirement="Flatten deeply nested if/loop logic into guard clauses, explicit dispatch, match/case, or small named pipeline steps so agents can reason about the algorithm shape.", + labels=dict(_RULE_LABELS), + ), + PythonHarnessRule( + rule_id=PY_AGENT_R010, + pack_id=AGENT_POLICY_PACK_ID, + severity=PythonDiagnosticSeverity.INFO, + title="Function owns a broad linear algorithm surface", + requirement="Split long public functions with broad linear statement blocks into small named helpers or pipeline steps so agents can edit one algorithm responsibility at a time.", + labels=dict(_RULE_LABELS), + ), ) _RULE_BY_ID = {rule.rule_id: rule for rule in _RULES} diff --git a/src/python_lang_project_harness/_discovery.py b/src/python_lang_project_harness/_discovery.py index be62312..75847c4 100644 --- a/src/python_lang_project_harness/_discovery.py +++ b/src/python_lang_project_harness/_discovery.py @@ -25,21 +25,37 @@ def discover_python_files( ) discovered: list[Path] = [] seen: set[Path] = set() + for candidate in _iter_python_file_candidates( + paths, + ignored_dir_names=ignored_names, + ): + _append_unique_path(discovered, seen, candidate) + return tuple(sorted(discovered, key=lambda item: item.as_posix())) + + +def _iter_python_file_candidates( + paths: Sequence[str | Path], + *, + ignored_dir_names: frozenset[str], +) -> tuple[Path, ...]: + candidates: list[Path] = [] for raw_path in paths: path = Path(raw_path) if path.is_file(): if path.suffix == ".py": - _append_unique_path(discovered, seen, path) + candidates.append(path) continue if path.is_dir(): - for candidate in path.rglob("*.py"): + candidates.extend( + candidate + for candidate in path.rglob("*.py") if is_scannable_python_file( candidate, scan_root=path, - ignored_dir_names=ignored_names, - ): - _append_unique_path(discovered, seen, candidate) - return tuple(sorted(discovered, key=lambda item: item.as_posix())) + ignored_dir_names=ignored_dir_names, + ) + ) + return tuple(candidates) def python_project_harness_paths( diff --git a/src/python_lang_project_harness/_project_config.py b/src/python_lang_project_harness/_project_config.py index 4002889..e820278 100644 --- a/src/python_lang_project_harness/_project_config.py +++ b/src/python_lang_project_harness/_project_config.py @@ -32,8 +32,13 @@ def read_python_project_harness_config( ) -> PythonHarnessConfig | None: """Read `[tool.python-lang-project-harness]` from `pyproject.toml`.""" - root = Path(project_root) - pyproject_path = root / "pyproject.toml" + table = _read_project_config_table(Path(project_root) / "pyproject.toml") + if not table: + return None + return PythonHarnessConfig(**_harness_config_kwargs(table)) + + +def _read_project_config_table(pyproject_path: Path) -> dict[str, Any] | None: if not pyproject_path.exists(): return None @@ -45,7 +50,10 @@ def read_python_project_harness_config( table = _table(_table(payload.get("tool")).get(_TOOL_TABLE_NAME)) if not table: return None + return table + +def _harness_config_kwargs(table: dict[str, Any]) -> dict[str, object]: kwargs: dict[str, object] = {} _put_bool(kwargs, table, "include_tests") _put_string_tuple(kwargs, table, "source_dir_names") @@ -56,7 +64,7 @@ def read_python_project_harness_config( _put_string_frozenset(kwargs, table, "blocking_rule_ids") _put_severity_frozenset(kwargs, table, "blocking_severities") _put_verification_policy(kwargs, table) - return PythonHarnessConfig(**kwargs) + return kwargs def _put_bool( diff --git a/src/python_lang_project_harness/agent_readability/__init__.py b/src/python_lang_project_harness/agent_readability/__init__.py new file mode 100644 index 0000000..0f26d9a --- /dev/null +++ b/src/python_lang_project_harness/agent_readability/__init__.py @@ -0,0 +1,8 @@ +"""Agent-readable code-shape policies built from parser facts.""" + +from __future__ import annotations + +from .algorithm_shape import agent_algorithm_shape_findings +from .function_compactness import agent_function_compactness_findings + +__all__ = ["agent_algorithm_shape_findings", "agent_function_compactness_findings"] diff --git a/src/python_lang_project_harness/agent_readability/algorithm_shape.py b/src/python_lang_project_harness/agent_readability/algorithm_shape.py new file mode 100644 index 0000000..e98750c --- /dev/null +++ b/src/python_lang_project_harness/agent_readability/algorithm_shape.py @@ -0,0 +1,108 @@ +"""Agent-facing algorithm-shape advice from parser-owned function facts.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from python_lang_parser import ( + python_symbol_is_public_callable_boundary, + python_symbol_is_public_class, +) + +from .._agent_policy_catalog import PY_AGENT_R009, agent_policy_rule +from .._model import PythonHarnessFinding + +if TYPE_CHECKING: + from python_lang_parser import ( + PythonFunctionControlFlow, + PythonModuleReport, + PythonSymbol, + ) + + +def agent_algorithm_shape_findings( + report: PythonModuleReport, + pack_id: str, +) -> tuple[PythonHarnessFinding, ...]: + """Return machine-readability advice for public algorithm boundaries.""" + + findings: list[PythonHarnessFinding] = [] + rule = agent_policy_rule(PY_AGENT_R009) + public_class_scopes = _public_class_scopes(report) + for symbol in report.symbols: + control_flow = symbol.control_flow + if ( + not _is_agent_algorithm_boundary( + symbol, + public_class_scopes=public_class_scopes, + ) + or control_flow is None + ): + continue + profile = _agent_algorithm_profile(control_flow) + if not profile: + continue + findings.append( + PythonHarnessFinding( + rule_id=rule.rule_id, + pack_id=pack_id, + severity=rule.severity, + title=rule.title, + summary=_summary(symbol, control_flow), + location=symbol.location, + requirement=f"{rule.requirement} Signals: {', '.join(profile)}.", + source_line=report.source_line(symbol.location.line), + label="make this algorithm shape explicit", + labels=dict(rule.labels), + ) + ) + return tuple(findings) + + +def _is_agent_algorithm_boundary( + symbol: PythonSymbol, + *, + public_class_scopes: frozenset[str], +) -> bool: + return python_symbol_is_public_callable_boundary( + symbol, + public_class_scopes=public_class_scopes, + ) + + +def _public_class_scopes(report: PythonModuleReport) -> frozenset[str]: + return frozenset( + symbol.qualified_name + for symbol in report.symbols + if python_symbol_is_public_class(symbol) + ) + + +def _agent_algorithm_profile( + control_flow: PythonFunctionControlFlow, +) -> tuple[str, ...]: + indicators: list[str] = [] + if control_flow.max_nesting_depth >= 4: + indicators.append("deep control-flow nesting") + if control_flow.max_loop_nesting_depth >= 2 and control_flow.branch_count >= 3: + indicators.append("nested loops mixed with branches") + if control_flow.max_literal_dispatch_chain >= 4 and control_flow.match_count == 0: + indicators.append("literal dispatch chain without match/case") + if control_flow.terminal_else_count >= 2 and control_flow.max_nesting_depth >= 3: + indicators.append("else blocks after terminal branches") + if control_flow.nested_control_flow_count >= 4: + indicators.append("many nested control-flow blocks") + return tuple(indicators) + + +def _summary( + symbol: PythonSymbol, + control_flow: PythonFunctionControlFlow, +) -> str: + return ( + f"{symbol.qualified_name} has nesting depth " + f"{control_flow.max_nesting_depth}, loop nesting " + f"{control_flow.max_loop_nesting_depth}, " + f"{control_flow.branch_count} branches, and " + f"literal dispatch chain {control_flow.max_literal_dispatch_chain}." + ) diff --git a/src/python_lang_project_harness/agent_readability/function_compactness.py b/src/python_lang_project_harness/agent_readability/function_compactness.py new file mode 100644 index 0000000..45acf45 --- /dev/null +++ b/src/python_lang_project_harness/agent_readability/function_compactness.py @@ -0,0 +1,121 @@ +"""Agent-facing compact-function advice from parser-owned function facts.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from python_lang_parser import ( + python_symbol_is_public_callable_boundary, + python_symbol_is_public_class, +) + +from .._agent_policy_catalog import PY_AGENT_R010, agent_policy_rule +from .._model import PythonHarnessFinding + +if TYPE_CHECKING: + from python_lang_parser import ( + PythonFunctionControlFlow, + PythonModuleReport, + PythonSymbol, + ) + +_MAX_PUBLIC_FUNCTION_LINES = 72 +_MAX_LINEAR_BLOCK_STATEMENTS = 14 +_MAX_TOTAL_STATEMENTS = 22 +_MAX_NESTING_FOR_LINEAR_RULE = 2 + + +def agent_function_compactness_findings( + report: PythonModuleReport, + pack_id: str, +) -> tuple[PythonHarnessFinding, ...]: + """Return machine-readability advice for broad public function bodies.""" + + findings: list[PythonHarnessFinding] = [] + rule = agent_policy_rule(PY_AGENT_R010) + public_class_scopes = _public_class_scopes(report) + for symbol in report.symbols: + control_flow = symbol.control_flow + if ( + not _is_agent_function_boundary( + symbol, + public_class_scopes=public_class_scopes, + ) + or control_flow is None + ): + continue + line_span = _symbol_line_span(symbol) + profile = _compactness_profile(control_flow, line_span=line_span) + if not profile: + continue + findings.append( + PythonHarnessFinding( + rule_id=rule.rule_id, + pack_id=pack_id, + severity=rule.severity, + title=rule.title, + summary=_summary(symbol, control_flow, line_span=line_span), + location=symbol.location, + requirement=f"{rule.requirement} Signals: {', '.join(profile)}.", + source_line=report.source_line(symbol.location.line), + label="split this broad function into named algorithm steps", + labels=dict(rule.labels), + ) + ) + return tuple(findings) + + +def _is_agent_function_boundary( + symbol: PythonSymbol, + *, + public_class_scopes: frozenset[str], +) -> bool: + return python_symbol_is_public_callable_boundary( + symbol, + public_class_scopes=public_class_scopes, + ) + + +def _public_class_scopes(report: PythonModuleReport) -> frozenset[str]: + return frozenset( + symbol.qualified_name + for symbol in report.symbols + if python_symbol_is_public_class(symbol) + ) + + +def _compactness_profile( + control_flow: PythonFunctionControlFlow, + *, + line_span: int, +) -> tuple[str, ...]: + if control_flow.max_nesting_depth > _MAX_NESTING_FOR_LINEAR_RULE: + return () + indicators: list[str] = [] + if ( + line_span >= _MAX_PUBLIC_FUNCTION_LINES + and control_flow.statement_count >= _MAX_TOTAL_STATEMENTS + ): + indicators.append("long public function body") + if control_flow.max_block_statement_count >= _MAX_LINEAR_BLOCK_STATEMENTS: + indicators.append("large linear statement block") + return tuple(indicators) + + +def _symbol_line_span(symbol: PythonSymbol) -> int: + if symbol.end_line is None: + return 0 + return max(1, symbol.end_line - symbol.location.line + 1) + + +def _summary( + symbol: PythonSymbol, + control_flow: PythonFunctionControlFlow, + *, + line_span: int, +) -> str: + return ( + f"{symbol.qualified_name} spans {line_span} lines with " + f"{control_flow.statement_count} statements and a " + f"{control_flow.max_block_statement_count}-statement linear block." + ) diff --git a/tests/unit/harness/test_agent_algorithm_policy.py b/tests/unit/harness/test_agent_algorithm_policy.py new file mode 100644 index 0000000..7088d4f --- /dev/null +++ b/tests/unit/harness/test_agent_algorithm_policy.py @@ -0,0 +1,118 @@ +from __future__ import annotations + +from dataclasses import replace +from typing import TYPE_CHECKING + +from snapshot_support import assert_snapshot, normalize_temp_root + +from python_lang_project_harness import ( + render_python_lang_harness, + run_python_lang_harness, +) + +if TYPE_CHECKING: + from pathlib import Path + + +def test_py_agent_r009_algorithm_shape_snapshot(tmp_path: Path) -> None: + source = tmp_path / "service.py" + source.write_text( + ''' +"""Service algorithms.""" + + +def classify(kind: str, rows: list[object]) -> int: + for row in rows: + if row: + if kind == "alpha": + return 1 + elif kind == "beta": + return 2 + elif kind == "gamma": + return 3 + elif kind == "delta": + return 4 + else: + return 0 + return -1 +''', + encoding="utf-8", + ) + + report = run_python_lang_harness([source]) + filtered = replace( + report, + findings=tuple( + finding for finding in report.findings if finding.rule_id == "PY-AGENT-R009" + ), + ) + rendered = normalize_temp_root(render_python_lang_harness(filtered), tmp_path) + + assert filtered.findings, "expected PY-AGENT-R009 finding" + assert_snapshot( + "unit_test__agent_policy_snapshot__py_agent_r009_algorithm_shape", + rendered, + source="tests/unit/harness/test_agent_algorithm_policy.py", + ) + + +def test_py_agent_r009_accepts_explicit_match_dispatch(tmp_path: Path) -> None: + source = tmp_path / "service.py" + source.write_text( + ''' +"""Service algorithms.""" + + +def classify(kind: str) -> int: + match kind: + case "alpha": + return 1 + case "beta": + return 2 + case "gamma": + return 3 + case "delta": + return 4 + case _: + return 0 +''', + encoding="utf-8", + ) + + report = run_python_lang_harness([source]) + + assert not any(finding.rule_id == "PY-AGENT-R009" for finding in report.findings) + + +def test_py_agent_r010_function_compactness_snapshot(tmp_path: Path) -> None: + source = tmp_path / "service.py" + source.write_text(_broad_linear_function_source(), encoding="utf-8") + + report = run_python_lang_harness([source]) + filtered = replace( + report, + findings=tuple( + finding for finding in report.findings if finding.rule_id == "PY-AGENT-R010" + ), + ) + rendered = normalize_temp_root(render_python_lang_harness(filtered), tmp_path) + + assert filtered.findings, "expected PY-AGENT-R010 finding" + assert_snapshot( + "unit_test__agent_policy_snapshot__py_agent_r010_function_compactness", + rendered, + source="tests/unit/harness/test_agent_algorithm_policy.py", + ) + + +def _broad_linear_function_source() -> str: + lines = [ + '"""Service algorithms."""', + "", + "", + "def summarize(value: int) -> int:", + ] + for index in range(15): + lines.append(f" step_{index} = value + {index}") + lines.append(" return step_0") + return "\n".join(lines) + "\n" diff --git a/tests/unit/harness/test_agent_policy.py b/tests/unit/harness/test_agent_policy.py index 5f91be2..9e321c8 100644 --- a/tests/unit/harness/test_agent_policy.py +++ b/tests/unit/harness/test_agent_policy.py @@ -259,5 +259,7 @@ def test_agent_policy_descriptor_and_catalog_are_stable() -> None: "PY-AGENT-R006", "PY-AGENT-R007", "PY-AGENT-R008", + "PY-AGENT-R009", + "PY-AGENT-R010", ] assert {rule.severity for rule in rules} == {PythonDiagnosticSeverity.INFO} diff --git a/tests/unit/harness/test_parser_boundary_contract.py b/tests/unit/harness/test_parser_boundary_contract.py index 94ce09c..b76f1a9 100644 --- a/tests/unit/harness/test_parser_boundary_contract.py +++ b/tests/unit/harness/test_parser_boundary_contract.py @@ -123,3 +123,21 @@ def test_harness_pyproject_metadata_comes_from_parser_boundary() -> None: source = path.read_text(encoding="utf-8") assert "import tomllib" not in source, path assert "tomllib." not in source, path + + +def test_agent_readability_policy_consumes_parser_function_facts() -> None: + readability_root = ( + _PROJECT_ROOT / "src" / "python_lang_project_harness" / "agent_readability" + ) + + for path in sorted(readability_root.glob("*.py")): + source = path.read_text(encoding="utf-8") + assert "import ast" not in source, path + assert "ast." not in source, path + + combined = "\n".join( + path.read_text(encoding="utf-8") + for path in sorted(readability_root.glob("*.py")) + ) + assert "symbol.control_flow" in combined + assert "PythonFunctionControlFlow" in combined diff --git a/tests/unit/harness/test_policy_contract.py b/tests/unit/harness/test_policy_contract.py index 4fa3f81..66e5f9b 100644 --- a/tests/unit/harness/test_policy_contract.py +++ b/tests/unit/harness/test_policy_contract.py @@ -43,6 +43,8 @@ "unit_test__agent_policy_snapshot__py_agent_r006_value_conflict.snap", "unit_test__agent_policy_snapshot__py_agent_r007_branch_intent.snap", "unit_test__agent_policy_snapshot__py_agent_r008_branch_surface.snap", + "unit_test__agent_policy_snapshot__py_agent_r009_algorithm_shape.snap", + "unit_test__agent_policy_snapshot__py_agent_r010_function_compactness.snap", "unit_test__policy_snapshot__py_mod_r001_wildcard_import.snap", "unit_test__policy_snapshot__py_mod_r002_bare_print.snap", "unit_test__policy_snapshot__py_mod_r003_facade_all.snap", @@ -141,6 +143,8 @@ def test_rule_catalogs_expose_stable_rule_ids() -> None: "PY-AGENT-R006", "PY-AGENT-R007", "PY-AGENT-R008", + "PY-AGENT-R009", + "PY-AGENT-R010", ] assert [rule.rule_id for rule in python_test_layout_rules()] == [ "PY-TEST-R001", diff --git a/tests/unit/python_lang_parser/test_control_flow.py b/tests/unit/python_lang_parser/test_control_flow.py new file mode 100644 index 0000000..97deb24 --- /dev/null +++ b/tests/unit/python_lang_parser/test_control_flow.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from pathlib import Path + +from python_lang_parser import parse_python_source + +_PROJECT_ROOT = next( + parent + for parent in Path(__file__).resolve().parents + if (parent / "pyproject.toml").exists() and (parent / "src").exists() +) + + +def test_parse_python_source_collects_function_control_flow_shape() -> None: + report = parse_python_source( + ''' +"""Control-flow fixture.""" + + +def classify(kind: str, rows: list[object]) -> int: + for row in rows: + if row: + if kind == "alpha": + return 1 + elif kind == "beta": + return 2 + elif kind == "gamma": + return 3 + elif kind == "delta": + return 4 + else: + return 0 + return -1 +''', + path="control_flow.py", + ) + + control_flow = report.symbols[0].control_flow + + assert control_flow is not None + assert control_flow.statement_count == 12 + assert control_flow.max_block_statement_count == 2 + assert control_flow.branch_count == 5 + assert control_flow.loop_count == 1 + assert control_flow.return_count == 6 + assert control_flow.terminal_else_count == 4 + assert control_flow.max_nesting_depth == 3 + assert control_flow.max_loop_nesting_depth == 1 + assert control_flow.max_literal_dispatch_chain == 4 + + +def test_control_flow_collector_avoids_module_wide_ast_walks() -> None: + source = ( + _PROJECT_ROOT / "src" / "python_lang_parser" / "_control_flow.py" + ).read_text(encoding="utf-8") + + assert "ast.walk" not in source + assert "generic_visit" not in source diff --git a/tests/unit/snapshots/unit_test__agent_policy_snapshot__py_agent_r009_algorithm_shape.snap b/tests/unit/snapshots/unit_test__agent_policy_snapshot__py_agent_r009_algorithm_shape.snap new file mode 100644 index 0000000..183807e --- /dev/null +++ b/tests/unit/snapshots/unit_test__agent_policy_snapshot__py_agent_r009_algorithm_shape.snap @@ -0,0 +1,10 @@ +--- +source: tests/unit/harness/test_agent_algorithm_policy.py +expression: rendered +--- +[advice] +[PY-AGENT-R009] Info: Function hides algorithm behind nested control flow + ,-[ $TEMP/service.py:5:1 ] + 5 | def classify(kind: str, rows: list[object]) -> int: + | `- make this algorithm shape explicit + |Required: Flatten deeply nested if/loop logic into guard clauses, explicit dispatch, match/case, or small named pipeline steps so agents can reason about the algorithm shape. Signals: literal dispatch chain without match/case, else blocks after terminal branches, many nested control-flow blocks. diff --git a/tests/unit/snapshots/unit_test__agent_policy_snapshot__py_agent_r010_function_compactness.snap b/tests/unit/snapshots/unit_test__agent_policy_snapshot__py_agent_r010_function_compactness.snap new file mode 100644 index 0000000..5828f02 --- /dev/null +++ b/tests/unit/snapshots/unit_test__agent_policy_snapshot__py_agent_r010_function_compactness.snap @@ -0,0 +1,10 @@ +--- +source: tests/unit/harness/test_agent_algorithm_policy.py +expression: rendered +--- +[advice] +[PY-AGENT-R010] Info: Function owns a broad linear algorithm surface + ,-[ $TEMP/service.py:4:1 ] + 4 | def summarize(value: int) -> int: + | `- split this broad function into named algorithm steps + |Required: Split long public functions with broad linear statement blocks into small named helpers or pipeline steps so agents can edit one algorithm responsibility at a time. Signals: large linear statement block. diff --git a/tests/unit/test_public_api.py b/tests/unit/test_public_api.py index 0102d7b..f857074 100644 --- a/tests/unit/test_public_api.py +++ b/tests/unit/test_public_api.py @@ -9,6 +9,7 @@ def test_root_package_reexports_parser_fact_models() -> None: assert harness_api.PythonCallEffect is parser_api.PythonCallEffect assert harness_api.PythonExportContract is parser_api.PythonExportContract assert harness_api.PythonExportContractKind is parser_api.PythonExportContractKind + assert harness_api.PythonFunctionControlFlow is parser_api.PythonFunctionControlFlow assert harness_api.PythonModuleShape is parser_api.PythonModuleShape assert harness_api.PythonProjectDependency is parser_api.PythonProjectDependency assert harness_api.PythonProjectEntryPoint is parser_api.PythonProjectEntryPoint @@ -93,6 +94,8 @@ def test_root_package_reexports_parser_fact_models() -> None: assert "PythonProjectDependency" in parser_api.__all__ assert "PythonPytestOptions" in parser_api.__all__ assert "PythonReasoningTreeImportEdge" in parser_api.__all__ + assert "PythonFunctionControlFlow" in parser_api.__all__ + assert "PythonFunctionControlFlow" in harness_api.__all__ assert "parse_python_project_metadata" in parser_api.__all__ assert "python_reasoning_tree_facts" in harness_api.__all__ assert "python_scope_is_public" in harness_api.__all__