Ryanhub - file viewer
filename: scripts/validate.py
branch: main
back to repo
#!/usr/bin/env python3
"""Validate repo consistency."""

from __future__ import annotations

import csv
import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).resolve().parent))

from common import (  # noqa: E402
    COMPANIES_DIR,
    CONTACTS_DIR,
    DATA_DIR,
    LEADS_DIR,
    LEAD_STATUSES,
    CONTACT_STATUSES,
    COMPANY_STATUSES,
    MATCH_VALUES,
    CONFIDENCE_VALUES,
    LEAD_CSV_FIELDS,
    CONTACT_CSV_FIELDS,
    COMPANY_CSV_FIELDS,
    id_from_filename,
    lead_has_output_proof,
    lead_has_interaction_proof,
    list_entity_files,
    load_interactions,
    normalize_name,
    parse_frontmatter,
    REPO_ROOT,
)


class Result:
    def __init__(self) -> None:
        self.errors: list[str] = []
        self.warnings: list[str] = []

    def error(self, message: str) -> None:
        self.errors.append(message)

    def warn(self, message: str) -> None:
        self.warnings.append(message)


def valid_priority(value) -> bool:
    if value in ("", None):
        return True
    try:
        return 1 <= int(value) <= 5
    except (TypeError, ValueError):
        return False


def read_csv_rows(path: Path) -> list[dict[str, str]]:
    if not path.exists():
        return []
    with path.open(encoding="utf-8", newline="") as f:
        return list(csv.DictReader(f))


def rows_match_csv(rows: list[dict], csv_path: Path, fields: list[str]) -> bool:
    csv_rows = read_csv_rows(csv_path)
    if len(rows) != len(csv_rows):
        return False
    for row, csv_row in zip(sorted(rows, key=lambda r: r["id"]), csv_rows):
        for field in fields:
            left = "" if row.get(field) in (None, "") else str(row.get(field))
            right = csv_row.get(field, "")
            if field == "remote":
                left = str(left).lower()
                right = str(right).lower()
            if left != right:
                return False
    return True


def validate_markdown(result: Result) -> tuple[dict, dict, dict]:
    leads: dict[str, tuple[Path, dict]] = {}
    contacts: dict[str, tuple[Path, dict]] = {}
    companies: dict[str, tuple[Path, dict]] = {}

    seen_lead_urls: dict[str, str] = {}
    seen_contact_pairs: dict[tuple[str, str], str] = {}

    for directory, bucket, statuses, match_or_conf in (
        (LEADS_DIR, leads, LEAD_STATUSES, "match"),
        (CONTACTS_DIR, contacts, CONTACT_STATUSES, "confidence"),
        (COMPANIES_DIR, companies, COMPANY_STATUSES, None),
    ):
        for path in list_entity_files(directory):
            try:
                meta, _ = parse_frontmatter(path)
            except ValueError as exc:
                result.error(str(exc))
                continue

            entity_id = str(meta.get("id", "")).strip()
            if not entity_id:
                result.error(f"{path}: missing id in frontmatter")
                continue

            filename_id = id_from_filename(path)
            if filename_id and filename_id != entity_id:
                result.error(f"{path}: id {entity_id} does not match filename prefix {filename_id}")

            if entity_id in bucket:
                result.error(f"Duplicate id {entity_id} in {path}")
            bucket[entity_id] = (path, meta)

            status = meta.get("status")
            if status not in statuses:
                result.error(f"{path}: invalid status {status!r}")

            if match_or_conf == "match":
                if meta.get("match") not in MATCH_VALUES:
                    result.error(f"{path}: invalid match {meta.get('match')!r}")
                if not valid_priority(meta.get("priority")):
                    result.error(f"{path}: invalid priority {meta.get('priority')!r}")
                url = str(meta.get("url", "")).strip()
                if url:
                    if url in seen_lead_urls:
                        result.error(f"Duplicate lead URL {url} ({entity_id} and {seen_lead_urls[url]})")
                    seen_lead_urls[url] = entity_id
            elif match_or_conf == "confidence":
                if meta.get("confidence") not in CONFIDENCE_VALUES:
                    result.error(f"{path}: invalid confidence {meta.get('confidence')!r}")
                pair = (
                    normalize_name(str(meta.get("name", ""))),
                    normalize_name(str(meta.get("company", ""))),
                )
                if pair[0] and pair[1]:
                    if pair in seen_contact_pairs:
                        result.error(
                            f"Duplicate contact name+company {pair} "
                            f"({entity_id} and {seen_contact_pairs[pair]})"
                        )
                    seen_contact_pairs[pair] = entity_id
            else:
                if not valid_priority(meta.get("priority")):
                    result.error(f"{path}: invalid priority {meta.get('priority')!r}")

    return leads, contacts, companies


def validate_references(
    result: Result,
    leads: dict[str, tuple[Path, dict]],
    contacts: dict[str, tuple[Path, dict]],
    companies: dict[str, tuple[Path, dict]],
) -> None:
    for entity_id, (path, meta) in leads.items():
        company_id = str(meta.get("company_id", "")).strip()
        if company_id and company_id not in companies:
            result.error(f"{path}: unknown company_id {company_id}")
        for contact_id in meta.get("contacts") or []:
            if str(contact_id) not in contacts:
                result.error(f"{path}: unknown contact id {contact_id}")

        outputs = meta.get("outputs") or {}
        if isinstance(outputs, dict):
            for key, output_path in outputs.items():
                if not output_path:
                    continue
                resolved = REPO_ROOT / str(output_path)
                if not resolved.exists():
                    result.error(f"{path}: missing output file for {key}: {output_path}")

    for entity_id, (path, meta) in contacts.items():
        company_id = str(meta.get("company_id", "")).strip()
        if company_id and company_id not in companies:
            result.error(f"{path}: unknown company_id {company_id}")
        for lead_id in meta.get("related_leads") or []:
            if str(lead_id) not in leads:
                result.error(f"{path}: unknown related lead {lead_id}")

    for entity_id, (path, meta) in companies.items():
        for contact_id in meta.get("contacts") or []:
            if str(contact_id) not in contacts:
                result.error(f"{path}: unknown contact id {contact_id}")
        for lead_id in meta.get("leads") or []:
            if str(lead_id) not in leads:
                result.error(f"{path}: unknown lead id {lead_id}")


def validate_status_proof(result: Result, leads: dict[str, tuple[Path, dict]]) -> None:
    for entity_id, (path, meta) in leads.items():
        status = meta.get("status")
        if status == "drafted":
            if not lead_has_output_proof(meta) and not lead_has_interaction_proof(entity_id, {"draft"}):
                result.warn(f"{path}: status drafted lacks output or interaction proof")
        elif status == "messaged":
            if not lead_has_output_proof(meta) and not lead_has_interaction_proof(
                entity_id, {"message", "email"}
            ):
                result.warn(f"{path}: status messaged lacks output or interaction proof")
        elif status == "applied":
            if not lead_has_interaction_proof(entity_id, {"application"}):
                result.warn(f"{path}: status applied lacks application interaction proof")


def validate_csv_sync(result: Result) -> None:
    from sync_indexes import company_row, contact_row, lead_row

    lead_rows = []
    for path in list_entity_files(LEADS_DIR):
        meta, _ = parse_frontmatter(path)
        lead_rows.append(lead_row(meta))
    if not rows_match_csv(lead_rows, DATA_DIR / "leads.csv", LEAD_CSV_FIELDS):
        result.error("data/leads.csv is out of sync with markdown")

    contact_rows = []
    for path in list_entity_files(CONTACTS_DIR):
        meta, _ = parse_frontmatter(path)
        contact_rows.append(contact_row(meta))
    if not rows_match_csv(contact_rows, DATA_DIR / "contacts.csv", CONTACT_CSV_FIELDS):
        result.error("data/contacts.csv is out of sync with markdown")

    company_rows = []
    for path in list_entity_files(COMPANIES_DIR):
        meta, _ = parse_frontmatter(path)
        company_rows.append(company_row(meta))
    if not rows_match_csv(company_rows, DATA_DIR / "companies.csv", COMPANY_CSV_FIELDS):
        result.error("data/companies.csv is out of sync with markdown")


def validate_interactions(
    result: Result,
    leads: dict[str, tuple[Path, dict]],
    contacts: dict[str, tuple[Path, dict]],
    companies: dict[str, tuple[Path, dict]],
) -> None:
    for index, row in enumerate(load_interactions(), start=2):
        if row.get("contact_id") and row["contact_id"] not in contacts:
            result.error(f"interactions.csv line {index}: unknown contact_id {row['contact_id']}")
        if row.get("lead_id") and row["lead_id"] not in leads:
            result.error(f"interactions.csv line {index}: unknown lead_id {row['lead_id']}")
        if row.get("company_id") and row["company_id"] not in companies:
            result.error(f"interactions.csv line {index}: unknown company_id {row['company_id']}")
        entity_id = str(row.get("entity_id", "")).strip()
        entity_type = str(row.get("entity_type", "")).strip()
        if entity_id and entity_type == "lead" and entity_id not in leads:
            result.error(f"interactions.csv line {index}: unknown entity_id {entity_id}")
        if entity_id and entity_type == "contact" and entity_id not in contacts:
            result.error(f"interactions.csv line {index}: unknown entity_id {entity_id}")
        if entity_id and entity_type == "company" and entity_id not in companies:
            result.error(f"interactions.csv line {index}: unknown entity_id {entity_id}")


def main() -> None:
    result = Result()

    leads, contacts, companies = validate_markdown(result)
    validate_references(result, leads, contacts, companies)
    validate_status_proof(result, leads)
    validate_interactions(result, leads, contacts, companies)
    validate_csv_sync(result)

    for message in result.errors:
        print(f"ERROR: {message}", file=sys.stderr)
    for message in result.warnings:
        print(f"WARNING: {message}", file=sys.stderr)

    if result.errors:
        print(f"\nValidation failed: {len(result.errors)} error(s), {len(result.warnings)} warning(s)")
        raise SystemExit(2)
    if result.warnings:
        print(f"\nValidation passed with {len(result.warnings)} warning(s)")
        raise SystemExit(1)
    print("Validation passed")
    raise SystemExit(0)


if __name__ == "__main__":
    main()