1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
import enum
from datetime import datetime
from sqlalchemy import (JSON, Boolean, Column, DateTime, Integer, Unicode,
UniqueConstraint, create_engine)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declarative_base
from db.db import SqlDB, get_conn_str
Base = declarative_base()
engine = create_engine(get_conn_str())
class Scanner(Base):
__tablename__ = 'scanners'
__table_args__ = (
None,
UniqueConstraint('id'),
UniqueConstraint('uuid'),
)
id = Column(Integer, autoincrement=True, primary_key=True)
uuid = Column(Unicode(37), nullable=False)
enabled = Column(Boolean, default=False, nullable=False)
first_seen = Column(DateTime, default=datetime.utcnow, nullable=False)
last_seen = Column(DateTime, default=datetime.utcnow,
onupdate=datetime.utcnow, nullable=False)
comment = Column(Unicode(255), default="", nullable=True)
scanners = Column(JSON, nullable=True)
def as_dict(self):
"""Return JSON serializable dict."""
d = {}
for col in self.__table__.columns:
value = getattr(self, col.name)
if issubclass(value.__class__, enum.Enum):
value = value.value
elif issubclass(value.__class__, Base):
continue
elif issubclass(value.__class__, datetime):
value = str(value)
d[col.name] = value
return d
@classmethod
def comment(cls, uuid, comment):
with SqlDB.sql_session() as session:
scanner: Scanner = session.query(Scanner).filter(
Scanner.uuid == uuid).one_or_none()
if scanner:
scanner.comment = comment
else:
return None
return None
@classmethod
def enable(cls, uuid):
with SqlDB.sql_session() as session:
scanner: Scanner = session.query(Scanner).filter(
Scanner.uuid == uuid).one_or_none()
if scanner:
scanner.enabled = True
else:
return None
return None
@classmethod
def disable(cls, uuid):
with SqlDB.sql_session() as session:
scanner: Scanner = session.query(Scanner).filter(
Scanner.uuid == uuid).one_or_none()
if scanner:
scanner.enabled = False
else:
return None
return None
@classmethod
def get(cls, scanner_id=None, uuid=None):
if scanner_id is None and uuid is None:
raise ValueError('Either scanner_id or uuid must be present.')
with SqlDB.sql_session() as session:
if scanner_id:
scanner: Scanner = session.query(Scanner).filter(
Scanner.id == scanner_id).one_or_none()
elif uuid:
scanner: Scanner = session.query(Scanner).filter(
Scanner.uuid == uuid).one_or_none()
else:
return None
if scanner is None:
return None
return scanner.as_dict()
return None
@classmethod
def add(cls, uuid):
try:
with SqlDB.sql_session() as session:
scanner = Scanner()
scanner.uuid = uuid
scanner.enabled = False
scanner.scanners = {}
session.add(scanner)
session.flush()
return scanner.id
except IntegrityError:
return None
@classmethod
def is_enabled(cls, uuid):
with SqlDB.sql_session() as session:
scanner: Scanner = session.query(Scanner).filter(
Scanner.uuid == uuid).one_or_none()
if scanner is None:
return None
enabled = scanner.enabled
return enabled
Base.metadata.create_all(engine)
|