| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- from __future__ import annotations
- import json
- import mimetypes
- import os
- from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
- from pathlib import Path
- from typing import Any
- from urllib.parse import unquote, urlparse
- from .agent_graph import SmqjhAgentGraph
- from .config import load_runtime_config
- RUNTIME_ROOT = Path(__file__).resolve().parents[1]
- REPO_ROOT = RUNTIME_ROOT.parent
- PUBLIC_ROOT = RUNTIME_ROOT / "public"
- LOGO_PATH = REPO_ROOT / "smqjh-admin-agent" / "build" / "logo.png"
- MAX_BODY_BYTES = 1024 * 1024
- CONFIG = load_runtime_config()
- AGENT = SmqjhAgentGraph(CONFIG)
- class AgentRequestHandler(BaseHTTPRequestHandler):
- server_version = "SMQJHAgentPython/0.1"
- def do_OPTIONS(self) -> None:
- self._send_text(204, "")
- def do_GET(self) -> None:
- try:
- path = urlparse(self.path).path
- if path == "/api/health":
- self._handle_health()
- return
- if path == "/api/tools":
- self._send_json(200, {"ok": True, "tools": AGENT.tools()})
- return
- if path == "/assets/logo.png":
- self._serve_file(LOGO_PATH, "image/png")
- return
- self._serve_static(path)
- except Exception as error:
- self._send_json(500, {"ok": False, "message": str(error) or error.__class__.__name__})
- def do_POST(self) -> None:
- try:
- path = urlparse(self.path).path
- if path != "/api/chat":
- self._send_json(404, {"ok": False, "message": "Not found"})
- return
- payload = self._read_json_body()
- message = str(payload.get("message") or "").strip() if isinstance(payload, dict) else ""
- if not message:
- self._send_json(400, {"ok": False, "message": "message is required"})
- return
- history = payload.get("history") if isinstance(payload, dict) else []
- if not isinstance(history, list):
- history = []
- result = AGENT.run(message, history)
- self._send_json(200, {"ok": True, "result": result})
- except Exception as error:
- self._send_json(500, {"ok": False, "message": str(error) or error.__class__.__name__})
- def log_message(self, format: str, *args: Any) -> None:
- if os.getenv("WEB_AGENT_ACCESS_LOG", "").lower() in {"1", "true", "yes"}:
- super().log_message(format, *args)
- def _handle_health(self) -> None:
- try:
- mcp = AGENT.health()
- self._send_json(
- 200,
- {
- "ok": True,
- "runtime": {
- "name": "smqjh-agent-runtime",
- "mode": "web",
- "framework": "Python + LangGraph",
- "environmentName": CONFIG.environment_name,
- "baseUrl": CONFIG.base_url,
- },
- "mcp": mcp,
- },
- )
- except Exception as error:
- self._send_json(
- 200,
- {
- "ok": False,
- "runtime": {
- "name": "smqjh-agent-runtime",
- "mode": "web",
- "framework": "Python + LangGraph",
- },
- "mcp": {
- "ok": False,
- "message": str(error) or error.__class__.__name__,
- },
- },
- )
- def _serve_static(self, request_path: str) -> None:
- clean_path = "/index.html" if request_path == "/" else request_path
- relative = unquote(clean_path).lstrip("/")
- file_path = (PUBLIC_ROOT / relative).resolve()
- try:
- file_path.relative_to(PUBLIC_ROOT.resolve())
- except ValueError:
- self._send_text(403, "Forbidden")
- return
- self._serve_file(file_path)
- def _serve_file(self, file_path: Path, content_type: str | None = None) -> None:
- if not file_path.exists() or not file_path.is_file():
- self._send_text(404, "Not found")
- return
- mime = content_type or mimetypes.guess_type(str(file_path))[0] or "application/octet-stream"
- data = file_path.read_bytes()
- self.send_response(200)
- self._cors_headers()
- self.send_header("Content-Type", _with_charset(mime))
- self.send_header("Cache-Control", "no-cache" if file_path.suffix == ".html" else "public, max-age=300")
- self.send_header("Content-Length", str(len(data)))
- self.end_headers()
- self.wfile.write(data)
- def _read_json_body(self) -> dict[str, Any]:
- length = int(self.headers.get("Content-Length") or "0")
- if length > MAX_BODY_BYTES:
- raise ValueError("Request body is too large")
- raw = self.rfile.read(length).decode("utf-8") if length else "{}"
- return json.loads(raw) if raw.strip() else {}
- def _send_json(self, status: int, payload: Any) -> None:
- body = json.dumps(payload, ensure_ascii=False, default=str).encode("utf-8")
- self.send_response(status)
- self._cors_headers()
- self.send_header("Content-Type", "application/json; charset=utf-8")
- self.send_header("Content-Length", str(len(body)))
- self.end_headers()
- self.wfile.write(body)
- def _send_text(self, status: int, text: str) -> None:
- body = text.encode("utf-8")
- self.send_response(status)
- self._cors_headers()
- self.send_header("Content-Type", "text/plain; charset=utf-8")
- self.send_header("Content-Length", str(len(body)))
- self.end_headers()
- self.wfile.write(body)
- def _cors_headers(self) -> None:
- self.send_header("Access-Control-Allow-Origin", "*")
- self.send_header("Access-Control-Allow-Methods", "GET,POST,OPTIONS")
- self.send_header("Access-Control-Allow-Headers", "Content-Type,Authorization,X-MCP-Token")
- def _with_charset(mime: str) -> str:
- if mime.startswith("text/") or mime in {"application/javascript", "application/json"}:
- return f"{mime}; charset=utf-8"
- return mime
- def main() -> None:
- server = ThreadingHTTPServer((CONFIG.host, CONFIG.port), AgentRequestHandler)
- print(f"SMQJH Python Web Agent started: http://127.0.0.1:{CONFIG.port}", flush=True)
- print(f"Runtime framework: Python + LangGraph", flush=True)
- print(f"MCP endpoint: {CONFIG.mcp.url}", flush=True)
- try:
- server.serve_forever()
- except KeyboardInterrupt:
- pass
- finally:
- server.server_close()
- if __name__ == "__main__":
- main()
|