Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/panel_reactflow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2166,21 +2166,18 @@ def _handle_msg(self, msg: dict[str, Any]) -> None:
if edge is None:
return
self.add_edge(edge)
self._emit("edge_added", msg)
case "node_deleted":
node_ids = msg.get("node_ids") or []
if msg.get("node_id"):
node_ids = list(set(node_ids) | {msg.get("node_id")})
for node_id in node_ids:
self.remove_node(node_id)
self._emit("node_deleted", msg)
case "edge_deleted":
edge_ids = msg.get("edge_ids") or []
if msg.get("edge_id"):
edge_ids = list(set(edge_ids) | {msg.get("edge_id")})
for edge_id in edge_ids:
self.remove_edge(edge_id)
self._emit("edge_deleted", msg)
case "node_clicked":
node_id = msg.get("node_id")
if node_id is None:
Expand Down
171 changes: 171 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,177 @@ def test_reactflow_events_and_selection() -> None:
assert flow.edges[0]["data"]["weight"] == 0.25


def test_handle_msg_edge_added_emits_once() -> None:
"""Frontend connect sends edge_added; Python must not double-emit (add_edge already emits)."""
flow = ReactFlow(
nodes=[
{"id": "n1", "position": {"x": 0, "y": 0}, "data": {}},
{"id": "n2", "position": {"x": 1, "y": 1}, "data": {}},
],
edges=[],
)
events: list[dict] = []
flow.on("edge_added", events.append)
flow._handle_msg(
{
"type": "edge_added",
"edge": {"id": "n1->n2", "source": "n1", "target": "n2"},
},
)
assert len(events) == 1
assert events[0]["type"] == "edge_added"
assert events[0]["edge"]["id"] == "n1->n2"
assert len(flow.edges) == 1


def test_handle_msg_node_moved_emits_once() -> None:
flow = ReactFlow(nodes=[{"id": "n1", "position": {"x": 0, "y": 0}, "data": {}}])
events: list[dict] = []
flow.on("node_moved", events.append)
flow._handle_msg({"type": "node_moved", "node_id": "n1", "position": {"x": 10, "y": 20}})
assert len(events) == 1
assert events[0]["type"] == "node_moved"
assert events[0]["node_id"] == "n1"
assert events[0]["position"] == {"x": 10, "y": 20}
assert flow.nodes[0]["position"] == {"x": 10, "y": 20}


def test_handle_msg_selection_changed_emits_once() -> None:
flow = ReactFlow(
nodes=[
{"id": "n1", "position": {"x": 0, "y": 0}, "data": {}},
{"id": "n2", "position": {"x": 1, "y": 1}, "data": {}},
],
edges=[{"id": "e1", "source": "n1", "target": "n2", "data": {}}],
)
events: list[dict] = []
flow.on("selection_changed", events.append)
flow._handle_msg({"type": "selection_changed", "nodes": ["n1"], "edges": ["e1"]})
assert len(events) == 1
assert events[0]["type"] == "selection_changed"
assert flow.selection == {"nodes": ["n1"], "edges": ["e1"]}


def test_handle_msg_node_clicked_emits_once() -> None:
flow = ReactFlow(nodes=[{"id": "n1", "position": {"x": 0, "y": 0}, "data": {}}])
events: list[dict] = []
flow.on("node_clicked", events.append)
flow._handle_msg({"type": "node_clicked", "node_id": "n1", "button": 0})
assert len(events) == 1
assert events[0]["type"] == "node_clicked"
assert events[0]["node_id"] == "n1"


def test_handle_msg_sync_emits_once() -> None:
flow = ReactFlow()
events: list[dict] = []
flow.on("sync", events.append)
msg = {
"type": "sync",
"nodes": [
{
"id": "n1",
"position": {"x": 0, "y": 0},
"type": "panel",
"data": {},
"selected": False,
}
],
"edges": [],
}
flow._handle_msg(msg)
assert len(events) == 1
assert events[0]["type"] == "sync"
assert [n["id"] for n in flow.nodes] == ["n1"]


def test_handle_msg_node_deleted_emits_once_for_single_node() -> None:
"""remove_node already emits; _handle_msg must not emit a duplicate batch message."""
flow = ReactFlow(
nodes=[
{"id": "n1", "position": {"x": 0, "y": 0}, "data": {}},
{"id": "n2", "position": {"x": 1, "y": 1}, "data": {}},
],
edges=[{"id": "e1", "source": "n1", "target": "n2", "data": {}}],
)
events: list[dict] = []
flow.on("node_deleted", events.append)
flow._handle_msg(
{
"type": "node_deleted",
"node_id": "n1",
"node_ids": ["n1"],
"deleted_edges": ["e1"],
},
)
assert len(events) == 1
assert events[0]["type"] == "node_deleted"
assert events[0]["node_id"] == "n1"
assert events[0]["deleted_edges"] == ["e1"]
assert [n["id"] for n in flow.nodes] == ["n2"]
assert flow.edges == []


def test_handle_msg_node_deleted_one_event_per_node_when_batch() -> None:
flow = ReactFlow(
nodes=[
{"id": "n1", "position": {"x": 0, "y": 0}, "data": {}},
{"id": "n2", "position": {"x": 1, "y": 1}, "data": {}},
],
edges=[],
)
events: list[dict] = []
flow.on("node_deleted", events.append)
flow._handle_msg(
{
"type": "node_deleted",
"node_id": None,
"node_ids": ["n1", "n2"],
"deleted_edges": [],
},
)
assert len(events) == 2
assert {e["node_id"] for e in events} == {"n1", "n2"}
assert flow.nodes == []


def test_handle_msg_edge_deleted_emits_once() -> None:
"""remove_edge already emits; _handle_msg must not emit a duplicate batch message."""
flow = ReactFlow(
nodes=[
{"id": "n1", "position": {"x": 0, "y": 0}, "data": {}},
{"id": "n2", "position": {"x": 1, "y": 1}, "data": {}},
],
edges=[{"id": "e1", "source": "n1", "target": "n2", "data": {}}],
)
events: list[dict] = []
flow.on("edge_deleted", events.append)
flow._handle_msg({"type": "edge_deleted", "edge_id": "e1", "edge_ids": ["e1"]})
assert len(events) == 1
assert events[0]["type"] == "edge_deleted"
assert events[0]["edge_id"] == "e1"
assert flow.edges == []


def test_handle_msg_edge_deleted_one_event_per_edge_when_batch() -> None:
flow = ReactFlow(
nodes=[
{"id": "n1", "position": {"x": 0, "y": 0}, "data": {}},
{"id": "n2", "position": {"x": 1, "y": 1}, "data": {}},
],
edges=[
{"id": "e1", "source": "n1", "target": "n2", "data": {}},
{"id": "e2", "source": "n2", "target": "n1", "data": {}},
],
)
events: list[dict] = []
flow.on("edge_deleted", events.append)
flow._handle_msg({"type": "edge_deleted", "edge_id": None, "edge_ids": ["e1", "e2"]})
assert len(events) == 2
assert {e["edge_id"] for e in events} == {"e1", "e2"}
assert flow.edges == []


@nx_available
def test_reactflow_to_networkx() -> None:
flow = ReactFlow()
Expand Down
Loading