"""JWT wrappers"""
from typing import Tuple
import asyncio
import logging
from pathlib import Path
import stat
from multikeyjwt.keygen import generate_keypair
from multikeyjwt import Issuer, Verifier
from .config import MWConfig
[docs]
LOGGER = logging.getLogger(__name__)
[docs]
PUBDIR_MODE = stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH | stat.S_IXGRP | stat.S_IXOTH
[docs]
PRIVDIR_MODE = stat.S_IRWXU
[docs]
async def check_create_keypair() -> Tuple[Path, Path]:
"""Check if we have keypair, if not create it, returns the file paths"""
config = MWConfig.singleton()
privkeypath = config.data_path / "private" / "jwt.key"
privdir = privkeypath.parent
privdir.mkdir(parents=True, exist_ok=True)
privdir.chmod(PRIVDIR_MODE)
pubkeypath = config.data_path / "publickeys" / "kraftwerk.pub"
pubdir = pubkeypath.parent
pubdir.mkdir(parents=True, exist_ok=True)
pubdir.chmod(PUBDIR_MODE)
if privkeypath.exists() and pubkeypath.exists():
return privkeypath, pubkeypath
LOGGER.info("Generating keypair, this will take a moment")
_, cpk = await asyncio.get_event_loop().run_in_executor(None, generate_keypair, privkeypath, None)
pubkeypath.write_bytes(cpk.read_bytes())
LOGGER.info("Wrote {}".format(pubkeypath))
return privkeypath, pubkeypath
[docs]
async def get_issuer() -> Issuer:
"""Get JWT issuer, init keys if needed"""
privkeypath, _ = await check_create_keypair()
return Issuer(privkeypath=privkeypath, keypasswd=None)
[docs]
async def get_verifier() -> Verifier:
"""Get JWT verifier, init keys if needed"""
_, pubkeypath = await check_create_keypair()
return Verifier(pubkeypath=pubkeypath.parent)