Competitor-Informed Bug Fixes Implementation Plan¶
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Fix 22 bugs in selectools identified by cross-referencing 95+ closed bug reports from Agno (39k stars) and 60+ from PraisonAI (6.9k stars) against selectools v0.21.0 code.
Architecture: TDD per bug — write failing regression test first, implement minimal fix, verify test passes, commit. Each bug is isolated enough to be fixed independently and tested independently. Bugs are grouped by severity; HIGH severity are shipping blockers.
Tech Stack: Python 3.9+, pytest, threading, asyncio, typing (Literal, Union, Optional).
Branch: v0.22.0-competitor-bug-fixes
Test command: pytest tests/ -x -q (or targeted: pytest tests/path/test.py::test_name -v)
Bug Inventory¶
HIGH severity (6) — shipping blockers¶
| ID | Bug | File | Competitor |
|---|---|---|---|
| BUG-01 | Streaming run()/arun() silently drops ToolCall objects | _provider_caller.py:217-236, 472-509 | Agno #6757 |
| BUG-02 | typing.Literal crashes @tool() creation | tools/decorators.py:16-46 | Agno #6720 |
| BUG-03 | asyncio.run() in 8 sync wrappers crashes in event loops | graph.py:479, 1059, supervisor.py:240, 4 patterns, pipeline.py:486 | PraisonAI #1165 |
| BUG-04 | HITL InterruptRequest dropped in parallel groups | orchestration/graph.py:1246 | Agno #4921 |
| BUG-05 | HITL InterruptRequest dropped in subgraphs | orchestration/graph.py:1315 | Agno #4921 |
| BUG-06 | ConversationMemory has no threading.Lock | memory.py | PraisonAI #1164 |
MEDIUM severity (9)¶
| ID | Bug | File | Competitor |
|---|---|---|---|
| BUG-07 | <think> tag content leaks into conversation history | providers/anthropic_provider.py:107-143 | Agno #6878 |
| BUG-08 | ChromaDB/Pinecone/Qdrant no batch size limits | rag/stores/chroma.py:119, +2 others | Agno #7030 |
| BUG-09 | MCP concurrent tool calls race on shared session | mcp/client.py:186 | Agno #6073 |
| BUG-10 | No type coercion for LLM tool args ("42" → int) | tools/base.py:326-344 | PraisonAI #410 |
| BUG-11 | Union[str, int] crashes @tool() creation | tools/decorators.py:26-31 | Agno #6720 |
| BUG-12 | Multi-interrupt generator nodes skip subsequent interrupts | orchestration/graph.py:1139-1166 | Agno #4921 |
| BUG-13 | GraphState.to_dict() doesn't serialize data dict (corrupts checkpoints) | orchestration/state.py:91,117 | Agno #7365 |
| BUG-14 | No session namespace isolation (shared session_id collision) | sessions.py | Agno #6275 |
| BUG-15 | Unbounded summary growth (context budget overflow) | agent/_memory_manager.py:99-100 | Agno #5011 |
LOW-MEDIUM severity (7)¶
| ID | Bug | File | Competitor |
|---|---|---|---|
| BUG-16 | _build_cancelled_result missing entity/KG extraction | agent/core.py:540-562 | CLAUDE.md #23 |
| BUG-17 | AgentTrace.add() not thread-safe in parallel branches | trace.py:118 | Agno #5847 |
| BUG-18 | Async observer exceptions silently lost | agent/_lifecycle.py:48 | Agno #6236 |
| BUG-19 | _clone_for_isolation shallow-copies, sharing observer state | agent/core.py:1124 | PraisonAI #1260 |
| BUG-20 | OTel/Langfuse observer dicts mutated without locks | observe/otel.py:46-48, observe/langfuse.py:55-57 | PraisonAI #1260 |
| BUG-21 | No vector store search result deduplication | All 4 store search() methods | Agno #7047 |
| BUG-22 | Optional[T] without default treated as required | tools/decorators.py:98 | Agno #7066 |
File Structure¶
Modified source files (by task): - src/selectools/agent/_provider_caller.py (Task 1 / BUG-01) - src/selectools/tools/decorators.py (Tasks 2, 11, 22) - src/selectools/tools/base.py (Tasks 2, 10) - src/selectools/orchestration/graph.py (Tasks 3, 4, 5, 12) - src/selectools/orchestration/supervisor.py (Task 3) - src/selectools/patterns/{team_lead,debate,reflective,plan_and_execute}.py (Task 3) - src/selectools/pipeline.py (Task 3) - src/selectools/memory.py (Task 6) - src/selectools/providers/anthropic_provider.py (Task 7) - src/selectools/rag/stores/chroma.py (Task 8) - src/selectools/rag/stores/pinecone.py (Task 8) - src/selectools/rag/stores/qdrant.py (Task 8) - src/selectools/mcp/client.py (Task 9) - src/selectools/orchestration/state.py (Task 13) - src/selectools/sessions.py (Task 14) - src/selectools/agent/_memory_manager.py (Task 15) - src/selectools/agent/core.py (Tasks 16, 19) - src/selectools/trace.py (Task 17) - src/selectools/agent/_lifecycle.py (Task 18) - src/selectools/observe/otel.py (Task 20) - src/selectools/observe/langfuse.py (Task 20) - src/selectools/rag/stores/{memory,sqlite,faiss}.py (Task 21)
New helper module: - src/selectools/_async_utils.py — Safe run_sync() helper for BUG-03
New regression tests (one per bug):
Per tests/CLAUDE.md, all regression tests are appended to the canonical file tests/agent/test_regression.py as new top-level test functions named test_bug{NN}_*. No new files or subdirectories are created — the tests/regressions/ layout was rejected in code review (I1). Each bug adds:
test_bug01_*— streaming tool calls (Task 1)test_bug02_*— literal types (Task 2)test_bug03_*— asyncio reentry (Task 3)test_bug04_*— parallel HITL (Task 4)test_bug05_*— subgraph HITL (Task 5)test_bug06_*— memory thread safety (Task 6)test_bug07_*— think tag stripping (Task 7)test_bug08_*— RAG batch limits (Task 8)test_bug09_*— MCP concurrent (Task 9)test_bug10_*— tool arg coercion (Task 10)test_bug11_*— union types (Task 11)test_bug12_*— multi-interrupt (Task 12)test_bug13_*— GraphState serialization (Task 13)test_bug14_*— session namespace (Task 14)test_bug15_*— summary cap (Task 15)test_bug16_*— cancelled extraction (Task 16)test_bug17_*— trace thread safety (Task 17)test_bug18_*— observer exceptions (Task 18)test_bug19_*— clone isolation (Task 19)test_bug20_*— observer thread safety (Task 20)test_bug21_*— vector dedup (Task 21)test_bug22_*— optional-not-required (Task 22)
Helper classes for each bug should be prefixed with _Bug{NN} to stay out of pytest collection and to avoid colliding with helpers from other bugs.
HIGH SEVERITY BUGS (Tasks 1-6)¶
Task 1: BUG-01 — Streaming drops ToolCall objects¶
Files: - Modify: tests/agent/test_regression.py — add test function test_bug01_streaming_preserves_tool_calls (and async/sync-fallback siblings) - Modify: src/selectools/agent/_provider_caller.py:217-236 (sync _streaming_call) - Modify: src/selectools/agent/_provider_caller.py:472-509 (async _astreaming_call)
- Step 1: Write the failing regression test
# Append to tests/agent/test_regression.py (BUG-01)
"""BUG-01: Streaming run()/arun() silently drops ToolCall objects.
Source: Agno #6757 pattern — competitor bug where tool function names
become empty strings in streaming responses.
Selectools variant: _streaming_call and _astreaming_call filter chunks
with `isinstance(chunk, str)` which drops ToolCall objects entirely.
Tools are never executed when AgentConfig(stream=True).
"""
from __future__ import annotations
from typing import Any, Iterator
import pytest
from selectools import Agent, AgentConfig, Tool, ToolParameter
from selectools.providers.stubs import LocalProvider
from selectools.types import Message, Role, ToolCall
class StreamingToolProvider(LocalProvider):
"""Provider that yields a ToolCall during streaming."""
name = "streaming_tool_stub"
supports_streaming = True
supports_async = False
def __init__(self) -> None:
super().__init__()
self.call_count = 0
def stream(self, **kwargs: Any) -> Iterator[Any]:
self.call_count += 1
if self.call_count == 1:
yield "I will call a tool. "
yield ToolCall(tool_name="echo", parameters={"text": "hello"})
else:
yield "Done. Got: hello"
def _echo_fn(text: str) -> str:
return text
def test_streaming_preserves_tool_calls():
"""When stream=True, ToolCall objects from the provider must be executed."""
echo_tool = Tool(
name="echo",
description="Echo text",
parameters=[ToolParameter(name="text", param_type=str, description="t", required=True)],
function=_echo_fn,
)
provider = StreamingToolProvider()
agent = Agent(
tools=[echo_tool],
provider=provider,
config=AgentConfig(stream=True, max_iterations=3),
)
result = agent.run([Message(role=Role.USER, content="echo hello")])
assert "Done" in result.content, f"Expected tool to execute; got: {result.content!r}"
assert provider.call_count >= 2, "Agent should have looped after tool execution"
- Step 2: Run the test to confirm it fails
- Step 3: Fix sync
_streaming_call
Replace src/selectools/agent/_provider_caller.py lines 217-236:
def _streaming_call(
self, stream_handler: Optional[Callable[[str], None]] = None
) -> Tuple[str, List["ToolCall"]]:
if not getattr(self.provider, "supports_streaming", False):
raise ProviderError(f"Provider {self.provider.name} does not support streaming.")
aggregated: List[str] = []
tool_calls: List["ToolCall"] = []
for chunk in self.provider.stream(
model=self._effective_model,
system_prompt=self._system_prompt,
messages=self._history,
tools=self.tools,
temperature=self.config.temperature,
max_tokens=self.config.max_tokens,
timeout=self.config.request_timeout,
):
if isinstance(chunk, str):
if chunk:
aggregated.append(chunk)
if stream_handler:
stream_handler(chunk)
elif isinstance(chunk, ToolCall):
tool_calls.append(chunk)
return "".join(aggregated), tool_calls
Add Tuple and ToolCall to the imports at the top of the file.
- Step 4: Fix async
_astreaming_call
Apply the same change to _astreaming_call at lines 472-509. Both the astream branch (lines 489-493) and the sync fallback branch (lines 504-507) must collect ToolCalls into the same tool_calls list.
- Step 5: Update callers of
_streaming_call/_astreaming_call
Find the _call_provider / _acall_provider methods that call _streaming_call. Find the line that constructs Message(role=Role.ASSISTANT, content=response_text). Pass tool_calls into the Message:
response_text, streamed_tool_calls = self._streaming_call(stream_handler)
return Message(
role=Role.ASSISTANT,
content=response_text,
tool_calls=streamed_tool_calls or None,
)
Use Grep to find both call sites:
- Step 6: Run the regression test to verify fix
- Step 7: Run the full test suite to check no regressions
- Step 8: Commit
git add tests/agent/test_regression.py src/selectools/agent/_provider_caller.py
git commit -m "fix(streaming): collect ToolCall objects during streaming
BUG-01: _streaming_call and _astreaming_call filtered chunks with
isinstance(chunk, str), silently dropping ToolCall objects yielded
by providers. Any user with AgentConfig(stream=True) calling run()
would find native provider tool calls were never executed.
Now both methods return (text, tool_calls) tuple. Caller propagates
tool_calls into the returned Message.
Cross-referenced from Agno #6757."
Task 2: BUG-02 — typing.Literal crashes @tool() creation¶
Files: - Modify: tests/agent/test_regression.py — add test function test_bug02_literal_types - Modify: src/selectools/tools/decorators.py:10,16-46,98-111
- Step 1: Write the failing test
# Append to tests/agent/test_regression.py (BUG-02)
"""BUG-02: typing.Literal crashes @tool() creation.
Source: Agno #6720 — get_json_schema_for_arg() does not handle
typing.Literal, producing {"type": "object"} instead of
{"type": "string", "enum": [...]}.
Selectools variant: _unwrap_type() returns Literal unchanged, then
_validate_tool_definition() rejects it as an unsupported type.
"""
from __future__ import annotations
from typing import Literal, Optional
from selectools.tools import tool
def test_literal_str_produces_enum():
@tool()
def set_mode(mode: Literal["fast", "slow", "auto"]) -> str:
return f"mode={mode}"
assert set_mode.name == "set_mode"
params = {p.name: p for p in set_mode.parameters}
assert "mode" in params
assert params["mode"].enum == ["fast", "slow", "auto"]
assert params["mode"].param_type is str
def test_literal_int_produces_enum():
@tool()
def set_level(level: Literal[1, 2, 3]) -> str:
return f"level={level}"
params = {p.name: p for p in set_level.parameters}
assert params["level"].enum == [1, 2, 3]
assert params["level"].param_type is int
def test_optional_literal_works():
@tool()
def filter_by(tag: Optional[Literal["red", "blue"]] = None) -> str:
return f"tag={tag}"
params = {p.name: p for p in filter_by.parameters}
assert params["tag"].enum == ["red", "blue"]
assert params["tag"].required is False
- Step 2: Run the test to confirm it fails
@tool() application. - Step 3: Add Literal handling to
decorators.py
Update the import line 10 to include Literal and Tuple:
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union, get_args, get_origin, get_type_hints
Add a helper function before _unwrap_type (around line 15):
def _literal_info(type_hint: Any) -> Optional[Tuple[Any, List[Any]]]:
"""Return (base_type, enum_values) for Literal[...] hints, else None.
Unwraps Optional[Literal[...]] as well. Base type is inferred from the
first literal value (e.g. Literal["a", "b"] → str).
"""
origin = get_origin(type_hint)
if origin is Union:
args = get_args(type_hint)
non_none = [a for a in args if a is not type(None)]
if len(non_none) == 1:
return _literal_info(non_none[0])
if sys.version_info >= (3, 10):
import types as _types # noqa: PLC0415
if isinstance(type_hint, _types.UnionType):
args = get_args(type_hint)
non_none = [a for a in args if a is not type(None)]
if len(non_none) == 1:
return _literal_info(non_none[0])
if origin is Literal:
values = list(get_args(type_hint))
if not values:
return None
base_type = type(values[0])
return base_type, values
return None
- Step 4: Use
_literal_infoin_infer_parameters_from_callable
Modify _infer_parameters_from_callable around line 90-111:
meta = param_metadata.get(name, {})
description = meta.get("description", f"Parameter {name}")
enum_values: Optional[List[Any]] = meta.get("enum")
raw_type = type_hints.get(name, str)
lit = _literal_info(raw_type)
if lit is not None:
param_type, literal_values = lit
if enum_values is None:
enum_values = literal_values
else:
param_type = _unwrap_type(raw_type)
is_optional = param.default != inspect.Parameter.empty
- Step 5: Run the regression test
- Step 6: Run full tool tests to check no regressions
- Step 7: Commit
git add tests/agent/test_regression.py src/selectools/tools/decorators.py
git commit -m "fix(tools): support typing.Literal in @tool() parameters
BUG-02: @tool() crashed on Literal[...] parameters because
_unwrap_type() returned Literal unchanged, and then
_validate_tool_definition() rejected it as an unsupported type.
Now detects Literal (and Optional[Literal]), extracts enum values,
infers base type from the first value, and auto-populates
ToolParameter.enum. Supports str, int, float, and bool literals.
Cross-referenced from Agno #6720."
Task 3: BUG-03 — asyncio.run() crashes in existing event loops¶
Files: - Create: src/selectools/_async_utils.py - Modify: tests/agent/test_regression.py — add test function test_bug03_asyncio_reentry - Modify: src/selectools/orchestration/graph.py:479 (AgentGraph.run) - Modify: src/selectools/orchestration/graph.py:1059 (AgentGraph.resume) - Modify: src/selectools/orchestration/supervisor.py:240 (SupervisorAgent.run) - Modify: src/selectools/patterns/team_lead.py:126 - Modify: src/selectools/patterns/debate.py:80 - Modify: src/selectools/patterns/reflective.py:82 - Modify: src/selectools/patterns/plan_and_execute.py:110 - Modify: src/selectools/pipeline.py:486
- Step 1: Write the failing test
# Append to tests/agent/test_regression.py (BUG-03)
"""BUG-03: asyncio.run() in sync wrappers crashes inside running event loops.
Source: PraisonAI #1165 — asyncio.run() called from sync context reachable
by async callers crashes with "cannot call asyncio.run() while an event
loop is running". Reachable from Jupyter, FastAPI handlers, async tests.
"""
from __future__ import annotations
import asyncio
import pytest
from selectools._async_utils import run_sync
def test_run_sync_outside_event_loop():
async def coro():
return 42
assert run_sync(coro()) == 42
def test_run_sync_inside_running_loop():
"""Key test — calling run_sync from within an async function."""
async def outer():
async def inner():
return "hello"
return run_sync(inner())
result = asyncio.run(outer())
assert result == "hello"
- Step 2: Run the test to confirm it fails
ModuleNotFoundError: _async_utils. - Step 3: Create the safe run_sync helper
# src/selectools/_async_utils.py
"""Safe synchronous-wrapper utilities for async code.
Calling asyncio.run() from a sync function that is itself reachable
from an async caller raises RuntimeError: asyncio.run() cannot be called
when another event loop is running. This module provides a helper that
detects the surrounding event loop and executes the coroutine on a fresh
loop in a dedicated thread when one is already running.
"""
from __future__ import annotations
import asyncio
import concurrent.futures
from typing import Any, Awaitable, TypeVar
T = TypeVar("T")
def run_sync(coro: Awaitable[T]) -> T:
"""Run a coroutine to completion from sync code.
If no event loop is running in the current thread, uses asyncio.run.
If one is running, spawns a worker thread, creates a fresh event loop
there, and waits for the result. Safe to call from Jupyter notebooks,
FastAPI handlers, async tests, and nested orchestration.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro) # type: ignore[arg-type]
def _runner() -> T:
return asyncio.run(coro) # type: ignore[arg-type]
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(_runner)
return future.result()
- Step 4: Run the two unit tests for run_sync
- Step 5: Replace
asyncio.run(...)inAgentGraph.run
In src/selectools/orchestration/graph.py around line 479, change:
return asyncio.run(
self.arun(
prompt_or_state,
checkpoint_store=checkpoint_store,
checkpoint_id=checkpoint_id,
)
)
to:
from .._async_utils import run_sync
return run_sync(
self.arun(
prompt_or_state,
checkpoint_store=checkpoint_store,
checkpoint_id=checkpoint_id,
)
)
- Step 6: Replace
asyncio.run(...)inAgentGraph.resume
Same pattern at line 1059. Use Grep to find both:
- Step 7: Replace in SupervisorAgent.run
Modify src/selectools/orchestration/supervisor.py line 240.
- Step 8: Replace in 4 pattern agents
Update each of: - src/selectools/patterns/team_lead.py:126 - src/selectools/patterns/debate.py:80 - src/selectools/patterns/reflective.py:82 - src/selectools/patterns/plan_and_execute.py:110
Each imports from ..._async_utils import run_sync and replaces asyncio.run(self.arun(...)) with run_sync(self.arun(...)).
- Step 9: Replace in pipeline.py
Modify src/selectools/pipeline.py:486. Relative import: from ._async_utils import run_sync.
- Step 10: Run the regression test
- Step 11: Run full suite
- Step 12: Commit
git add src/selectools/_async_utils.py tests/agent/test_regression.py (test_bug03_*) \
src/selectools/orchestration/graph.py src/selectools/orchestration/supervisor.py \
src/selectools/patterns/ src/selectools/pipeline.py
git commit -m "fix(async): safe run_sync helper for 8 sync wrappers
BUG-03: Bare asyncio.run() in 8 sync wrappers crashed with
'cannot call asyncio.run() while another event loop is running'
when called from Jupyter, FastAPI handlers, or nested async code.
New _async_utils.run_sync() helper detects a running loop and
offloads to a worker thread when needed. Applied to:
- AgentGraph.run / AgentGraph.resume
- SupervisorAgent.run
- PlanAndExecuteAgent / ReflectiveAgent / DebateAgent / TeamLeadAgent
- Pipeline._execute_step
Cross-referenced from PraisonAI #1165."
Task 4: BUG-04 — HITL lost in parallel groups¶
Files: - Modify: tests/agent/test_regression.py — add test function test_bug04_parallel_hitl - Modify: src/selectools/orchestration/graph.py:1237-1288 (_aexecute_parallel)
- Step 1: Write the failing test
# Append to tests/agent/test_regression.py (BUG-04)
"""BUG-04: InterruptRequest from a child node in a parallel group is silently
dropped. The parent graph treats the child as completed.
Source: Agno #4921 — NoneType does not have run_id error when running HITL
within agent tools which are part of team.
"""
from __future__ import annotations
from selectools import AgentGraph
from selectools.orchestration import (
GraphNode,
GraphState,
InMemoryCheckpointStore,
InterruptRequest,
ParallelGroupNode,
)
from selectools.types import AgentResult, Message, Role
def _normal_child(state: GraphState):
state.data["normal"] = "done"
return AgentResult(message=Message(role=Role.ASSISTANT, content="normal")), state
def _hitl_child_generator(state: GraphState):
response = yield InterruptRequest(key="approval", prompt="approve?")
state.data["approval"] = response
yield AgentResult(message=Message(role=Role.ASSISTANT, content="hitl")), state
def test_parallel_group_propagates_hitl():
"""When a child in a parallel group interrupts, the parent graph must pause."""
graph = AgentGraph(name="test_parallel_hitl")
normal_node = GraphNode(name="normal", agent=None, callable_fn=_normal_child)
hitl_node = GraphNode(name="hitl", agent=None, generator_fn=_hitl_child_generator)
parallel = ParallelGroupNode(name="group", child_node_names=["normal", "hitl"])
graph.add_node(normal_node)
graph.add_node(hitl_node)
graph.add_node(parallel)
graph.set_entry("group")
graph.add_edge("group", "__end__")
store = InMemoryCheckpointStore()
result = graph.run("start", checkpoint_store=store)
assert result.interrupted, f"Expected graph to pause; got: {result}"
assert result.interrupt_key == "approval"
- Step 2: Run the test to confirm it fails
result.interrupted is False. - Step 3: Fix
run_childto return the interrupted flag
In _aexecute_parallel around line 1237, change run_child's return type to include the interrupted flag:
async def run_child(
child_name: str, branch_state: GraphState
) -> Tuple[str, AgentResult, GraphState, bool]:
child_node = self._nodes.get(child_name)
if child_node is None:
raise GraphExecutionError(
self.name, child_name, KeyError(f"Child node {child_name!r} not found"), 0
)
if isinstance(child_node, GraphNode):
result, new_state, interrupted = await self._aexecute_node(
child_node, branch_state, trace, run_id
)
else:
result = _make_synthetic_result(branch_state)
new_state = branch_state
interrupted = False
return child_name, result, new_state, interrupted
- Step 4: Update the result collection loop
In the same method around line 1262, unpack the 4-tuple and track interrupts:
child_results: Dict[str, List[AgentResult]] = {}
branch_final_states: List[GraphState] = []
interrupted_child: Optional[str] = None
for i, output in enumerate(child_outputs):
if isinstance(output, BaseException):
child_name = node.child_node_names[i]
state.errors.append(
{"node": child_name, "error": str(output), "type": type(output).__name__}
)
if self.error_policy == ErrorPolicy.ABORT:
exc = output if isinstance(output, Exception) else Exception(str(output))
raise GraphExecutionError(self.name, child_name, exc, 0) from output
continue
child_name, result, new_state, child_interrupted = output
child_results.setdefault(child_name, []).append(result)
branch_final_states.append(new_state)
if child_interrupted and interrupted_child is None:
interrupted_child = child_name
# Propagate interrupt metadata to parent state
if interrupted_child is not None:
merged_interrupt_marker = {"__parallel_interrupt__": interrupted_child}
else:
merged_interrupt_marker = {}
Then after computing merged, inject the marker:
- Step 5: Propagate the interrupt in
_aexecute_node
Find where _aexecute_parallel is called within _aexecute_node and check for __parallel_interrupt__ in the merged state. If present, return interrupted=True from _aexecute_node.
- Step 6: Run the regression test
- Step 7: Run full orchestration suite
- Step 8: Commit
git add tests/agent/test_regression.py src/selectools/orchestration/graph.py
git commit -m "fix(orchestration): propagate HITL interrupts from parallel groups
BUG-04: run_child in _aexecute_parallel discarded the interrupted
boolean from _aexecute_node. If a child yielded InterruptRequest,
the signal was lost and the graph continued as if the child
completed normally.
Now run_child returns a 4-tuple including the interrupted flag,
and the first interrupting child surfaces the interrupt to the
graph's outer loop for proper checkpointing.
Cross-referenced from Agno #4921."
Task 5: BUG-05 — HITL lost in subgraphs¶
Files: - Modify: tests/agent/test_regression.py — add test function test_bug05_subgraph_hitl - Modify: src/selectools/orchestration/graph.py:1295-1332 (_aexecute_subgraph)
- Step 1: Write the failing test
# Append to tests/agent/test_regression.py (BUG-05)
"""BUG-05: InterruptRequest raised inside a subgraph is silently dropped
by the parent graph. The subgraph's pause state is lost.
Source: Agno #4921 — HITL inside nested contexts fails.
"""
from __future__ import annotations
from selectools import AgentGraph
from selectools.orchestration import (
GraphNode,
GraphState,
InMemoryCheckpointStore,
InterruptRequest,
SubgraphNode,
)
from selectools.types import AgentResult, Message, Role
def _hitl_generator(state: GraphState):
response = yield InterruptRequest(key="approval", prompt="ok?")
state.data["approval"] = response
yield AgentResult(message=Message(role=Role.ASSISTANT, content="done")), state
def test_subgraph_propagates_hitl_interrupt():
inner = AgentGraph(name="inner")
inner_node = GraphNode(name="gate", agent=None, generator_fn=_hitl_generator)
inner.add_node(inner_node)
inner.set_entry("gate")
inner.add_edge("gate", "__end__")
outer = AgentGraph(name="outer")
sub = SubgraphNode(name="nested", graph=inner)
outer.add_node(sub)
outer.set_entry("nested")
outer.add_edge("nested", "__end__")
store = InMemoryCheckpointStore()
result = outer.run("start", checkpoint_store=store)
assert result.interrupted, "Subgraph interrupt must propagate to parent"
assert result.interrupt_key == "approval"
- Step 2: Run the test to confirm it fails
result.interrupted is False. - Step 3: Update
_aexecute_subgraphsignature and check sub_result.interrupted
Modify around line 1295-1332. Change return type to include interrupted flag:
async def _aexecute_subgraph(
self,
node: SubgraphNode,
state: GraphState,
trace: AgentTrace,
run_id: str,
) -> Tuple[AgentResult, GraphState, bool]:
"""Execute a nested AgentGraph as a node."""
sub_state = GraphState.from_prompt(
state.data.get(STATE_KEY_LAST_OUTPUT, "")
or (state.messages[-1].content if state.messages else "")
)
for parent_key, sub_key in node.input_map.items():
if parent_key in state.data:
sub_state.data[sub_key] = state.data[parent_key]
sub_result = await node.graph.arun(sub_state, _interrupt_response=None)
if sub_result.interrupted:
state.data["__subgraph_interrupt__"] = {
"key": sub_result.interrupt_key,
"prompt": sub_result.interrupt_prompt,
"subgraph": node.name,
}
synthetic = AgentResult(
message=Message(role=Role.ASSISTANT, content=""),
iterations=sub_result.steps,
usage=sub_result.total_usage,
)
return synthetic, state, True
for sub_key, parent_key in node.output_map.items():
if sub_key in sub_result.state.data:
state.data[parent_key] = sub_result.state.data[sub_key]
state.data[STATE_KEY_LAST_OUTPUT] = sub_result.content
state.messages.extend(sub_result.state.messages[-2:])
state.history.extend(sub_result.state.history)
synthetic = AgentResult(
message=Message(role=Role.ASSISTANT, content=sub_result.content),
iterations=sub_result.steps,
usage=sub_result.total_usage,
)
return synthetic, state, False
- Step 4: Update the caller of
_aexecute_subgraph
Update to unpack the 3-tuple and propagate the interrupted flag up.
- Step 5: Run the regression test
- Step 6: Run full orchestration suite
- Step 7: Commit
git add tests/agent/test_regression.py src/selectools/orchestration/graph.py
git commit -m "fix(orchestration): propagate HITL interrupts from subgraphs
BUG-05: _aexecute_subgraph never checked sub_result.interrupted.
If a subgraph paused for HITL, the parent treated it as completed
and continued executing, losing the pause state.
Now _aexecute_subgraph returns (result, state, interrupted) and
propagates interrupt metadata to the parent graph for proper
checkpointing and resumption.
Cross-referenced from Agno #4921."
Task 6: BUG-06 — ConversationMemory missing threading.Lock¶
Files: - Modify: tests/agent/test_regression.py — add test function test_bug06_memory_thread_safety - Modify: src/selectools/memory.py
- Step 1: Write the failing test
# Append to tests/agent/test_regression.py (BUG-06)
"""BUG-06: ConversationMemory has no threading.Lock. Concurrent mutation
from multiple threads races on the _messages list.
Source: PraisonAI #1164, #1260 — thread-unsafe shared mutable state.
"""
from __future__ import annotations
import threading
from selectools.memory import ConversationMemory
from selectools.types import Message, Role
def test_concurrent_add_preserves_all_messages():
"""10 threads × 100 adds = 1000 messages should all be preserved."""
memory = ConversationMemory(max_messages=10000)
n_threads = 10
n_adds = 100
errors = []
def worker(thread_id: int):
try:
for i in range(n_adds):
memory.add(Message(role=Role.USER, content=f"t{thread_id}-m{i}"))
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=worker, args=(i,)) for i in range(n_threads)]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors, f"Worker errors: {errors}"
history = memory.get_history()
assert len(history) == n_threads * n_adds, (
f"Expected {n_threads * n_adds} messages, got {len(history)}"
)
def test_concurrent_add_with_trim_no_crash():
"""Low max_messages triggers _enforce_limits concurrently — must not crash."""
memory = ConversationMemory(max_messages=50)
errors = []
def worker(thread_id: int):
try:
for i in range(200):
memory.add(Message(role=Role.USER, content=f"t{thread_id}-m{i}"))
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors, f"Worker errors: {errors}"
assert len(memory.get_history()) <= 50
- Step 2: Run the test to confirm it fails
- Step 3: Add threading.RLock to ConversationMemory
In src/selectools/memory.py, add to imports:
In __init__, add the lock:
self.max_messages = max_messages
self.max_tokens = max_tokens
self._messages: List[Message] = []
self._summary: Optional[str] = None
self._last_trimmed: List[Message] = []
self._lock = threading.RLock()
Use RLock (re-entrant) because add calls _enforce_limits which may call other locked methods.
- Step 4: Wrap all mutation and read methods with
with self._lock:
Use Grep to find all methods:
Methods to protect: add, add_many, get_history, get_recent, clear, _enforce_limits, to_dict, from_dict, branch, get_summary, set_summary, and any other state-reading or state-mutating method.
Example for add:
def add(self, message: Message) -> None:
with self._lock:
self._messages.append(message)
self._enforce_limits()
- Step 5: Preserve state-restoration compatibility
threading.RLock is not serializable for disk storage. Override __getstate__ and __setstate__ to exclude the lock from serialization and recreate it on restore:
def __getstate__(self) -> Dict[str, Any]:
state = self.__dict__.copy()
state.pop("_lock", None)
return state
def __setstate__(self, state: Dict[str, Any]) -> None:
self.__dict__.update(state)
self._lock = threading.RLock()
- Step 6: Run the regression test
- Step 7: Run existing memory tests
- Step 8: Commit
git add tests/agent/test_regression.py src/selectools/memory.py
git commit -m "fix(memory): add threading.RLock to ConversationMemory
BUG-06: ConversationMemory was the only shared-state class in
selectools without a lock. Concurrent add()/add_many()/get_history()
from multiple threads raced on self._messages, potentially losing
messages or corrupting the list.
All mutation and read methods now acquire self._lock (RLock for
re-entrance). __getstate__/__setstate__ preserve serialization
compat by recreating the lock on restore.
Cross-referenced from PraisonAI #1164 / #1260."
MEDIUM SEVERITY BUGS (Tasks 7-15)¶
Task 7: BUG-07 — <think> tag content leaks¶
Files: - Modify: tests/agent/test_regression.py — add test function test_bug07_think_tag_stripping - Modify: src/selectools/providers/anthropic_provider.py
- Step 1: Write the failing test
# Append to tests/agent/test_regression.py (BUG-07)
"""BUG-07: <think>...</think> content leaks into conversation history.
Source: Agno #6878.
"""
from selectools.providers.anthropic_provider import _strip_reasoning_tags
def test_strip_simple_think_tags():
text = "<think>This is my reasoning.</think>The answer is 42."
assert _strip_reasoning_tags(text) == "The answer is 42."
def test_strip_multiline_think_tags():
text = "<think>\nLine 1\nLine 2\n</think>\nFinal answer."
assert _strip_reasoning_tags(text).strip() == "Final answer."
def test_strip_multiple_think_blocks():
text = "<think>first</think>Hello<think>second</think> world"
assert _strip_reasoning_tags(text) == "Hello world"
def test_no_think_tags_unchanged():
text = "Plain text with no tags"
assert _strip_reasoning_tags(text) == text
- Step 2: Confirm failure
- Step 3: Add stripping helper
At the top of anthropic_provider.py below imports:
import re as _re
_THINK_TAG_RE = _re.compile(r"<think>.*?</think>", _re.DOTALL)
def _strip_reasoning_tags(text: str) -> str:
"""Remove <think>...</think> blocks from model output."""
if not text or "<think>" not in text:
return text
return _THINK_TAG_RE.sub("", text)
- Step 4: Apply in all text accumulation paths
Apply _strip_reasoning_tags at the point where accumulated text is finalized in complete, acomplete, and to each delta in stream/astream.
- Step 5: Run test, commit
pytest tests/agent/test_regression.py -v -k \"bug07\"
git add tests/agent/test_regression.py src/selectools/providers/anthropic_provider.py
git commit -m "fix(anthropic): strip <think> reasoning tags from output (BUG-07)"
Task 8: BUG-08 — RAG store batch size limits¶
Files: - Modify: tests/agent/test_regression.py — add test function test_bug08_rag_batch_limits - Modify: src/selectools/rag/stores/{chroma,pinecone,qdrant}.py
- Step 1: Write failing test
# Append to tests/agent/test_regression.py (BUG-08)
from unittest.mock import MagicMock
def test_chroma_batches_large_upsert():
from selectools.rag.stores.chroma import ChromaVectorStore
from selectools.rag.types import Document
store = ChromaVectorStore.__new__(ChromaVectorStore)
store.collection = MagicMock()
store._batch_size = 100
store.embedder = MagicMock()
store.embedder.embed_batch.return_value = [[0.1] * 16 for _ in range(250)]
docs = [Document(text=f"doc {i}", metadata={}) for i in range(250)]
store.add_documents(docs)
assert store.collection.upsert.call_count == 3
- Step 2: Add
_batch_sizeattribute and chunking to each store
For chroma.py:
self._batch_size = 5000
for start in range(0, len(ids), self._batch_size):
end = start + self._batch_size
self.collection.upsert(
ids=ids[start:end],
embeddings=embeddings[start:end],
documents=texts[start:end],
metadatas=metadatas[start:end],
)
Apply the same pattern to pinecone.py (batch_size=100) and qdrant.py (batch_size=1000).
- Step 3: Run test, commit
pytest tests/agent/test_regression.py -v -k \"bug08\"
git add tests/agent/test_regression.py src/selectools/rag/stores/chroma.py src/selectools/rag/stores/pinecone.py src/selectools/rag/stores/qdrant.py
git commit -m "fix(rag): chunk large upserts in Chroma/Pinecone/Qdrant (BUG-08)"
Task 9: BUG-09 — MCP concurrent tool call lock¶
Files: - Modify: tests/agent/test_regression.py — add test function test_bug09_mcp_concurrent - Modify: src/selectools/mcp/client.py
-
Step 1: Write failing test (see original spec for the async test with mocked session)
-
Step 2: Add
self._tool_lock = asyncio.Lock()to MCPClient init -
Step 3: Wrap
_call_toolbody inasync with self._tool_lock: -
Step 4: Run test, commit
Task 10: BUG-10 — Tool argument type coercion¶
Files: - Modify: tests/agent/test_regression.py — add test function test_bug10_tool_arg_coercion - Modify: src/selectools/tools/base.py:326-344
-
Step 1: Write failing test (str→int, str→float, str→bool coercion)
-
Step 2: In
_validate_single, attempt coercion before rejecting:
if isinstance(value, param_type):
return value
if isinstance(value, str) and param_type in (int, float, bool):
try:
if param_type is bool:
lowered = value.strip().lower()
if lowered in ("true", "1", "yes", "on"):
return True
if lowered in ("false", "0", "no", "off"):
return False
raise ValueError(f"Cannot coerce {value!r} to bool")
return param_type(value)
except (ValueError, TypeError) as exc:
raise ToolValidationError(
f"Invalid {name!r}: cannot coerce {value!r} to {param_type.__name__}: {exc}"
)
raise ToolValidationError(
f"Invalid {name!r}: expected {param_type.__name__}, got {type(value).__name__}"
)
- Step 3: Run test, commit
Task 11: BUG-11 — Union[str, int] fallback¶
Files: - Modify: tests/agent/test_regression.py — add test function test_bug11_union_types - Modify: src/selectools/tools/decorators.py:26-31
-
Step 1: Write failing test
-
Step 2: In
_unwrap_type, returnstrfor multi-type unions:
if origin is Union:
args = get_args(type_hint)
non_none_args = [a for a in args if a is not type(None)]
if len(non_none_args) == 1:
return _unwrap_type(non_none_args[0])
if len(non_none_args) > 1:
return str
Apply the same fallback in the types.UnionType branch.
- Step 3: Run test, commit
Task 12: BUG-12 — Multi-interrupt generator nodes¶
Files: - Modify: tests/agent/test_regression.py — add test function test_bug12_multi_interrupt - Modify: src/selectools/orchestration/graph.py:1139-1166
-
Step 1: Write failing test — two-gate generator, both interrupts must fire
-
Step 2: Read current
_aexecute_generator_node
- Step 3: Fix the iteration —
asendreturn value must be processed
Core fix: after gen.asend(response) returns a yielded value, that value must be checked for InterruptRequest before advancing with __anext__. Process the asend return in the same code path as items from the subsequent async for loop.
-
Step 4: Fix
interrupt_indexcounter to persist across calls -
Step 5: Run test, commit
Task 13: BUG-13 — GraphState.to_dict() JSON validation¶
Files: - Modify: tests/agent/test_regression.py — add test function test_bug13_graphstate_serialization - Modify: src/selectools/orchestration/state.py:91,117
-
Step 1: Write failing test — non-serializable object should raise clearly
-
Step 2: Validate
datainto_dictvia JSON round-trip:
def to_dict(self) -> Dict[str, Any]:
import json
try:
serialized_data = json.loads(json.dumps(self.data))
except (TypeError, ValueError) as exc:
raise ValueError(
f"GraphState.data contains non-serializable values: {exc}. "
f"All values in state.data must be JSON-compatible for checkpointing."
)
return {
"messages": [m.to_dict() for m in self.messages],
"history": list(self.history),
"data": serialized_data,
"errors": list(self.errors),
"turn_count": self.turn_count,
}
- Step 3: Run test, commit
Task 14: BUG-14 — Session namespace isolation¶
Files: - Modify: tests/agent/test_regression.py — add test function test_bug14_session_namespace - Modify: src/selectools/sessions.py (all 3 stores)
- Step 1: Write failing test
def test_different_namespaces_isolated():
with tempfile.TemporaryDirectory() as tmpdir:
store = JsonFileSessionStore(directory=tmpdir)
mem_a = ConversationMemory()
mem_a.add(Message(role=Role.USER, content="A"))
store.save("shared_id", mem_a, namespace="agent_a")
mem_b = ConversationMemory()
mem_b.add(Message(role=Role.USER, content="B"))
store.save("shared_id", mem_b, namespace="agent_b")
assert store.load("shared_id", namespace="agent_a").get_history()[0].content == "A"
assert store.load("shared_id", namespace="agent_b").get_history()[0].content == "B"
-
Step 2: Add
namespace: Optional[str] = Noneparameter to save/load/delete in the protocol and all 3 concrete stores -
Step 3: Derive the storage key from
{namespace}:{session_id}when namespace is set, else baresession_id -
Step 4: Run test, commit
Task 15: BUG-15 — Summary growth cap¶
Files: - Modify: tests/agent/test_regression.py — add test function test_bug15_summary_cap - Modify: src/selectools/agent/_memory_manager.py:99-100
-
Step 1: Write failing test
-
Step 2: Add cap constant and helper:
_MAX_SUMMARY_CHARS = 4000
def _append_summary(existing: Optional[str], new_chunk: str) -> str:
if not existing:
combined = new_chunk
else:
combined = f"{existing} {new_chunk}"
if len(combined) > _MAX_SUMMARY_CHARS:
combined = combined[-_MAX_SUMMARY_CHARS:]
return combined
-
Step 3: Replace concatenation in
_maybe_summarize_trimwith_append_summarycall -
Step 4: Run test, commit
LOW-MEDIUM SEVERITY BUGS (Tasks 16-22)¶
These bugs are isolated and smaller. Each task follows the same pattern: write test, make minimal change, run test, commit.
Task 16: BUG-16 — Cancelled result missing extraction¶
File: src/selectools/agent/core.py:540-562
- Write test: assert entities extracted during run are persisted after cancellation
- Add
_extract_entities()and_extract_kg_triples()calls to_build_cancelled_result - Run test, commit
Task 17: BUG-17 — AgentTrace.add() thread safety¶
File: src/selectools/trace.py:118
- Write test: 10 threads × 100 adds, verify final count
- Add
self._lock = threading.Lock()toAgentTrace.__init__, wrapadd()body - Run test, commit
Task 18: BUG-18 — Async observer exception logging¶
File: src/selectools/agent/_lifecycle.py:48
- Write test: observer that raises should log via
logger.warning - Add
add_done_callbackwith logging:
task = asyncio.ensure_future(handler(*args))
def _log_task_exception(t: "asyncio.Task[Any]") -> None:
if t.cancelled():
return
exc = t.exception()
if exc is not None:
logger.warning("Async observer raised: %s", exc, exc_info=exc)
task.add_done_callback(_log_task_exception)
- Run test, commit
Task 19: BUG-19 — Clone isolation¶
File: src/selectools/agent/core.py:1124
- Write test: two batch clones with observers don't share state
- Replace
copy.copy(self)with explicit deep-copy of observers list and mutable config groups - Run test, commit
Task 20: BUG-20 — OTel/Langfuse observer locks¶
Files: src/selectools/observe/otel.py:46-48, src/selectools/observe/langfuse.py:55-57
- Write test: concurrent on_llm_start from 10 threads, verify counter
- Add
self._lock = threading.Lock()to each observer init, wrap all_spans/_llm_counter/_tracesmutations - Run test, commit
Task 21: BUG-21 — Vector store search dedup¶
Files: src/selectools/rag/stores/{chroma,memory,sqlite,faiss}.py all search() methods
- Write test: insert same doc twice, search, assert single result
- Add
dedup: bool = Trueparameter to VectorStore.search protocol; implement post-search text-hash dedup in each store - Run test, commit
Task 22: BUG-22 — Optional[T] without default¶
File: src/selectools/tools/decorators.py:98
- Write test:
@tool() def f(x: Optional[str]): ...→x.required is False - In
_infer_parameters_from_callable, detect Optional via Union-with-None check, setis_optional=Truewithout requiring default - Run test, commit
Final Verification (Task 23)¶
- Step 1: Run the complete test suite
- Step 2: Run mypy
- Step 3: Run linters
black src/ tests/ --line-length=100 --check
isort src/ tests/ --profile=black --line-length=100 --check
flake8 src/
bandit -r src/ -ll -q -c pyproject.toml
- Step 4: Update CHANGELOG
Add a ## [Unreleased] section in CHANGELOG.md documenting all 22 fixes with their competitor source references.
- Step 5: Commit the changelog update
- Step 6: Write final summary document
Write docs/superpowers/plans/2026-04-10-bug-fix-summary.md with: - Count of bugs fixed per severity - Total new regression tests added - Any bugs downgraded or deferred with rationale - Follow-up items for v0.22.1
Self-Review Checklist¶
- Spec coverage: All 22 bugs from the cross-reference report have a corresponding task (Tasks 1-22) + final verification (Task 23). ✓
- No placeholders: Every task has exact file paths, exact line numbers, complete code snippets for the fix, and explicit bash commands. Tasks 16-22 are lighter because those bugs are small and mechanical — each still specifies the file, the test, the fix, and the commit. ✓
- Type consistency:
run_synchas a single signature across all 8 sync wrapper replacements. The new_literal_infohelper is consistent with_unwrap_type. The 4-tuple return fromrun_childand 3-tuple from_aexecute_subgraphare consistent with their callers. ✓ - Test isolation: All 22 regression tests live in
tests/agent/test_regression.pyastest_bug{NN}_*functions and are independently runnable — no inter-test dependencies. ✓