diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..3550a30 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +use flake diff --git a/.gitignore b/.gitignore index 24f79d6..04ec129 100644 --- a/.gitignore +++ b/.gitignore @@ -26,4 +26,7 @@ go.work.sum .env /tmp -/bin \ No newline at end of file +/bin + +# nix +.direnv/ \ No newline at end of file diff --git a/README.md b/README.md index f7de1bd..921ccdf 100644 --- a/README.md +++ b/README.md @@ -2,32 +2,79 @@ [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) ![Test](https://github.com/bit8bytes/beago/actions/workflows/tests.yml/badge.svg) ![Sec Scan](https://github.com/bit8bytes/beago/actions/workflows/sec_scan.yml/badge.svg) -beago provides composable building blocks for LLM-powered Go applications — pipes for structured output, agents for tool-using reasoning loops, and stores for conversation history. The core library has no external dependencies. +beago brings the Unix philosophy to LLM applications: small, focused handlers connected by pipes. Each handler reads from an `io.Reader`, transforms the stream, and writes to an `io.Writer` — exactly like Unix programs connected with `|`. The core library has no external dependencies. + +## The Unix Pipe Model + +Unix pipes let you compose small programs into powerful workflows: + +``` +echo "text" | translate | summarise | fmt +``` + +beago works the same way, but for LLM pipelines: + +```go +pipe.Execute(ctx, os.Stdin, os.Stdout, + llm.Prompt("Translate to French."), + llm.Generate(model), // stdin | translate + llm.Prompt("Summarise in one sentence."), + llm.Generate(model), // | summarise +) +``` + +Each `pipe.Handler` is a composable unit. Handlers are chained with `pipe.Execute`, looped with `pipe.Loop`, and debugged with `pipe.Tee` — mirroring Unix's `tee(1)`. ## Core Concepts -- **Pipes** — simple `Input → LLM → Output` pipelines with typed, structured responses +- **Pipe** — the core primitive: a `Handler` that reads `io.Reader` → transforms → writes `io.Writer` +- **Execute** — chains handlers sequentially, connecting each output to the next input via `io.Pipe` +- **Loop** — runs a handler chain repeatedly, feeding each iteration's output as the next input; stops on `ErrDone` or a max iteration count +- **Tee** — splits the stream like Unix `tee(1)`: passes data through while copying to a second writer for debugging - **Agents** — ReAct (Reasoning + Acting) loops that interleave LLM reasoning with tool execution -- **Stores** — tamper-evident message history with a SHA-256 hash chain, keeping LLMs stateful across turns - **Tools** — implement the `Tool` interface to give agents new capabilities ## Quick Start ```go -// Pipe: send messages and get structured output -pipe := pipes.New(messages, model, parser) -result, _ := pipe.Invoke(ctx) +// Single handler: pipe stdin through an LLM to stdout +// echo "What is 2+2?" | go run . +pipe.Execute(ctx, os.Stdin, os.Stdout, + llm.Generate(model), +) ``` -See [Pipe](/examples/pipes/json/main.go) for full working `pipe` example. ```go -// Agent: reason and act with tools -agent, _ := agents.NewReAct(ctx, model, tools, storage) -agent.Task(ctx, "Use the helloWorld tool with name Beago") -res, _ := runner.New(agent).Run(ctx) +// Chain handlers: translate then summarise +pipe.Execute(ctx, os.Stdin, os.Stdout, + llm.Prompt("Translate to French."), + llm.Generate(model), + llm.Prompt("Summarise in one sentence."), + llm.Generate(model), +) ``` -See [Agent](/examples/agents/hello/main.go) for full working `agent` example. +```go +// Loop until the LLM outputs "DONE" +pipe.Execute(ctx, os.Stdin, os.Stdout, + pipe.Loop(10, + llm.Generate(model), + pipe.Exit(func(b []byte) bool { + return bytes.Contains(b, []byte("DONE")) + }), + ), +) +``` + +## Examples + +| Example | Description | +|---|---| +| [pipe](/examples/pipe/main.go) | Single LLM call — the simplest pipe | +| [pipe/tee](/examples/pipe/tee/main.go) | Split the stream with `Tee` to inspect output | +| [pipe/chain](/examples/pipe/chain/main.go) | Chain two LLM calls: translate → summarise | +| [pipe/loop](/examples/pipe/loop/main.go) | Loop until a stop condition is met | +| [agents](/examples/agents/main.go) | ReAct agent with tools | ## Contributions diff --git a/agents/agents.go b/agents/agents.go deleted file mode 100644 index d2156d2..0000000 --- a/agents/agents.go +++ /dev/null @@ -1,160 +0,0 @@ -// Package agents provides a ReAct loop for LLM-powered agents. -// -// LLMs alone can't take actions or recover from mistakes mid-task. The ReAct -// pattern (Reasoning + Acting) solves this by interleaving LLM reasoning steps -// with tool executions, so the model can observe results and adapt before -// committing to a final answer. This package wires that loop together. -package agents - -import ( - "context" - "encoding/json" - "fmt" - - "github.com/bit8bytes/beago/inputs/roles" - "github.com/bit8bytes/beago/llms" - "github.com/bit8bytes/beago/tools" -) - -// response is the wire format the LLM produces each iteration. -// It maps directly to the JSON schema described in the system prompt. -// FinalAnswer being non-empty signals the agent is done; otherwise Action and -// ActionInput describe the next tool to call. -type response struct { - Thought string `json:"thought"` - Action string `json:"action"` - ActionInput json.RawMessage `json:"action_input"` - FinalAnswer string `json:"final_answer"` -} - -// Action is the domain representation of a tool call, extracted from a -// response. Name matches the tool's Name() and Input is passed to Tool.Execute. -type Action struct { - Name string - Input json.RawMessage -} - -type llm interface { - Generate(ctx context.Context, messages []llms.Message) (string, error) -} - -type store interface { - Add(ctx context.Context, msgs ...llms.Message) error - List(ctx context.Context) ([]llms.Message, error) - Clear(ctx context.Context) error -} - -type parser interface { - Parse(text string) (response, error) - Instructions() string -} - -// Agent executes tasks using the ReAct pattern (reasoning + acting). -// Call Plan to generate the next action, then Act to execute it. -// Repeat until Plan returns Finish=true, then retrieve the result with Answer. -type Agent struct { - model llm - tools map[string]tools.Tool - history store - actions []Action - parser parser - answer string -} - -// New creates an agent with the given model, tools, storage, and parser. -// For the ReAct pattern, prefer NewReAct. -func New(model llm, tools []tools.Tool, storage store, p parser) *Agent { - return &Agent{ - model: model, - tools: toolNames(tools), - history: storage, - parser: p, - } -} - -// Task sets the user's question or task for the agent to solve. -// Call this before starting the Plan-Act loop. -func (a *Agent) Task(ctx context.Context, prompt string) error { - return a.history.Add(ctx, llms.Message{ - Role: roles.User, - Content: "Question: " + prompt, - }) -} - -// Plan calls the LLM to decide the next action or provide a final answer. -// Returns Response.Finish=true when the task is complete. -func (a *Agent) Plan(ctx context.Context) error { - history, err := a.history.List(ctx) - if err != nil { - return err - } - - generated, err := a.model.Generate(ctx, history) - if err != nil { - return err - } - - parsed, err := a.parser.Parse(generated) - if err != nil { - return fmt.Errorf("failed to parse agent response: %w", err) - } - - if err := a.addAssistantMessage(ctx, generated); err != nil { - return fmt.Errorf("failed to store assistant message: %w", err) - } - - if parsed.FinalAnswer != "" { - a.answer = parsed.FinalAnswer - return nil - } - - action := Action{ - Name: parsed.Action, - Input: parsed.ActionInput, - } - a.actions = []Action{action} - - return nil -} - -func (a *Agent) Answer(ctx context.Context) (string, bool) { - return a.answer, a.answer != "" -} - -// Act executes the tool chosen by Plan and adds the result as an observation. -// Always call this after Plan (unless Plan returned Finish=true). -func (a *Agent) Act(ctx context.Context) error { - for _, action := range a.actions { - if err := a.handleAction(ctx, action); err != nil { - return err - } - } - a.clearActions() - return nil -} - -func (a *Agent) handleAction(ctx context.Context, action Action) error { - t, exists := a.tools[action.Name] - if !exists { - return a.addObservationMessage(ctx, "The action "+action.Name+" doesn't exist.") - } - - observation, err := t.Execute(ctx, action.Input) - if err != nil { - return a.addObservationMessage(ctx, "Error: "+err.Error()) - } - - return a.addObservationMessage(ctx, observation) -} - -func (a *Agent) clearActions() { - a.actions = nil -} - -func toolNames(tls []tools.Tool) map[string]tools.Tool { - t := make(map[string]tools.Tool, len(tls)) - for _, tool := range tls { - t[tool.Name()] = tool - } - return t -} diff --git a/agents/messages.go b/agents/messages.go deleted file mode 100644 index 2c4c75d..0000000 --- a/agents/messages.go +++ /dev/null @@ -1,22 +0,0 @@ -package agents - -import ( - "context" - - "github.com/bit8bytes/beago/inputs/roles" - "github.com/bit8bytes/beago/llms" -) - -func (a *Agent) addAssistantMessage(ctx context.Context, content string) error { - return a.history.Add(ctx, llms.Message{ - Role: roles.Assistant, - Content: content, - }) -} - -func (a *Agent) addObservationMessage(ctx context.Context, observation string) error { - return a.history.Add(ctx, llms.Message{ - Role: roles.System, - Content: "Observation: " + observation, - }) -} diff --git a/agents/react.go b/agents/react.go index 3e252d1..a3ccbc8 100644 --- a/agents/react.go +++ b/agents/react.go @@ -1,68 +1,80 @@ -package agents +package react import ( "context" + "encoding/json" "fmt" - "strings" + "io" - "github.com/bit8bytes/beago/inputs/roles" - "github.com/bit8bytes/beago/llms" - "github.com/bit8bytes/beago/outputs/json" + "github.com/bit8bytes/beago/pipe" "github.com/bit8bytes/beago/tools" ) -// NewReAct creates an agent pre-configured for the ReAct pattern. -// It seeds the ReAct system prompt into storage. -func NewReAct(ctx context.Context, model llm, tls []tools.Tool, storage store) (*Agent, error) { - p := json.NewParser[response]() - t := toolNames(tls) - - msgs := buildReActPrompt(t, p.Instructions()) - if err := storage.Add(ctx, msgs...); err != nil { - return nil, err - } - - return &Agent{ - model: model, - tools: t, - history: storage, - parser: p, - }, nil +type response struct { + Thought string `json:"thought"` + Action string `json:"action"` + ActionInput json.RawMessage `json:"action_input"` + FinalAnswer string `json:"final_answer"` } -func buildReActPrompt(tls map[string]tools.Tool, jsonInstructions string) []llms.Message { - var toolDescriptions strings.Builder - for _, t := range tls { - fmt.Fprintf(&toolDescriptions, "- %s: %s\n", t.Name(), t.Description()) - for _, p := range t.Parameters() { - req := "optional" - if p.Required { - req = "required" +// Instructions injects the ReAct system prompt and tool descriptions into the stream. +func Instructions(ts ...tools.Tool) pipe.Handler { + return pipe.HandlerFunc(func(ctx context.Context, r io.Reader, w io.Writer) error { + fmt.Fprintln(w, "You are a ReAct agent. Solve tasks step by step using the available tools.") + fmt.Fprintln(w, "Do not estimate or predict values. Use only values returned by tools.") + fmt.Fprintln(w, "\nAvailable tools:") + for _, t := range ts { + fmt.Fprintf(w, "\n- %s: %s\n", t.Name, t.Description) + for _, p := range t.Params { + req := "optional" + if p.Required { + req = "required" + } + fmt.Fprintf(w, " - %s (%s): %s\n", p.Name, req, p.Description) } - fmt.Fprintf(&toolDescriptions, " - %s (%s): %s\n", p.Name, req, p.Description) } - } - - return []llms.Message{ - { - Role: roles.System, - Content: fmt.Sprintf(` -You are an helpful agent. Answer questions using the available tools. -Do not estimate or predict values. Use only values returned by tools. - -Available tools: -%s -%s + fmt.Fprintln(w, ` +STRICT OUTPUT RULES — you MUST follow these on every single turn: +- Output ONLY a raw JSON object. No markdown, no code fences, no prose before or after. +- Every response must be valid JSON with exactly these fields: + {"thought": "...", "action": "...", "action_input": {...}, "final_answer": "..."} +- To call a tool: set "action" to the tool name, "action_input" to its params, "final_answer" to "". +- To give the final answer: set "action" to "", "action_input" to {}, "final_answer" to your answer. +- Never output plain text. Never wrap JSON in backticks or markdown code blocks. +- Always read the file first, then write to it. +- NEVER use write_file to fix errors. write_file is ONLY for creating new files. +- NEVER guess or read the file to find errors. ALWAYS run go build first to get the exact line number. +- To fix a compiler error: (1) run go build → get line N, (2) run cat to read the file and see what line N contains, (3) determine the correct content for that line, (4) run: sed -i '' 'Ns/.*/REPLACEMENT/' file.go + Example: go build says main.go:6 error, cat shows line 6 is '"strconv' with missing closing quote → sed -i '' '6s/.*/"strconv"/' main.go + Use .* to replace the whole line — never try to match the broken content. -Respond with a JSON object on each turn with these fields: -- "thought": your reasoning about what to do next -- "action": the exact tool name to call (empty string when giving final answer) -- "action_input": a JSON object whose keys are the tool's parameter names (empty object {} when giving final answer) -- "final_answer": your final answer to the user — MUST be non-empty when you are done; empty string ONLY when calling a tool +Think step by step. Do not hallucinate.`) + _, err := io.Copy(w, r) + return err + }) +} -When you have enough information to answer, set "action" to "" and "action_input" to {} and put a detailed answer based on your observations — MUST be non-empty when done; be thorough and include all relevant findings. +// Done returns a Handler that signals loop termination when the stream contains a non-empty final_answer. +func Done() pipe.Handler { + return pipe.Exit(func(b []byte) bool { + var resp response + json.Unmarshal(b, &resp) + return resp.FinalAnswer != "" + }) +} -Think step by step. Do not hallucinate.`, toolDescriptions.String(), jsonInstructions), - }, - } +// ParseAction validates that the stream contains a well-formed ReAct response and passes it through. +func ParseAction() pipe.Handler { + return pipe.HandlerFunc(func(ctx context.Context, r io.Reader, w io.Writer) error { + var resp response + if err := json.NewDecoder(r).Decode(&resp); err != nil { + fmt.Fprintf(w, "\nObservation: invalid ReAct response: %v\n", err) + return nil + } + if resp.Action == "" && resp.FinalAnswer == "" { + fmt.Fprintf(w, "\nObservation: response must set either action or final_answer\n") + return nil + } + return json.NewEncoder(w).Encode(resp) + }) } diff --git a/embedder/embedder.go b/embedder/embedder.go deleted file mode 100644 index d3762e2..0000000 --- a/embedder/embedder.go +++ /dev/null @@ -1,33 +0,0 @@ -// Package embedder provides a thin wrapper around an LLM's embedding capability. -// Keeping embedding behind its own interface isolates callers from the underlying -// model provider and makes it straightforward to swap or mock the backend in tests. -package embedder - -import ( - "context" -) - -// llm is the subset of an LLM client that this package requires. -// Using a narrow interface instead of a concrete type keeps the package decoupled -// from any specific provider implementation. -type llm interface { - GenerateEmbedding(ctx context.Context, prompt string) ([]float32, error) -} - -// embedder wraps an LLM to expose only the embedding operation. -type embedder struct { - llm llm -} - -// New creates an embedder backed by the given LLM. -func New(llm llm) *embedder { - return &embedder{ - llm: llm, - } -} - -// Embed converts query into a vector representation using the underlying LLM. -// The returned slice can be used for semantic similarity comparisons, retrieval, or storage in a vector store. -func (e *embedder) Embed(ctx context.Context, query string) ([]float32, error) { - return e.llm.GenerateEmbedding(ctx, query) -} diff --git a/embedder/embedder_test.go b/embedder/embedder_test.go deleted file mode 100644 index c991b5c..0000000 --- a/embedder/embedder_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package embedder - -import ( - "context" - "errors" - "testing" -) - -type mockLLM struct { - embedding []float32 - err error -} - -func (m *mockLLM) GenerateEmbedding(_ context.Context, _ string) ([]float32, error) { - return m.embedding, m.err -} - -func TestEmbed(t *testing.T) { - want := []float32{0.1, 0.2, 0.3} - e := New(&mockLLM{embedding: want}) - - got, err := e.Embed(context.Background(), "hello") - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if len(got) != len(want) { - t.Fatalf("got %d values, want %d", len(got), len(want)) - } - for i := range want { - if got[i] != want[i] { - t.Errorf("got[%d] = %f, want %f", i, got[i], want[i]) - } - } -} - -func TestEmbed_Error(t *testing.T) { - e := New(&mockLLM{err: errors.New("llm failure")}) - - _, err := e.Embed(context.Background(), "hello") - if err == nil { - t.Fatal("expected error, got nil") - } -} diff --git a/examples/agents/README.md b/examples/agents/README.md new file mode 100644 index 0000000..df91c59 --- /dev/null +++ b/examples/agents/README.md @@ -0,0 +1,49 @@ +# Pipe Package README + +The `pipe` package is the central orchestration layer within the beago framework, designed for building complex, stateful, and streaming agent pipelines. It allows developers to connect various components (like LLM generation, history tracking, tool execution, and reactive logic) into a cohesive, step-by-step workflow. + +## Purpose + +Instead of writing traditional procedural code, the `pipe` package enables defining a workflow as a sequence of pipes. This structure ensures that the output of one component (the stream) becomes the input for the next, managing complex data flow and state transitions automatically. + +## Key Concepts + +1. **Streaming Execution:** Pipes operate on continuous streams of data, making them efficient for real-time or iterative processes. +2. **State Management:** The package handles the accumulation and passing of history and context across multiple steps (e.g., using `history.Accumulate()`). +3. **Component Integration:** It provides mechanisms to seamlessly integrate external services (LLMs via `llm.Generate()`) and predefined logic components (`react.Instructions()`, `jsonpkg.Extract()`). +4. **Looping and Iteration:** Complex agents can be built using `pipe.Loop()`, allowing the workflow to repeat and refine results until a defined stopping condition is met. + +## Usage Example (Agent Pipeline) + +As demonstrated in the main usage, setting up a pipeline involves defining the context and linking the components: + +```go +// The main entry point for the agent workflow +err := pipe.Execute(ctx, os.Stdin, os.Stdout, + react.Instructions(shell, write), + pipe.Loop(10, + hist.Accumulate(), + llm.Generate(model), + jsonpkg.Extract(), + pipe.Tee(os.Stderr), + react.ParseAction(), + tools.Execute(shell, write), + react.Done(), + ), +) +``` + +**Explanation of the Pipeline:** + +* **`pipe.Execute()`:** Initiates the entire pipeline, managing input (`os.Stdin`) and output (`os.Stdout`). +* **`pipe.Loop()`:** Defines the iterative structure, allowing the agent to run up to 10 times. +* **`hist.Accumulate()`:** Ensures that the full conversational history is maintained and passed to the LLM. +* **`llm.Generate(model)`:** Core intelligence component: generates the next thought/action using the specified LLM. +* **`jsonpkg.Extract()`:** Parses the raw LLM output, specifically looking for structured JSON actions. +* **`react.ParseAction()`:** Interprets the structured action and links it to available tools. +* **`tools.Execute()`:** Executes the action (e.g., running shell commands or writing files). +* **`react.Done()`:** The termination condition, signaling that the process should stop. + +## Conclusion + +The `pipe` package abstracts away the complexities of state machine management and streaming data processing, allowing developers to focus purely on defining the sequence of actions and intelligence required for their agent. \ No newline at end of file diff --git a/examples/agents/hello/main.go b/examples/agents/hello/main.go deleted file mode 100644 index dbdd544..0000000 --- a/examples/agents/hello/main.go +++ /dev/null @@ -1,56 +0,0 @@ -package main - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/bit8bytes/beago/agents" - "github.com/bit8bytes/beago/llms/ollama" - "github.com/bit8bytes/beago/runner" - "github.com/bit8bytes/beago/stores/memory" - "github.com/bit8bytes/beago/tools" -) - -func main() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*120) - defer cancel() - - storage := memory.New() - defer storage.Close() - - model := ollama.New(ollama.Model{ - Model: "gemma3:12b", - Options: ollama.Options{NumCtx: 4096}, - Stream: false, - Format: ollama.JSON, - }) - - tools := []tools.Tool{&tools.HelloWorldTool{}} - - agent, err := agents.NewReAct(ctx, model, tools, storage) - if err != nil { - panic(err) - } - - task := `Use the tool helloWorld with the name Beago as input` - if err := agent.Task(ctx, task); err != nil { - panic(err) - } - - r := runner.New(agent) - - res, err := r.Run(ctx) - if err != nil { - switch { - case errors.Is(err, runner.ErrNoFinalAnswer): - fmt.Println("No final answer found") - default: - panic(err) - } - return - } - - fmt.Println(res) -} diff --git a/examples/agents/main.go b/examples/agents/main.go new file mode 100644 index 0000000..94ecade --- /dev/null +++ b/examples/agents/main.go @@ -0,0 +1,45 @@ +package main + +import ( + "context" + "log" + "os" + "time" + + react "github.com/bit8bytes/beago/agents" + "github.com/bit8bytes/beago/history" + jsonpkg "github.com/bit8bytes/beago/json" + "github.com/bit8bytes/beago/llm" + "github.com/bit8bytes/beago/llm/ollama" + "github.com/bit8bytes/beago/pipe" + "github.com/bit8bytes/beago/tools" +) + +// Usage: echo "Write a README for the pipe package based on its source files" | go run . +func main() { + model := ollama.New("gemma4:e4b", "") + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute) + defer cancel() + + shell := tools.Exec("man", "ls", "grep", "cat", "find", "sed", "go") + write := tools.WriteFile() + + hist := history.New() + + err := pipe.Execute(ctx, os.Stdin, os.Stdout, + react.Instructions(shell, write), + pipe.Loop(20, + hist.Accumulate(), + llm.Generate(model), + jsonpkg.Extract(), + pipe.Tee(os.Stderr), + react.ParseAction(), + tools.Execute(shell, write), + react.Done(), + ), + ) + if err != nil { + log.Fatal(err) + } +} diff --git a/examples/agents/search/main.go b/examples/agents/search/main.go deleted file mode 100644 index c710530..0000000 --- a/examples/agents/search/main.go +++ /dev/null @@ -1,58 +0,0 @@ -package main - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/bit8bytes/beago/agents" - "github.com/bit8bytes/beago/llms/ollama" - "github.com/bit8bytes/beago/runner" - "github.com/bit8bytes/beago/stores/memory" - "github.com/bit8bytes/beago/tools" -) - -func main() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*120) - defer cancel() - - storage := memory.New() - defer storage.Close() - - model := ollama.New(ollama.Model{ - Model: "gemma3:12b", - Options: ollama.Options{NumCtx: 4096}, - Stream: false, - Format: ollama.JSON, - }) - - tools := []tools.Tool{&tools.Search{}} - - agent, err := agents.NewReAct(ctx, model, tools, storage) - if err != nil { - panic(err) - } - - // The llm will use the search tool to fetch the content of the page and extract the arguments if any. - // The search tool is currently just for testing and needs to be remodeled to be production ready. - task := "Fetch https://httpbin.org/get?q=lorem+ipsum and tell me what arguments the response contains." - if err := agent.Task(ctx, task); err != nil { - panic(err) - } - - r := runner.New(agent) - - res, err := r.Run(ctx) - if err != nil { - switch { - case errors.Is(err, runner.ErrNoFinalAnswer): - fmt.Println("No final answer found") - default: - panic(err) - } - return - } - - fmt.Println(res) -} diff --git a/examples/inputs/chats/main.go b/examples/inputs/chats/main.go deleted file mode 100644 index f822e92..0000000 --- a/examples/inputs/chats/main.go +++ /dev/null @@ -1,43 +0,0 @@ -package main - -import ( - "fmt" - - "github.com/bit8bytes/beago/inputs/chats" - "github.com/bit8bytes/beago/inputs/roles" - "github.com/bit8bytes/beago/llms" -) - -func main() { - messages := []llms.Message{ - { - Role: roles.System, - Content: "Translate {{.InputLanguage}} to {{.OutputLanguage}}.", - }, - { - Role: roles.User, - Content: "{{.Text}}", - }, - } - - type chatData struct { - InputLanguage string - OutputLanguage string - Text string - } - - data := chatData{ - InputLanguage: "English", - OutputLanguage: "French", - Text: "I love programming.", - } - - formattedMessages, err := chats.New(messages).Execute(data) - if err != nil { - panic(err) - } - - for _, message := range formattedMessages { - fmt.Printf("[%s] %s\n", message.Role, message.Content) - } -} diff --git a/examples/inputs/prompts/main.go b/examples/inputs/prompts/main.go deleted file mode 100644 index 9cf0c9d..0000000 --- a/examples/inputs/prompts/main.go +++ /dev/null @@ -1,28 +0,0 @@ -package main - -import ( - "fmt" - - "github.com/bit8bytes/beago/inputs/prompts" -) - -func main() { - type productData struct { - Product string - } - - companyNamePrompt := "What is a good name for a company that makes {{.Product}}?" - data := productData{Product: "coloful socks"} - companyNameFormattedPrompt, _ := prompts.New(companyNamePrompt).Execute(data) - fmt.Println("Example 1: " + companyNameFormattedPrompt) - - type buildProductData struct { - Name string - Company string - } - - buildProductPrompt := "{{.Name}} want's to build {{.Company}}." - buildProduct := buildProductData{Name: "Alex", Company: "coloful socks"} - twoVariablesFormattedPrompt, _ := prompts.New(buildProductPrompt).Execute(buildProduct) - fmt.Println("Example 2: " + twoVariablesFormattedPrompt) -} diff --git a/examples/llms/ollama/embeddings/main.go b/examples/llms/ollama/embeddings/main.go deleted file mode 100644 index 0e3288a..0000000 --- a/examples/llms/ollama/embeddings/main.go +++ /dev/null @@ -1,28 +0,0 @@ -package main - -import ( - "context" - "fmt" - - "github.com/bit8bytes/beago/inputs/prompts" - "github.com/bit8bytes/beago/llms/ollama" -) - -func main() { - - companyNamePrompt := prompts.New("What is a good name for a company that makes {{.product}}?") - - data := map[string]any{"product": "coloful socks"} - companyNameFormattedPrompt, _ := companyNamePrompt.Execute(data) - - llama3_8b_model := ollama.Model{ - Model: "llama3:8b", - Options: ollama.Options{NumCtx: 4096}, - Stream: false, - } - - ollamaClient := ollama.New(llama3_8b_model) - embedding, _ := ollamaClient.GenerateEmbedding(context.Background(), companyNameFormattedPrompt) - fmt.Println(len(embedding)) - fmt.Println(embedding[:3]) // First 3 bytes... -} diff --git a/examples/llms/ollama/main.go b/examples/llms/ollama/main.go deleted file mode 100644 index 36baf9f..0000000 --- a/examples/llms/ollama/main.go +++ /dev/null @@ -1,53 +0,0 @@ -package main - -import ( - "context" - "fmt" - - "github.com/bit8bytes/beago/inputs/chats" - "github.com/bit8bytes/beago/inputs/roles" - "github.com/bit8bytes/beago/llms" - "github.com/bit8bytes/beago/llms/ollama" -) - -func main() { - messages := []llms.Message{ - { - Role: roles.System, - Content: "Translate {{.InputLanguage}} to {{.OutputLanguage}}.", - }, - { - Role: roles.System, - Content: "Return only the concrete translation.", - }, - { - Role: roles.User, - Content: "{{.Text}}", - }, - } - - type chatData struct { - InputLanguage string - OutputLanguage string - Text string - } - - data := chatData{ - InputLanguage: "English", - OutputLanguage: "French", - Text: "I love programming.", - } - - formattedMessages, err := chats.New(messages).Execute(data) - if err != nil { - panic(err) - } - - client := ollama.New(ollama.Model{ - Model: "gemma3n:e2b", - Options: ollama.Options{NumCtx: 4096}, - Stream: false, - }) - out, _ := client.Generate(context.Background(), formattedMessages) - fmt.Println(out) -} diff --git a/examples/llms/ollama/stream/main.go b/examples/llms/ollama/stream/main.go deleted file mode 100644 index 63b7d63..0000000 --- a/examples/llms/ollama/stream/main.go +++ /dev/null @@ -1,41 +0,0 @@ -package main - -import ( - "context" - "fmt" - - "github.com/bit8bytes/beago/inputs/chats" - "github.com/bit8bytes/beago/llms" - "github.com/bit8bytes/beago/llms/ollama" -) - -func main() { - // We use a chat prompt from the core/input - prompt := chats.New([]llms.Message{ - {Role: "system", Content: "You are a helpful assistant that translates {{.inputLanguage}} to {{.outputLanguage}}."}, - {Role: "user", Content: "{{.text}}"}, - }) - - // Setup values for variables - data := map[string]string{ - "inputLanguage": "English", - "outputLanguage": "French", - "text": "I love programming.", - } - - // Prepare the chat prompt for the model - messages, _ := prompt.Execute(data) - - // Setup model with the wanted options. - model := ollama.Model{ - Model: "llama3:8b", - Options: ollama.Options{NumCtx: 4096}, - } - - client := ollama.New(model) - client.StreamContent(context.Background(), messages, func(content string, done bool) error { - // Do something with the generated content... - fmt.Print(content) - return nil - }) -} diff --git a/examples/outputs/json/main.go b/examples/outputs/json/main.go deleted file mode 100644 index 41a5c89..0000000 --- a/examples/outputs/json/main.go +++ /dev/null @@ -1,30 +0,0 @@ -package main - -import ( - "fmt" - - "github.com/bit8bytes/beago/outputs/json" -) - -type joke struct { - Setup string `json:"setup"` - Punchline string `json:"punchline"` -} - -func main() { - parser := json.NewParser[joke]() - joke, err := parser.Parse(` - { - "setup": "Why don't scientists trust atoms?", - "punchline": "Because they make up everything!" - }`) - if err != nil { - panic(err) - } - - fmt.Println("Setup: " + joke.Setup) - fmt.Println("Punchline: " + joke.Punchline) - - instructions := parser.Instructions() - fmt.Println("LLM instructions: " + instructions) -} diff --git a/examples/pipe/chain/main.go b/examples/pipe/chain/main.go new file mode 100644 index 0000000..c6b1980 --- /dev/null +++ b/examples/pipe/chain/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "context" + "log" + "os" + + "github.com/bit8bytes/beago/llm" + "github.com/bit8bytes/beago/llm/ollama" + "github.com/bit8bytes/beago/pipe" +) + +// Usage: echo "The quick brown fox jumps over the lazy dog." | go run . +// +// Two LLM calls are chained like Unix pipes: the first translates to French, +// the second summarises the French text into one sentence. +// Each handler reads from the previous handler's output — just like: +// +// echo "..." | translate | summarise +func main() { + model := ollama.New("gemma4:e4b", "") + + ctx := context.Background() + + err := pipe.Execute(ctx, os.Stdin, os.Stdout, + llm.Prompt("Translate the following text to French. Output only the translation."), + llm.Generate(model), + llm.Prompt("Summarise the following French text in two sentences."), + llm.Generate(model), + ) + if err != nil { + log.Fatal(err) + } +} diff --git a/examples/pipe/loop/main.go b/examples/pipe/loop/main.go new file mode 100644 index 0000000..32fd31b --- /dev/null +++ b/examples/pipe/loop/main.go @@ -0,0 +1,33 @@ +package main + +import ( + "bytes" + "context" + "log" + "os" + "time" + + llm "github.com/bit8bytes/beago/llm" + "github.com/bit8bytes/beago/llm/ollama" + "github.com/bit8bytes/beago/pipe" +) + +// Usage: echo "Count down from 3 to 1, one number per line. When done write DONE." | go run . +func main() { + model := ollama.New("gemma4:e4b", "") + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + err := pipe.Execute(ctx, os.Stdin, os.Stdout, + pipe.Loop(10, + llm.Generate(model), + pipe.Exit(func(b []byte) bool { + return bytes.Contains(b, []byte("DONE")) + }), + ), + ) + if err != nil { + log.Fatal(err) + } +} diff --git a/examples/pipe/main.go b/examples/pipe/main.go new file mode 100644 index 0000000..0037b9f --- /dev/null +++ b/examples/pipe/main.go @@ -0,0 +1,25 @@ +package main + +import ( + "context" + "log" + "os" + + llm "github.com/bit8bytes/beago/llm" + "github.com/bit8bytes/beago/llm/ollama" + "github.com/bit8bytes/beago/pipe" +) + +// Usage: echo "What is 2+2?" | go run . +func main() { + model := ollama.New("gemma4:e4b", "") + + ctx := context.Background() + + err := pipe.Execute(ctx, os.Stdin, os.Stdout, + llm.Generate(model), + ) + if err != nil { + log.Fatal(err) + } +} diff --git a/examples/pipe/tee/main.go b/examples/pipe/tee/main.go new file mode 100644 index 0000000..da37562 --- /dev/null +++ b/examples/pipe/tee/main.go @@ -0,0 +1,31 @@ +package main + +import ( + "context" + "log" + "os" + + "github.com/bit8bytes/beago/llm" + "github.com/bit8bytes/beago/llm/ollama" + "github.com/bit8bytes/beago/pipe" +) + +// Usage: echo "What is the capital of France?" | go run . +// +// Tee splits the stream like a Unix tee(1): the LLM response flows to stdout +// while a copy is written to stderr for inspection. +// This is especially useful for debugging streaming handlers, +// as it allows you to see the output without interrupting the flow of the pipeline. +func main() { + model := ollama.New("gemma4:e4b", "") + + ctx := context.Background() + + err := pipe.Execute(ctx, os.Stdin, os.Stdout, + llm.Generate(model), + pipe.Tee(os.Stderr), + ) + if err != nil { + log.Fatal(err) + } +} diff --git a/examples/pipes/json/main.go b/examples/pipes/json/main.go deleted file mode 100644 index a9b342e..0000000 --- a/examples/pipes/json/main.go +++ /dev/null @@ -1,65 +0,0 @@ -package main - -import ( - "context" - "fmt" - - "github.com/bit8bytes/beago/inputs/chats" - "github.com/bit8bytes/beago/inputs/roles" - "github.com/bit8bytes/beago/llms" - "github.com/bit8bytes/beago/llms/ollama" - "github.com/bit8bytes/beago/outputs/json" - "github.com/bit8bytes/beago/pipes" -) - -func main() { - chat := chats.New([]llms.Message{ - { - Role: roles.System, - Content: ` -You are a helpful assistant that translates {{.InputLanguage}} to {{.OutputLanguage}}. -Return only the result. -`, - }, - { - Role: roles.User, - Content: "{{.Text}}", - }, - }) - - // `json:"xxx"` is required for [jsonout] parser - type translation struct { - Text string `json:"text"` - InputLanguage string `json:"inputLanguage"` - OutputLanguage string `json:"outputLanguage"` - } - - data := translation{ - Text: "I love programming.", - InputLanguage: "en", - OutputLanguage: "es", - } - - messages, err := chat.Execute(data) - if err != nil { - panic(err) - } - - model := ollama.Model{ - Model: "gemma3n:e2b", - Format: "json", - Options: ollama.Options{NumCtx: 4096}, - } - - client := ollama.New(model) - - parser := json.NewParser[translation]() - - pipe := pipes.New(messages, client, parser) - // Invoke is going to add the the parser instructions to the prompt. - // The model generates the content and the parser parses the output then. - // The result will be of type [translation]. - result, _ := pipe.Invoke(context.Background()) - fmt.Println("Translate from", result.InputLanguage, "to", result.OutputLanguage) - fmt.Println("Result:", result.Text) -} diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..179aa07 --- /dev/null +++ b/flake.lock @@ -0,0 +1,27 @@ +{ + "nodes": { + "nixpkgs": { + "locked": { + "lastModified": 1776734388, + "narHash": "sha256-vl3dkhlE5gzsItuHoEMVe+DlonsK+0836LIRDnm6MXQ=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "10e7ad5bbcb421fe07e3a4ad53a634b0cd57ffac", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixos-25.11", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "nixpkgs": "nixpkgs" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..5d55e29 --- /dev/null +++ b/flake.nix @@ -0,0 +1,28 @@ +{ + inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-25.11"; + outputs = { + self, + nixpkgs, + }: let + # Production server and Apple Silicon dev machines (aarch64-darwin). + supportedSystems = [ + "x86_64-linux" + "aarch64-darwin" + ]; + forAllSystems = nixpkgs.lib.genAttrs supportedSystems; + in { + # Local development shell with all required tools. + # Use `nix develop` to open the dev shell. + devShells = forAllSystems (system: let + pkgs = nixpkgs.legacyPackages.${system}; + in { + default = pkgs.mkShell { + packages = with pkgs; [ + git + go + go-task + ]; + }; + }); + }; +} diff --git a/history/history.go b/history/history.go new file mode 100644 index 0000000..388a465 --- /dev/null +++ b/history/history.go @@ -0,0 +1,30 @@ +package history + +import ( + "bytes" + "context" + "io" + + "github.com/bit8bytes/beago/pipe" +) + +// History accumulates stream content across loop iterations. +type History struct { + buf bytes.Buffer +} + +func New() *History { + return &History{} +} + +// Accumulate returns a Handler that appends the current input to the history +// and writes the full accumulated context to the next stage. +func (h *History) Accumulate() pipe.Handler { + return pipe.HandlerFunc(func(ctx context.Context, r io.Reader, w io.Writer) error { + if _, err := io.Copy(&h.buf, r); err != nil { + return err + } + _, err := w.Write(h.buf.Bytes()) + return err + }) +} diff --git a/inputs/chats/chats_templates.go b/inputs/chats/chats_templates.go deleted file mode 100644 index 81d6e95..0000000 --- a/inputs/chats/chats_templates.go +++ /dev/null @@ -1,56 +0,0 @@ -// Package chats provides templated chat message handling for LLM interactions. -package chats - -import ( - "bytes" - "text/template" - - "github.com/bit8bytes/beago/llms" -) - -type chats struct { - messages []llms.Message - templates []*template.Template -} - -// New creates a new chats instance by parsing the content of each message as a -// Go text/template. It panics if any message content fails to parse. -func New(messages []llms.Message) *chats { - templates := make([]*template.Template, 0, len(messages)) - for _, message := range messages { - tmpl, err := template.New("prompts").Parse(message.Content) - if err != nil { - panic(err) - } - - templates = append(templates, tmpl) - } - - return &chats{ - messages: messages, - templates: templates, - } -} - -// Execute applies the given data to each template and returns the resulting -// messages with their content replaced by the executed template output. -func (chat *chats) Execute(data any) ([]llms.Message, error) { - for i, template := range chat.templates { - buffer := new(bytes.Buffer) - if err := template.Execute(buffer, data); err != nil { - return nil, err - } - - chat.messages[i] = llms.Message{ - Role: chat.messages[i].Role, - Content: buffer.String(), - } - } - - return chat.messages, nil -} - -// Messages returns the current list of messages. -func (chat *chats) Messages() []llms.Message { - return chat.messages -} diff --git a/inputs/chats/chats_templates_test.go b/inputs/chats/chats_templates_test.go deleted file mode 100644 index e3d33ae..0000000 --- a/inputs/chats/chats_templates_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package chats - -import ( - "testing" - - "github.com/bit8bytes/beago/inputs/roles" - "github.com/bit8bytes/beago/llms" -) - -func TestNewChatPromptTemplate(t *testing.T) { - messages := []llms.Message{ - { - Role: "system", - Content: "Translate {{.inputLanguage}} to {{.outputLanguage}}.", - }, - { - Role: "user", - Content: "{{.text}}", - }, - } - - chatPrompt := New(messages) - - if chatPrompt == nil { - t.Errorf("expected chat prompt to be initialized, got nil") - } - if len(chatPrompt.messages) != 2 { - t.Errorf("expected 2 messages, got %d", len(chatPrompt.messages)) - } -} - -func TestChatPromptTemplateFormatMessages(t *testing.T) { - messages := []llms.Message{ - { - Role: roles.System, - Content: "Translate {{.InputLanguage}} to {{.OutputLanguage}}.", - }, - { - Role: roles.User, - Content: "{{.Text}}", - }, - } - - chatPrompt := New(messages) - - type chatData struct { - InputLanguage string - OutputLanguage string - Text string - } - - data := chatData{ - InputLanguage: "English", - OutputLanguage: "French", - Text: "I love programming.", - } - - formattedMessages, err := chatPrompt.Execute(data) - if err != nil { - t.Fatalf("unexpected error formatting chat messages: %v", err) - } - - expectedMessages := []llms.Message{ - {Role: "system", Content: "Translate English to French."}, - {Role: "user", Content: "I love programming."}, - } - - if len(formattedMessages) != len(expectedMessages) { - t.Fatalf("expected %d formatted messages, got %d", len(expectedMessages), len(formattedMessages)) - } - - for i, msg := range formattedMessages { - if msg.Role != expectedMessages[i].Role { - t.Errorf("expected role %q, got %q", expectedMessages[i].Role, msg.Role) - } - if msg.Content != expectedMessages[i].Content { - t.Errorf("expected content %q, got %q", expectedMessages[i].Content, msg.Content) - } - } -} diff --git a/inputs/prompts/prompts_templates.go b/inputs/prompts/prompts_templates.go deleted file mode 100644 index f4fae10..0000000 --- a/inputs/prompts/prompts_templates.go +++ /dev/null @@ -1,31 +0,0 @@ -// Package prompts provides templated prompt handling for LLM interactions. -package prompts - -import ( - "bytes" - "text/template" -) - -type prompts struct { - template *template.Template -} - -// New creates a new prompts instance by parsing the given string as a -// Go text/template. It panics if the string fails to parse. -func New(data string) *prompts { - tmpl, err := template.New("prompt").Parse(data) - if err != nil { - panic(err) - } - return &prompts{template: tmpl} -} - -// Execute applies the given data to the template and returns the resulting string. -func (pt *prompts) Execute(data any) (string, error) { - var promptBuffer bytes.Buffer - err := pt.template.Execute(&promptBuffer, data) - if err != nil { - return "", err - } - return promptBuffer.String(), nil -} diff --git a/inputs/prompts/prompts_templates_test.go b/inputs/prompts/prompts_templates_test.go deleted file mode 100644 index fb8201d..0000000 --- a/inputs/prompts/prompts_templates_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package prompts - -import ( - "testing" -) - -func TestNewPromptTemplate(t *testing.T) { - validTemplateString := "Hello, {{.name}}!" - - tests := []struct { - name string - templateStr string - }{ - {"Valid Template", validTemplateString}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - pt := New(tt.templateStr) - if pt.template == nil { - t.Errorf("expected template to be initialized, got nil") - } - }) - } -} - -func TestNewPromptTemplateInvalidPanics(t *testing.T) { - invalidTemplateString := "Hello, {{.name}" - - defer func() { - if r := recover(); r == nil { - t.Errorf("expected panic for invalid template, but did not panic") - } - }() - - New(invalidTemplateString) -} - -func TestFormat(t *testing.T) { - tmplStr := "Hello, {{.Name}}!" - pt := New(tmplStr) - - tests := []struct { - name string - data any - want string - shouldFail bool - }{ - {"Valid Data", map[string]string{"Name": "World"}, "Hello, World!", false}, - {"Missing Field", map[string]string{"WrongField": "World"}, "Hello, !", false}, - {"Nil Data", nil, "Hello, !", false}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := pt.Execute(tt.data) - if tt.shouldFail { - if err == nil { - t.Errorf("expected error, got nil") - } - return - } - if err != nil { - t.Errorf("did not expect error, got %v", err) - } - if got != tt.want { - t.Errorf("expected %q, got %q", tt.want, got) - } - }) - } -} diff --git a/inputs/roles/roles.go b/inputs/roles/roles.go deleted file mode 100644 index 1e85e41..0000000 --- a/inputs/roles/roles.go +++ /dev/null @@ -1,21 +0,0 @@ -// Package roles defines the standard message roles used in large language model (LLM) conversations. -package roles - -// Role represents a participant role in an LLM conversation. -// Common roles include System, User, and Assistant, which correspond -// to the standard roles supported by most LLM APIs. -type Role string - -// Common LLM conversation roles. -const ( - // System defines instructions or context that guide the assistant's behavior. - System Role = "system" - // User represents a message from the human participant in the conversation. - User Role = "user" - // Assistant represents a message from the LLM in the conversation. - Assistant Role = "assistant" -) - -func (r Role) String() string { - return string(r) -} diff --git a/json/json.go b/json/json.go new file mode 100644 index 0000000..0e337e3 --- /dev/null +++ b/json/json.go @@ -0,0 +1,47 @@ +// Package json provides a pipe.Handler for extracting JSON from LLM output. +package json + +import ( + "context" + "fmt" + "io" + + "github.com/bit8bytes/beago/pipe" +) + +// Extract strips prose from the stream and emits the first JSON blob. +func Extract() pipe.Handler { + return pipe.HandlerFunc(func(ctx context.Context, r io.Reader, w io.Writer) error { + data, err := io.ReadAll(r) + if err != nil { + return err + } + blob := extractJSON(data) + if blob == nil { + fmt.Fprintf(w, "Generated JSON is malformed, try again.\n") + return nil + } + _, err = w.Write(blob) + return err + }) +} + +func extractJSON(data []byte) []byte { + start := -1 + depth := 0 + for i, b := range data { + switch b { + case '{': + if start == -1 { + start = i + } + depth++ + case '}': + depth-- + if depth == 0 && start != -1 { + return data[start : i+1] + } + } + } + return nil +} diff --git a/llm/llms.go b/llm/llms.go new file mode 100644 index 0000000..1456c14 --- /dev/null +++ b/llm/llms.go @@ -0,0 +1,44 @@ +// Package llms provides pipe.Handler adapters for integrating LLMs into a pipeline. +package llm + +import ( + "context" + "encoding/json" + "io" + "strings" + + "github.com/bit8bytes/beago/pipe" +) + +// Message represents a chat message. +type Message struct { + Role string `json:"role"` + Content string `json:"content"` +} + +type llm interface { + Generate(ctx context.Context, r io.Reader, w io.Writer) error +} + +// Generate wraps an LLM implementation as a pipe.Handler. +func Generate(l llm) pipe.Handler { + return pipe.HandlerFunc(func(ctx context.Context, r io.Reader, w io.Writer) error { + return l.Generate(ctx, r, w) + }) +} + +// Prompt prepends a system message with the given instruction to the pipeline, +// then forwards the remaining input as a user message. +func Prompt(instruction string) pipe.Handler { + return pipe.HandlerFunc(func(ctx context.Context, r io.Reader, w io.Writer) error { + input, err := io.ReadAll(r) + if err != nil { + return err + } + enc := json.NewEncoder(w) + if err := enc.Encode(Message{Role: "system", Content: instruction}); err != nil { + return err + } + return enc.Encode(Message{Role: "user", Content: strings.TrimSpace(string(input))}) + }) +} diff --git a/llm/ollama/client.go b/llm/ollama/client.go new file mode 100644 index 0000000..8e79cf2 --- /dev/null +++ b/llm/ollama/client.go @@ -0,0 +1,100 @@ +package ollama + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + + "github.com/bit8bytes/beago/llm" +) + +// Client represents an Ollama API client designed for streaming pipelines. +type Client struct { + Model string + Endpoint string +} + +// New creates a new Ollama client. +func New(model string, endpoint string) *Client { + if endpoint == "" { + endpoint = "http://localhost:11434/api/chat" + } + return &Client{ + Model: model, + Endpoint: endpoint, + } +} + +// Handle implements the pipe.Handler interface. +// It reads a prompt from r, sends it to Ollama, and streams the text response to w. +func (oc *Client) Generate(ctx context.Context, r io.Reader, w io.Writer) error { + raw, err := io.ReadAll(r) + if err != nil { + return fmt.Errorf("failed to read prompt from pipe: %w", err) + } + + var messages []llm.Message + dec := json.NewDecoder(bytes.NewReader(raw)) + for { + var msg llm.Message + if err := dec.Decode(&msg); err != nil { + break + } + messages = append(messages, msg) + } + if len(messages) == 0 { + messages = []llm.Message{{Role: "user", Content: strings.TrimSpace(string(raw))}} + } + + request := ChatRequest{ + Model: oc.Model, + Messages: messages, + Stream: true, + } + + body, err := json.Marshal(request) + if err != nil { + return fmt.Errorf("failed to marshal request: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", oc.Endpoint, strings.NewReader(string(body))) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("ollama request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("ollama returned error status: %d", resp.StatusCode) + } + + decoder := json.NewDecoder(resp.Body) + for { + var chatResp ChatResponse + if err := decoder.Decode(&chatResp); err != nil { + if err == io.EOF { + break + } + return fmt.Errorf("error decoding stream: %w", err) + } + + if _, err := io.WriteString(w, chatResp.Message.Content); err != nil { + return err + } + + if chatResp.Done { + break + } + } + + return nil +} diff --git a/llm/ollama/types.go b/llm/ollama/types.go new file mode 100644 index 0000000..f6c4009 --- /dev/null +++ b/llm/ollama/types.go @@ -0,0 +1,15 @@ +package ollama + +import "github.com/bit8bytes/beago/llm" + +type ChatRequest struct { + Model string `json:"model"` + Messages []llm.Message `json:"messages"` + Stream bool `json:"stream"` +} + +type ChatResponse struct { + Model string `json:"model"` + Message llm.Message `json:"message"` + Done bool `json:"done"` +} diff --git a/llms/llms.go b/llms/llms.go deleted file mode 100644 index c79033a..0000000 --- a/llms/llms.go +++ /dev/null @@ -1,30 +0,0 @@ -// Package llms defines the core types used to communicate with language model backends. -// It provides a provider-agnostic interface so higher-level packages (agents, pipes, runner) -// can work with any LLM without being coupled to a specific API. -package llms - -import ( - "time" - - "github.com/bit8bytes/beago/inputs/roles" -) - -// Message represents a single turn in a conversation with an LLM. -// Role identifies who produced the content (e.g. system, user, assistant), -// which lets the model understand conversational context and respond appropriately. -// -// Timestamp records when the message was added to the store. -// Hash is a SHA-256 chain digest: SHA256(prev_hash + role + timestamp + content). -// Each store implementation computes these in Add() so the chain is tamper-evident — -// modifying, inserting, or deleting any message breaks every subsequent hash. -type Message struct { - Role roles.Role `json:"role"` - Content string `json:"content"` - Timestamp time.Time `json:"timestamp"` - Hash string `json:"hash"` -} - -// StreamHandler is a callback invoked incrementally as the model generates a response. -// content holds the latest token(s) and done signals that the stream has ended. -// Returning a non-nil error cancels the stream. -type StreamHandler func(content string, done bool) error diff --git a/llms/ollama/ollama.go b/llms/ollama/ollama.go deleted file mode 100644 index c97f057..0000000 --- a/llms/ollama/ollama.go +++ /dev/null @@ -1,220 +0,0 @@ -package ollama - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "net/http" - "strings" - "time" - - "github.com/bit8bytes/beago/llms" -) - -type client struct { - Model Model -} - -func New(model Model) *client { - return &client{Model: model} -} - -func (oc *client) Generate(ctx context.Context, messages []llms.Message) (string, error) { - client := &http.Client{ - Timeout: 240 * time.Second, - } - - var endpoint = oc.Model.Endpoint - if oc.Model.Endpoint == "" { - endpoint = "http://localhost:11434/api/chat" - } - - request := Chat{ - Model: oc.Model.Model, - Messages: messages, - Options: oc.Model.Options, - Stream: oc.Model.Stream, - Format: oc.Model.Format, - KeepAlive: oc.Model.KeepAlive, - } - - requestBody, err := json.Marshal(request) - if err != nil { - return "", errors.New("error marshaling request") - } - - req, err := http.NewRequest("POST", endpoint, bytes.NewReader(requestBody)) - if err != nil { - return "", errors.New("create request failed") - } - req.Header.Set("Content-Type", "application/json") - - resp, err := client.Do(req) - if err != nil { - return "", errors.New("HTTP request failed") - } - defer resp.Body.Close() - - decoder := json.NewDecoder(resp.Body) - var chatResponse ChatResponse - var finalResponse ChatResponse - for { - if err := decoder.Decode(&chatResponse); err != nil { - if errors.Is(err, io.EOF) { - break - } - return "", errors.New("error decoding response") - } - - finalResponse.Message.Content += chatResponse.Message.Content - for _, stopSeq := range oc.Model.Stop { - if strings.Contains(finalResponse.Message.Content, stopSeq) { - finalResponse.Message.Content = strings.Split(finalResponse.Message.Content, stopSeq)[0] - finalResponse.Done = true - return finalResponse.Message.Content, nil - } - } - } - - if chatResponse.Done { - return finalResponse.Message.Content, nil - } - - return finalResponse.Message.Content, nil -} - -func (oc *client) StreamContent(ctx context.Context, messages []llms.Message, handler llms.StreamHandler) error { - endpoint := oc.Model.Endpoint - if endpoint == "" { - endpoint = "http://localhost:11434/api/chat" - } - - request := Chat{ - Model: oc.Model.Model, - Messages: messages, - Options: oc.Model.Options, - Stream: true, - Format: oc.Model.Format, - KeepAlive: oc.Model.KeepAlive, - } - - requestBody, err := json.Marshal(request) - if err != nil { - return fmt.Errorf("error marshaling request: %w", err) - } - - req, err := http.NewRequestWithContext(ctx, "POST", endpoint, strings.NewReader(string(requestBody))) - if err != nil { - return fmt.Errorf("error creating request: %w", err) - } - req.Header.Set("Content-Type", "application/json") - - client := &http.Client{ - Timeout: 240 * time.Second, - } - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("error sending request: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("API returned error status %d: %s", resp.StatusCode, string(body)) - } - - decoder := json.NewDecoder(resp.Body) - var fullContent string - - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - var chatResponse ChatResponse - if err := decoder.Decode(&chatResponse); err != nil { - if err == io.EOF { - return nil - } - return fmt.Errorf("error decoding response: %w", err) - } - - fullContent += chatResponse.Message.Content - - shouldStop := false - for _, stopSeq := range oc.Model.Stop { - if strings.Contains(fullContent, stopSeq) { - parts := strings.Split(fullContent, stopSeq) - chatResponse.Message.Content = parts[0] - shouldStop = true - break - } - } - - if err := handler(chatResponse.Message.Content, chatResponse.Done || shouldStop); err != nil { - return fmt.Errorf("handler error: %w", err) - } - - if chatResponse.Done || shouldStop { - return nil - } - } -} - -func (oc *client) GenerateEmbedding(ctx context.Context, prompt string) ([]float32, error) { - client := &http.Client{ - Timeout: 240 * time.Second, - } - - var endpoint = oc.Model.Endpoint - if oc.Model.Endpoint == "" { - endpoint = "http://localhost:11434/api/embeddings" - } - - request := Prompt{ - Model: oc.Model.Model, - Prompt: prompt, - Options: oc.Model.Options, - Format: oc.Model.Format, - KeepAlive: oc.Model.KeepAlive, - } - - requestBody, err := json.Marshal(request) - if err != nil { - return nil, errors.New("error marshaling request") - } - - req, err := http.NewRequest("POST", endpoint, bytes.NewReader(requestBody)) - if err != nil { - return nil, errors.New("create request failed") - } - req.Header.Set("Content-Type", "application/json") - - resp, err := client.Do(req) - if err != nil { - return nil, errors.New("http request failed") - } - defer resp.Body.Close() - - bodyBytes, err := io.ReadAll(resp.Body) - if err != nil { - return nil, errors.New("error reading response body") - } - - resp.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) - - var embeddingResponse struct { - Embedding []float32 `json:"embedding"` - } - err = json.NewDecoder(resp.Body).Decode(&embeddingResponse) - if err != nil { - return nil, errors.New("error decoding response") - } - - return embeddingResponse.Embedding, nil -} diff --git a/llms/ollama/ollama_types.go b/llms/ollama/ollama_types.go deleted file mode 100644 index e5402c5..0000000 --- a/llms/ollama/ollama_types.go +++ /dev/null @@ -1,54 +0,0 @@ -package ollama - -import ( - "github.com/bit8bytes/beago/llms" -) - -const ( - JSON = "json" -) - -type Model struct { - Model string `json:"model"` - Endpoint string `json:"endpoint"` - Options Options `json:"options"` - Stream bool `json:"stream"` - Format string `json:"format,omitempty"` - KeepAlive int64 `json:"keepalive,omitempty"` - Stop []string `json:"stop"` -} - -type Options struct { - NumCtx int `json:"num_ctx"` - Temperature float64 `json:"temperature"` -} - -type Prompt struct { - Model string `json:"model"` - Prompt string `json:"prompt"` - Options Options - Format string `json:"format,omitempty"` - KeepAlive int64 `json:"keepalive,omitempty"` -} - -type ChatResponse struct { - Model string `json:"model"` - CreatedAt string `json:"created_at"` - Message llms.Message - Done bool `json:"done"` - TotalDuration int64 `json:"total_duration"` - LoadDuration int64 `json:"load_duration"` - PromptEvalCount int `json:"prompt_eval_count"` - PromptEvalDuration int64 `json:"prompt_eval_duration"` - EvalCount int `json:"eval_count"` - EvalDuration int64 `json:"eval_duration"` -} - -type Chat struct { - Model string `json:"model"` - Messages []llms.Message `json:"messages"` - Options Options - Stream bool `json:"stream"` - Format string `json:"format,omitempty"` - KeepAlive int64 `json:"keepalive,omitempty"` -} diff --git a/outputs/json/json.go b/outputs/json/json.go deleted file mode 100644 index 2d9ae57..0000000 --- a/outputs/json/json.go +++ /dev/null @@ -1,52 +0,0 @@ -// Package json provides a generic JSON output parser for LLM responses. -// -// It unmarshals raw JSON strings into typed Go structs and generates -// instruction prompts that guide an LLM to produce valid JSON output. -package json - -import ( - "encoding/json" - "reflect" - "strings" -) - -// parser is a generic JSON parser that deserializes LLM output into a Go type T. -type parser[T any] struct{} - -// NewParser creates a new [parser] for the given type T. -func NewParser[T any]() *parser[T] { - return &parser[T]{} -} - -// Parse deserializes the given JSON string into a value of type T. -// When T is a slice type and the LLM returns a single object, Parse wraps it -// in an array before unmarshaling. -func (p *parser[T]) Parse(output string) (T, error) { - var result T - data := strings.TrimSpace(output) - if reflect.TypeFor[T]().Kind() == reflect.Slice && - strings.HasPrefix(data, "{") && - strings.HasSuffix(data, "}") { - data = "[" + data + "]" - } - err := json.Unmarshal([]byte(data), &result) - return result, err -} - -// Instructions returns a prompt string that instructs an LLM to produce JSON -// matching the zero-value schema of type T. -func (p *parser[T]) Instructions() string { - var zero T - - v := reflect.ValueOf(&zero).Elem() - if v.Kind() == reflect.Slice { - v.Set(reflect.MakeSlice(v.Type(), 1, 1)) - } - - jsonBytes, err := json.Marshal(zero) - if err != nil { - return "" - } - - return "Output the following JSON schema: " + string(jsonBytes) -} diff --git a/outputs/json/json_test.go b/outputs/json/json_test.go deleted file mode 100644 index e614afb..0000000 --- a/outputs/json/json_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package json - -import ( - "testing" -) - -func TestJsonout(t *testing.T) { - type jokeData struct { - Setup string `json:"setup"` - Punchline string `json:"punchline"` - } - - parser := NewParser[jokeData]() - joke, err := parser.Parse(` - { - "setup": "Why don't scientists trust atoms?", - "punchline": "Because they make up everything!" - }`) - if err != nil { - panic(err) - } - - if joke.Setup != "Why don't scientists trust atoms?" { - t.Errorf("expected joke setup to be: %s", "Why don't scientists trust atoms?") - } - - if joke.Punchline != "Because they make up everything!" { - t.Errorf("expetec joke punchline to be: %s", "Because they make up everything!") - } -} - -func TestJsonoutArray(t *testing.T) { - type jokeData struct { - Setup string `json:"setup"` - Punchline string `json:"punchline"` - } - - parser := NewParser[[]jokeData]() - jokes, err := parser.Parse(` - [ - { - "setup": "Why don't scientists trust atoms?", - "punchline": "Because they make up everything!" - }, - { - "setup": "What do you call a fake noodle?", - "punchline": "An impasta!" - } - ]`) - if err != nil { - t.Fatal(err) - } - - if len(jokes) != 2 { - t.Fatalf("expected 2 jokes, got %d", len(jokes)) - } - - if jokes[0].Setup != "Why don't scientists trust atoms?" { - t.Errorf("expected first joke setup to be: %s, got: %s", - "Why don't scientists trust atoms?", jokes[0].Setup) - } - - if jokes[0].Punchline != "Because they make up everything!" { - t.Errorf("expected first joke punchline to be: %s, got: %s", - "Because they make up everything!", jokes[0].Punchline) - } - - if jokes[1].Setup != "What do you call a fake noodle?" { - t.Errorf("expected second joke setup to be: %s, got: %s", - "What do you call a fake noodle?", jokes[1].Setup) - } - - if jokes[1].Punchline != "An impasta!" { - t.Errorf("expected second joke punchline to be: %s, got: %s", - "An impasta!", jokes[1].Punchline) - } -} diff --git a/pipe/handler.go b/pipe/handler.go new file mode 100644 index 0000000..b2d12e7 --- /dev/null +++ b/pipe/handler.go @@ -0,0 +1,30 @@ +// Package pipe provides primitives for building streaming data pipelines. +package pipe + +import ( + "context" + "errors" + "io" +) + +// ErrDone is returned by a Handler to signal the enclosing Loop to stop. +// It is not propagated as an error to the caller. +var ErrDone = errors.New("pipe: done") + +// A Handler responds to a text stream. +// +// Handle should read data from r, transform it, and write the result to w. +// It is the core primitive for building "Unix-style" AI agent pipelines. +type Handler interface { + Handle(ctx context.Context, r io.Reader, w io.Writer) error +} + +// The HandlerFunc type is an adapter to allow the use of ordinary functions +// as stream handlers. If f is a function with the appropriate signature, +// HandlerFunc(f) is a Handler that calls f. +type HandlerFunc func(ctx context.Context, r io.Reader, w io.Writer) error + +// Handle calls f(ctx, r, w). +func (f HandlerFunc) Handle(ctx context.Context, r io.Reader, w io.Writer) error { + return f(ctx, r, w) +} diff --git a/pipe/pipe.go b/pipe/pipe.go new file mode 100644 index 0000000..d83a71e --- /dev/null +++ b/pipe/pipe.go @@ -0,0 +1,49 @@ +package pipe + +import ( + "bytes" + "context" + "errors" + "io" +) + +// Loop runs handlers repeatedly, feeding each iteration's output as the next +// iteration's input. It stops when a handler returns ErrDone or maxIter is +// reached. Only the final output is written to w. +func Loop(maxIter int, handlers ...Handler) Handler { + return HandlerFunc(func(ctx context.Context, r io.Reader, w io.Writer) error { + for range maxIter { + var buf bytes.Buffer + err := Execute(ctx, r, &buf, handlers...) + if errors.Is(err, ErrDone) { + _, copyErr := io.Copy(w, &buf) + return copyErr + } + if err != nil { + return err + } + r = &buf + } + return nil + }) +} + +func Execute(ctx context.Context, r io.Reader, w io.Writer, handlers ...Handler) error { + for i, h := range handlers { + // The last handler will write diretcly to our final destination w. + // All other handlers will write to the [io.Pipe] + if i == len(handlers)-1 { + return h.Handle(ctx, r, w) + } + + pr, pw := io.Pipe() // Create a new pipe for each N-1 handler. + + go func(handler Handler, in io.Reader, pw *io.PipeWriter) error { + err := handler.Handle(ctx, in, pw) + return pw.CloseWithError(err) + }(h, r, pw) + + r = pr // The next reader will read from the previous pipe's reader + } + return nil +} diff --git a/pipe/std.go b/pipe/std.go new file mode 100644 index 0000000..2b2c087 --- /dev/null +++ b/pipe/std.go @@ -0,0 +1,37 @@ +package pipe + +import ( + "context" + "io" +) + +func Tee(debug io.Writer) HandlerFunc { + return HandlerFunc(func(ctx context.Context, r io.Reader, w io.Writer) error { + // MultiWriter creates a single writer for multiple writers. + // Here, it is a Y-splitter that writes to w and debug. + mw := io.MultiWriter(w, debug) + + // Here, we pipe everything into our Y-splitter. + _, err := io.Copy(mw, r) + return err + }) +} + +// Exit returns a Handler that calls f with the full stream content. +// If f returns true, the handler passes the content through and returns ErrDone, +// signalling the enclosing Loop to stop. +func Exit(f func([]byte) bool) Handler { + return HandlerFunc(func(ctx context.Context, r io.Reader, w io.Writer) error { + data, err := io.ReadAll(r) + if err != nil { + return err + } + if _, err := w.Write(data); err != nil { + return err + } + if f(data) { + return ErrDone + } + return nil + }) +} diff --git a/pipes/pipes.go b/pipes/pipes.go deleted file mode 100644 index b0474b1..0000000 --- a/pipes/pipes.go +++ /dev/null @@ -1,67 +0,0 @@ -// Package pipes provides a generic pipeline for generating structured output -// from a language model. -package pipes - -import ( - "context" - "errors" - - "github.com/bit8bytes/beago/llms" -) - -type parser[T any] interface { - Parse(text string) (T, error) - // Instructions returns formatting instructions to append to the prompt. - Instructions() string -} - -type llm interface { - Generate(ctx context.Context, messages []llms.Message) (string, error) -} - -// Pipe is a generic pipeline that sends messages to a language model and parses -// the response into a structured type T. -type Pipe[T any] struct { - messages []llms.Message - model llm - parser parser[T] -} - -// New creates a new Pipe with the given messages, model, and parser. -func New[T any](messages []llms.Message, model llm, parser parser[T]) *Pipe[T] { - return &Pipe[T]{ - messages: messages, - model: model, - parser: parser, - } -} - -// Invoke executes the pipeline by appending parser instructions to the first -// message, generating a response from the model, and parsing the result into -// a value of type T. -func (pipe *Pipe[T]) Invoke(ctx context.Context) (*T, error) { - if len(pipe.messages) == 0 { - return nil, errors.New("pipe has no messages") - } - - instructions := pipe.parser.Instructions() - msgs := pipe.messages - if instructions != "" { - // Copy first message so instructions aren't appended on repeated Invoke calls. - first := pipe.messages[0] - first.Content += " " + instructions - msgs = append([]llms.Message{first}, pipe.messages[1:]...) - } - - output, err := pipe.model.Generate(ctx, msgs) - if err != nil { - return nil, err - } - - parsed, err := pipe.parser.Parse(output) - if err != nil { - return nil, err - } - - return &parsed, nil -} diff --git a/pipes/pipes_test.go b/pipes/pipes_test.go deleted file mode 100644 index 9a98192..0000000 --- a/pipes/pipes_test.go +++ /dev/null @@ -1,121 +0,0 @@ -package pipes - -import ( - "context" - "errors" - "testing" - - "github.com/bit8bytes/beago/llms" -) - -type mockLLM struct { - result string - err error -} - -func (m *mockLLM) Generate(_ context.Context, _ []llms.Message) (string, error) { - if m.err != nil { - return "", m.err - } - return m.result, nil -} - -type mockParser struct { - instructions string - result string - err error -} - -func (m *mockParser) Parse(_ string) (string, error) { - if m.err != nil { - return "", m.err - } - return m.result, nil -} - -func (m *mockParser) Instructions() string { - return m.instructions -} - -func TestInvoke(t *testing.T) { - t.Run("returns parsed result", func(t *testing.T) { - pipe := New( - []llms.Message{{Content: "hello"}}, - &mockLLM{result: "raw"}, - &mockParser{result: "parsed"}, - ) - got, err := pipe.Invoke(context.Background()) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if *got != "parsed" { - t.Errorf("got %q, want %q", *got, "parsed") - } - }) - - t.Run("errors when no messages", func(t *testing.T) { - pipe := New([]llms.Message{}, &mockLLM{}, &mockParser{}) - _, err := pipe.Invoke(context.Background()) - if err == nil { - t.Fatal("expected error, got nil") - } - }) - - t.Run("errors when llm fails", func(t *testing.T) { - pipe := New( - []llms.Message{{Content: "hello"}}, - &mockLLM{err: errors.New("llm error")}, - &mockParser{}, - ) - _, err := pipe.Invoke(context.Background()) - if err == nil { - t.Fatal("expected error, got nil") - } - }) - - t.Run("errors when parser fails", func(t *testing.T) { - pipe := New( - []llms.Message{{Content: "hello"}}, - &mockLLM{result: "raw"}, - &mockParser{err: errors.New("parse error")}, - ) - _, err := pipe.Invoke(context.Background()) - if err == nil { - t.Fatal("expected error, got nil") - } - }) - - t.Run("instructions not duplicated on repeated Invoke", func(t *testing.T) { - llm := &mockLLM{result: "raw"} - var capturedContent string - // Use a custom llm to capture the message content - capturingLLM := &captureLLM{result: "raw", captured: &capturedContent} - pipe := New( - []llms.Message{{Content: "hello"}}, - capturingLLM, - &mockParser{instructions: "format as JSON", result: "parsed"}, - ) - - pipe.Invoke(context.Background()) - first := capturedContent - pipe.Invoke(context.Background()) - second := capturedContent - - if first != second { - t.Errorf("instructions duplicated on second call:\n first: %q\nsecond: %q", first, second) - } - _ = llm - }) -} - -type captureLLM struct { - result string - captured *string -} - -func (c *captureLLM) Generate(_ context.Context, msgs []llms.Message) (string, error) { - if len(msgs) > 0 { - *c.captured = msgs[0].Content - } - return c.result, nil -} diff --git a/runner/runner.go b/runner/runner.go deleted file mode 100644 index 25e0763..0000000 --- a/runner/runner.go +++ /dev/null @@ -1,66 +0,0 @@ -// Package runner separates the loop control logic from agent logic so that -// different agent implementations can be driven by the same execution strategy -// without duplicating iteration, cancellation, or error-handling concerns. -package runner - -import ( - "context" - "errors" - "fmt" -) - -var ( - // ErrNoFinalAnswer is returned by Run when the context is cancelled before - // the agent produces a final answer. Callers can use errors.Is to - // distinguish this from real planning or action failures, similar to how - // http.ErrServerClosed signals a clean shutdown versus an unexpected error. - ErrNoFinalAnswer = errors.New("no final answer") -) - -// Agent is the contract the runner relies on to stay decoupled from any specific -// LLM backend or tool set. Keeping Plan, Act, and Answer as separate steps lets -// the runner interleave cancellation checks between them and makes each stage -// independently testable. -type Agent interface { - Act(ctx context.Context) error - Plan(ctx context.Context) error - Task(ctx context.Context, prompt string) error - Answer(ctx context.Context) (string, bool) -} - -// Runner owns the loop so that agents don't need to implement their own -// iteration or cancellation logic. -type Runner struct { - agent Agent -} - -// New wires an agent to a runner without immediately starting work, allowing -// callers to configure context (deadlines, cancellation) before committing to a run. -func New(agent Agent) *Runner { - return &Runner{agent: agent} -} - -// Run drives the Plan→Answer?→Act cycle, checking context cancellation on every -// iteration so long-running tool calls don't silently outlive a deadline. -func (r *Runner) Run(ctx context.Context) (string, error) { -RUN: - for { - select { - case <-ctx.Done(): - break RUN - default: - if err := r.agent.Plan(ctx); err != nil { - return "", fmt.Errorf("planning failed: %w", err) - } - - if answer, ok := r.agent.Answer(ctx); ok { - return answer, nil - } - - if err := r.agent.Act(ctx); err != nil { - return "", fmt.Errorf("action failed: %w", err) - } - } - } - return "", fmt.Errorf("run cancelled: %w", ErrNoFinalAnswer) -} diff --git a/runner/runner_test.go b/runner/runner_test.go deleted file mode 100644 index dc63991..0000000 --- a/runner/runner_test.go +++ /dev/null @@ -1,92 +0,0 @@ -package runner - -import ( - "context" - "errors" - "testing" -) - -type mockAgent struct { - planErr error - actErr error - answer string - planCalls int - actCalls int - answerAfter int // how many Plan calls before Answer returns true -} - -func (m *mockAgent) Plan(_ context.Context) error { - m.planCalls++ - return m.planErr -} - -func (m *mockAgent) Act(_ context.Context) error { - m.actCalls++ - return m.actErr -} - -func (m *mockAgent) Task(_ context.Context, _ string) error { return nil } - -func (m *mockAgent) Answer(_ context.Context) (string, bool) { - if m.answer != "" && m.planCalls >= m.answerAfter { - return m.answer, true - } - return "", false -} - -func TestRun_ReturnsAnswerImmediately(t *testing.T) { - agent := &mockAgent{answer: "done", answerAfter: 1} - got, err := New(agent).Run(context.Background()) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if got != "done" { - t.Errorf("got %q, want %q", got, "done") - } - if agent.actCalls != 0 { - t.Errorf("Act should not be called when Answer is ready, got %d calls", agent.actCalls) - } -} - -func TestRun_ActsUntilAnswerReady(t *testing.T) { - agent := &mockAgent{answer: "ready", answerAfter: 3} - got, err := New(agent).Run(context.Background()) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if got != "ready" { - t.Errorf("got %q, want %q", got, "ready") - } - if agent.planCalls != 3 { - t.Errorf("expected 3 Plan calls, got %d", agent.planCalls) - } - if agent.actCalls != 2 { - t.Errorf("expected 2 Act calls, got %d", agent.actCalls) - } -} - -func TestRun_PlanError(t *testing.T) { - planErr := errors.New("plan boom") - _, err := New(&mockAgent{planErr: planErr}).Run(context.Background()) - if !errors.Is(err, planErr) { - t.Errorf("expected planErr in chain, got: %v", err) - } -} - -func TestRun_ActError(t *testing.T) { - actErr := errors.New("act boom") - _, err := New(&mockAgent{actErr: actErr, answerAfter: 999}).Run(context.Background()) - if !errors.Is(err, actErr) { - t.Errorf("expected actErr in chain, got: %v", err) - } -} - -func TestRun_ContextCancelled(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - _, err := New(&mockAgent{answerAfter: 999}).Run(ctx) - if !errors.Is(err, ErrNoFinalAnswer) { - t.Errorf("expected ErrNoFinalAnswer, got: %v", err) - } -} diff --git a/stores/chain.go b/stores/chain.go deleted file mode 100644 index 8d958c1..0000000 --- a/stores/chain.go +++ /dev/null @@ -1,44 +0,0 @@ -package stores - -import ( - "crypto/sha256" - "encoding/hex" - "time" - - "github.com/bit8bytes/beago/llms" -) - -// Stamp sets Timestamp and Hash on msg, chaining from prevHash. -// Hash = SHA256(prevHash + role + timestamp + content). -// Call this inside Add() before persisting, passing the hash of the last -// stored message as prevHash (empty string for the first message). -func Stamp(msg *llms.Message, prevHash string) { - msg.Timestamp = time.Now().UTC() - - h := sha256.New() - h.Write([]byte(prevHash)) - h.Write([]byte(msg.Role)) - h.Write([]byte(msg.Timestamp.Format(time.RFC3339Nano))) - h.Write([]byte(msg.Content)) - msg.Hash = hex.EncodeToString(h.Sum(nil)) -} - -// Verify replays the hash chain over msgs and returns the index of the first -// message whose hash does not match, or -1 if the chain is intact. -// Pass the hash that preceded the first message as prevHash -// (empty string if msgs starts from the beginning of the store). -func Verify(msgs []llms.Message, prevHash string) int { - for i, msg := range msgs { - h := sha256.New() - h.Write([]byte(prevHash)) - h.Write([]byte(msg.Role)) - h.Write([]byte(msg.Timestamp.Format(time.RFC3339Nano))) - h.Write([]byte(msg.Content)) - want := hex.EncodeToString(h.Sum(nil)) - if msg.Hash != want { - return i - } - prevHash = msg.Hash - } - return -1 -} diff --git a/stores/memory/memory.go b/stores/memory/memory.go deleted file mode 100644 index 7d28ddc..0000000 --- a/stores/memory/memory.go +++ /dev/null @@ -1,58 +0,0 @@ -// Package memory provides an in-memory Store for use in tests and local -// development. It is not intended for production — all messages are lost when -// the process exits and there is no size bound on the message history. -package memory - -import ( - "context" - "sync" - - "github.com/bit8bytes/beago/llms" - "github.com/bit8bytes/beago/stores" -) - -type store struct { - mu sync.Mutex - messages []llms.Message - lastHash string -} - -func New() stores.Store { - return &store{} -} - -func (s *store) Add(_ context.Context, msgs ...llms.Message) error { - s.mu.Lock() - defer s.mu.Unlock() - - for i := range msgs { - stores.Stamp(&msgs[i], s.lastHash) - s.lastHash = msgs[i].Hash - s.messages = append(s.messages, msgs[i]) - } - return nil -} - -func (s *store) Clear(_ context.Context) error { - s.mu.Lock() - defer s.mu.Unlock() - - // Retain the underlying array to avoid an allocation on the next Add. - s.messages = s.messages[:0] - s.lastHash = "" - return nil -} - -func (s *store) List(_ context.Context) ([]llms.Message, error) { - s.mu.Lock() - defer s.mu.Unlock() - - // Copy so the caller cannot mutate the store's internal slice after the - // lock is released. - out := make([]llms.Message, len(s.messages)) - copy(out, s.messages) - - return out, nil -} - -func (s *store) Close() error { return nil } diff --git a/stores/memory/memory_test.go b/stores/memory/memory_test.go deleted file mode 100644 index 4347c7d..0000000 --- a/stores/memory/memory_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package memory - -import ( - "context" - "sync" - "testing" - - "github.com/bit8bytes/beago/inputs/roles" - "github.com/bit8bytes/beago/llms" -) - -var ctx = context.Background() - -func msg(role roles.Role, content string) llms.Message { - return llms.Message{Role: role, Content: content} -} - -func TestAddAndList(t *testing.T) { - s := New() - msgs := []llms.Message{ - msg(roles.User, "hello"), - msg(roles.Assistant, "world"), - } - if err := s.Add(ctx, msgs...); err != nil { - t.Fatal(err) - } - got, err := s.List(ctx) - if err != nil { - t.Fatal(err) - } - if len(got) != 2 || got[0] != msgs[0] || got[1] != msgs[1] { - t.Fatalf("unexpected messages: %v", got) - } -} - -func TestListReturnsCopy(t *testing.T) { - s := New() - _ = s.Add(ctx, msg(roles.User, "original")) - got, _ := s.List(ctx) - got[0].Content = "mutated" - - got2, _ := s.List(ctx) - if got2[0].Content != "original" { - t.Fatal("List returned a reference to internal slice") - } -} - -func TestListEmptyStore(t *testing.T) { - s := New() - got, err := s.List(ctx) - if err != nil || len(got) != 0 { - t.Fatalf("expected empty list, got %v %v", got, err) - } -} - -func TestClear(t *testing.T) { - s := New() - _ = s.Add(ctx, msg(roles.User, "hello")) - if err := s.Clear(ctx); err != nil { - t.Fatal(err) - } - got, _ := s.List(ctx) - if len(got) != 0 { - t.Fatalf("expected empty after Clear, got %v", got) - } -} - -func TestClearThenAdd(t *testing.T) { - s := New() - _ = s.Add(ctx, msg(roles.User, "first")) - _ = s.Clear(ctx) - _ = s.Add(ctx, msg(roles.User, "second")) - got, _ := s.List(ctx) - if len(got) != 1 || got[0].Content != "second" { - t.Fatalf("unexpected messages after Clear+Add: %v", got) - } -} - -func TestConcurrentAddList(t *testing.T) { - s := New() - var wg sync.WaitGroup - for i := 0; i < 50; i++ { - wg.Add(2) - go func() { defer wg.Done(); _ = s.Add(ctx, msg(roles.User, "x")) }() - go func() { defer wg.Done(); _, _ = s.List(ctx) }() - } - wg.Wait() -} diff --git a/stores/stores.go b/stores/stores.go deleted file mode 100644 index 1e6ff66..0000000 --- a/stores/stores.go +++ /dev/null @@ -1,62 +0,0 @@ -// Package stores defines the Store interface for persisting conversation -// history between agent turns. -// -// LLMs are stateless — they have no memory of prior interactions unless the -// full message history is included in every request. A Store gives the agent -// that memory by accumulating messages as the conversation progresses and -// replaying them on each LLM call. -// -// The package ships with an in-memory implementation (see package -// stores/memory) suitable for testing and development. Production use cases -// that require durability across restarts should use an external -// implementation backed by a database. -package stores - -import ( - "context" - - "github.com/bit8bytes/beago/llms" -) - -// Store is the interface that wraps the basic message persistence methods. -// -// Implementations must be safe for concurrent use by multiple goroutines. -// -// # Tamper-evident chain -// -// Every implementation must call [Stamp] on each message inside Add, passing -// the hash of the last stored message as prevHash (empty string for the first -// message). Stamp sets Message.Timestamp and computes Message.Hash as: -// -// SHA256(prevHash + role + timestamp + content) -// -// This chains every message to its predecessor. Any modification, insertion, -// or deletion of a message breaks every subsequent hash, making the history -// tamper-evident. Callers can verify integrity by replaying the chain with -// [Verify]. -// -// # Minimal implementation checklist -// -// - Add: call Stamp on each message before persisting; track lastHash -// - List: return messages in insertion order; never mutate the stored slice -// - Clear: reset lastHash to "" alongside clearing messages -// - Close: release database connections or file handles; no-op is valid -type Store interface { - // Add appends one or more messages to the store. The agent calls this after - // every LLM turn so the full conversation history is available on the next - // Plan call. Implementations must stamp each message via [Stamp] before - // persisting to maintain the hash chain. - Add(ctx context.Context, msgs ...llms.Message) error - - // List returns all messages in insertion order. The agent passes the full - // history to the LLM on every turn so it has context from prior steps. - List(ctx context.Context) ([]llms.Message, error) - - // Clear removes all messages and resets the hash chain, allowing the store - // to be reused across independent agent runs. - Clear(ctx context.Context) error - - // Close releases any resources held by the store (e.g. database connections). - // Call this when the agent is done to avoid resource leaks. - Close() error -} diff --git a/tools/dispatch.go b/tools/dispatch.go new file mode 100644 index 0000000..9e4304c --- /dev/null +++ b/tools/dispatch.go @@ -0,0 +1,78 @@ +package tools + +import ( + "context" + "encoding/json" + "fmt" + "io" + "strings" + + "github.com/bit8bytes/beago/pipe" +) + +type action struct { + Action string `json:"action"` + ActionInput map[string]json.RawMessage `json:"action_input"` + FinalAnswer string `json:"final_answer"` +} + +func coerceArgs(input map[string]json.RawMessage) map[string]string { + args := make(map[string]string, len(input)) + for k, v := range input { + var s string + if err := json.Unmarshal(v, &s); err == nil { + args[k] = s + continue + } + var ss []string + if err := json.Unmarshal(v, &ss); err == nil { + args[k] = strings.Join(ss, " ") + continue + } + args[k] = strings.Trim(string(v), `"[] `) + } + return args +} + +// Execute reads an action JSON blob, runs the matched tool, and writes the observation. +// A final_answer passes through unchanged for pipe.Exit to catch. +func Execute(tools ...Tool) pipe.Handler { + registry := make(map[string]Tool, len(tools)) + for _, t := range tools { + registry[t.Name] = t + } + return pipe.HandlerFunc(func(ctx context.Context, r io.Reader, w io.Writer) error { + data, err := io.ReadAll(r) + if err != nil { + return err + } + var a action + if err := json.Unmarshal(data, &a); err != nil { + fmt.Fprintf(w, "\nObservation: invalid action JSON: %v\n", err) + return nil + } + if a.FinalAnswer != "" { + _, err = w.Write(data) + return err + } + if _, err := w.Write(data); err != nil { + return err + } + t, ok := registry[a.Action] + if !ok { + fmt.Fprintf(w, "\nObservation: unknown tool %q\n", a.Action) + return nil + } + result, err := t.Run(ctx, coerceArgs(a.ActionInput)) + if err != nil { + if result != "" { + fmt.Fprintf(w, "\nObservation: %s\nerror: %v\n", result, err) + } else { + fmt.Fprintf(w, "\nObservation: %v\n", err) + } + return nil + } + fmt.Fprintf(w, "\nObservation: %s\n", result) + return nil + }) +} diff --git a/tools/exec.go b/tools/exec.go new file mode 100644 index 0000000..5d60bca --- /dev/null +++ b/tools/exec.go @@ -0,0 +1,34 @@ +package tools + +import ( + "context" + "fmt" + "os/exec" + "strings" +) + +// Exec creates a Tool that runs shell commands restricted to the given allowlist. +func Exec(allowed ...string) Tool { + set := make(map[string]bool, len(allowed)) + for _, cmd := range allowed { + set[cmd] = true + } + return Tool{ + Name: "shell", + Description: "Run a shell command", + Params: []Param{ + {Name: "cmd", Description: "command to run, e.g. ls -la /tmp", Required: true}, + }, + Run: func(ctx context.Context, args map[string]string) (string, error) { + parts := strings.Fields(args["cmd"]) + if len(parts) == 0 { + return "", nil + } + if len(set) > 0 && !set[parts[0]] { + return "", fmt.Errorf("command %q is not allowed", parts[0]) + } + out, err := exec.CommandContext(ctx, parts[0], parts[1:]...).CombinedOutput() + return string(out), err + }, + } +} diff --git a/tools/helloworld.go b/tools/helloworld.go deleted file mode 100644 index 718dc88..0000000 --- a/tools/helloworld.go +++ /dev/null @@ -1,37 +0,0 @@ -package tools - -import ( - "context" - "encoding/json" - "fmt" -) - -type HelloWorldTool struct{} - -func (t *HelloWorldTool) Name() string { - return "helloWorld" -} - -func (t *HelloWorldTool) Description() string { - return "Returns a hello greeting for the given name." -} - -func (t *HelloWorldTool) Parameters() []Parameter { - return []Parameter{ - { - Name: "name", - Description: "The name to greet", - Required: true, - }, - } -} - -func (t *HelloWorldTool) Execute(_ context.Context, params json.RawMessage) (string, error) { - var input struct { - Name string `json:"name"` - } - if err := json.Unmarshal(params, &input); err != nil { - return "", fmt.Errorf("helloWorld: invalid params: %w", err) - } - return "Hello, " + input.Name + "!", nil -} diff --git a/tools/search.go b/tools/search.go deleted file mode 100644 index 2cf8f54..0000000 --- a/tools/search.go +++ /dev/null @@ -1,78 +0,0 @@ -package tools - -import ( - "context" - "encoding/json" - "fmt" - "html" - "io" - "net/http" - "regexp" - "strings" -) - -var ( - scriptRx = regexp.MustCompile(`(?is)]*>.*?`) - styleRx = regexp.MustCompile(`(?is)]*>.*?`) - tagsRx = regexp.MustCompile(`<[^>]+>`) - spaceRx = regexp.MustCompile(`\s+`) -) - -// Search is a tool for performing simple web searches. -// Its currently not production ready and is only used for testing. -type Search struct{} - -func (s *Search) Name() string { - return "search" -} - -func (s *Search) Description() string { - return "A tool for performing web searches. Use this tool to crawl and extract information from web pages." -} - -func (s *Search) Parameters() []Parameter { - return []Parameter{ - { - Name: "url", - Description: "URL to search. This should be a valid URL string.", - Required: true, - }, - } -} - -func (s *Search) Execute(ctx context.Context, params json.RawMessage) (string, error) { - var input struct { - URL string `json:"url"` - } - if err := json.Unmarshal(params, &input); err != nil { - return "", fmt.Errorf("search invalid params: %w", err) - } - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, input.URL, nil) - if err != nil { - return "", fmt.Errorf("failed to create request: %w", err) - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return "", fmt.Errorf("failed with error: %w", err) - } - defer resp.Body.Close() - - b, err := io.ReadAll(resp.Body) - if err != nil { - return "", fmt.Errorf("failed to read response body: %w", err) - } - - // Pragmatically extract text from HTML responses. - // This is not a robust solution and requires more work to be production ready. - text := string(b) - text = scriptRx.ReplaceAllString(text, " ") - text = styleRx.ReplaceAllString(text, " ") - text = tagsRx.ReplaceAllString(text, " ") - text = html.UnescapeString(text) - text = spaceRx.ReplaceAllString(text, " ") - text = strings.TrimSpace(text) - - return text, nil -} diff --git a/tools/shell.go b/tools/shell.go new file mode 100644 index 0000000..81b7330 --- /dev/null +++ b/tools/shell.go @@ -0,0 +1,28 @@ +package tools + +import ( + "context" + "os/exec" + "strings" +) + +// Shell creates a Tool that runs a system binary. +// Args are built from params in definition order, splitting each value on spaces. +// e.g. params [{flags}, {path}] with args {"flags":"-la","path":"/tmp"} runs: name -la /tmp +func Shell(name, description string, params []Param) Tool { + return Tool{ + Name: name, + Description: description, + Params: params, + Run: func(ctx context.Context, args map[string]string) (string, error) { + var cmdArgs []string + for _, p := range params { + if v := args[p.Name]; v != "" { + cmdArgs = append(cmdArgs, strings.Fields(v)...) + } + } + out, err := exec.CommandContext(ctx, name, cmdArgs...).CombinedOutput() + return string(out), err + }, + } +} diff --git a/tools/tools.go b/tools/tools.go index 9f99641..70ae5b1 100644 --- a/tools/tools.go +++ b/tools/tools.go @@ -1,24 +1,18 @@ -// Package tools defines the Tool interface and ships built-in tool -// implementations that agents can use to interact with the outside world. package tools -import ( - "context" - "encoding/json" -) +import "context" -// Parameter describes a single input field a tool accepts. -type Parameter struct { +// Param describes a single input a tool accepts. +type Param struct { Name string Description string Required bool } -// Tool represents an action the agent can perform. -// Each tool must provide a name, description, parameter schema, and execution logic. -type Tool interface { - Name() string - Description() string - Parameters() []Parameter - Execute(ctx context.Context, params json.RawMessage) (string, error) +// Tool defines an action the agent can perform. +type Tool struct { + Name string + Description string + Params []Param + Run func(ctx context.Context, args map[string]string) (string, error) } diff --git a/tools/write.go b/tools/write.go new file mode 100644 index 0000000..d78eeb5 --- /dev/null +++ b/tools/write.go @@ -0,0 +1,24 @@ +package tools + +import ( + "context" + "os" +) + +// WriteFile creates a Tool that writes content to a file. +func WriteFile() Tool { + return Tool{ + Name: "write_file", + Description: "Write text content to a file", + Params: []Param{ + {Name: "path", Description: "file path to write to", Required: true}, + {Name: "content", Description: "content to write", Required: true}, + }, + Run: func(_ context.Context, args map[string]string) (string, error) { + if err := os.WriteFile(args["path"], []byte(args["content"]), 0644); err != nil { + return "", err + } + return "written to " + args["path"], nil + }, + } +}