From 571997129ba5275cc5e148a8ac1c0f64d895a9ef Mon Sep 17 00:00:00 2001 From: Kristofer Hallin Date: Wed, 5 Jan 2022 11:46:15 +0100 Subject: Added database and API endpoints for scanners. --- src/db/scanner.py | 87 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 85 insertions(+), 2 deletions(-) (limited to 'src/db/scanner.py') diff --git a/src/db/scanner.py b/src/db/scanner.py index 714551f..e9ac8c3 100644 --- a/src/db/scanner.py +++ b/src/db/scanner.py @@ -3,9 +3,10 @@ from datetime import datetime from sqlalchemy import (Boolean, Column, DateTime, Integer, Unicode, UniqueConstraint, create_engine) +from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.declarative import declarative_base -from db import SqlDB, get_conn_str +from db.db import SqlDB, get_conn_str Base = declarative_base() engine = create_engine(get_conn_str()) @@ -16,6 +17,7 @@ class Scanner(Base): __table_args__ = ( None, UniqueConstraint('id'), + UniqueConstraint('uuid'), ) id = Column(Integer, autoincrement=True, primary_key=True) @@ -26,7 +28,7 @@ class Scanner(Base): onupdate=datetime.utcnow, nullable=False) comment = Column(Unicode(255), nullable=True) scanners = Column(Unicode(2048), nullable=False) - target = Column(Unicode(255), nullable=True) + targets = Column(Unicode(255), nullable=True) def as_dict(self): """Return JSON serializable dict.""" @@ -42,5 +44,86 @@ class Scanner(Base): d[col.name] = value return d + @classmethod + def comment(cls, uuid, comment): + with SqlDB.sql_session() as session: + scanner: Scanner = session.query(Scanner).filter(Scanner.uuid == uuid).one_or_none() + + if scanner: + scanner.comment = comment + else: + return None + return None + + @classmethod + def enable(cls, uuid): + with SqlDB.sql_session() as session: + scanner: Scanner = session.query(Scanner).filter(Scanner.uuid == uuid).one_or_none() + + if scanner: + scanner.enabled = True + else: + return None + return None + + @classmethod + def disable(cls, uuid): + with SqlDB.sql_session() as session: + scanner: Scanner = session.query(Scanner).filter(Scanner.uuid == uuid).one_or_none() + + if scanner: + scanner.enabled = False + else: + return None + return None + + @classmethod + def get(cls, scanner_id=None, uuid=None): + if scanner_id is None and uuid is None: + raise ValueError('Either scanner_id or uuid must be present.') + + with SqlDB.sql_session() as session: + if scanner_id: + scanner: Scanner = session.query(Scanner).filter(Scanner.id == scanner_id).one_or_none() + elif uuid: + scanner: Scanner = session.query(Scanner).filter(Scanner.uuid == uuid).one_or_none() + else: + return None + + if scanner is None: + return None + + return scanner.as_dict() + + return None + + @classmethod + def add(cls, uuid): + try: + with SqlDB.sql_session() as session: + scanner = Scanner() + scanner.uuid = uuid + scanner.enabled = False + scanner.scanners = "None" + + session.add(scanner) + session.flush() + + return scanner.id + except IntegrityError: + return None + + + @classmethod + def is_enabled(cls, uuid): + with SqlDB.sql_session() as session: + scanner: Scanner = session.query(Scanner).filter(Scanner.uuid == uuid).one_or_none() + if scanner is None: + return None + + enabled = scanner.enabled + + return enabled + Base.metadata.create_all(engine) -- cgit v1.1