rasenmaeher_api.db

Gino based database abstraction

Submodules

Package Contents

Classes

DBConfig

DB config dataclass, functools etc used to avoid import-time side-effects

Person

People, pk is UUID and comes from basemodel

Role

Give a person a role

Enrollment

Enrollments, pk is UUID and comes from basemodel

EnrollmentPool

Enrollment pools aka links, pk is UUID and comes from basemodel

EnrollmentState

Enrollment states

SeenToken

Store tokens we should see used only once

LoginCode

Track the login codes that can be exchanged for session JWTs

Functions

bind_config()

Set bind from config and return

init_db()

Create schemas and tables, normally one should use migration manager

Attributes

db

class DBConfig[source]

DB config dataclass, functools etc used to avoid import-time side-effects

driver: str
host: str | None
port: int
user: str | None
password: starlette.datastructures.Secret
database: str
dsn: sqlalchemy.engine.url.URL | None
pool_min_size: int
pool_max_size: int
echo: bool
ssl: str
use_connection_for_request: bool
retry_limit: int
retry_interval: int
_singleton: ClassVar[DBConfig | None]
classmethod singleton(**kwargs)[source]

Get a singleton

Parameters:

kwargs (Any)

Return type:

DBConfig

__post_init__()[source]

Post init stuff

Return type:

None

db[source]
async bind_config()[source]

Set bind from config and return

Return type:

None

async init_db()[source]

Create schemas and tables, normally one should use migration manager

Return type:

None

class Person[source]

Bases: rasenmaeher_api.db.base.ORMBaseModel

People, pk is UUID and comes from basemodel

NOTE: at some point we want to stop keeping track of people in our own db and only use keycloack as the store for actual users. In any case we need a nice pythonic abstraction layer so implement any queries you need to add as helpers here.

property productapidata: libpvarki.schemas.product.UserCRUDRequest

Return a model that is usable with the product integration APIs

Return type:

libpvarki.schemas.product.UserCRUDRequest

property certsubject: Dict[str, str]

Return the dict that gets set to cert DN

Return type:

Dict[str, str]

property privkeyfile: pathlib.Path

Path to the private key

Return type:

pathlib.Path

property pfxfile: pathlib.Path

Return a PKCS12 PFX file

Return type:

pathlib.Path

property certfile: pathlib.Path

Path to the public cert

Return type:

pathlib.Path

property csrfile: pathlib.Path

Path to the CSR file

Return type:

pathlib.Path

property pubkeyfile: pathlib.Path

Path to the public key

Return type:

pathlib.Path

__tablename__ = 'users'
callsign
certspath
extra
revoke_reason
async classmethod by_pk_or_callsign(inval, allow_deleted=False)[source]

Get person by pk or by callsign

Parameters:
Return type:

Person

async classmethod create_with_cert(callsign, extra=None)[source]

Create the cert etc and save the person

Parameters:
  • callsign (str)

  • extra (Optional[Dict[str, Any]])

Return type:

Person

async create_pfx()[source]

Put cert and key to PKCS12 container

Return type:

pathlib.Path

async revoke(reason)[source]

Revokes the cert with given reason and makes user deleted see validate_reason for info on reasons

Parameters:

reason (rasenmaeher_api.cfssl.private.ReasonTypes)

Return type:

bool

async delete()[source]

Revoke the cert on delete

Return type:

bool

async classmethod list(include_deleted=False)[source]

List people

Parameters:

include_deleted (bool)

Return type:

AsyncGenerator[Person, None]

async classmethod by_role(role)[source]

List people that have given role, if role is None list all people

Parameters:

role (str)

Return type:

AsyncGenerator[Person, None]

async classmethod by_callsign(callsign, allow_deleted=False)[source]

Get by callsign

Parameters:
  • callsign (str)

  • allow_deleted (bool)

Return type:

Self

async classmethod is_callsign_available(callsign)[source]

Is callsign available

Parameters:

callsign (str)

Return type:

bool

async classmethod by_mtlsjwt_payload(payload, allow_deleted=False)[source]

Get by MTLSorJWTMiddleWare payload

Parameters:
Return type:

Self

get_cert_pem()[source]

Read the cert from under certspath and return the PEM

Return type:

bytes

get_cert_pfx()[source]

Read the cert and private key from under certspath and return the PFX container

Return type:

bytes

async _get_role(role)[source]

Internal helper for DRY

Parameters:

role (str)

Return type:

Optional[Role]

async has_role(role)[source]

Check if this user has given role

Parameters:

role (str)

Return type:

bool

async assign_role(role)[source]

Assign a role, return true if role was created, false if it already existed

Parameters:

role (str)

Return type:

bool

async remove_role(role)[source]

Remove a role, return true if role was removed, false if it wasn’t assigned

Parameters:

role (str)

Return type:

bool

async roles_set()[source]

Shorthand

Return type:

Set[str]

async roles()[source]

Roles of this person

Return type:

AsyncGenerator[str, None]

class Role[source]

Bases: rasenmaeher_api.db.base.DBModel

Give a person a role

__tablename__ = 'roles'
__table_args__
pk
created
updated
user
role
_idx
class Enrollment[source]

Bases: rasenmaeher_api.db.base.ORMBaseModel

Enrollments, pk is UUID and comes from basemodel

__tablename__ = 'enrollments'
approvecode
callsign
decided_on
decided_by
person
pool
state
extra
async classmethod by_pk_or_callsign(inval)[source]

Get enrollment by pk or by callsign

Parameters:

inval (Union[str, uuid.UUID])

Return type:

Enrollment

async approve(approver)[source]

Creates the person record, their certs etc

Parameters:

approver (rasenmaeher_api.db.people.Person)

Return type:

rasenmaeher_api.db.people.Person

async reject(decider)[source]

Reject

Parameters:

decider (rasenmaeher_api.db.people.Person)

Return type:

None

async classmethod list(by_pool=None)[source]

List enrollments, optionally by pool (enrollment deletion is not allowed, they can only be rejected)

Parameters:

by_pool (Optional[EnrollmentPool])

Return type:

AsyncGenerator[Enrollment, None]

async classmethod by_callsign(callsign)[source]

Get by callsign

Parameters:

callsign (str)

Return type:

Self

async classmethod by_approvecode(code)[source]

Get by approvecode

Parameters:

code (str)

Return type:

Self

async classmethod reset_approvecode4callsign(callsign)[source]

Reset approvecode code for callsign

Parameters:

callsign (str)

Return type:

str

async reset_approvecode()[source]

Reset approvecode

Return type:

str

async classmethod _generate_unused_code()[source]

Internal helper to generate a code that is free NOTE: This MUST ONLY be used inside a transaction for nothing is guaranteed

Return type:

str

async classmethod create_for_callsign(callsign, pool=None, extra=None)[source]

Create a new one with random code for the callsign

Parameters:
Return type:

Self

async delete()[source]

Deletion of enrollments is not allowed

Return type:

bool

class EnrollmentPool[source]

Bases: rasenmaeher_api.db.base.ORMBaseModel

Enrollment pools aka links, pk is UUID and comes from basemodel

__tablename__ = 'enrollmentpools'
owner
active
extra
invitecode
async classmethod by_pk_or_invitecode(inval, allow_deleted=False)[source]

Get pool by pk or by invitecode

Parameters:
Return type:

EnrollmentPool

async create_enrollment(callsign)[source]

Create enrollment from this pool

Parameters:

callsign (str)

Return type:

Enrollment

async set_active(state)[source]

Set active and return refreshed object

Parameters:

state (bool)

Return type:

Self

async classmethod list(by_owner=None, include_deleted=False)[source]

List pools, optionally by owner or including deleted pools

Parameters:
Return type:

AsyncGenerator[EnrollmentPool, None]

async classmethod _generate_unused_code()[source]

Internal helper to generate a code that is free NOTE: This MUST ONLY be used inside a transaction for nothing is guaranteed

Return type:

str

async classmethod create_for_owner(person, extra=None)[source]

Creates one for given owner

Parameters:
Return type:

Self

async reset_invitecode()[source]

Reset invitecode

Return type:

str

async classmethod by_invitecode(invitecode, allow_deleted=False)[source]

Get by invitecode

Parameters:
  • invitecode (str)

  • allow_deleted (bool)

Return type:

Self

class EnrollmentState[source]

Bases: enum.IntEnum

Enrollment states

PENDING = 0
APPROVED = 1
REJECTED = 2
class SeenToken[source]

Bases: rasenmaeher_api.db.base.ORMBaseModel

Store tokens we should see used only once

__tablename__ = 'seentokens'
token
auditmeta
async classmethod use_token(token, auditmeta=None)[source]

Use token if it was already used raise error that is also 403

Parameters:
  • token (str)

  • auditmeta (Optional[Dict[str, Any]])

Return type:

None

async classmethod by_token(token)[source]

Get by token

Parameters:

token (str)

Return type:

Self

async delete()[source]

Deletion of enrollments is not allowed

Return type:

bool

class LoginCode[source]

Bases: rasenmaeher_api.db.base.ORMBaseModel

Track the login codes that can be exchanged for session JWTs

__tablename__ = 'logincodes'
code
auditmeta
used_on
claims
async classmethod use_code(code, auditmeta=None)[source]

Exchange the code for JWT, if it was already used raise error that is also 403, return JWT with the claims

Parameters:
  • code (str)

  • auditmeta (Optional[Dict[str, Any]])

Return type:

str

async classmethod by_code(code)[source]

Get by token

Parameters:

code (str)

Return type:

Self

async classmethod create_for_claims(claims, auditmeta=None)[source]

Create a new one with random code for the given claims

Parameters:
  • claims (Dict[str, Any])

  • auditmeta (Optional[Dict[str, Any]])

Return type:

str

async delete()[source]

Deletion of enrollments is not allowed

Return type:

bool