Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/03_features/201_rule_catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ LLMs and are not blocking by default.
a reasoning-tree intent docstring.
- `PY-AGENT-R008`: broad branch packages should split into focused subpackages
or document the facade and owner map for agent repair loops.
- `PY-AGENT-R009`: public functions with deeply nested control flow should
expose the algorithm shape through guard clauses, explicit dispatch,
`match/case`, or small named pipeline steps.
- `PY-AGENT-R010`: public functions with broad linear statement blocks should
split into named helpers or pipeline steps that are easier for agents to edit.

## Reasoning Tree Policy

Expand All @@ -115,6 +120,26 @@ folders surface as owner-map advice without becoming a raw child-count gate.
Packages that already expose an explicit public facade are treated as having
an owner map.

`PY-AGENT-R009` is backed by parser-owned function control-flow facts, not by
harness string scanning. The parser records branch count, loop count, maximum
nesting, loop nesting, terminal `else` opportunities, and repeated literal
dispatch chains for each function symbol during the normal AST collection pass.
The harness turns those facts into a compact repair hint when a public function
hides its algorithm behind nested `if`/loop structure. The rule stays advisory
by default so teams can tune or promote it after seeing their project shape.

The implementation lives under `python_lang_project_harness.agent_readability`
because the target reader is the repair agent, not a human style reviewer. The
goal is short, explicit algorithm surfaces that an LLM can use from the
reasoning tree: guard clauses instead of nested `else`, `match/case` or dispatch
tables instead of literal branch ladders, and small named pipeline steps instead
of one broad loop body. Performance remains parser-first: the harness only
consumes `PythonFunctionControlFlow` facts and does not run a second AST parse.
`PY-AGENT-R010` complements `PY-AGENT-R009`: the former catches long flat
procedure-like public functions, while the latter catches nested control-flow
shape. This keeps the advice compact and avoids telling the agent the same
thing twice.

`render_python_reasoning_tree()` exposes the same tree as compact text for LLM
repair loops. It includes an `[imports]` section for parser-resolved
project-internal edges, a compact `[project]` section for declared package
Expand Down
2 changes: 2 additions & 0 deletions src/python_lang_parser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
PythonDiagnosticSeverity,
PythonExportContract,
PythonExportContractKind,
PythonFunctionControlFlow,
PythonImport,
PythonModuleReport,
PythonModuleShape,
Expand All @@ -69,6 +70,7 @@
"PythonDiagnosticSeverity",
"PythonExportContract",
"PythonExportContractKind",
"PythonFunctionControlFlow",
"PythonImport",
"PythonModuleReport",
"PythonModuleShape",
Expand Down
6 changes: 6 additions & 0 deletions src/python_lang_parser/_ast_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
unparse,
)
from ._call_effects import call_effect
from ._control_flow import collect_function_control_flow
from ._export_model import PythonExportContract, PythonExportContractKind
from ._exports import literal_string_sequence
from .model import (
Expand Down Expand Up @@ -227,6 +228,11 @@ def _visit_symbol(
if isinstance(node, ast.ClassDef)
else ()
),
control_flow=(
None
if isinstance(node, ast.ClassDef)
else collect_function_control_flow(node)
),
docstring=ast.get_docstring(node),
has_annotations=symbol_has_annotations(node),
is_public=is_public_name(node.name),
Expand Down
305 changes: 305 additions & 0 deletions src/python_lang_parser/_control_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
"""Parser-owned function control-flow shape collection."""

from __future__ import annotations

import ast
from dataclasses import dataclass

from ._ast_names import unparse
from .model import PythonFunctionControlFlow


def collect_function_control_flow(
node: ast.FunctionDef | ast.AsyncFunctionDef,
) -> PythonFunctionControlFlow:
"""Return compact control-flow shape facts for one function body."""

collector = _ControlFlowCollector()
collector.visit_statements(node.body, nesting_depth=0, loop_depth=0)
return PythonFunctionControlFlow(
statement_count=collector.statement_count,
max_block_statement_count=collector.max_block_statement_count,
branch_count=collector.branch_count,
loop_count=collector.loop_count,
match_count=collector.match_count,
return_count=collector.return_count,
terminal_else_count=collector.terminal_else_count,
max_nesting_depth=collector.max_nesting_depth,
max_loop_nesting_depth=collector.max_loop_nesting_depth,
max_literal_dispatch_chain=collector.max_literal_dispatch_chain,
nested_control_flow_count=collector.nested_control_flow_count,
)


@dataclass(slots=True)
class _ControlFlowCollector:
statement_count: int = 0
max_block_statement_count: int = 0
branch_count: int = 0
loop_count: int = 0
match_count: int = 0
return_count: int = 0
terminal_else_count: int = 0
max_nesting_depth: int = 0
max_loop_nesting_depth: int = 0
max_literal_dispatch_chain: int = 0
nested_control_flow_count: int = 0

def visit_statements(
self,
statements: list[ast.stmt],
*,
nesting_depth: int,
loop_depth: int,
) -> None:
self.max_block_statement_count = max(
self.max_block_statement_count,
len(statements),
)
for statement in statements:
self.visit_statement(
statement,
nesting_depth=nesting_depth,
loop_depth=loop_depth,
)

def visit_statement(
self,
statement: ast.stmt,
*,
nesting_depth: int,
loop_depth: int,
) -> None:
self.statement_count += 1
if isinstance(statement, ast.If):
self._visit_if(
statement,
nesting_depth=nesting_depth,
loop_depth=loop_depth,
)
return
if isinstance(statement, ast.For | ast.AsyncFor | ast.While):
self._visit_loop(
statement,
nesting_depth=nesting_depth,
loop_depth=loop_depth,
)
return
if isinstance(statement, ast.Match):
self._visit_match(
statement,
nesting_depth=nesting_depth,
loop_depth=loop_depth,
)
return
if isinstance(statement, ast.Try):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Handle except blocks when collecting control-flow facts*

except* statements are represented as ast.TryStar, but this dispatcher only routes ast.Try into _visit_try. As a result, functions that use except* have most nested statements/branches skipped (only the top-level statement is counted), which underreports control-flow complexity and can suppress PY-AGENT-R009/PY-AGENT-R010 advice for those functions.

Useful? React with 👍 / 👎.

self._visit_try(
statement,
nesting_depth=nesting_depth,
loop_depth=loop_depth,
)
return
if isinstance(statement, ast.With | ast.AsyncWith):
self._visit_block(
statement.body,
nesting_depth=nesting_depth + 1,
loop_depth=loop_depth,
)
return
if isinstance(statement, ast.Return):
self.return_count += 1

def _visit_if(
self,
statement: ast.If,
*,
nesting_depth: int,
loop_depth: int,
) -> None:
depth = nesting_depth + 1
self._record_control_flow(depth)
self.branch_count += 1
self.max_literal_dispatch_chain = max(
self.max_literal_dispatch_chain,
_literal_dispatch_chain_count(statement),
)
if statement.orelse and _body_has_terminal_exit(statement.body):
self.terminal_else_count += 1
self.visit_statements(
statement.body,
nesting_depth=depth,
loop_depth=loop_depth,
)
if len(statement.orelse) == 1 and isinstance(statement.orelse[0], ast.If):
self.visit_statement(
statement.orelse[0],
nesting_depth=nesting_depth,
loop_depth=loop_depth,
)
return
self.visit_statements(
statement.orelse,
nesting_depth=depth,
loop_depth=loop_depth,
)

def _visit_loop(
self,
statement: ast.For | ast.AsyncFor | ast.While,
*,
nesting_depth: int,
loop_depth: int,
) -> None:
depth = nesting_depth + 1
next_loop_depth = loop_depth + 1
self._record_control_flow(depth)
self.loop_count += 1
self.max_loop_nesting_depth = max(
self.max_loop_nesting_depth,
next_loop_depth,
)
self.visit_statements(
statement.body,
nesting_depth=depth,
loop_depth=next_loop_depth,
)
self.visit_statements(
statement.orelse,
nesting_depth=depth,
loop_depth=next_loop_depth,
)

def _visit_match(
self,
statement: ast.Match,
*,
nesting_depth: int,
loop_depth: int,
) -> None:
depth = nesting_depth + 1
self._record_control_flow(depth)
self.match_count += 1
self.branch_count += len(statement.cases)
for case in statement.cases:
self.visit_statements(
case.body,
nesting_depth=depth,
loop_depth=loop_depth,
)

def _visit_try(
self,
statement: ast.Try,
*,
nesting_depth: int,
loop_depth: int,
) -> None:
depth = nesting_depth + 1
self._record_control_flow(depth)
self.branch_count += len(statement.handlers)
self.branch_count += 1 if statement.orelse else 0
self.branch_count += 1 if statement.finalbody else 0
self.visit_statements(
statement.body,
nesting_depth=depth,
loop_depth=loop_depth,
)
for handler in statement.handlers:
self.visit_statements(
handler.body,
nesting_depth=depth,
loop_depth=loop_depth,
)
self.visit_statements(
statement.orelse,
nesting_depth=depth,
loop_depth=loop_depth,
)
self.visit_statements(
statement.finalbody,
nesting_depth=depth,
loop_depth=loop_depth,
)

def _visit_block(
self,
statements: list[ast.stmt],
*,
nesting_depth: int,
loop_depth: int,
) -> None:
self.max_nesting_depth = max(self.max_nesting_depth, nesting_depth)
self.visit_statements(
statements,
nesting_depth=nesting_depth,
loop_depth=loop_depth,
)

def _record_control_flow(self, depth: int) -> None:
self.max_nesting_depth = max(self.max_nesting_depth, depth)
if depth >= 3:
self.nested_control_flow_count += 1


def _body_has_terminal_exit(statements: list[ast.stmt]) -> bool:
if not statements:
return False
return _statement_has_terminal_exit(statements[-1])


def _statement_has_terminal_exit(statement: ast.stmt) -> bool:
if isinstance(statement, ast.Return | ast.Raise | ast.Break | ast.Continue):
return True
if isinstance(statement, ast.If):
return (
bool(statement.orelse)
and _body_has_terminal_exit(statement.body)
and _body_has_terminal_exit(statement.orelse)
)
if isinstance(statement, ast.Match):
return bool(statement.cases) and all(
_body_has_terminal_exit(case.body) for case in statement.cases
)
return False


def _literal_dispatch_chain_count(statement: ast.If) -> int:
subject = _literal_dispatch_subject(statement.test)
if subject is None:
return 0
count = 0
current: ast.If | None = statement
while current is not None:
if _literal_dispatch_subject(current.test) != subject:
break
count += 1
current = (
current.orelse[0]
if len(current.orelse) == 1 and isinstance(current.orelse[0], ast.If)
else None
)
return count


def _literal_dispatch_subject(expression: ast.expr) -> str | None:
if not isinstance(expression, ast.Compare):
return None
if len(expression.ops) != 1 or len(expression.comparators) != 1:
return None
operator = expression.ops[0]
comparator = expression.comparators[0]
if isinstance(operator, ast.Eq | ast.Is) and _is_literal_dispatch_value(comparator):
return unparse(expression.left)
if isinstance(operator, ast.In) and _is_literal_container(comparator):
return unparse(expression.left)
return None


def _is_literal_dispatch_value(expression: ast.expr) -> bool:
return isinstance(expression, ast.Constant)


def _is_literal_container(expression: ast.expr) -> bool:
return isinstance(expression, ast.Tuple | ast.Set | ast.List) and all(
_is_literal_dispatch_value(item) for item in expression.elts
)
Loading
Loading