import enum from datetime import datetime from sqlalchemy import (JSON, Boolean, Column, DateTime, Integer, Unicode, UniqueConstraint, create_engine) from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.declarative import declarative_base from db.db import SqlDB, get_conn_str Base = declarative_base() engine = create_engine(get_conn_str()) class Scanner(Base): __tablename__ = 'scanners' __table_args__ = ( None, UniqueConstraint('id'), UniqueConstraint('uuid'), ) id = Column(Integer, autoincrement=True, primary_key=True) uuid = Column(Unicode(37), nullable=False) enabled = Column(Boolean, default=False, nullable=False) first_seen = Column(DateTime, default=datetime.utcnow, nullable=False) last_seen = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) comment = Column(Unicode(255), default="", nullable=True) scanners = Column(JSON, nullable=True) def as_dict(self): """Return JSON serializable dict.""" d = {} for col in self.__table__.columns: value = getattr(self, col.name) if issubclass(value.__class__, enum.Enum): value = value.value elif issubclass(value.__class__, Base): continue elif issubclass(value.__class__, datetime): value = str(value) d[col.name] = value return d @classmethod def comment(cls, uuid, comment): with SqlDB.sql_session() as session: scanner: Scanner = session.query(Scanner).filter( Scanner.uuid == uuid).one_or_none() if scanner: scanner.comment = comment else: return None return None @classmethod def enable(cls, uuid): with SqlDB.sql_session() as session: scanner: Scanner = session.query(Scanner).filter( Scanner.uuid == uuid).one_or_none() if scanner: scanner.enabled = True else: return None return None @classmethod def disable(cls, uuid): with SqlDB.sql_session() as session: scanner: Scanner = session.query(Scanner).filter( Scanner.uuid == uuid).one_or_none() if scanner: scanner.enabled = False else: return None return None @classmethod def get(cls, scanner_id=None, uuid=None): if scanner_id is None and uuid is None: raise ValueError('Either scanner_id or uuid must be present.') with SqlDB.sql_session() as session: if scanner_id: scanner: Scanner = session.query(Scanner).filter( Scanner.id == scanner_id).one_or_none() elif uuid: scanner: Scanner = session.query(Scanner).filter( Scanner.uuid == uuid).one_or_none() else: return None if scanner is None: return None return scanner.as_dict() return None @classmethod def add(cls, uuid): try: with SqlDB.sql_session() as session: scanner = Scanner() scanner.uuid = uuid scanner.enabled = False scanner.scanners = {} session.add(scanner) session.flush() return scanner.id except IntegrityError: return None @classmethod def update(cls, uuid, json_data): with SqlDB.sql_session() as session: scanner: Scanner = session.query(Scanner).filter( Scanner.uuid == uuid).one_or_none() if not scanner: return False scanner.scanners = json_data return True @classmethod def is_enabled(cls, uuid): with SqlDB.sql_session() as session: scanner: Scanner = session.query(Scanner).filter( Scanner.uuid == uuid).one_or_none() if scanner is None: return None enabled = scanner.enabled return enabled Base.metadata.create_all(engine)