summaryrefslogtreecommitdiff
path: root/src/jobs.py
blob: 2e6a76a1cf2e729b0c3522544727c90e7233cfdb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from datetime import datetime

from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from pytz import utc

from db import SqlDB
from job import Job, JobStatus
from log import get_logger

logger = get_logger()


class JobScheduler(object):
    def __init__(self, nr_threads=10):
        self.__scheduler = BackgroundScheduler(
            executors={"default": ThreadPoolExecutor(nr_threads)},
            jobstores={"default": MemoryJobStore()},
            job_defaults={},
            timezone=utc,
        )

        self.reset_job_status()

    def reset_job_status(self):
        logger.info('Clearing jobs.')
        with SqlDB.sql_session() as session:
            query = session.query(Job).all()

            for instance in query:
                if instance.status == JobStatus.RUNNING or \
                   instance.status == JobStatus.SCHEDULED:
                    instance.status = JobStatus.CLEARED
                    logger.info(f'Job with ID {instance.id} was cleared.')

    def get(self):
        return self.__scheduler

    def start(self):
        return self.__scheduler.start()

    def stop(self):
        return self.__scheduler.shutdown()

    def add(self, func, comment='', **kwargs):
        with SqlDB.sql_session() as session:
            job = Job()
            job.starttime = datetime.now()
            job.comment = comment
            job.status = JobStatus.SCHEDULED

            session.add(job)
            session.flush()

            job_id = job.id
            kwargs['job_id'] = job_id
            kwargs['func'] = func

        logger.info(f'Adding new job with ID {job_id}')

        self.__scheduler.add_job(Job.starter, kwargs=kwargs)
        return job_id

    @classmethod
    def get_jobs(cls):
        jobs = list()

        with SqlDB.sql_session() as session:
            query = session.query(Job).all()

            for instance in query:
                job_dict = instance.as_dict()
                jobs.append(job_dict)

        return jobs

    @classmethod
    def get_job(cls, job_id):
        with SqlDB.sql_session() as session:
            job = session.query(Job).filter(Job.id == job_id).one_or_none()

            if job is None:
                return job
            job_dict = job.as_dict()

        return job_dict