Ryanhub - file viewer
filename: agent-sandbox/sandbox.py
branch: main
back to repo
# sandbox.py

import json
import re
import subprocess
import sys
from datetime import datetime
from pathlib import Path

import requests

ROOT = Path(__file__).resolve().parent
WORKSPACE = ROOT / "sandbox"
LOGS_DIR = ROOT / "logs"
CONFIG_PATH = ROOT / "config.json"
PROMPTS_DIR = ROOT / "prompts"
MODEL_TIMEOUT = 150 # seconds
BASH_TIMEOUT = 60 # seconds

def load_config():
    with open(CONFIG_PATH, encoding="utf-8") as f:
        return json.load(f)


def setup_dirs(config):
    WORKSPACE.mkdir(parents=True, exist_ok=True)
    LOGS_DIR.mkdir(parents=True, exist_ok=True)
    return WORKSPACE, LOGS_DIR


def load_prompt(name: str, **kwargs) -> str:
    path = PROMPTS_DIR / f"{name}.md"
    text = path.read_text(encoding="utf-8").strip()
    if kwargs:
        text = text.format(**kwargs)
    return text


def build_system_prompt(config):
    return load_prompt("system", nickname=config["model"]["nickname"])


def get_notes_summary(workspace):
    notes_path = workspace / "notes.txt"
    if not notes_path.exists():
        return ""
    content = notes_path.read_text(encoding="utf-8", errors="replace").strip()
    if not content:
        return ""
    return "[your notes]:\n" + content


def parse_response(text):
    result = {"thinking": "", "notes": "", "bash": ""}

    # <thinking>...</thinking>
    m = re.search(r"<thinking>(.*?)</thinking>", text, re.DOTALL | re.IGNORECASE)
    if m:
        result["thinking"] = m.group(1).strip()

    # <notes>...</notes> OR <note>...</note>
    m = re.search(r"<notes>(.*?)</notes>|<note>(.*?)</note>", text, re.DOTALL | re.IGNORECASE)
    if m:
        result["notes"] = m.group(1).strip() or m.group(2).strip()

    # ```bash ... ```
    m = re.search(r"```(?:bash)?\s*\n(.*?)```", text, re.DOTALL)
    if m:
        result["bash"] = m.group(1).strip()

    return result


def looks_like_markdown_instead_of_bash(cmd: str) -> bool:
    """Reject content that is clearly markdown/text, not a shell command."""
    if not cmd:
        return True
    first_line = cmd.split("\n")[0].strip()
    # Markdown patterns - if first line looks like markdown, reject
    markdown_starts = ("#", "###", "##", "- ", "* ", "|", "```", "**", "[", "✅", "📊", "🎨", "📋")
    return first_line.startswith(markdown_starts)


def append_notes(workspace, notes_text):
    if not notes_text:
        return
    path = workspace / "notes.txt"
    with open(path, "a", encoding="utf-8") as f:
        f.write(notes_text.strip() + "\n\n")


def is_command_safe(cmd: str, workspace: Path) -> tuple[bool, str]:
    cmd = cmd.strip()
    workspace_str = str(workspace.resolve())
    # block path traversal
    if ".." in cmd:
        return False, "command not allowed (path traversal)"
    # block absolute paths that escape workspace
    if cmd.startswith("/") or cmd.startswith("~") and not cmd.startswith(workspace_str):
        return False, "command not allowed (using root path)"
    return True, ""


def run_bash(workspace, cmd):
    safe, err = is_command_safe(cmd, workspace)
    if not safe:
        return "", err, -1
    try:
        r = subprocess.run(
            ["bash", "-c", cmd],
            cwd=str(workspace),
            capture_output=True,
            text=True,
            timeout=BASH_TIMEOUT,
        )
        return r.stdout, r.stderr, r.returncode
    except subprocess.TimeoutExpired:
        return "", f"command timed out ({BASH_TIMEOUT}s)", -1
    except Exception as e:
        return "", str(e), -1


def call_model(url, model_name, messages, temperature):
    payload = {
        "model": model_name,
        "messages": messages,
        "stream": False,
        "options": {"temperature": temperature},
    }
    try:
        r = requests.post(url, json=payload, timeout=MODEL_TIMEOUT)
        r.raise_for_status()
        data = r.json()
        return data.get("message", {}).get("content", "")
    except requests.Timeout:
        return load_prompt("timeout", timeout=MODEL_TIMEOUT)


def log_turn(log_dir, nickname, turn_data):
    log_file = log_dir / f"{datetime.now().strftime('%Y-%m-%d')}.jsonl"
    line = json.dumps(turn_data, ensure_ascii=False) + "\n"
    with open(log_file, "a", encoding="utf-8") as f:
        f.write(line)


def main():
    config = load_config()
    m = config["model"]
    workspace, log_dir = setup_dirs(config)

    system_prompt = build_system_prompt(config)
    messages = [{"role": "system", "content": system_prompt}]

    instructions = ""
    if sys.stdin.isatty():
        try:
            instructions = input("instructions: ").strip()
        except EOFError:
            pass
    initial = load_prompt("initial")
    if instructions:
        initial = f"[User]: {instructions}\n\n{initial}"
    notes_summary = get_notes_summary(workspace)
    if notes_summary:
        initial += notes_summary
    messages.append({"role": "user", "content": initial})

    turn = 0
    recent_turns: list[dict] = []  # for stuck-loop detection
    print(f"sandbox started, working from {workspace}\n")

    while True:
        turn += 1
        try:
            bash_out = ""
            # call model
            response_text = call_model(
                m["url"], m["name"], messages, m.get("temperature", 0.5)
            )
            parsed = parse_response(response_text)
            is_timeout_response = "<error>" in response_text and "timed out" in response_text

            # model output
            print(f"thought: {parsed.get('thinking', '')}")
            print(f"note: {parsed.get('notes', '')}")
            print(f"bash: {parsed.get('bash', '')}")

            # append notes
            if parsed["notes"]:
                append_notes(workspace, parsed["notes"])

            # run bash
            bash_out = ""
            messages.append({"role": "assistant", "content": response_text})
            if parsed["bash"]:
                stdout, stderr, code = run_bash(workspace, parsed["bash"])
                bash_out = f"$ {parsed['bash']}\n"
                if stdout:
                    bash_out += stdout
                if stderr:
                    bash_out += f"stderr: {stderr}\n"
                bash_out += f"(exit {code})"
                next_prompt = load_prompt("bash_continue", bash_out=bash_out)
                print(f"Turn {turn}: ran bash")
            else:
                if is_timeout_response:
                    next_prompt = load_prompt("timeout_continue")
                else:
                    next_prompt = load_prompt("next_turn")
                print(f"Turn {turn}: no bash")

            # stuck detection
            cmd = parsed.get("bash") or ""
            recent_turns.append({
                "had_bash": bool(cmd),
                "bash_first": cmd.split()[0] if cmd else None,
                "has_redirect": ">" in cmd or "<<" in cmd or "tee" in cmd,
            })
            if len(recent_turns) > 6:
                recent_turns.pop(0)
            # nudge
            exploratory = ("ls", "cat", "head", "tail")
            no_bash_count = sum(1 for t in recent_turns[-4:] if not t["had_bash"])
            exploratory_count = sum(
                1 for t in recent_turns[-6:]
                if t["had_bash"]
                and t["bash_first"] in exploratory
                and not t["has_redirect"]
            )
            if no_bash_count >= 3:
                next_prompt += "\n\n[You've had several turns without a bash command. If you have work to do, include a bash block with the next command.]"
            elif exploratory_count >= 5:
                next_prompt += "\n\n[You've inspected files multiple times. If your task is to create a file, output a bash command to create it (e.g. cat > file.html << 'EOF').]"

            # bash output
            bash_lines = bash_out.splitlines()
            bash_preview = "\n".join(bash_lines[:10])

            print(f"result: {bash_preview}\n\n")

            notes_summary = get_notes_summary(workspace)
            if notes_summary:
                next_prompt += notes_summary
            messages.append({"role": "user", "content": next_prompt})

            # log
            log_turn(
                log_dir,
                m["nickname"],
                {
                    "turn": turn,
                    "ts": datetime.now().isoformat(),
                    "parsed": parsed,
                    "response_preview": response_text[:500],
                    "bash_out_preview": bash_out[:500] if bash_out else None,
                },
            )

        except KeyboardInterrupt:
            print("\nstopped.")
            sys.exit(0)
        except requests.RequestException as e:
            print(f"API error: {e}")
            sys.exit(1)


if __name__ == "__main__":
    main()