summaryrefslogtreecommitdiff
path: root/src/db/scanner.py
blob: 625fd8e4b3abe24c86fde2edaea4b5b0a4db0247 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import enum
from datetime import datetime

from sqlalchemy import (JSON, Boolean, Column, DateTime, Integer, Unicode,
                        UniqueConstraint, create_engine)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declarative_base

from db.db import SqlDB, get_conn_str

Base = declarative_base()
engine = create_engine(get_conn_str())


class Scanner(Base):
    __tablename__ = 'scanners'
    __table_args__ = (
        None,
        UniqueConstraint('id'),
        UniqueConstraint('uuid'),
    )

    id = Column(Integer, autoincrement=True, primary_key=True)
    uuid = Column(Unicode(37), nullable=False)
    enabled = Column(Boolean, default=False, nullable=False)
    first_seen = Column(DateTime, default=datetime.utcnow, nullable=False)
    last_seen = Column(DateTime, default=datetime.utcnow,
                       onupdate=datetime.utcnow, nullable=False)
    comment = Column(Unicode(255), default="", nullable=True)
    scanners = Column(JSON, nullable=True)

    def as_dict(self):
        """Return JSON serializable dict."""
        d = {}
        for col in self.__table__.columns:
            value = getattr(self, col.name)
            if issubclass(value.__class__, enum.Enum):
                value = value.value
            elif issubclass(value.__class__, Base):
                continue
            elif issubclass(value.__class__, datetime):
                value = str(value)
            d[col.name] = value
        return d

    @classmethod
    def comment(cls, uuid, comment):
        with SqlDB.sql_session() as session:
            scanner: Scanner = session.query(Scanner).filter(
                Scanner.uuid == uuid).one_or_none()

            if scanner:
                scanner.comment = comment
            else:
                return None
        return None

    @classmethod
    def enable(cls, uuid):
        with SqlDB.sql_session() as session:
            scanner: Scanner = session.query(Scanner).filter(
                Scanner.uuid == uuid).one_or_none()

            if scanner:
                scanner.enabled = True
            else:
                return None
        return None

    @classmethod
    def disable(cls, uuid):
        with SqlDB.sql_session() as session:
            scanner: Scanner = session.query(Scanner).filter(
                Scanner.uuid == uuid).one_or_none()

            if scanner:
                scanner.enabled = False
            else:
                return None
        return None

    @classmethod
    def get(cls, scanner_id=None, uuid=None):
        if scanner_id is None and uuid is None:
            raise ValueError('Either scanner_id or uuid must be present.')

        with SqlDB.sql_session() as session:
            if scanner_id:
                scanner: Scanner = session.query(Scanner).filter(
                    Scanner.id == scanner_id).one_or_none()
            elif uuid:
                scanner: Scanner = session.query(Scanner).filter(
                    Scanner.uuid == uuid).one_or_none()
            else:
                return None

            if scanner is None:
                return None

            return scanner.as_dict()

        return None

    @classmethod
    def add(cls, uuid):
        try:
            with SqlDB.sql_session() as session:
                scanner = Scanner()
                scanner.uuid = uuid
                scanner.enabled = False
                scanner.scanners = {}

                session.add(scanner)
                session.flush()

                return scanner.id
        except IntegrityError:
            return None

    @classmethod
    def is_enabled(cls, uuid):
        with SqlDB.sql_session() as session:
            scanner: Scanner = session.query(Scanner).filter(
                Scanner.uuid == uuid).one_or_none()
            if scanner is None:
                return None

            enabled = scanner.enabled

        return enabled


Base.metadata.create_all(engine)