Source code for rasenmaeher_api.prodcutapihelpers

"""Product integration API helpers"""
from typing import Dict, Optional, Type, Any, Mapping, Tuple
import asyncio
import logging

import aiohttp
import pydantic
from libpvarki.schemas.generic import OperationResultResponse
from libadvian.tasks import TaskMaster


from .rmsettings import RMSettings
from .mtlsinit import get_session_winit
from .cfssl.private import refresh_ocsp

[docs] LOGGER = logging.getLogger(__name__)
[docs] def check_kraftwerk_manifest() -> bool: """Check that settings has manifest""" RMSettings.singleton().load_manifest() return RMSettings.singleton().kraftwerk_manifest_bool
[docs] async def post_to_all_products( url_suffix: str, data: Mapping[str, Any], respose_schema: Type[pydantic.BaseModel], collect_responses: bool = True ) -> Optional[Dict[str, Optional[pydantic.BaseModel]]]: """Call given POST endpoint on call products in the manifest""" return await _method_to_all_products("post", url_suffix, data, respose_schema, collect_responses)
[docs] async def put_to_all_products( url_suffix: str, data: Mapping[str, Any], respose_schema: Type[pydantic.BaseModel], collect_responses: bool = True ) -> Optional[Dict[str, Optional[pydantic.BaseModel]]]: """Call given PUT endpoint on call products in the manifest""" return await _method_to_all_products("put", url_suffix, data, respose_schema, collect_responses)
[docs] async def get_from_all_products( url_suffix: str, respose_schema: Type[pydantic.BaseModel], collect_responses: bool = True ) -> Optional[Dict[str, Optional[pydantic.BaseModel]]]: """Call given GET endpoint on call products in the manifest""" return await _method_to_all_products("get", url_suffix, None, respose_schema, collect_responses)
[docs] async def _method_to_all_products( methodname: str, url_suffix: str, data: Optional[Mapping[str, Any]], respose_schema: Type[pydantic.BaseModel], collect_responses: bool = True, ) -> Optional[Dict[str, Optional[pydantic.BaseModel]]]: """Call given POST endpoint on call products in the manifest""" if not check_kraftwerk_manifest(): return None manifest = RMSettings.singleton().kraftwerk_manifest_dict if "products" not in manifest: LOGGER.error("Manifest does not have products key") return None rmconf = RMSettings.singleton() await refresh_ocsp() LOGGER.debug("data={}".format(data)) async def handle_one(name: str, conf: Mapping[str, Any]) -> Tuple[str, Optional[pydantic.BaseModel]]: """Do one call""" nonlocal url_suffix, rmconf session = await get_session_winit() async with session as client: try: url = f"{conf['api']}{url_suffix}" LOGGER.debug("calling {}({})".format(methodname, url)) if data is None: resp = await getattr(client, methodname)(url, timeout=rmconf.integration_api_timeout) else: resp = await getattr(client, methodname)(url, json=data, timeout=rmconf.integration_api_timeout) resp.raise_for_status() payload = await resp.json() LOGGER.debug("{}({}) payload={}".format(methodname, url, payload)) retval = respose_schema.parse_obj(payload) # Log a common error case here for DRY if isinstance(retval, OperationResultResponse): if not retval.success: LOGGER.error("Failure at {}, response: {}".format(url, retval)) return name, retval except (aiohttp.ClientError, TimeoutError, asyncio.TimeoutError) as exc: LOGGER.error("Failure to call {}: {}".format(url, repr(exc))) return name, None except pydantic.ValidationError as exc: LOGGER.error("Invalid response from {}: {}".format(url, repr(exc))) return name, None except Exception: # pylint: disable=W0718 LOGGER.exception("Something went seriously wrong calling {}".format(url)) return name, None if not collect_responses: tma = TaskMaster.singleton() for name, conf in manifest["products"].items(): tma.create_task(handle_one(name, conf)) return None coros = [] for name, conf in manifest["products"].items(): coros.append(handle_one(name, conf)) return dict(await asyncio.gather(*coros))