"""aiohttp routes"""
from typing import List, Dict, Any
import asyncio
import logging
import tempfile
import uuid
from pathlib import Path
import json
import time
from libadvian.tasks import TaskMaster
from libadvian.logging import init_logging
from fastapi import FastAPI, APIRouter, Request, HTTPException
from fastapi.responses import FileResponse
from ocsprest import __version__
from .config import RESTConfig
from .helpers import call_cmd, cfssl_loglevel, dump_crl, refresh_oscp
[docs]
LOGGER = logging.getLogger(__name__)
# FIXME: Use proper response schemas etc
@ROUTER.post("/refresh")
[docs]
async def refresh_all(request: Request) -> Dict[str, Any]:
"""calls cfssl ocsprefresh"""
_ = request
# Don't bother waiting for the result
TaskMaster.singleton().create_task(refresh_oscp())
return {"success": True}
@ROUTER.post("/csr/sign")
[docs]
async def csr_sign(request: Request) -> Dict[str, Any]:
"""Sign a CSR, we have to do it via CLI if we want CFSSL to add the Authority Information Access properties"""
cnf = RESTConfig().singleton()
data = await request.json()
csrtmp = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.csr"
certtmp = csrtmp.with_suffix(".crt")
try:
csrtmp.write_text(data["certificate_request"])
args_sign: List[str] = [
str(cnf.cfssl),
"sign",
f"-config {cnf.conf}",
f"-db-config {cnf.dbconf}",
f"-ca {cnf.cacrt}",
f"-ca-key {cnf.cakey}",
f"-profile {data['profile']}",
f"-csr {csrtmp}",
f"-loglevel {cfssl_loglevel()}",
]
cmd_sign = " ".join(args_sign)
ret_sign, out_sign, _ = await call_cmd(cmd_sign)
if ret_sign != 0:
raise HTTPException(
status_code=500,
detail={"success": False, "error": f"CFSSL CLI call to sign failed, code {ret_sign}. See server logs"},
)
resp_sign = json.loads(out_sign)
if not data.get("bundle", True):
return {"result": {"certificate": resp_sign["cert"].replace("\n", "\\n")}}
# Create the bundle
certtmp.write_text(resp_sign["cert"])
args_bundle: List[str] = [
str(cnf.cfssl),
"bundle",
f"-ca-bundle {cnf.rootcacrt}",
f"-int-bundle {cnf.cacrt}",
"-flavor optimal",
f"-cert {certtmp}",
f"-loglevel {cfssl_loglevel()}",
]
cmd_bundle = " ".join(args_bundle)
ret_bundle, out_bundle, _ = await call_cmd(cmd_bundle)
if ret_bundle != 0:
LOGGER.error("CFSSL CLI call to bundle failed, returning the signed cert anyway")
return {"result": {"certificate": resp_sign["cert"].replace("\n", "\\n")}}
resp_bundle = json.loads(out_bundle)
return {"result": {"certificate": resp_bundle["bundle"].replace("\n", "\\n")}}
finally:
csrtmp.unlink()
certtmp.unlink()
@ROUTER.post("/ocsp/sign")
[docs]
async def ocsp_sign_one(request: Request) -> Dict[str, Any]:
"""calls cfssl ocspsign"""
data = await request.json()
certtmp = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.pem"
try:
certtmp.write_text(data["cert"])
status = data.get("status", "good")
cnf = RESTConfig.singleton()
args: List[str] = [
str(cnf.cfssl),
"ocspsign",
f"-cert {certtmp}",
f"-ca {cnf.cacrt}",
f"-status={status}",
f"-responder {cnf.respcrt}",
f"-responder-key {cnf.respkey}",
f"-loglevel {cfssl_loglevel()}",
]
cmd = " ".join(args)
LOGGER.info("Running ocspsign")
ret, out, _ = await call_cmd(cmd)
# TODO: How to inject the response to the database ??
_resp = json.loads(out)
if ret != 0:
raise HTTPException(
status_code=500,
detail={"success": False, "error": f"CFSSL CLI call to ocspsign failed, code {ret}. See server logs"},
)
# TODO: Should we return the signed OCSP response ? can rmapi do something with it ??
return {"success": True}
finally:
certtmp.unlink()
@ROUTER.post("/dump_crl")
[docs]
async def call_dump_crl(request: Request) -> Dict[str, Any]:
"""Dump CRL to shared directory, triggering reloads for everyone interested in it is beyond us though"""
_ = request
# Don't bother waiting for the result
TaskMaster.singleton().create_task(dump_crl())
return {"success": True}
@ROUTER.get("/crl/crl.pem")
[docs]
async def get_crl_pem(request: Request) -> FileResponse:
"""Dump CRL to shared directory, triggering reloads for everyone interested in it is beyond us though"""
_ = request
ret = await dump_crl()
if ret != 0:
raise HTTPException(
status_code=500,
detail={"success": False, "error": f"CFSSL CLI call to dump_crl failed, code {ret}. See server logs"},
)
cnf = RESTConfig.singleton()
pem_path = cnf.crl.parent / f"{cnf.crl.stem}.pem"
return FileResponse(pem_path, media_type="application/x-pem-file")
@ROUTER.get("/crl/crl.der")
[docs]
async def get_crl_der(request: Request) -> FileResponse:
"""Dump CRL to shared directory, triggering reloads for everyone interested in it is beyond us though"""
_ = request
ret = await dump_crl()
if ret != 0:
raise HTTPException(
status_code=500,
detail={"success": False, "error": f"CFSSL CLI call to dump_crl failed, code {ret}. See server logs"},
)
der_path = RESTConfig.singleton().crl
return FileResponse(der_path, media_type="application/pkix-crl")
@ROUTER.get("/healthcheck")
[docs]
async def healthcheck(request: Request) -> Dict[str, Any]:
"""Health check"""
_ = request
retval = "success"
grace = 15
cnf = RESTConfig.singleton()
modtime = time.time() - cnf.crl.stat().st_mtime
LOGGER.debug("{} modified {} seconds ago".format(cnf.crl, modtime))
if modtime > (cnf.crl_refresh + grace):
LOGGER.warning("{} modified too long ago ({}s)".format(cnf.crl, modtime))
retval = "crlfail"
return {"healthcheck": retval}
[docs]
def get_app() -> FastAPI:
"""Get the app"""
app = FastAPI(docs_url="/api/docs", openapi_url="/api/openapi.json", version=__version__)
app.include_router(router=ROUTER, prefix="/api/v1")
LOGGER.debug("Returning {}".format(app))
return app
[docs]
def app_w_logging() -> FastAPI:
"""init logging and create app"""
init_logging()
app = get_app()
return app
[docs]
async def refresher() -> None:
"""Dump the CRL and refresh OCSP periodically"""
try:
while True:
try:
await asyncio.gather(dump_crl(), refresh_oscp())
except asyncio.TimeoutError as exc:
LOGGER.warning("Ignoring timeout: {}".format(exc))
await asyncio.sleep(RESTConfig.singleton().crl_refresh)
except asyncio.CancelledError:
LOGGER.debug("Cancelled")