#!/usr/bin/env python3
"""zerokit-mcp -- stdio MCP server for the ZeroKit.dev public API.

Exposes seven AI-readiness tools to any MCP client (Claude Code, Cursor,
or any other client that speaks MCP JSON-RPC over stdio):

  * ai_readiness_check   -- Score a site's AI readiness (0-100 + grade)
  * ai_readiness_extended -- Same plus Wayback + knowability + cloaking
  * ai_visibility_check  -- Does ChatGPT know your site? (Wikipedia, CC,
                            DDG, Wayback signals)
  * ai_readiness_compare -- Head-to-head scan of two sites
  * ai_readiness_leaderboard -- Top 100 reference dataset
  * llms_txt_generate    -- Generate a starter llms.txt for a URL
  * schema_inspect       -- Parse JSON-LD + AI-citation coverage score

All tools delegate to the public https://zerokit.dev/api/* endpoints.
No authentication, no API key, 30 req/min per IP. SSRF-hardened
upstream. Free for any use within rate limits.

This file wraps seven of the ~27 public endpoints. For the newer
batch scanner, llms.txt/robots.txt validators, live leaderboard,
host rank/history/movers, and public stats endpoints, hit
https://zerokit.dev/api-docs.html for curl examples you can call
directly from any MCP client or shell.

This is a single-file Python stdlib script; no pip dependencies. Save
it anywhere and add to your Claude Code MCP config:

  {
    "mcpServers": {
      "zerokit": {
        "command": "python3",
        "args": ["/absolute/path/to/zerokit-mcp.py"]
      }
    }
  }

Transport: stdio. Protocol: MCP JSON-RPC 2.0. Logs go to stderr so
they do not pollute the JSON-RPC stream on stdout.

https://zerokit.dev/api-docs.html
"""

from __future__ import annotations

import json
import sys
import urllib.error
import urllib.parse
import urllib.request
from typing import Any

API_BASE = "https://zerokit.dev"
USER_AGENT = "zerokit-mcp/1.0 (+https://zerokit.dev/api-docs.html)"
TIMEOUT = 60  # seconds -- some scans (extended) take ~20s

SERVER_NAME = "zerokit"
SERVER_VERSION = "1.0.0"
PROTOCOL_VERSION = "2024-11-05"


# ---------------------------------------------------------------------------
# Logging helper -- MUST go to stderr. stdout is reserved for JSON-RPC.
# ---------------------------------------------------------------------------

def log(msg: str) -> None:
    sys.stderr.write(f"[zerokit-mcp] {msg}\n")
    sys.stderr.flush()


# ---------------------------------------------------------------------------
# Thin HTTP client over the ZeroKit.dev public API.
# ---------------------------------------------------------------------------

def _api_get(path: str, params: dict) -> Any:
    """GET {API_BASE}{path}?{params}, return parsed JSON or raise."""
    if params:
        qs = urllib.parse.urlencode({k: v for k, v in params.items() if v is not None})
        url = f"{API_BASE}{path}?{qs}"
    else:
        url = f"{API_BASE}{path}"

    req = urllib.request.Request(
        url,
        headers={
            "User-Agent": USER_AGENT,
            "Accept": "application/json",
        },
    )
    try:
        with urllib.request.urlopen(req, timeout=TIMEOUT) as resp:
            body = resp.read().decode("utf-8", errors="replace")
    except urllib.error.HTTPError as e:
        try:
            body = e.read().decode("utf-8", errors="replace")
            data = json.loads(body)
            err_msg = data.get("error") or f"HTTP {e.code}"
        except Exception:
            err_msg = f"HTTP {e.code}"
        raise RuntimeError(f"{err_msg} ({url})")
    except Exception as e:
        raise RuntimeError(f"{type(e).__name__}: {e} ({url})")

    try:
        return json.loads(body)
    except json.JSONDecodeError:
        raise RuntimeError(f"Upstream returned non-JSON ({url})")


# ---------------------------------------------------------------------------
# Tool implementations -- each returns a dict that will be serialized into
# the MCP "content" array as a text block.
# ---------------------------------------------------------------------------

def tool_ai_readiness_check(args: dict) -> str:
    url = args["url"]
    data = _api_get("/api/ai-readiness", {"url": url})
    summary = {
        "url": data.get("url"),
        "hostname": data.get("hostname"),
        "score": data.get("score"),
        "grade": data.get("grade"),
        "total_points": data.get("total_points"),
        "max_points": data.get("max_points"),
        "categories": {
            k: {"score": v.get("score"), "max": v.get("max")}
            for k, v in (data.get("categories") or {}).items()
        },
        "recommendations": [
            {"priority": r.get("priority"), "text": r.get("text")}
            for r in (data.get("recommendations") or [])[:6]
        ],
    }
    return json.dumps(summary, indent=2)


def tool_ai_readiness_extended(args: dict) -> str:
    url = args["url"]
    data = _api_get("/api/ai-readiness", {"url": url, "extended": "1"})
    summary = {
        "url": data.get("url"),
        "hostname": data.get("hostname"),
        "score": data.get("score"),
        "grade": data.get("grade"),
        "extended_score": data.get("extended_score"),
        "wayback": {
            "training_data_likelihood": (data.get("wayback") or {}).get(
                "training_data_likelihood"
            ),
            "stability_score": (data.get("wayback") or {}).get("stability_score"),
            "years_tracked": (data.get("wayback") or {}).get("years_tracked"),
        },
        "knowability": {
            "knowability_level": (data.get("knowability") or {}).get(
                "knowability_level"
            ),
            "knowability_score": (data.get("knowability") or {}).get(
                "knowability_score"
            ),
            "wikipedia_mentioned": (data.get("knowability") or {}).get(
                "wikipedia_mentioned"
            ),
            "commoncrawl_indexed": (data.get("knowability") or {}).get(
                "commoncrawl_indexed"
            ),
        },
        "cloaking_severity": (data.get("cloaking") or {}).get("cloaking_severity"),
    }
    return json.dumps(summary, indent=2)


def tool_ai_visibility_check(args: dict) -> str:
    """Visibility view of the extended scan: only wayback + knowability."""
    url = args["url"]
    data = _api_get("/api/ai-readiness", {"url": url, "extended": "1"})
    wb = data.get("wayback") or {}
    kn = data.get("knowability") or {}
    knowability_0_100 = int(kn.get("knowability_score") or 0)
    stability_0_100 = int(wb.get("stability_score") or 0) * 10  # API returns 0-10
    visibility = (knowability_0_100 + stability_0_100) // 2
    verdict = (
        "very high visibility"
        if visibility >= 81
        else "high visibility"
        if visibility >= 61
        else "moderate visibility"
        if visibility >= 41
        else "low visibility"
        if visibility >= 21
        else "invisible to LLMs"
    )
    out = {
        "url": data.get("url"),
        "hostname": data.get("hostname"),
        "visibility_score": visibility,
        "verdict": verdict,
        "wikipedia_mentioned": kn.get("wikipedia_mentioned"),
        "wikipedia_matches": (kn.get("wikipedia_matches") or [])[:3],
        "commoncrawl_indexed": kn.get("commoncrawl_indexed"),
        "commoncrawl_result_count": kn.get("commoncrawl_result_count"),
        "ddg_has_abstract": kn.get("ddg_has_abstract"),
        "wayback_first_snapshot": wb.get("first_snapshot_date"),
        "wayback_years_tracked": wb.get("years_tracked"),
        "wayback_training_likelihood": wb.get("training_data_likelihood"),
    }
    return json.dumps(out, indent=2)


def tool_ai_readiness_compare(args: dict) -> str:
    a = args["url_a"]
    b = args["url_b"]
    ra = _api_get("/api/ai-readiness", {"url": a})
    rb = _api_get("/api/ai-readiness", {"url": b})
    delta = (ra.get("score") or 0) - (rb.get("score") or 0)
    if delta > 0:
        winner = ra.get("hostname") or a
        loser = rb.get("hostname") or b
    elif delta < 0:
        winner = rb.get("hostname") or b
        loser = ra.get("hostname") or a
        delta = -delta
    else:
        winner = loser = None
    out = {
        "sites": {
            "a": {
                "hostname": ra.get("hostname"),
                "score": ra.get("score"),
                "grade": ra.get("grade"),
            },
            "b": {
                "hostname": rb.get("hostname"),
                "score": rb.get("score"),
                "grade": rb.get("grade"),
            },
        },
        "winner": winner,
        "loser": loser,
        "delta": delta,
        "category_diff": {},
    }
    for key in ("robots_txt", "llms_txt", "structured_data", "content_citability",
                "ai_meta_directives"):
        av = ((ra.get("categories") or {}).get(key) or {}).get("score")
        bv = ((rb.get("categories") or {}).get(key) or {}).get("score")
        out["category_diff"][key] = {
            "a": av,
            "b": bv,
            "winner": "a" if (av or 0) > (bv or 0) else ("b" if (bv or 0) > (av or 0) else "tie"),
        }
    return json.dumps(out, indent=2)


def tool_ai_readiness_leaderboard(args: dict) -> str:
    limit = int(args.get("limit", 10) or 10)
    limit = max(1, min(limit, 100))
    data = _api_get("/api/leaderboard", {"limit": limit})
    out = {
        "source": data.get("source"),
        "scanned_at": data.get("scanned_at"),
        "stats": data.get("stats"),
        "results": [
            {
                "rank": r.get("rank"),
                "domain": r.get("domain"),
                "score": r.get("score"),
                "grade": r.get("grade"),
                "llms_txt": r.get("llms_txt"),
            }
            for r in (data.get("results") or [])[:limit]
        ],
    }
    return json.dumps(out, indent=2)


def tool_llms_txt_generate(args: dict) -> str:
    url = args["url"]
    data = _api_get("/api/llms-txt", {"url": url})
    if data.get("error"):
        return json.dumps(data, indent=2)
    return json.dumps(
        {
            "url": data.get("url"),
            "host": data.get("host"),
            "name": data.get("name"),
            "description": data.get("description"),
            "internal_link_count": data.get("internal_link_count"),
            "external_link_count": data.get("external_link_count"),
            "bytes": data.get("bytes"),
            "llms_txt": data.get("llms_txt"),
        },
        indent=2,
    )


def tool_schema_inspect(args: dict) -> str:
    url = args["url"]
    data = _api_get("/api/schema-inspect", {"url": url})
    if data.get("error"):
        return json.dumps(data, indent=2)
    return json.dumps(
        {
            "url": data.get("url"),
            "hostname": data.get("hostname"),
            "total_blocks": data.get("total_blocks"),
            "valid_blocks": data.get("valid_blocks"),
            "invalid_blocks": data.get("invalid_blocks"),
            "ai_citation_coverage": data.get("ai_citation_coverage"),
            "schemas_found": data.get("schemas_found"),
            "ai_schemas_missing": data.get("ai_schemas_missing"),
            "recommendations": data.get("recommendations"),
            "errors": data.get("errors"),
        },
        indent=2,
    )


# ---------------------------------------------------------------------------
# Tool registry -- maps MCP tool names to (handler, schema) pairs.
# ---------------------------------------------------------------------------

def _url_schema(desc: str) -> dict:
    return {
        "type": "object",
        "properties": {
            "url": {
                "type": "string",
                "description": desc,
                "examples": ["https://example.com"],
            }
        },
        "required": ["url"],
    }


TOOLS: list[dict] = [
    {
        "name": "ai_readiness_check",
        "description": (
            "Score a website's AI readiness on a 0-100 scale. Returns grade "
            "A+ through F, category breakdowns (robots.txt, llms.txt, "
            "structured data, content citability, meta directives) and "
            "actionable recommendations. Use this for quick scans."
        ),
        "inputSchema": _url_schema("URL of the site to scan (any scheme)"),
        "handler": tool_ai_readiness_check,
    },
    {
        "name": "ai_readiness_extended",
        "description": (
            "Full AI readiness scan including Wayback Machine history, "
            "LLM knowability signals (Wikipedia, Common Crawl, DuckDuckGo), "
            "and bot-cloaking detection. Takes 15-30 seconds. Use when you "
            "want the complete picture."
        ),
        "inputSchema": _url_schema("URL of the site to scan"),
        "handler": tool_ai_readiness_extended,
    },
    {
        "name": "ai_visibility_check",
        "description": (
            "Answers 'does ChatGPT know about this site?' by checking "
            "Wikipedia mentions, Common Crawl indexing, DuckDuckGo abstracts, "
            "and Wayback Machine history. Returns a 0-100 visibility score "
            "and a plain-language verdict."
        ),
        "inputSchema": _url_schema("URL of the site to check"),
        "handler": tool_ai_visibility_check,
    },
    {
        "name": "ai_readiness_compare",
        "description": (
            "Scan two websites in parallel and return a head-to-head "
            "comparison: scores, grades, category-by-category diffs and "
            "a winner with point delta."
        ),
        "inputSchema": {
            "type": "object",
            "properties": {
                "url_a": {"type": "string", "description": "First URL"},
                "url_b": {"type": "string", "description": "Second URL"},
            },
            "required": ["url_a", "url_b"],
        },
        "handler": tool_ai_readiness_compare,
    },
    {
        "name": "ai_readiness_leaderboard",
        "description": (
            "Return the Top 100 AI readiness leaderboard (State of AI "
            "Crawlers 2026 dataset) as JSON with rank, domain, score, grade "
            "and llms.txt flag. Optional limit parameter (1-100, default 10)."
        ),
        "inputSchema": {
            "type": "object",
            "properties": {
                "limit": {
                    "type": "integer",
                    "description": "How many results to return (1-100)",
                    "minimum": 1,
                    "maximum": 100,
                    "default": 10,
                }
            },
        },
        "handler": tool_ai_readiness_leaderboard,
    },
    {
        "name": "llms_txt_generate",
        "description": (
            "Fetch a website's homepage and generate a starter llms.txt in "
            "the llmstxt.org format. Returns the full text plus site name, "
            "description, and extracted link counts. Intended as a starting "
            "point the site owner edits before publishing."
        ),
        "inputSchema": _url_schema("Site to generate llms.txt for"),
        "handler": tool_llms_txt_generate,
    },
    {
        "name": "schema_inspect",
        "description": (
            "Parse every JSON-LD structured data block on a page and return: "
            "the schemas found (with counts), an AI citation coverage score "
            "(0-100), and priority-sorted recommendations for missing types."
        ),
        "inputSchema": _url_schema("URL of the page to inspect"),
        "handler": tool_schema_inspect,
    },
]


def _tool_schema_for_list() -> list[dict]:
    """Shape expected by MCP tools/list -- name, description, inputSchema only."""
    return [
        {"name": t["name"], "description": t["description"], "inputSchema": t["inputSchema"]}
        for t in TOOLS
    ]


def _find_handler(name: str):
    for t in TOOLS:
        if t["name"] == name:
            return t["handler"]
    return None


# ---------------------------------------------------------------------------
# JSON-RPC 2.0 handlers -- MCP speaks a small subset of the spec.
# ---------------------------------------------------------------------------

def handle_initialize(params: dict) -> dict:
    return {
        "protocolVersion": PROTOCOL_VERSION,
        "capabilities": {"tools": {"listChanged": False}},
        "serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},
    }


def handle_tools_list(_params: dict) -> dict:
    return {"tools": _tool_schema_for_list()}


def handle_tools_call(params: dict) -> dict:
    name = params.get("name", "")
    args = params.get("arguments") or {}
    handler = _find_handler(name)
    if handler is None:
        return {
            "isError": True,
            "content": [{"type": "text", "text": f"Unknown tool: {name}"}],
        }
    try:
        text = handler(args)
    except KeyError as e:
        return {
            "isError": True,
            "content": [
                {"type": "text", "text": f"Missing required argument: {e.args[0]}"}
            ],
        }
    except Exception as e:
        log(f"tool {name} failed: {e}")
        return {"isError": True, "content": [{"type": "text", "text": str(e)}]}
    return {"content": [{"type": "text", "text": text}]}


def handle_ping(_params: dict) -> dict:
    return {}


METHODS = {
    "initialize": handle_initialize,
    "tools/list": handle_tools_list,
    "tools/call": handle_tools_call,
    "ping": handle_ping,
}


# ---------------------------------------------------------------------------
# Main stdio loop -- newline-delimited JSON-RPC 2.0.
# ---------------------------------------------------------------------------

def _send(obj: dict) -> None:
    sys.stdout.write(json.dumps(obj) + "\n")
    sys.stdout.flush()


def _error_response(req_id, code: int, message: str) -> dict:
    return {
        "jsonrpc": "2.0",
        "id": req_id,
        "error": {"code": code, "message": message},
    }


def main() -> int:
    log(f"{SERVER_NAME} v{SERVER_VERSION} ready -- protocol {PROTOCOL_VERSION}")
    try:
        for raw in sys.stdin:
            raw = raw.strip()
            if not raw:
                continue
            try:
                msg = json.loads(raw)
            except json.JSONDecodeError as e:
                log(f"parse error: {e}")
                _send(_error_response(None, -32700, "Parse error"))
                continue

            method = msg.get("method", "")
            msg_id = msg.get("id")
            params = msg.get("params") or {}

            # Notification (no id) -- silent ack.
            if msg_id is None:
                if method == "notifications/initialized":
                    log("client initialized")
                else:
                    log(f"notification: {method}")
                continue

            handler = METHODS.get(method)
            if handler is None:
                _send(_error_response(msg_id, -32601, f"Method not found: {method}"))
                continue

            try:
                result = handler(params)
            except Exception as e:
                log(f"handler {method} crashed: {e}")
                _send(_error_response(msg_id, -32603, str(e)))
                continue

            _send({"jsonrpc": "2.0", "id": msg_id, "result": result})
    except KeyboardInterrupt:
        log("interrupted")
        return 130
    except Exception as e:
        log(f"fatal: {e}")
        return 1
    return 0


if __name__ == "__main__":
    sys.exit(main())
