Source code for rasenmaeher_api.mtlsinit

"""Init mTLS client cert for RASENMAEHER itself"""
import asyncio
from pathlib import Path
import logging
import random

from libpvarki.mtlshelp.session import get_session as libsession
from libpvarki.mtlshelp.csr import async_create_client_csr, async_create_keypair, resolve_filepaths
import aiohttp
import filelock


from .cfssl.anoncsr import anon_sign_csr
from .rmsettings import switchme_to_singleton_call

[docs] LOGGER = logging.getLogger(__name__)
[docs] CERT_NAME_PREFIX = "rm_mtls_client"
[docs] def check_settings_clientpaths() -> bool: """Make sure the paths are defined, to defaults if needed, return True if setting was changed""" changed = False if not switchme_to_singleton_call.mtls_client_cert_path: switchme_to_singleton_call.mtls_client_cert_path = str( Path(switchme_to_singleton_call.persistent_data_dir) / "public" / f"{CERT_NAME_PREFIX}.pem" ) changed = True if not switchme_to_singleton_call.mtls_client_key_path: switchme_to_singleton_call.mtls_client_key_path = str( Path(switchme_to_singleton_call.persistent_data_dir) / "private" / f"{CERT_NAME_PREFIX}.key" ) changed = True return changed
[docs] def check_mtls_init() -> bool: """Check if we have the cert and key""" check_settings_clientpaths() assert switchme_to_singleton_call.mtls_client_cert_path is not None assert switchme_to_singleton_call.mtls_client_key_path is not None cert_path = Path(switchme_to_singleton_call.mtls_client_cert_path) key_path = Path(switchme_to_singleton_call.mtls_client_key_path) LOGGER.debug("cert_path={} exits={}".format(cert_path, cert_path.exists())) LOGGER.debug("key_path={} exits={}".format(key_path, key_path.exists())) if cert_path.exists() and key_path.exists(): return True return False
[docs] async def mtls_init() -> None: """If needed: Create keypair, CSR, and get it signed""" if check_mtls_init(): return privkeypath, pubkeypath, csrpath = resolve_filepaths( Path(switchme_to_singleton_call.persistent_data_dir), CERT_NAME_PREFIX ) check_settings_clientpaths() assert switchme_to_singleton_call.mtls_client_key_path is not None assert switchme_to_singleton_call.mtls_client_cert_path is not None if (pth := Path(switchme_to_singleton_call.mtls_client_key_path)) != privkeypath: privkeypath = pth certpath = pubkeypath.parent / f"{CERT_NAME_PREFIX}.pem" if (pth := Path(switchme_to_singleton_call.mtls_client_cert_path)) != certpath: certpath = pth lockpath = privkeypath.with_suffix(".lock") # Random sleep to avoid race conditions on these file accesses await asyncio.sleep(random.random() * 3.0) # nosec lock = filelock.FileLock(lockpath) try: lock.acquire(timeout=0.0) # Check the privkey again to avoid overwriting. if privkeypath.exists(): return None LOGGER.info("No mTLS client cert yet, creating it, this will take a moment") keypair = await async_create_keypair(privkeypath, pubkeypath) csrpem = await async_create_client_csr(keypair, csrpath, {"CN": switchme_to_singleton_call.mtls_client_cert_cn}) certpem = (await anon_sign_csr(csrpem)).replace("\\n", "\n") except filelock.Timeout: LOGGER.warning("Someone has already locked {}".format(lockpath)) LOGGER.debug("Sleeping for ~5s and then recursing") await asyncio.sleep(5.0 + random.random()) # nosec return await mtls_init() finally: lock.release() certpath.write_text(certpem, encoding="ascii")
[docs] async def get_session_winit() -> aiohttp.ClientSession: """wrap libpvarki get_session to init checks""" await mtls_init() check_settings_clientpaths() assert switchme_to_singleton_call.mtls_client_cert_path is not None assert switchme_to_singleton_call.mtls_client_key_path is not None cert_path = Path(switchme_to_singleton_call.mtls_client_cert_path) key_path = Path(switchme_to_singleton_call.mtls_client_key_path) return libsession((cert_path, key_path))