Module rabbit.tlog.db

View Source
import logging

import os

import time

from datetime import datetime

import attr

from sqlalchemy import create_engine

from sqlalchemy.exc import OperationalError

from sqlalchemy.orm import sessionmaker

from rabbit import OperationError

from rabbit.tlog import Event, Migration, events

logging.getLogger(__name__).addHandler(logging.NullHandler())

@attr.s(slots=True)

class DB:

    driver = attr.ib(

        type=str,

        default=os.getenv(

            "DATABASE_DRIVER", "postgresql://postgres:postgres@localhost:5432/db"

        ),

        validator=attr.validators.instance_of(str),

    )

    _migration = attr.ib(

        type=Migration,

        factory=Migration,

        validator=attr.validators.instance_of(Migration),

    )

    _engine = attr.ib(default=None)

    _connection = attr.ib(default=None)

    _session = attr.ib(default=None)

    def __attrs_post_init__(self):

        self.configure()

    @property

    def connection(self):

        return self._connection

    @connection.setter

    def connection(self, value):

        if self._connection:

            self._connection.invalidate()

        self._connection = value

    @property

    def session(self):

        return self._session

    @session.setter

    def session(self, value):

        if self._session:

            self._session.invalidate()

        self._session = value

    @property

    def alembic_file(self):

        return self._migration.alembic_file

    @property

    def script_location(self):

        return self._migration.script_location

    def configure(self) -> None:

        self._engine = create_engine(self.driver, pool_pre_ping=True)

        Session = sessionmaker(bind=self._engine)

        self._session = Session()

        self._connection = self._engine.connect()

        try:

            self._connection = self._engine.connect()

        except Exception:

            self._reconnect()

    def _reconnect(self):

        logging.error("Failed to connect to database. Trying again in 10 seconds")

        logging.debug(f"DB({attr.asdict(self)})")

        time.sleep(10)

        self.configure()

    async def exec(self, command, params={}):

        try:

            self._connection.execute(command, [params])

        except OperationalError:

            self._reconnect()

            await self.exec(command, params)

    async def save(self, data: bytes, user: str = "anonymous") -> None:

        e = Event(body=data, created_at=datetime.utcnow(), created_by=user)

        ins = events.insert().values(

            body=e.body,

            status=e.status,

            created_at=e.created_at,

            created_by=e.created_by,

        )

        ins.compile().params

        try:

            logging.debug(f"Saving event: [{e}]")

            await self.exec(ins)

            logging.info(f"Event saved.")

        except OperationalError:

            self._session.rollback()

            self._reconnect()

            raise OperationError("Failed to store event.")

    async def get_oldest_event(self):

        try:

            event = (

                self._session.query(events)

                .filter_by(status=False)

                .order_by("created_at")

                .first()

            )

            if not event:

                return None

            return Event(

                event.body, event.id, event.created_at, event.created_by, event.status,

            )

        except OperationalError:

            self._reconnect()

Classes

DB

class DB(
    driver: str = 'postgresql://postgres:postgres@localhost:5432/db',
    migration: rabbit.tlog.migration.Migration = NOTHING,
    engine=None,
    connection=None,
    session=None
)
View Source
class DB:

    driver = attr.ib(

        type=str,

        default=os.getenv(

            "DATABASE_DRIVER", "postgresql://postgres:postgres@localhost:5432/db"

        ),

        validator=attr.validators.instance_of(str),

    )

    _migration = attr.ib(

        type=Migration,

        factory=Migration,

        validator=attr.validators.instance_of(Migration),

    )

    _engine = attr.ib(default=None)

    _connection = attr.ib(default=None)

    _session = attr.ib(default=None)

    def __attrs_post_init__(self):

        self.configure()

    @property

    def connection(self):

        return self._connection

    @connection.setter

    def connection(self, value):

        if self._connection:

            self._connection.invalidate()

        self._connection = value

    @property

    def session(self):

        return self._session

    @session.setter

    def session(self, value):

        if self._session:

            self._session.invalidate()

        self._session = value

    @property

    def alembic_file(self):

        return self._migration.alembic_file

    @property

    def script_location(self):

        return self._migration.script_location

    def configure(self) -> None:

        self._engine = create_engine(self.driver, pool_pre_ping=True)

        Session = sessionmaker(bind=self._engine)

        self._session = Session()

        self._connection = self._engine.connect()

        try:

            self._connection = self._engine.connect()

        except Exception:

            self._reconnect()

    def _reconnect(self):

        logging.error("Failed to connect to database. Trying again in 10 seconds")

        logging.debug(f"DB({attr.asdict(self)})")

        time.sleep(10)

        self.configure()

    async def exec(self, command, params={}):

        try:

            self._connection.execute(command, [params])

        except OperationalError:

            self._reconnect()

            await self.exec(command, params)

    async def save(self, data: bytes, user: str = "anonymous") -> None:

        e = Event(body=data, created_at=datetime.utcnow(), created_by=user)

        ins = events.insert().values(

            body=e.body,

            status=e.status,

            created_at=e.created_at,

            created_by=e.created_by,

        )

        ins.compile().params

        try:

            logging.debug(f"Saving event: [{e}]")

            await self.exec(ins)

            logging.info(f"Event saved.")

        except OperationalError:

            self._session.rollback()

            self._reconnect()

            raise OperationError("Failed to store event.")

    async def get_oldest_event(self):

        try:

            event = (

                self._session.query(events)

                .filter_by(status=False)

                .order_by("created_at")

                .first()

            )

            if not event:

                return None

            return Event(

                event.body, event.id, event.created_at, event.created_by, event.status,

            )

        except OperationalError:

            self._reconnect()

Class variables

driver

Instance variables

alembic_file
connection
driver
script_location
session

Methods

configure
def configure(
    self
) -> None
View Source
    def configure(self) -> None:

        self._engine = create_engine(self.driver, pool_pre_ping=True)

        Session = sessionmaker(bind=self._engine)

        self._session = Session()

        self._connection = self._engine.connect()

        try:

            self._connection = self._engine.connect()

        except Exception:

            self._reconnect()
exec
def exec(
    self,
    command,
    params={}
)
View Source
    async def exec(self, command, params={}):

        try:

            self._connection.execute(command, [params])

        except OperationalError:

            self._reconnect()

            await self.exec(command, params)
get_oldest_event
def get_oldest_event(
    self
)
View Source
    async def get_oldest_event(self):

        try:

            event = (

                self._session.query(events)

                .filter_by(status=False)

                .order_by("created_at")

                .first()

            )

            if not event:

                return None

            return Event(

                event.body, event.id, event.created_at, event.created_by, event.status,

            )

        except OperationalError:

            self._reconnect()
save
def save(
    self,
    data: bytes,
    user: str = 'anonymous'
) -> None
View Source
    async def save(self, data: bytes, user: str = "anonymous") -> None:

        e = Event(body=data, created_at=datetime.utcnow(), created_by=user)

        ins = events.insert().values(

            body=e.body,

            status=e.status,

            created_at=e.created_at,

            created_by=e.created_by,

        )

        ins.compile().params

        try:

            logging.debug(f"Saving event: [{e}]")

            await self.exec(ins)

            logging.info(f"Event saved.")

        except OperationalError:

            self._session.rollback()

            self._reconnect()

            raise OperationError("Failed to store event.")