Source code for ocsprest.helpers

"""helpers"""
from typing import Tuple, List, Optional
import logging
import asyncio
import base64
import enum
from pathlib import Path

from cryptography import x509
from cryptography.hazmat.primitives import serialization

from .config import RESTConfig

[docs] LOGGER = logging.getLogger(__name__)
# FIXME: switch to the libpvarki version
[docs] async def call_cmd(cmd: str, timeout: int = 10) -> Tuple[int, str, str]: """Do the boilerplate for calling cmd and reporting output/return code""" cnf = RESTConfig.singleton() cwd_cmd = f"cd {cnf.data_path} && {cmd}" LOGGER.debug("Calling create_subprocess_shell(({})".format(cwd_cmd)) process = await asyncio.create_subprocess_shell( cwd_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) out, err = await asyncio.wait_for(process.communicate(), timeout=timeout) if err: LOGGER.warning(err) LOGGER.info(out) assert isinstance(process.returncode, int) # at this point it is, keep mypy happy if process.returncode != 0: LOGGER.error("{} returned nonzero code: {} (process: {})".format(cmd, process.returncode, process)) LOGGER.error(err) LOGGER.error(out) return process.returncode, out.decode("utf-8"), err.decode("utf-8")
[docs] def cfssl_loglevel() -> int: """Return CFSSL loglevel 0-5""" our_level = LOGGER.getEffectiveLevel() if our_level < 10: return 0 return round(our_level / 10) - 1
[docs] class CRLType(enum.IntEnum): """Root or intermediate (or why not both)"""
[docs] ROOT = 1
[docs] INTERMEDIATE = 2
[docs] MERGED = 3
[docs] def crlpaths(crltype: CRLType = CRLType.MERGED) -> Tuple[Path, Path]: """Path to CRL files by CRLType""" cnf = RESTConfig.singleton() der_path = cnf.crl.parent / f"{cnf.crl.stem}_{crltype.name.lower()}.der" pem_path = cnf.crl.parent / f"{cnf.crl.stem}_{crltype.name.lower()}.pem" return der_path, pem_path
[docs] async def merge_crl() -> int: """Merge CRL files""" cnf = RESTConfig.singleton() root_der, root_pem = crlpaths(CRLType.ROOT) intermediate_der, intermediate_pem = crlpaths(CRLType.INTERMEDIATE) retcodes = await asyncio.gather(dump_crl(CRLType.ROOT), dump_crl(CRLType.INTERMEDIATE)) for ret in retcodes: if ret != 0: return ret der_path = cnf.crl pem_path = cnf.crl.parent / f"{cnf.crl.stem}.pem" LOGGER.info("Writing {}".format(der_path)) der_path.write_bytes(root_der.read_bytes() + intermediate_der.read_bytes()) LOGGER.info("Writing {}".format(pem_path)) pem_path.write_bytes(root_pem.read_bytes() + intermediate_pem.read_bytes()) return 0
[docs] async def dump_crl(crltype: CRLType = CRLType.MERGED) -> int: """Dump CRL to shared directory, triggering reloads for everyone interested in it is beyond us though""" cnf = RESTConfig.singleton() cafile: Optional[Path] = None cakey: Optional[Path] = None # If merged is requested dump root and intermediate first, then merge them if crltype == CRLType.MERGED: return await merge_crl() if crltype == CRLType.ROOT: cafile = cnf.rootcacrt cakey = cnf.rootcakey if crltype == CRLType.INTERMEDIATE: cafile = cnf.cacrt cakey = cnf.cakey der_path, pem_path = crlpaths(crltype) if not cafile or not cakey: return -1 args: List[str] = [ str(cnf.cfssl), "crl", f"-db-config {cnf.dbconf}", f"-ca {cafile}", f"-ca-key {cakey}", f"-expiry {cnf.crl_lifetime}", f"-loglevel {cfssl_loglevel()}", ] cmd = " ".join(args) ret, der_b64, _ = await call_cmd(cmd) if ret != 0: return ret der_bytes = base64.b64decode(der_b64) LOGGER.info("Writing {}".format(der_path)) der_path.write_bytes(der_bytes) LOGGER.debug("Parsing the CRL") crl = x509.load_der_x509_crl(der_bytes) LOGGER.info("Writing {}".format(pem_path)) pem_path.write_bytes(crl.public_bytes(encoding=serialization.Encoding.PEM)) return ret
[docs] async def refresh_oscp() -> int: """Call the OCSP refresh script""" cnf = RESTConfig.singleton() args: List[str] = [ str(cnf.cfssl), "ocsprefresh", f"-db-config {cnf.dbconf}", f"-ca {cnf.cacrt}", f"-ca-key {cnf.cakey}", f"-responder {cnf.respcrt}", f"-responder-key {cnf.respkey}", f"-interval {cnf.ocsp_lifetime}", f"-loglevel {cfssl_loglevel()}", ] cmd = " ".join(args) LOGGER.info("Running ocsprefresh") ret, _, _ = await call_cmd(cmd) return ret