Source code for rasenmaeher_api.cfssl.public

"""Public things, CA cert, CRL etc"""
from typing import Dict, Any
import logging
import base64

import aiohttp

from .base import base_url, anon_session, get_result, get_result_cert, CFSSLError, get_result_bundle, ocsprest_base
from .private import refresh_ocsp
from ..rmsettings import RMSettings


[docs] LOGGER = logging.getLogger(__name__)
[docs] CRL_LIFETIME = "1800s" # seconds
[docs] async def get_ca() -> str: """ Quick and dirty method to get CA from CFSSL returns: CA certificate """ async with (await anon_session()) as session: url = f"{base_url()}/api/v1/cfssl/info" payload: Dict[str, Any] = {} # PONDER: Why does this need to be a POST ?? try: async with session.post(url, json=payload, timeout=RMSettings.singleton().cfssl_timeout) as response: return await get_result_cert(response) except aiohttp.ClientError as exc: raise CFSSLError(str(exc)) from exc
[docs] async def get_ocsprest_crl(suffix: str) -> bytes: """Fetch CRL from OCSPREST""" async with (await anon_session()) as session: url = f"{ocsprest_base()}/api/v1/crl/{suffix}" try: async with session.get(url) as response: data = await response.read() LOGGER.debug("{} returned {}".format(url, repr(data))) return data except aiohttp.ClientError as exc: raise CFSSLError(str(exc)) from exc
[docs] async def get_crl() -> bytes: """ Quick and dirty method to get CRL from CFSSL, should not be used. returns: DER binary encoded Certificate Revocation List """ async with (await anon_session()) as session: url = f"{base_url()}/api/v1/cfssl/crl" try: async with session.get( url, params={"expiry": CRL_LIFETIME}, timeout=RMSettings.singleton().cfssl_timeout ) as response: crl_b64 = await get_result(response) data = base64.b64decode(crl_b64) return data except aiohttp.ClientError as exc: raise CFSSLError(str(exc)) from exc
[docs] async def get_bundle(cert: str) -> str: """ Get the optimal cert bundle for given cert """ # FIXME: This is not a good way but I don't have a better one right now either # Force OCSP refresh before getting the bundle so we hopefully get all we need await refresh_ocsp() async with (await anon_session()) as session: url = f"{base_url()}/api/v1/cfssl/bundle" payload: Dict[str, Any] = {"certificate": cert, "flavor": "optimal"} try: async with session.post(url, json=payload, timeout=RMSettings.singleton().cfssl_timeout) as response: return await get_result_bundle(response) except aiohttp.ClientError as exc: raise CFSSLError(str(exc)) from exc