Source code for ocsprest.console.ocsprest

"""CLI commands for the OCSP rest wrapper"""
import asyncio
import logging
import json

import click
from libadvian.logging import init_logging
import aiohttp

from ocsprest import __version__
from ocsprest.config import RESTConfig
from ocsprest.routes import refresher
from ocsprest.helpers import dump_crl, refresh_oscp

[docs] LOGGER = logging.getLogger(__name__)
@click.group() @click.version_option(version=__version__) @click.option("-l", "--loglevel", help="Python log level, 10=DEBUG, 20=INFO, 30=WARNING, 40=CRITICAL", default=30) @click.option("-v", "--verbose", count=True, help="Shorthand for info/debug loglevel (-v/-vv)")
[docs] def cligrp(loglevel: int, verbose: int) -> None: """REST wrapper for CFSSL CLI functionality not present in it's own REST API""" if verbose == 1: loglevel = 20 if verbose >= 2: loglevel = 10 init_logging(loglevel) LOGGER.setLevel(loglevel)
@cligrp.command(name="config")
[docs] def dump_config() -> None: """Show the resolved config as JSON""" click.echo(RESTConfig.singleton().model_dump_json())
@cligrp.command(name="crl")
[docs] def crl() -> None: """dump CRL files""" asyncio.get_event_loop().run_until_complete(dump_crl())
@cligrp.command(name="ocsp")
[docs] def ocsp() -> None: """Refresh OCSP signatures""" asyncio.get_event_loop().run_until_complete(refresh_oscp())
@cligrp.command(name="refresher")
[docs] def start_refresher() -> None: """Start the refresher loop and run forever""" asyncio.get_event_loop().run_until_complete(refresher())
@cligrp.command(name="healthcheck") @click.option("--host", default="localhost", help="The host to connect to") @click.option("--port", default=8887, help="The port to connect to") @click.option("--timeout", default=2.0, help="The timeout in seconds") @click.pass_context
[docs] def do_http_healthcheck(ctx: click.Context, host: str, port: int, timeout: float) -> None: """ Do a GET request to the healthcheck api and dump results to stdout """ async def doit() -> int: """The actual work""" nonlocal host, port, timeout if "://" not in host: host = f"http://{host}" async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session: url = f"{host}:{port}/api/v1/healthcheck" LOGGER.debug("Calling {}".format(url)) async with session.get(url) as resp: if resp.status != 200: LOGGER.warning("{} returned {}".format(url, resp)) return resp.status payload = await resp.json() click.echo(json.dumps(payload)) if payload["healthcheck"] != "success": return 1 return 0 ctx.exit(asyncio.get_event_loop().run_until_complete(doit()))
[docs] def ocsprest_cli() -> None: """cli entrypoint""" init_logging(logging.WARNING) cligrp() # pylint: disable=no-value-for-parameter