"""fixtures"""
from typing import AsyncGenerator, cast
import logging
from pathlib import Path
import uuid
import pytest
import pytest_asyncio
from async_asgi_testclient import TestClient # pylint: disable=import-error
from multikeyjwt import Issuer
from rasenmaeher_api.web.application import get_app_no_init
[docs]
LOGGER = logging.getLogger(__name__)
# pylint: disable=W0621
@pytest.fixture(scope="session")
[docs]
def datadir(nice_tmpdir_mod: str) -> Path:
"""Make sure we have a well known directory structure"""
datadir = Path(nice_tmpdir_mod) / "persistent"
datadir.mkdir(parents=True)
privdir = datadir / "private"
pubdir = datadir / "public"
privdir.mkdir()
pubdir.mkdir()
return datadir
@pytest_asyncio.fixture(scope="session")
[docs]
async def admin_mtls_client(issuer_cl: Issuer) -> AsyncGenerator[TestClient, None]:
"""(fake) mTLS client for admin"""
async with TestClient(get_app_no_init()) as instance:
token = issuer_cl.issue(
{
"sub": "tpadminsession",
"anon_admin_session": True,
"nonce": str(uuid.uuid4()),
}
)
instance.headers.update({"Authorization": f"Bearer {token}"})
url = "/api/v1/firstuser/add-admin"
data = {
"callsign": "FIRSTADMINa",
}
LOGGER.debug("Fetching {}".format(url))
LOGGER.debug("Data: {}".format(data))
response = await instance.post(url, json=data)
response.raise_for_status()
payload = response.json()
assert "jwt_exchange_code" in payload
del instance.headers["Authorization"]
instance.headers.update({"X-ClientCert-DN": "CN=FIRSTADMINa,O=N/A"})
yield instance
[docs]
async def enroll_user(poolcode: str, callsign: str, admin: TestClient) -> bytes:
"""Enrolls a user, returns the p12 cert pkg"""
async with TestClient(get_app_no_init()) as instance:
# start enroll
url = "/api/v1/enrollment/invitecode/enroll"
data = {
"invite_code": poolcode,
"callsign": callsign,
}
LOGGER.debug("Fetching {}".format(url))
LOGGER.debug("Data: {}".format(data))
response = await instance.post(url, json=data)
response.raise_for_status()
enroll_payload = response.json()
instance.headers.update({"Authorization": f"Bearer {enroll_payload['jwt']}"})
# approve enrollment
url = "/api/v1/enrollment/accept"
data = {
"callsign": callsign,
"approvecode": enroll_payload["approvecode"],
}
LOGGER.debug("Fetching {}".format(url))
LOGGER.debug("Data: {}".format(data))
response = await admin.post(url, json=data)
response.raise_for_status()
# Fetch the p12 file
p12url = f"/api/v1/enduserpfx/{callsign}.pfx"
p12resp = await instance.get(p12url)
p12resp.raise_for_status()
return cast(bytes, p12resp.content)
@pytest_asyncio.fixture(scope="session")
[docs]
async def enroll_poolcode(admin_mtls_client: TestClient) -> AsyncGenerator[str, None]:
"""Create enrollment pool"""
admin = admin_mtls_client
url = "/api/v1/enrollment/invitecode/create"
LOGGER.debug("Fetching {}".format(url))
response = await admin.post(url, json=None)
response.raise_for_status()
payload = response.json()
poolcode = payload["invite_code"]
yield poolcode
# PONDER: Clean up ??
@pytest_asyncio.fixture(scope="session")
[docs]
async def user_mtls_client(admin_mtls_client: TestClient, enroll_poolcode: str) -> AsyncGenerator[TestClient, None]:
"""mTLS client for user"""
await enroll_user(enroll_poolcode, "ENROLLUSERa", admin_mtls_client)
async with TestClient(get_app_no_init()) as instance:
instance.headers.update({"X-ClientCert-DN": "CN=ENROLLUSERa,O=N/A"})
yield instance
@pytest_asyncio.fixture(scope="session")
[docs]
async def product_mtls_client(admin_mtls_client: TestClient) -> AsyncGenerator[TestClient, None]:
"""Client with mocked NGinx mTLS headers"""
_ = admin_mtls_client # Just make sure the db inits are done
async with TestClient(get_app_no_init()) as instance:
instance.headers.update({"X-ClientCert-DN": "CN=fake.localmaeher.pvarki.fi,O=N/A"})
yield instance