Source code for rasenmaeher_api.cfssl.base

"""Base helpers etc"""
from typing import Any, Mapping, Union, List, cast
import logging
import ssl

import aiohttp
from libpvarki.mtlshelp.context import get_ca_context

from ..rmsettings import RMSettings

[docs] LOGGER = logging.getLogger(__name__)
[docs] DEFAULT_TIMEOUT = RMSettings.singleton().cfssl_timeout
[docs] class CFSSLError(RuntimeError): """CFSSL errors"""
[docs] class NoResult(CFSSLError, ValueError): """Did not get any result"""
[docs] class ErrorResult(CFSSLError, ValueError): """Did not get any result"""
[docs] class DBLocked(CFSSLError): """Database is locked, we should probably retry"""
[docs] class NoValue(CFSSLError, ValueError): """Did not get expected values"""
[docs] async def get_result(response: aiohttp.ClientResponse) -> Any: """Get the result part""" data = cast(Mapping[str, Union[Any, Mapping[str, Any]]], await response.json(content_type=None)) LOGGER.debug("data={}".format(data)) if not data: LOGGER.error("Got empty json from response={}".format(response)) raise CFSSLError("Got empty response") if errors := data.get("errors"): errors = cast(List[Mapping[str, Any]], errors) for error in errors: if error["code"] == 11000: raise DBLocked("CFSSL returned following errors: {}".format(errors)) raise ErrorResult("CFSSL returned following errors: {}".format(errors)) result = data.get("result") if not result: raise NoResult() return result
[docs] async def get_result_cert(response: aiohttp.ClientResponse) -> str: """Shorthand for checking the response and getting the cert""" result = await get_result(response) cert = result.get("certificate") if not cert: raise NoValue("CFSSL did not return certificate") return cast(str, cert)
[docs] async def get_result_bundle(response: aiohttp.ClientResponse) -> str: """Shorthand for checking the response and getting the cert""" result = await get_result(response) cert = result.get("bundle") if not cert: raise NoValue("CFSSL did not return certificate bundle") return cast(str, cert)
[docs] def base_url() -> str: """Construct the base url""" cnf = RMSettings.singleton() return f"{cnf.cfssl_host}:{cnf.cfssl_port}"
[docs] def ocsprest_base() -> str: """Construct the base url for ocsprest""" cnf = RMSettings.singleton() return f"{cnf.ocsprest_host}:{cnf.ocsprest_port}"
[docs] async def anon_session() -> aiohttp.ClientSession: """Anonymous session with content-type set""" ctx = get_ca_context(ssl.Purpose.SERVER_AUTH) conn = aiohttp.TCPConnector(ssl=ctx) session = aiohttp.ClientSession(connector=conn) session.headers.add("Content-Type", "application/json") return session