"""DB specific tests"""
import asyncio
import logging
import uuid
from pathlib import Path
import pytest
import pytest_asyncio
from libadvian.binpackers import uuid_to_b64
from multikeyjwt import Verifier
import cryptography.x509
import cryptography.hazmat.primitives.serialization.pkcs12
from rasenmaeher_api.db import DBConfig, Person, Enrollment, EnrollmentState, EnrollmentPool, SeenToken, LoginCode
from rasenmaeher_api.db.errors import (
NotFound,
Deleted,
CallsignReserved,
ForbiddenOperation,
PoolInactive,
TokenReuse,
BackendError,
)
from rasenmaeher_api.jwtinit import jwt_init
from rasenmaeher_api.mtlsinit import mtls_init
from rasenmaeher_api.rmsettings import switchme_to_singleton_call, RMSettings
from rasenmaeher_api.cfssl.public import get_crl
from rasenmaeher_api.db.base import init_db, bind_config
[docs]
LOGGER = logging.getLogger(__name__)
# pylint: disable=W0621
@pytest_asyncio.fixture(scope="session")
[docs]
async def ginosession() -> None:
"""make sure db is bound etc"""
await bind_config()
await init_db()
[docs]
def test_dbconfig_env(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test the env loading works without import side effects"""
host = str(uuid.uuid4())
user = str(uuid.uuid4())
passwd = str(uuid.uuid4())
with monkeypatch.context() as mpatch:
mpatch.setenv("RM_DATABASE_HOST", host)
mpatch.setenv("RM_DATABASE_USER", user)
mpatch.setenv("RM_DATABASE_PASSWORD", passwd)
config = DBConfig()
assert config.host == host
assert config.user == user
assert str(config.password) == passwd
assert config.dsn
[docs]
def test_dbconfig_defaults(docker_ip: str) -> None:
"""Check that the fixture set port and host correctly"""
config = DBConfig()
assert config.port == 5542
assert config.host == docker_ip
@pytest.mark.asyncio
[docs]
async def test_person_crud(ginosession: None) -> None:
"""Test the db abstraction of persons and roles"""
_ = ginosession
obj = Person(callsign="DOGGO01a", certspath=str(uuid.uuid4()))
await obj.create()
obj2 = await Person.by_callsign("DOGGO01a")
assert obj2.callsign == "DOGGO01a"
assert not await obj2.has_role("admin")
assert await obj2.assign_role("admin")
assert not await obj2.assign_role("admin") # already assignee, no need to create
# Test the get pk or callsign helper
await Person.by_pk_or_callsign("DOGGO01a")
await Person.by_pk_or_callsign(str(obj.pk))
await Person.by_pk_or_callsign(uuid_to_b64(obj.pk))
await Person.by_pk_or_callsign(obj.pk)
callsigns = []
async for user in Person.by_role("admin"):
callsigns.append(user.callsign)
assert "DOGGO01a" in callsigns
callsigns = []
async for user in Person.by_role("nosuchrole"):
callsigns.append(user.callsign)
assert not callsigns
assert await obj2.has_role("admin")
assert await obj2.remove_role("admin")
assert not await obj2.remove_role("admin") # not assigned, no need to delete
obj3 = await Person.by_pk(str(obj.pk))
assert obj3.callsign == "DOGGO01a"
await obj3.delete()
with pytest.raises(NotFound):
await Person.by_callsign("PORA22b")
with pytest.raises(Deleted):
await Person.by_callsign("DOGGO01a")
obj4 = await Person.by_callsign("DOGGO01a", allow_deleted=True)
assert obj4.callsign == "DOGGO01a"
assert obj4.deleted
person = Person(callsign="DOGGO01b", certspath=str(uuid.uuid4()))
await person.create()
callsigns = []
async for user in Person.list(False):
callsigns.append(user.callsign)
assert "DOGGO01a" not in callsigns
assert "DOGGO01b" in callsigns
callsigns = []
async for user in Person.list(True):
callsigns.append(user.callsign)
assert "DOGGO01a" in callsigns
assert "DOGGO01b" in callsigns
@pytest.mark.asyncio
[docs]
async def test_enrollments_crud(ginosession: None) -> None:
"""Test the db abstraction enrollments"""
_ = ginosession
# Done this way to avoid the cost of the certificate workflow, you should never do this outside of unittests
person = Person(callsign="MEGAMAN00a", certspath=str(uuid.uuid4()))
await person.create()
# refresh
person = await Person.by_callsign("MEGAMAN00a")
obj = await Enrollment.create_for_callsign("PORA22b")
assert obj.approvecode
assert obj.callsign == "PORA22b"
assert obj.state == EnrollmentState.PENDING
obj2 = await Enrollment.by_approvecode(obj.approvecode)
assert obj2.callsign == obj.callsign
obj3 = await Enrollment.by_callsign(obj.callsign)
assert obj3.callsign == obj.callsign
await Enrollment.by_pk_or_callsign("PORA22b")
await Enrollment.by_pk_or_callsign(str(obj.pk))
await Enrollment.by_pk_or_callsign(uuid_to_b64(obj.pk))
await Enrollment.by_pk_or_callsign(obj.pk)
old_code = str(obj.approvecode)
new_code = await obj.reset_approvecode()
assert old_code != new_code
new_new_code = await Enrollment.reset_approvecode4callsign("PORA22b")
assert new_new_code != new_code
with pytest.raises(CallsignReserved):
await Enrollment.create_for_callsign("PORA22b")
with pytest.raises(ForbiddenOperation):
await obj2.delete()
await obj.reject(person)
obj4 = await Enrollment.by_pk(uuid_to_b64(obj.pk))
assert obj4.decided_on
assert obj4.decided_by == person.pk
assert obj4.state == EnrollmentState.REJECTED
obj5 = await Enrollment.create_for_callsign("ERAPPROVTEST01a")
person2 = await obj5.approve(person)
assert person2.callsign == "ERAPPROVTEST01a"
@pytest.mark.asyncio
[docs]
async def test_enrollmentpools_crud(ginosession: None) -> None:
"""Test the db abstraction enrollments and enrollmentpools"""
_ = ginosession
# Done this way to avoid the cost of the certificate workflow, you should never do this outside of unittests
person = Person(callsign="POOLBOYa", certspath=str(uuid.uuid4()))
await person.create()
# Done this way to test low level things, you should always use EnrollmentPool.create_for_owner
pool = EnrollmentPool(owner=person.pk, extra={"jonnet": "ei tiiƤ"}, invitecode="12313123")
await pool.create()
# refresh
pool = await EnrollmentPool.by_pk(pool.pk)
assert pool.active
await EnrollmentPool.by_pk_or_invitecode(pool.invitecode)
await EnrollmentPool.by_pk_or_invitecode(str(pool.pk))
await EnrollmentPool.by_pk_or_invitecode(uuid_to_b64(pool.pk))
await EnrollmentPool.by_pk_or_invitecode(pool.pk)
pool = await pool.set_active(False)
with pytest.raises(PoolInactive):
await pool.create_enrollment(str(uuid.uuid4()))
pool = await pool.set_active(True)
enr1 = await pool.create_enrollment("JONNE01a")
assert "jonnet" in enr1.extra
assert enr1.extra["jonnet"] == "ei tiiƤ"
assert enr1.pool == pool.pk
await pool.delete()
with pytest.raises(Deleted):
await EnrollmentPool.by_pk(pool.pk)
# refresh
pool = await EnrollmentPool.by_pk(pool.pk, allow_deleted=True)
with pytest.raises(Deleted):
await pool.create_enrollment(str(uuid.uuid4()))
pool2 = await EnrollmentPool.create_for_owner(person)
assert pool2.invitecode
old_code = str(pool2.invitecode)
new_code = await pool2.reset_invitecode()
assert old_code != new_code
@pytest.mark.asyncio
[docs]
async def test_enrollmentpools_list(ginosession: None) -> None:
"""Test list methods"""
_ = ginosession
# Done this way to avoid the cost of the certificate workflow, you should never do this outside of unittests
owner1 = Person(callsign="MASTER666a", certspath=str(uuid.uuid4()))
await owner1.create()
owner2 = Person(callsign="BLASTER999a", certspath=str(uuid.uuid4()))
await owner2.create()
for _ in range(5):
await EnrollmentPool.create_for_owner(owner2)
await EnrollmentPool.create_for_owner(owner1)
all_codes = {pool.invitecode async for pool in EnrollmentPool.list()}
owner1_codes = {pool.invitecode async for pool in EnrollmentPool.list(by_owner=owner1)}
owner2_codes = {pool.invitecode async for pool in EnrollmentPool.list(by_owner=owner2)}
assert len(all_codes) >= 10
assert len(owner1_codes) == 5
assert len(owner2_codes) == 5
assert owner1_codes.issubset(all_codes)
assert owner2_codes.issubset(all_codes)
assert not owner1_codes.intersection(owner2_codes)
for code in owner1_codes:
pool = await EnrollmentPool.by_invitecode(code)
assert pool.owner == owner1.pk
for code in owner2_codes:
pool = await EnrollmentPool.by_invitecode(code)
assert pool.owner == owner2.pk
@pytest.mark.asyncio
[docs]
async def test_enrollments_list(ginosession: None) -> None:
"""Test list methods"""
_ = ginosession
# FIXME: should use fixtures instead of trusting on side effects from previous tests
# Created in test_enrollmentpools_list
owner = await Person.by_callsign("MASTER666a")
active_codes = [pool.invitecode async for pool in EnrollmentPool.list(by_owner=owner) if pool.active]
pool1 = await EnrollmentPool.by_invitecode(active_codes[0])
pool2 = await EnrollmentPool.by_invitecode(active_codes[1])
for _ in range(5):
await Enrollment.create_for_callsign(str(uuid.uuid4()))
await Enrollment.create_for_callsign(str(uuid.uuid4()), pool=pool1)
await Enrollment.create_for_callsign(str(uuid.uuid4()), pool=pool2)
all_codes = {enr.approvecode async for enr in Enrollment.list()}
pool1_codes = {enr.approvecode async for enr in Enrollment.list(by_pool=pool1)}
pool2_codes = {enr.approvecode async for enr in Enrollment.list(by_pool=pool2)}
assert len(all_codes) >= 15
assert len(pool1_codes) == 5
assert len(pool2_codes) == 5
assert pool1_codes.issubset(all_codes)
assert pool2_codes.issubset(all_codes)
assert not pool1_codes.intersection(pool2_codes)
@pytest.mark.asyncio
[docs]
async def test_seentokens_crud(ginosession: None) -> None:
"""Test the db abstraction for seen tokens"""
_ = ginosession
token = str(uuid.uuid4())
meta = {"koirat": "doggoi"}
with pytest.raises(NotFound):
await SeenToken.by_token(token)
await SeenToken.use_token(token, meta)
obj = await SeenToken.by_token(token)
assert "koirat" in obj.auditmeta
assert obj.auditmeta["koirat"] == "doggoi"
with pytest.raises(TokenReuse):
await SeenToken.use_token(token, meta)
token2 = str(uuid.uuid4())
await SeenToken.use_token(token2)
obj2 = await SeenToken.by_token(token2)
assert not obj2.auditmeta
with pytest.raises(ForbiddenOperation):
await obj2.delete()
@pytest.mark.asyncio
[docs]
async def test_logincodes_crud(ginosession: None) -> None:
"""Test the db abstraction for login codes"""
_ = ginosession
await jwt_init()
code = await LoginCode.create_for_claims({"sub": "sotakoira"})
obj = await LoginCode.by_code(code)
assert not obj.used_on
jwt = await LoginCode.use_code(code)
obj2 = await LoginCode.by_code(code)
assert obj2.used_on
claims = Verifier.singleton().decode(jwt)
LOGGER.debug("claims={}".format(claims))
assert "sub" in claims
assert claims["sub"] == "sotakoira"
with pytest.raises(ForbiddenOperation):
await obj2.delete()
with pytest.raises(TokenReuse):
await LoginCode.use_code(code)
@pytest.mark.asyncio
[docs]
async def test_person_with_cert(ginosession: None) -> None:
"""Test the cert creation with the classmethod (and revocation)"""
_ = ginosession
await mtls_init()
person = await Person.create_with_cert("BINGO01a", {"kissa": "puuma"})
assert person.privkeyfile.exists()
assert person.pubkeyfile.exists()
assert person.certfile.exists()
old_crl = cryptography.x509.load_der_x509_crl(await get_crl())
old_crl_serials = {revcert.serial_number for revcert in old_crl}
await person.revoke("key_compromise")
new_crl = cryptography.x509.load_der_x509_crl(await get_crl())
new_crl_serials = {revcert.serial_number for revcert in new_crl}
LOGGER.debug("old_crl={} new_crl={}".format(old_crl_serials, new_crl_serials))
assert old_crl_serials != new_crl_serials
refresh = await Person.by_callsign("BINGO01a", allow_deleted=True)
assert refresh.deleted
assert refresh.revoke_reason
@pytest.mark.xfail(reason="monkeypatching the host does not work as expected")
@pytest.mark.asyncio
[docs]
async def test_person_with_cert_cfsslfail(ginosession: None, monkeypatch: pytest.MonkeyPatch) -> None:
"""Test the cert creation with the classmethod with CFSSL failure"""
_ = ginosession
await mtls_init()
peoplepath = Path(switchme_to_singleton_call.persistent_data_dir) / "private" / "people"
old_files = set(peoplepath.rglob("*"))
RMSettings.singleton()
assert RMSettings._singleton # pylint: disable=W0212
with monkeypatch.context() as mpatch:
mpatch.setattr(RMSettings._singleton, "cfssl_host", "http://nosuchost") # pylint: disable=W0212
mpatch.setenv("RM_CFSSL_HOST", RMSettings._singleton.cfssl_host) # pylint: disable=W0212
with pytest.raises(BackendError):
await Person.create_with_cert("BONGO01a", {"kissa": "puuma"})
new_files = set(peoplepath.rglob("*"))
assert new_files == old_files
with pytest.raises(NotFound):
await Person.by_callsign("BONGO01a")
@pytest.mark.asyncio
[docs]
async def test_person_with_cert_duplicatename(ginosession: None) -> None:
"""Test the cert creation with the classmethod but reserved callsign"""
_ = ginosession
await mtls_init()
callsign = "RUOSKA23a"
peoplepath = Path(switchme_to_singleton_call.persistent_data_dir) / "private" / "people"
person = await Person.create_with_cert(callsign)
assert person.privkeyfile.exists()
assert person.pubkeyfile.exists()
assert person.certfile.exists()
old_files = set(peoplepath.rglob("*"))
assert old_files
with pytest.raises(CallsignReserved):
await Person.create_with_cert(callsign)
new_files = set(peoplepath.rglob("*"))
assert new_files == old_files
@pytest.mark.asyncio
[docs]
async def test_pfx_parse(ginosession: None) -> None:
"""Test that the PFX file gets done"""
_ = ginosession
await mtls_init()
person = await Person.create_with_cert("PFXMAN01a")
async def wait_for_pfxfile() -> None:
"""wait for the background task to do it's work"""
nonlocal person
while not person.pfxfile.exists():
await asyncio.sleep(0.5)
await asyncio.wait_for(wait_for_pfxfile(), timeout=5.0)
assert person.pfxfile.exists()
pfxbytes = person.pfxfile.read_bytes()
pfxdata = cryptography.hazmat.primitives.serialization.pkcs12.load_pkcs12(pfxbytes, b"PFXMAN01a")
assert pfxdata.key
assert pfxdata.cert
@pytest.mark.asyncio
[docs]
async def test_productcn_forbid(ginosession: None) -> None:
"""Test that trying to create enrollment or person with callsign that matches a product CN fails"""
_ = ginosession
with pytest.raises(CallsignReserved):
await Person.create_with_cert("fake.localmaeher.pvarki.fi")
with pytest.raises(CallsignReserved):
await Enrollment.create_for_callsign("fake.localmaeher.pvarki.fi")