"""CLI entrypoints for python-rasenmaeher-api"""
from typing import Dict, Any
import logging
import json
import asyncio
import pprint
import uuid
from pathlib import Path
import aiohttp
import click
from libadvian.logging import init_logging
from multikeyjwt import Issuer
from rasenmaeher_api import __version__
from rasenmaeher_api.jwtinit import jwt_init
from rasenmaeher_api.testhelpers import create_test_users
from rasenmaeher_api.db import LoginCode
from rasenmaeher_api.db import base as dbbase
from rasenmaeher_api.db.config import DBConfig
from rasenmaeher_api.db.middleware import DBWrapper
from rasenmaeher_api.web.application import get_app_no_init
from rasenmaeher_api.db.base import init_db, bind_config
from rasenmaeher_api.db import Person
from rasenmaeher_api.db.errors import NotFound
[docs]
LOGGER = logging.getLogger(__name__)
@click.group()
@click.version_option(version=__version__)
@click.pass_context
@click.option("-l", "--loglevel", help="Python log level, 10=DEBUG, 20=INFO, 30=WARNING, 40=CRITICAL", default=30)
@click.option("-v", "--verbose", count=True, help="Shorthand for info/debug loglevel (-v/-vv)")
[docs]
def cli_group(ctx: click.Context, loglevel: int, verbose: int) -> None:
"""CLI helpers for RASENMAEHER developers"""
if verbose == 1:
loglevel = 20
if verbose >= 2:
loglevel = 10
init_logging(loglevel)
LOGGER.setLevel(loglevel)
ctx.ensure_object(dict)
ctx.obj["loop"] = asyncio.get_event_loop()
ctx.obj["dbwrapper"] = DBWrapper(gino=dbbase.db, config=DBConfig.singleton())
@cli_group.command(name="openapi")
@click.pass_context
[docs]
def dump_openapi(ctx: click.Context) -> None:
"""
Dump autogenerate openapi spec as JSON
"""
app = get_app_no_init()
click.echo(json.dumps(app.openapi()))
ctx.exit(0)
@cli_group.command(name="healthcheck")
@click.option("--host", default="localhost", help="The host to connect to")
@click.option("--port", default=8000, help="The port to connect to")
@click.option("--timeout", default=2.0, help="The timeout in seconds")
@click.option("--services", default=False, help="Check services status too", is_flag=True)
@click.pass_context
[docs]
def do_http_healthcheck(ctx: click.Context, host: str, port: int, timeout: float, services: bool) -> None:
"""
Do a GET request to the healthcheck api and dump results to stdout
"""
async def doit() -> int:
"""The actual work"""
nonlocal host, port, timeout, services
if "://" not in host:
host = f"http://{host}"
suffix = ""
if services:
suffix = "/services"
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session:
async with session.get(f"{host}:{port}/api/v1/healthcheck{suffix}") as resp:
if resp.status != 200:
return int(resp.status)
payload = await resp.json()
click.echo(json.dumps(payload))
if services:
if not payload["all_ok"]:
return 1
if payload["healthcheck"] != "success":
return 1
return 0
ctx.exit(ctx.obj["loop"].run_until_complete(doit()))
@cli_group.command(name="addcode")
@click.pass_context
@click.argument("claims_json", required=False, default="""{"anon_admin_session": true}""", type=str)
[docs]
def add_code(ctx: click.Context, claims_json: str) -> None:
"""
Add new single-use login code
"""
claims = json.loads(claims_json)
LOGGER.debug("Parsed claims={}".format(claims))
if not claims:
click.echo("Must specify claims", err=True)
ctx.exit(1)
async def call_backend(claims: Dict[str, Any]) -> int:
"""Call the backend"""
nonlocal ctx
await ctx.obj["dbwrapper"].app_startup_event()
code = await LoginCode.create_for_claims(claims)
await ctx.obj["dbwrapper"].app_startup_event()
click.echo(code)
return 0
ctx.exit(ctx.obj["loop"].run_until_complete(call_backend(claims)))
@cli_group.command(name="getpfx")
@click.pass_context
@click.option("--admin", is_flag=True, help="If a new user, make admin")
@click.argument("callsign", required=True, type=str)
[docs]
def get_pfx(ctx: click.Context, callsign: str, admin: bool) -> None:
"""Get PFX for cert+key for the given user, will create the user if needed"""
async def do_the_needful() -> int:
"""Do what is needed"""
nonlocal callsign, admin
await bind_config()
await init_db()
try:
person = await Person.by_callsign(callsign)
except NotFound:
person = await Person.create_with_cert(callsign)
if admin:
await person.assign_role("admin")
tgtfile = Path(f"{callsign}.pfx")
tgtfile.write_bytes((await person.create_pfx()).read_bytes())
click.echo(f"Wrote {tgtfile}")
return 0
ctx.exit(ctx.obj["loop"].run_until_complete(do_the_needful()))
@cli_group.command(name="revokeuser")
@click.pass_context
@click.option("--reason", type=str, help="Reason", default="unspecified")
@click.argument("callsign", required=True, type=str)
[docs]
def revoke_user(ctx: click.Context, callsign: str, reason: str) -> None:
"""Revoke user by callsign"""
async def do_the_needful() -> int:
"""Do what is needed"""
nonlocal callsign, reason
await bind_config()
await init_db()
person = await Person.by_callsign(callsign)
await person.revoke(reason)
click.echo(f"{callsign} revoked")
return 0
ctx.exit(ctx.obj["loop"].run_until_complete(do_the_needful()))
@cli_group.command(name="getjwt")
@click.pass_context
@click.option("--nonce", is_flag=True, help="Add nonce field with UUID as value")
@click.argument("claims_json", required=False, default="""{"anon_admin_session": true}""", type=str)
[docs]
def get_jwt(ctx: click.Context, claims_json: str, nonce: bool) -> None:
"""
Get RASENMAEHER signed JWT
"""
claims = json.loads(claims_json)
if nonce:
claims["nonce"] = str(uuid.uuid4())
LOGGER.debug("Parsed claims={}".format(claims))
if not claims:
click.echo("Must specify claims", err=True)
ctx.exit(1)
async def call_backend(claims: Dict[str, Any]) -> int:
"""Call the backend"""
await jwt_init()
token = Issuer.singleton().issue(claims)
click.echo(token)
return 0
ctx.exit(ctx.obj["loop"].run_until_complete(call_backend(claims)))
@cli_group.command(name="getadminjwt")
@click.pass_context
@click.argument("claims_json", required=False, default="""{"sub": "pyteststuff"}""", type=str)
[docs]
def get_adminjwt(ctx: click.Context, claims_json: str) -> None:
"""
Get RASENMAEHER signed admin user JWT
"""
claims = json.loads(claims_json)
LOGGER.debug("Parsed claims={}".format(claims))
if not claims:
click.echo("Must specify claims", err=True)
ctx.exit(1)
async def call_backend(claims: Dict[str, Any]) -> int:
"""Call the backend"""
await jwt_init()
token = Issuer.singleton().issue(claims)
click.echo(token)
return 0
ctx.exit(ctx.obj["loop"].run_until_complete(call_backend(claims)))
@cli_group.command(name="addtestusers")
@click.pass_context
[docs]
def add_test_users(ctx: click.Context) -> None:
"""
Create the test users defined in testhelpers.create_test_users
"""
async def call_testusers() -> int:
"""Start db connection, call the helper"""
await bind_config()
await init_db()
ret = await create_test_users()
click.echo(pprint.pformat(ret))
return 0
ctx.exit(ctx.obj["loop"].run_until_complete(call_testusers()))
[docs]
def rasenmaeher_api_cli() -> None:
"""python-rasenmaeher-api"""
init_logging(logging.WARNING)
cli_group() # pylint: disable=no-value-for-parameter