"""Shared utilities for job-search CRM scripts."""
from __future__ import annotations
import csv
import re
import sys
from datetime import date, datetime
from pathlib import Path
from typing import Any
import yaml
REPO_ROOT = Path(__file__).resolve().parent.parent
LEADS_DIR = REPO_ROOT / "leads"
CONTACTS_DIR = REPO_ROOT / "contacts"
COMPANIES_DIR = REPO_ROOT / "companies"
DATA_DIR = REPO_ROOT / "data"
OUTPUTS_DIR = REPO_ROOT / "outputs"
LEAD_STATUSES = {
"found",
"researched",
"contact_needed",
"draft_needed",
"drafted",
"messaged",
"applied",
"followed_up",
"interviewing",
"closed",
"archived",
}
CONTACT_STATUSES = {
"found",
"researched",
"drafted",
"messaged",
"replied",
"followed_up",
"dead",
"archived",
}
COMPANY_STATUSES = {
"target",
"watching",
"applied",
"in_conversation",
"dead",
"archived",
}
MATCH_VALUES = {"strong", "moderate", "weak", "bad"}
CONFIDENCE_VALUES = {"high", "medium", "low", "unknown"}
INTERACTION_TYPES = {
"note",
"draft",
"message",
"email",
"application",
"followup",
"reply",
"interview",
"rejection",
"offer",
}
LEAD_CSV_FIELDS = [
"id",
"title",
"company",
"company_id",
"url",
"source",
"location",
"remote",
"employment_type",
"status",
"match",
"priority",
"date_found",
"date_updated",
"apply_by",
]
CONTACT_CSV_FIELDS = [
"id",
"name",
"company",
"company_id",
"role",
"linkedin_url",
"source",
"confidence",
"status",
"date_found",
"date_updated",
"last_contacted",
"next_followup",
]
COMPANY_CSV_FIELDS = [
"id",
"name",
"website",
"careers_url",
"domain",
"status",
"priority",
"date_added",
"date_updated",
]
INTERACTION_CSV_FIELDS = [
"date",
"type",
"entity_type",
"entity_id",
"contact_id",
"lead_id",
"company_id",
"summary",
"next_followup",
]
SKIP_FILENAMES = {"_template.md"}
def today_str() -> str:
return date.today().isoformat()
def slugify(text: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", text.lower().strip())
return slug.strip("-") or "unknown"
def parse_frontmatter(path: Path) -> tuple[dict[str, Any], str]:
text = path.read_text(encoding="utf-8")
if not text.startswith("---"):
raise ValueError(f"{path}: missing YAML frontmatter")
parts = text.split("---", 2)
if len(parts) < 3:
raise ValueError(f"{path}: malformed frontmatter")
meta = yaml.safe_load(parts[1]) or {}
if not isinstance(meta, dict):
raise ValueError(f"{path}: frontmatter must be a mapping")
body = parts[2].lstrip("\n")
return meta, body
def normalize_meta_value(value: Any) -> Any:
if value is None:
return ""
if isinstance(value, dict):
return {key: normalize_meta_value(item) for key, item in value.items()}
if isinstance(value, list):
return [normalize_meta_value(item) for item in value]
return value
def normalize_meta(meta: dict[str, Any]) -> dict[str, Any]:
return {key: normalize_meta_value(value) for key, value in meta.items()}
def write_markdown(path: Path, meta: dict[str, Any], body: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
cleaned = normalize_meta(meta)
yaml_text = yaml.safe_dump(
cleaned,
default_flow_style=False,
sort_keys=False,
allow_unicode=True,
).rstrip()
path.write_text(f"---\n{yaml_text}\n---\n\n{body.rstrip()}\n", encoding="utf-8")
def list_entity_files(directory: Path) -> list[Path]:
if not directory.exists():
return []
return sorted(
p
for p in directory.glob("*.md")
if p.name not in SKIP_FILENAMES and not p.name.startswith("_")
)
def load_entities(directory: Path) -> dict[str, tuple[Path, dict[str, Any], str]]:
entities: dict[str, tuple[Path, dict[str, Any], str]] = {}
for path in list_entity_files(directory):
meta, body = parse_frontmatter(path)
entity_id = meta.get("id")
if entity_id:
entities[str(entity_id)] = (path, meta, body)
return entities
def id_from_filename(path: Path) -> str | None:
match = re.match(r"^((?:CO|L|C)\d{4})-", path.name)
if match:
return match.group(1)
return None
def next_id(prefix: str, width: int, entities: dict[str, Any]) -> str:
max_num = 0
pattern = re.compile(rf"^{re.escape(prefix)}(\d+)$")
for entity_id in entities:
m = pattern.match(entity_id)
if m:
max_num = max(max_num, int(m.group(1)))
for directory in (LEADS_DIR, CONTACTS_DIR, COMPANIES_DIR):
if not directory.exists():
continue
for path in directory.glob("*.md"):
m = pattern.match(path.name.split("-", 1)[0])
if m:
max_num = max(max_num, int(m.group(1)))
return f"{prefix}{max_num + 1:0{width}d}"
def normalize_name(name: str) -> str:
return re.sub(r"\s+", " ", name.strip().lower())
def find_company_by_name(name: str) -> tuple[Path, dict[str, Any], str] | None:
target = normalize_name(name)
for path in list_entity_files(COMPANIES_DIR):
meta, body = parse_frontmatter(path)
if normalize_name(str(meta.get("name", ""))) == target:
return path, meta, body
return None
def ensure_interactions_csv() -> Path:
path = DATA_DIR / "interactions.csv"
DATA_DIR.mkdir(parents=True, exist_ok=True)
if not path.exists() or path.stat().st_size == 0:
with path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=INTERACTION_CSV_FIELDS)
writer.writeheader()
return path
def load_interactions() -> list[dict[str, str]]:
path = ensure_interactions_csv()
with path.open(encoding="utf-8", newline="") as f:
return list(csv.DictReader(f))
def append_interaction(row: dict[str, str]) -> None:
path = ensure_interactions_csv()
with path.open("a", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=INTERACTION_CSV_FIELDS)
writer.writerow({field: row.get(field, "") for field in INTERACTION_CSV_FIELDS})
def csv_value(value: Any) -> str:
if value is None:
return ""
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, list):
return "|".join(str(v) for v in value)
return str(value)
def write_csv(path: Path, fieldnames: list[str], rows: list[dict[str, Any]]) -> None:
DATA_DIR.mkdir(parents=True, exist_ok=True)
sorted_rows = sorted(rows, key=lambda r: str(r.get("id", "")))
with path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
for row in sorted_rows:
writer.writerow({field: csv_value(row.get(field, "")) for field in fieldnames})
def lead_has_output_proof(meta: dict[str, Any]) -> bool:
outputs = meta.get("outputs") or {}
if isinstance(outputs, dict):
for value in outputs.values():
if value:
return True
return False
def lead_has_interaction_proof(lead_id: str, interaction_types: set[str]) -> bool:
for row in load_interactions():
if row.get("lead_id") == lead_id and row.get("type") in interaction_types:
return True
return False
def contact_has_interaction_proof(contact_id: str, interaction_types: set[str]) -> bool:
for row in load_interactions():
if row.get("contact_id") == contact_id and row.get("type") in interaction_types:
return True
return False
def positive_reply(summary: str) -> bool:
lowered = summary.lower()
keywords = (
"interview",
"schedule",
"chat",
"call",
"move forward",
"next round",
"speak with",
"talk with",
"phone screen",
"would like to",
"happy to",
"let's connect",
"lets connect",
)
return any(keyword in lowered for keyword in keywords)
def run_sync_indexes() -> None:
from sync_indexes import main as sync_main
sync_main()
def die(message: str, code: int = 1) -> None:
print(message, file=sys.stderr)
raise SystemExit(code)