from datetime import datetime from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers.background import BackgroundScheduler from pytz import utc from db import SqlDB from job import Job, JobStatus from log import get_logger logger = get_logger() class JobScheduler(object): def __init__(self, nr_threads=10): self.__scheduler = BackgroundScheduler( executors={"default": ThreadPoolExecutor(nr_threads)}, jobstores={"default": MemoryJobStore()}, job_defaults={}, timezone=utc, ) self.reset_job_status() def reset_job_status(self): logger.info('Clearing jobs.') with SqlDB.sql_session() as session: query = session.query(Job).all() for instance in query: if instance.status == JobStatus.RUNNING or \ instance.status == JobStatus.SCHEDULED: instance.status = JobStatus.CLEARED logger.info(f'Job with ID {instance.id} was cleared.') def get(self): return self.__scheduler def start(self): return self.__scheduler.start() def stop(self): return self.__scheduler.shutdown() def add(self, func, comment='', **kwargs): with SqlDB.sql_session() as session: job = Job() job.starttime = datetime.now() job.comment = comment job.status = JobStatus.SCHEDULED session.add(job) session.flush() job_id = job.id kwargs['job_id'] = job_id kwargs['func'] = func logger.info(f'Adding new job with ID {job_id}') self.__scheduler.add_job(Job.starter, kwargs=kwargs) return job_id @classmethod def get_jobs(cls): jobs = list() with SqlDB.sql_session() as session: query = session.query(Job).all() for instance in query: job_dict = instance.as_dict() jobs.append(job_dict) return jobs @classmethod def get_job(cls, job_id): with SqlDB.sql_session() as session: job = session.query(Job).filter(Job.id == job_id).one_or_none() if job is None: return job job_dict = job.as_dict() return job_dict