Initial commit

This commit is contained in:
Sergey Filkin
2026-04-17 23:39:57 +03:00
commit 0fe596383e
13 changed files with 943 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
"""Application package for the md-to-html service."""
+183
View File
@@ -0,0 +1,183 @@
import os
from typing import Any
from urllib.error import URLError
from urllib.request import Request, urlopen
from fastapi import FastAPI, HTTPException, Request as FastAPIRequest, Response
from fastapi.exceptions import RequestValidationError
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, ConfigDict, field_validator
from app.converter import convert, load_template_text
DEFAULT_MAX_MARKDOWN_BYTES = 1_048_576
DEFAULT_MAX_REQUEST_BYTES = 1_200_000
def get_int_env(name: str, default: int) -> int:
raw_value = os.getenv(name)
if raw_value is None:
return default
try:
value = int(raw_value)
except ValueError as exc:
raise RuntimeError(f"{name} must be an integer.") from exc
if value <= 0:
raise RuntimeError(f"{name} must be positive.")
return value
def get_bool_env(name: str, default: bool = False) -> bool:
raw_value = os.getenv(name)
if raw_value is None:
return default
return raw_value.strip().lower() in {"1", "true", "yes", "on"}
class ConvertRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
markdown: str
title: str | None = None
@field_validator("markdown")
@classmethod
def validate_markdown_size(cls, value: str) -> str:
max_markdown_bytes = get_int_env(
"MAX_MARKDOWN_BYTES", DEFAULT_MAX_MARKDOWN_BYTES
)
if len(value.encode("utf-8")) > max_markdown_bytes:
raise HTTPException(
status_code=413,
detail=f"markdown exceeds {max_markdown_bytes} bytes",
)
return value
class MaxRequestSizeMiddleware:
def __init__(self, app: Any, max_request_bytes: int) -> None:
self.app = app
self.max_request_bytes = max_request_bytes
async def __call__(self, scope, receive, send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
headers = {
key.decode("latin1").lower(): value.decode("latin1")
for key, value in scope.get("headers", [])
}
content_length = headers.get("content-length")
if content_length:
try:
if int(content_length) > self.max_request_bytes:
await self._send_413(scope, receive, send)
return
except ValueError:
pass
body = bytearray()
while True:
message = await receive()
if message["type"] != "http.request":
if message["type"] == "http.disconnect":
return
continue
chunk = message.get("body", b"")
body.extend(chunk)
if len(body) > self.max_request_bytes:
await self._send_413(scope, receive, send)
return
if not message.get("more_body", False):
break
body_bytes = bytes(body)
body_sent = False
async def replay_receive():
nonlocal body_sent
if body_sent:
return {"type": "http.request", "body": b"", "more_body": False}
body_sent = True
return {"type": "http.request", "body": body_bytes, "more_body": False}
await self.app(scope, replay_receive, send)
async def _send_413(self, scope, receive, send) -> None:
response = JSONResponse(
status_code=413,
content={"detail": f"request exceeds {self.max_request_bytes} bytes"},
)
await response(scope, receive, send)
app = FastAPI(title="md-to-html")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["POST", "GET"],
allow_headers=["content-type"],
)
app.add_middleware(
MaxRequestSizeMiddleware,
max_request_bytes=get_int_env("MAX_REQUEST_BYTES", DEFAULT_MAX_REQUEST_BYTES),
)
@app.exception_handler(RequestValidationError)
async def request_validation_exception_handler(
request: FastAPIRequest, exc: RequestValidationError
) -> JSONResponse:
return JSONResponse(status_code=400, content={"detail": exc.errors()})
@app.post("/convert")
async def convert_markdown(payload: ConvertRequest) -> Response:
if not payload.markdown.strip():
raise HTTPException(status_code=400, detail="markdown must not be empty")
fallback_title = payload.title or "Document"
try:
html_result = convert(payload.markdown, fallback_title=fallback_title)
except RuntimeError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
return Response(content=html_result, media_type="text/html; charset=utf-8")
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}
@app.get("/ready")
async def ready() -> dict[str, Any]:
details: dict[str, Any] = {"status": "ok", "template_loaded": True}
try:
load_template_text()
except Exception as exc:
raise HTTPException(status_code=503, detail=f"Template load failed: {exc}") from exc
if get_bool_env("READY_CHECK_GITHUB", default=False):
request = Request(
"https://api.github.com",
headers={"User-Agent": "md-to-html-service-readiness"},
method="HEAD",
)
try:
with urlopen(request, timeout=5) as response:
details["github_status"] = response.status
except URLError as exc:
raise HTTPException(
status_code=503,
detail=f"GitHub readiness check failed: {exc.reason}",
) from exc
else:
details["github_status"] = "skipped"
return details
+103
View File
@@ -0,0 +1,103 @@
import json
import os
import re
from functools import lru_cache
from html.parser import HTMLParser
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
API_URL = "https://api.github.com/markdown"
API_VERSION = "2022-11-28"
TEMPLATE_PATH = Path(__file__).resolve().parent.parent / "template.html"
class FirstHeadingParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
self._capture = False
self._done = False
self._parts: list[str] = []
def handle_starttag(self, tag: str, attrs) -> None:
if self._done:
return
if tag in {"h1", "h2", "h3", "h4", "h5", "h6"}:
self._capture = True
def handle_endtag(self, tag: str) -> None:
if self._capture and tag in {"h1", "h2", "h3", "h4", "h5", "h6"}:
self._capture = False
self._done = True
def handle_data(self, data: str) -> None:
if self._capture and not self._done:
self._parts.append(data)
def title(self) -> str:
return "".join(self._parts).strip()
def render_markdown(markdown_text: str) -> str:
payload = json.dumps({"text": markdown_text}).encode("utf-8")
headers = {
"Accept": "text/html",
"Content-Type": "application/json",
"User-Agent": "md-to-html-service",
"X-GitHub-Api-Version": API_VERSION,
}
github_token = os.getenv("GITHUB_TOKEN")
if github_token:
headers["Authorization"] = f"Bearer {github_token}"
request = Request(API_URL, data=payload, headers=headers, method="POST")
try:
with urlopen(request, timeout=30) as response:
return response.read().decode("utf-8")
except HTTPError as exc:
error_body = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(
f"GitHub API error: {exc.code} {exc.reason}\n{error_body}"
) from exc
except URLError as exc:
raise RuntimeError(f"Failed to reach GitHub API: {exc.reason}") from exc
def extract_title(html_text: str, fallback: str) -> str:
parser = FirstHeadingParser()
parser.feed(html_text)
return parser.title() or fallback
def apply_template(template_text: str, html_text: str, title: str) -> str:
updated = re.sub(
r"<title>.*?</title>",
f"<title>{title}</title>",
template_text,
flags=re.DOTALL,
)
output_lines = []
inserted = False
html_lines = [f" {line}" if line else "" for line in html_text.splitlines()]
for line in updated.splitlines():
if not inserted and "Markdown -->" in line:
output_lines.extend(html_lines)
inserted = True
continue
output_lines.append(line)
if not inserted:
raise RuntimeError("Template placeholder not found.")
return "\n".join(output_lines) + "\n"
@lru_cache(maxsize=1)
def load_template_text() -> str:
return TEMPLATE_PATH.read_text(encoding="utf-8")
def convert(markdown_text: str, fallback_title: str = "Document") -> str:
html_text = render_markdown(markdown_text)
title = extract_title(html_text, fallback_title)
template_text = load_template_text()
return apply_template(template_text, html_text, title)
+240
View File
@@ -0,0 +1,240 @@
import threading
import uuid
from collections import OrderedDict
from html.parser import HTMLParser
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
import sys
import streamlit as st
try:
from app.converter import convert
except ModuleNotFoundError:
sys.path.append(str(Path(__file__).resolve().parent.parent))
from app.converter import convert
MAX_PREVIEW_STORE_ITEMS = 20
class BodyInnerHTMLParser(HTMLParser):
def __init__(self) -> None:
super().__init__(convert_charrefs=False)
self._inside_body = False
self._depth = 0
self._parts: list[str] = []
def handle_starttag(self, tag: str, attrs) -> None:
rendered = self.get_starttag_text()
if tag == "body":
self._inside_body = True
self._depth = 0
return
if self._inside_body and rendered is not None:
self._parts.append(rendered)
self._depth += 1
def handle_endtag(self, tag: str) -> None:
if tag == "body" and self._inside_body:
self._inside_body = False
self._depth = 0
return
if self._inside_body:
self._parts.append(f"</{tag}>")
if self._depth > 0:
self._depth -= 1
def handle_startendtag(self, tag: str, attrs) -> None:
if self._inside_body:
rendered = self.get_starttag_text()
if rendered is not None:
self._parts.append(rendered)
def handle_data(self, data: str) -> None:
if self._inside_body:
self._parts.append(data)
def handle_entityref(self, name: str) -> None:
if self._inside_body:
self._parts.append(f"&{name};")
def handle_charref(self, name: str) -> None:
if self._inside_body:
self._parts.append(f"&#{name};")
def handle_comment(self, data: str) -> None:
if self._inside_body:
self._parts.append(f"<!--{data}-->")
def body_html(self) -> str:
return "".join(self._parts).strip()
def extract_body_html(document_html: str) -> str:
parser = BodyInnerHTMLParser()
parser.feed(document_html)
parser.close()
return parser.body_html()
@st.cache_resource
def get_preview_runtime() -> dict[str, object]:
store: OrderedDict[str, str] = OrderedDict()
lock = threading.Lock()
class PreviewHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
prefix = "/preview/"
if not self.path.startswith(prefix):
self.send_error(404)
return
preview_id = self.path[len(prefix) :].split("?", 1)[0]
with lock:
document_html = store.get(preview_id)
if document_html is None:
self.send_error(404)
return
payload = document_html.encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(payload)))
self.send_header("Cache-Control", "no-store")
self.end_headers()
self.wfile.write(payload)
def log_message(self, format: str, *args) -> None:
return
server = ThreadingHTTPServer(("127.0.0.1", 0), PreviewHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
return {
"base_url": f"http://127.0.0.1:{server.server_port}",
"store": store,
"lock": lock,
}
def register_preview(document_html: str) -> str:
runtime = get_preview_runtime()
preview_id = uuid.uuid4().hex
store = runtime["store"]
lock = runtime["lock"]
with lock:
store[preview_id] = document_html
while len(store) > MAX_PREVIEW_STORE_ITEMS:
store.popitem(last=False)
return f"{runtime['base_url']}/preview/{preview_id}"
st.set_page_config(
page_title="Markdown to HTML",
page_icon=":material/description:",
layout="centered",
)
if "html_result" not in st.session_state:
st.session_state["html_result"] = None
if "output_name" not in st.session_state:
st.session_state["output_name"] = "document.html"
if "preview_url" not in st.session_state:
st.session_state["preview_url"] = None
st.title("Markdown → HTML")
st.caption("Загрузите markdown-файл, проверьте превью и скачайте готовый HTML.")
uploaded_file = st.file_uploader(
"Загрузите .md файл",
type=["md", "markdown"],
)
html_result = st.session_state["html_result"]
with st.container(border=True):
action_col, preview_col, download_col = st.columns(
[1.1, 1, 1],
vertical_alignment="center",
)
with action_col:
convert_clicked = st.button(
"Конвертировать",
disabled=uploaded_file is None,
type="primary",
icon=":material/auto_awesome:",
use_container_width=True,
)
with preview_col:
if html_result and st.session_state["preview_url"] is not None:
st.link_button(
"Открыть превью",
url=st.session_state["preview_url"],
icon=":material/open_in_new:",
use_container_width=True,
)
else:
st.button(
"Открыть превью",
disabled=True,
icon=":material/open_in_new:",
use_container_width=True,
)
with download_col:
if html_result:
st.download_button(
"Скачать HTML",
data=html_result,
file_name=st.session_state["output_name"],
mime="text/html",
icon=":material/download:",
use_container_width=True,
)
else:
st.button(
"Скачать HTML",
disabled=True,
icon=":material/download:",
use_container_width=True,
)
if html_result:
st.caption(":green-badge[Результат готов]")
else:
st.caption("После конвертации здесь появятся действия с готовым файлом.")
if convert_clicked and uploaded_file is not None:
markdown_bytes = uploaded_file.getvalue()
markdown_text = markdown_bytes.decode("utf-8")
output_name = f"{Path(uploaded_file.name).stem}.html"
try:
st.session_state["html_result"] = convert(
markdown_text,
fallback_title=Path(uploaded_file.name).stem or "Document",
)
st.session_state["output_name"] = output_name
st.session_state["preview_url"] = register_preview(st.session_state["html_result"])
st.rerun()
except RuntimeError as exc:
st.session_state["html_result"] = None
st.session_state["preview_url"] = None
st.error(str(exc))
html_result = st.session_state["html_result"]
if html_result:
body_html = extract_body_html(html_result)
with st.container(border=True):
st.caption(
"Inline-превью без стилей. Для точного вида — «Открыть превью» или скачайте файл."
)
st.markdown(body_html, unsafe_allow_html=True)
with st.expander("Показать исходный HTML", icon=":material/code:"):
st.code(html_result, language="html")