Module rabbit.tlog.event

View Source
import os

from datetime import datetime

from typing import Optional

import attr

from sqlalchemy import (

    Boolean,

    Column,

    DateTime,

    Integer,

    LargeBinary,

    MetaData,

    String,

    Table

)

from sqlalchemy.orm import mapper

from sqlalchemy.schema import Sequence

metadata = MetaData(schema=str(os.getenv('EVENT_SCHEMA', 'my_schema')))

events = Table(

    'event',

    metadata,

    Column('body', LargeBinary, nullable=False),

    Column(

        'id',

        Integer,

        Sequence(

            name='id_seq',

            schema=str(os.getenv('EVENT_SCHEMA', 'my_schema'))

        ),

        primary_key=True

    ),

    Column('created_at', DateTime),

    Column('created_by', String(100)),

    Column('status', Boolean),

    schema=str(os.getenv('EVENT_SCHEMA', 'my_schema'))

)

@attr.s

class Event:

    body = attr.ib(

        type=bytes,

        validator=attr.validators.instance_of(bytes)

    )

    id = attr.ib(

        type=Optional[int],

        default=None,

        validator=attr.validators.optional(

            validator=attr.validators.instance_of(int)

        )

    )

    created_at = attr.ib(

        type=Optional[datetime],

        default=None,

        validator=attr.validators.optional(

            validator=attr.validators.instance_of(datetime)

        )

    )

    created_by = attr.ib(

        type=str,

        default='',

        validator=attr.validators.instance_of(str)

    )

    status = attr.ib(

        type=bool,

        default=False,

        validator=attr.validators.instance_of(bool)

    )

mapper(Event, events)

Variables

events
metadata

Classes

Event

class Event(
    body,
    id=None,
    created_at=None,
    created_by='',
    status=False
)
View Source
class Event:

    body = attr.ib(

        type=bytes,

        validator=attr.validators.instance_of(bytes)

    )

    id = attr.ib(

        type=Optional[int],

        default=None,

        validator=attr.validators.optional(

            validator=attr.validators.instance_of(int)

        )

    )

    created_at = attr.ib(

        type=Optional[datetime],

        default=None,

        validator=attr.validators.optional(

            validator=attr.validators.instance_of(datetime)

        )

    )

    created_by = attr.ib(

        type=str,

        default='',

        validator=attr.validators.instance_of(str)

    )

    status = attr.ib(

        type=bool,

        default=False,

        validator=attr.validators.instance_of(bool)

    )

Class variables

body
created_at
created_by
id
status