from __future__ import annotations import json import mimetypes import os from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any from urllib.parse import unquote, urlparse from .agent_graph import SmqjhAgentGraph from .config import load_runtime_config RUNTIME_ROOT = Path(__file__).resolve().parents[1] REPO_ROOT = RUNTIME_ROOT.parent PUBLIC_ROOT = RUNTIME_ROOT / "public" LOGO_PATH = REPO_ROOT / "smqjh-admin-agent" / "build" / "logo.png" MAX_BODY_BYTES = 1024 * 1024 CONFIG = load_runtime_config() AGENT = SmqjhAgentGraph(CONFIG) class AgentRequestHandler(BaseHTTPRequestHandler): server_version = "SMQJHAgentPython/0.1" def do_OPTIONS(self) -> None: self._send_text(204, "") def do_GET(self) -> None: try: path = urlparse(self.path).path if path == "/api/health": self._handle_health() return if path == "/api/tools": self._send_json(200, {"ok": True, "tools": AGENT.tools()}) return if path == "/assets/logo.png": self._serve_file(LOGO_PATH, "image/png") return self._serve_static(path) except Exception as error: self._send_json(500, {"ok": False, "message": str(error) or error.__class__.__name__}) def do_POST(self) -> None: try: path = urlparse(self.path).path if path == "/api/chat/stream": self._handle_chat_stream() return if path != "/api/chat": self._send_json(404, {"ok": False, "message": "Not found"}) return payload = self._read_json_body() message = str(payload.get("message") or "").strip() if isinstance(payload, dict) else "" if not message: self._send_json(400, {"ok": False, "message": "message is required"}) return history = payload.get("history") if isinstance(payload, dict) else [] if not isinstance(history, list): history = [] result = AGENT.run(message, history) self._send_json(200, {"ok": True, "result": result}) except Exception as error: self._send_json(500, {"ok": False, "message": str(error) or error.__class__.__name__}) def _handle_chat_stream(self) -> None: payload = self._read_json_body() message = str(payload.get("message") or "").strip() if isinstance(payload, dict) else "" if not message: self._send_json(400, {"ok": False, "message": "message is required"}) return history = payload.get("history") if isinstance(payload, dict) else [] if not isinstance(history, list): history = [] self.send_response(200) self._cors_headers() self.send_header("Content-Type", "application/x-ndjson; charset=utf-8") self.send_header("Cache-Control", "no-cache") self.end_headers() def send_event(kind: str, event_payload: Any) -> None: body = json.dumps( {"type": kind, "payload": event_payload}, ensure_ascii=False, default=str, ).encode("utf-8") self.wfile.write(body + b"\n") self.wfile.flush() try: send_event("status", {"title": "开始处理", "detail": "正在进入 LangGraph 编排"}) result = AGENT.run(message, history, emit=lambda event: send_event("trace", event)) send_event("result", result) send_event("done", {}) except (BrokenPipeError, ConnectionResetError): return except Exception as error: send_event("error", {"message": str(error) or error.__class__.__name__}) def log_message(self, format: str, *args: Any) -> None: if os.getenv("WEB_AGENT_ACCESS_LOG", "").lower() in {"1", "true", "yes"}: super().log_message(format, *args) def _handle_health(self) -> None: try: mcp = AGENT.health() self._send_json( 200, { "ok": True, "runtime": { "name": "smqjh-agent-runtime", "mode": "web", "framework": "Python + LangGraph", "environmentName": CONFIG.environment_name, "baseUrl": CONFIG.base_url, }, "mcp": mcp, }, ) except Exception as error: self._send_json( 200, { "ok": False, "runtime": { "name": "smqjh-agent-runtime", "mode": "web", "framework": "Python + LangGraph", }, "mcp": { "ok": False, "message": str(error) or error.__class__.__name__, }, }, ) def _serve_static(self, request_path: str) -> None: clean_path = "/index.html" if request_path == "/" else request_path relative = unquote(clean_path).lstrip("/") file_path = (PUBLIC_ROOT / relative).resolve() try: file_path.relative_to(PUBLIC_ROOT.resolve()) except ValueError: self._send_text(403, "Forbidden") return self._serve_file(file_path) def _serve_file(self, file_path: Path, content_type: str | None = None) -> None: if not file_path.exists() or not file_path.is_file(): self._send_text(404, "Not found") return mime = content_type or mimetypes.guess_type(str(file_path))[0] or "application/octet-stream" data = file_path.read_bytes() self.send_response(200) self._cors_headers() self.send_header("Content-Type", _with_charset(mime)) self.send_header("Cache-Control", "no-cache" if file_path.suffix == ".html" else "public, max-age=300") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def _read_json_body(self) -> dict[str, Any]: length = int(self.headers.get("Content-Length") or "0") if length > MAX_BODY_BYTES: raise ValueError("Request body is too large") raw = self.rfile.read(length).decode("utf-8") if length else "{}" return json.loads(raw) if raw.strip() else {} def _send_json(self, status: int, payload: Any) -> None: body = json.dumps(payload, ensure_ascii=False, default=str).encode("utf-8") self.send_response(status) self._cors_headers() self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _send_text(self, status: int, text: str) -> None: body = text.encode("utf-8") self.send_response(status) self._cors_headers() self.send_header("Content-Type", "text/plain; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _cors_headers(self) -> None: self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET,POST,OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type,Authorization,X-MCP-Token") def _with_charset(mime: str) -> str: if mime.startswith("text/") or mime in {"application/javascript", "application/json"}: return f"{mime}; charset=utf-8" return mime def main() -> None: server = ThreadingHTTPServer((CONFIG.host, CONFIG.port), AgentRequestHandler) print(f"SMQJH Python Web Agent started: http://127.0.0.1:{CONFIG.port}", flush=True) print(f"Runtime framework: Python + LangGraph", flush=True) print(f"MCP endpoint: {CONFIG.mcp.url}", flush=True) try: server.serve_forever() except KeyboardInterrupt: pass finally: server.server_close() if __name__ == "__main__": main()