filename:
agent-sandbox/sandbox.py
branch:
main
back to repo
# sandbox.py
import json
import re
import subprocess
import sys
from datetime import datetime
from pathlib import Path
import requests
ROOT = Path(__file__).resolve().parent
WORKSPACE = ROOT / "sandbox"
LOGS_DIR = ROOT / "logs"
CONFIG_PATH = ROOT / "config.json"
PROMPTS_DIR = ROOT / "prompts"
MODEL_TIMEOUT = 150 # seconds
BASH_TIMEOUT = 60 # seconds
def load_config():
with open(CONFIG_PATH, encoding="utf-8") as f:
return json.load(f)
def setup_dirs(config):
WORKSPACE.mkdir(parents=True, exist_ok=True)
LOGS_DIR.mkdir(parents=True, exist_ok=True)
return WORKSPACE, LOGS_DIR
def load_prompt(name: str, **kwargs) -> str:
path = PROMPTS_DIR / f"{name}.md"
text = path.read_text(encoding="utf-8").strip()
if kwargs:
text = text.format(**kwargs)
return text
def build_system_prompt(config):
return load_prompt("system", nickname=config["model"]["nickname"])
def get_notes_summary(workspace):
notes_path = workspace / "notes.txt"
if not notes_path.exists():
return ""
content = notes_path.read_text(encoding="utf-8", errors="replace").strip()
if not content:
return ""
return "[your notes]:\n" + content
def parse_response(text):
result = {"thinking": "", "notes": "", "bash": ""}
# <thinking>...</thinking>
m = re.search(r"<thinking>(.*?)</thinking>", text, re.DOTALL | re.IGNORECASE)
if m:
result["thinking"] = m.group(1).strip()
# <notes>...</notes> OR <note>...</note>
m = re.search(r"<notes>(.*?)</notes>|<note>(.*?)</note>", text, re.DOTALL | re.IGNORECASE)
if m:
result["notes"] = m.group(1).strip() or m.group(2).strip()
# ```bash ... ```
m = re.search(r"```(?:bash)?\s*\n(.*?)```", text, re.DOTALL)
if m:
result["bash"] = m.group(1).strip()
return result
def looks_like_markdown_instead_of_bash(cmd: str) -> bool:
"""Reject content that is clearly markdown/text, not a shell command."""
if not cmd:
return True
first_line = cmd.split("\n")[0].strip()
# Markdown patterns - if first line looks like markdown, reject
markdown_starts = ("#", "###", "##", "- ", "* ", "|", "```", "**", "[", "✅", "📊", "🎨", "📋")
return first_line.startswith(markdown_starts)
def append_notes(workspace, notes_text):
if not notes_text:
return
path = workspace / "notes.txt"
with open(path, "a", encoding="utf-8") as f:
f.write(notes_text.strip() + "\n\n")
def is_command_safe(cmd: str, workspace: Path) -> tuple[bool, str]:
cmd = cmd.strip()
workspace_str = str(workspace.resolve())
# block path traversal
if ".." in cmd:
return False, "command not allowed (path traversal)"
# block absolute paths that escape workspace
if cmd.startswith("/") or cmd.startswith("~") and not cmd.startswith(workspace_str):
return False, "command not allowed (using root path)"
return True, ""
def run_bash(workspace, cmd):
safe, err = is_command_safe(cmd, workspace)
if not safe:
return "", err, -1
try:
r = subprocess.run(
["bash", "-c", cmd],
cwd=str(workspace),
capture_output=True,
text=True,
timeout=BASH_TIMEOUT,
)
return r.stdout, r.stderr, r.returncode
except subprocess.TimeoutExpired:
return "", f"command timed out ({BASH_TIMEOUT}s)", -1
except Exception as e:
return "", str(e), -1
def call_model(url, model_name, messages, temperature):
payload = {
"model": model_name,
"messages": messages,
"stream": False,
"options": {"temperature": temperature},
}
try:
r = requests.post(url, json=payload, timeout=MODEL_TIMEOUT)
r.raise_for_status()
data = r.json()
return data.get("message", {}).get("content", "")
except requests.Timeout:
return load_prompt("timeout", timeout=MODEL_TIMEOUT)
def log_turn(log_dir, nickname, turn_data):
log_file = log_dir / f"{datetime.now().strftime('%Y-%m-%d')}.jsonl"
line = json.dumps(turn_data, ensure_ascii=False) + "\n"
with open(log_file, "a", encoding="utf-8") as f:
f.write(line)
def main():
config = load_config()
m = config["model"]
workspace, log_dir = setup_dirs(config)
system_prompt = build_system_prompt(config)
messages = [{"role": "system", "content": system_prompt}]
instructions = ""
if sys.stdin.isatty():
try:
instructions = input("instructions: ").strip()
except EOFError:
pass
initial = load_prompt("initial")
if instructions:
initial = f"[User]: {instructions}\n\n{initial}"
notes_summary = get_notes_summary(workspace)
if notes_summary:
initial += notes_summary
messages.append({"role": "user", "content": initial})
turn = 0
recent_turns: list[dict] = [] # for stuck-loop detection
print(f"sandbox started, working from {workspace}\n")
while True:
turn += 1
try:
bash_out = ""
# call model
response_text = call_model(
m["url"], m["name"], messages, m.get("temperature", 0.5)
)
parsed = parse_response(response_text)
is_timeout_response = "<error>" in response_text and "timed out" in response_text
# model output
print(f"thought: {parsed.get('thinking', '')}")
print(f"note: {parsed.get('notes', '')}")
print(f"bash: {parsed.get('bash', '')}")
# append notes
if parsed["notes"]:
append_notes(workspace, parsed["notes"])
# run bash
bash_out = ""
messages.append({"role": "assistant", "content": response_text})
if parsed["bash"]:
stdout, stderr, code = run_bash(workspace, parsed["bash"])
bash_out = f"$ {parsed['bash']}\n"
if stdout:
bash_out += stdout
if stderr:
bash_out += f"stderr: {stderr}\n"
bash_out += f"(exit {code})"
next_prompt = load_prompt("bash_continue", bash_out=bash_out)
print(f"Turn {turn}: ran bash")
else:
if is_timeout_response:
next_prompt = load_prompt("timeout_continue")
else:
next_prompt = load_prompt("next_turn")
print(f"Turn {turn}: no bash")
# stuck detection
cmd = parsed.get("bash") or ""
recent_turns.append({
"had_bash": bool(cmd),
"bash_first": cmd.split()[0] if cmd else None,
"has_redirect": ">" in cmd or "<<" in cmd or "tee" in cmd,
})
if len(recent_turns) > 6:
recent_turns.pop(0)
# nudge
exploratory = ("ls", "cat", "head", "tail")
no_bash_count = sum(1 for t in recent_turns[-4:] if not t["had_bash"])
exploratory_count = sum(
1 for t in recent_turns[-6:]
if t["had_bash"]
and t["bash_first"] in exploratory
and not t["has_redirect"]
)
if no_bash_count >= 3:
next_prompt += "\n\n[You've had several turns without a bash command. If you have work to do, include a bash block with the next command.]"
elif exploratory_count >= 5:
next_prompt += "\n\n[You've inspected files multiple times. If your task is to create a file, output a bash command to create it (e.g. cat > file.html << 'EOF').]"
# bash output
bash_lines = bash_out.splitlines()
bash_preview = "\n".join(bash_lines[:10])
print(f"result: {bash_preview}\n\n")
notes_summary = get_notes_summary(workspace)
if notes_summary:
next_prompt += notes_summary
messages.append({"role": "user", "content": next_prompt})
# log
log_turn(
log_dir,
m["nickname"],
{
"turn": turn,
"ts": datetime.now().isoformat(),
"parsed": parsed,
"response_preview": response_text[:500],
"bash_out_preview": bash_out[:500] if bash_out else None,
},
)
except KeyboardInterrupt:
print("\nstopped.")
sys.exit(0)
except requests.RequestException as e:
print(f"API error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()