blob: 2e6a76a1cf2e729b0c3522544727c90e7233cfdb (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
from datetime import datetime
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from pytz import utc
from db import SqlDB
from job import Job, JobStatus
from log import get_logger
logger = get_logger()
class JobScheduler(object):
def __init__(self, nr_threads=10):
self.__scheduler = BackgroundScheduler(
executors={"default": ThreadPoolExecutor(nr_threads)},
jobstores={"default": MemoryJobStore()},
job_defaults={},
timezone=utc,
)
self.reset_job_status()
def reset_job_status(self):
logger.info('Clearing jobs.')
with SqlDB.sql_session() as session:
query = session.query(Job).all()
for instance in query:
if instance.status == JobStatus.RUNNING or \
instance.status == JobStatus.SCHEDULED:
instance.status = JobStatus.CLEARED
logger.info(f'Job with ID {instance.id} was cleared.')
def get(self):
return self.__scheduler
def start(self):
return self.__scheduler.start()
def stop(self):
return self.__scheduler.shutdown()
def add(self, func, comment='', **kwargs):
with SqlDB.sql_session() as session:
job = Job()
job.starttime = datetime.now()
job.comment = comment
job.status = JobStatus.SCHEDULED
session.add(job)
session.flush()
job_id = job.id
kwargs['job_id'] = job_id
kwargs['func'] = func
logger.info(f'Adding new job with ID {job_id}')
self.__scheduler.add_job(Job.starter, kwargs=kwargs)
return job_id
@classmethod
def get_jobs(cls):
jobs = list()
with SqlDB.sql_session() as session:
query = session.query(Job).all()
for instance in query:
job_dict = instance.as_dict()
jobs.append(job_dict)
return jobs
@classmethod
def get_job(cls, job_id):
with SqlDB.sql_session() as session:
job = session.query(Job).filter(Job.id == job_id).one_or_none()
if job is None:
return job
job_dict = job.as_dict()
return job_dict
|