Python 에서 Scheduler 기능을 구현하기 위해 사용한 APScheduler 를 소개합니다.
Scheduler 기능을 제공하는 Package 는 몇 가지가 있습니다.
제가 조사한 Package 는 총 3가지 입니다.
- celery - 분산 작업큐 역할을 담당하는 프레임웍
- twisted - 비동기 IO 를 이용한 이벤트 방식의 네트워크 엔진 프레임웍
- apscheduler
여기서 apscheduler 가 다른 Package 에 비해 간단히 구현이 가능합니다. 또한 기타 다른 Package 와 통합할 수 있는 기능도 제공합니다.
(Python 공식문서에도 링크가 등록되어 있습니다.)
APScheduler 란?
Advanced Python Scheduler 의 약자로 Python code 를 주기적으로 수행할 수 있게 도와주는 Python Library 입니다.
APScheduler 는 자체적으로는 Daemon 이나 Service 가 아닙니다. 이미 존재하는 Application 내에서 수행됩니다.
APScheduler 수행방식
3가지 수행방식을 지원합니다.
- Cron 방식 - Cron 표현식으로 Python code 를 수행
- Date 방식 - 특정 날짜에 Python code 를 수행
- Interval 방식 - 일정 주기로 Python code 를 수행
각 방식마다 trigger parameter 가 달라집니다.
이는 아래 Sample 예제에서 소개하도록 하겠습니다.
JobStore
APScheduler 는 Job(Python code) 을 저장해 두고 주기적으로 수행하게 됩니다.
이 때 4가지 저장방식을 지원합니다.
- Memory(기본)
- SQLAlchemy - RDBMS 에서 동작
- MongoDB
- Redis
APScheduler 설치
$ pip install apscheduler
|
현재 3.2.0 버전이 최신 버전입니다.(at 2016/07/26)
Scheduler 종류
- BlockingScheduler - 단일 Job 수행시
- BackgroundScheduler - 다수 Job 수행시
- AsyncIOScheduler
- GeventScheduler
- TornadoScheduler
- TwistedScheduler
- QtScheduler
BackgroundScheduler
background 에서 Job 들이 수행되며, 여러 Job 들을 등록하여 동시에 수행할 수 있습니다.
import logging
import time
from apscheduler.schedulers.background import BackgroundScheduler
logging = logging.getLogger(__name__)
def tick():
logger.debug(“tick!!!”)
sched = BackgroundScheduler()
sched.add_job(tick, ‘interval’, seconds=3)
sched.start()
|
위 샘플은 3초마다 tick() 함수를 실행하는 코드입니다.
log 에 “tick!!!” 메시지가 3초마다 출력되는 것을 확인 할 수 있습니다.
add_job() 함수의 명세는 아래와 같습니다.
def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default', executor='default', replace_existing=False, **trigger_args) |
"""
add_job(func, trigger=None, args=None, kwargs=None, id=None, \ name=None, misfire_grace_time=undefined, coalesce=undefined, \ max_instances=undefined, next_run_time=undefined, \ jobstore='default', executor='default', \ replace_existing=False, **trigger_args) Adds the given job to the job list and wakes up the scheduler if it's already running. Any option that defaults to ``undefined`` will be replaced with the corresponding default value when the job is scheduled (which happens when the scheduler is started, or immediately if the scheduler is already running). The ``func`` argument can be given either as a callable object or a textual reference in the ``package.module:some.object`` format, where the first half (separated by ``:``) is an importable module and the second half is a reference to the callable object, relative to the module. The ``trigger`` argument can either be: #. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case any extra keyword arguments to this method are passed on to the trigger's constructor #. an instance of a trigger class :param func: callable (or a textual reference to one) to run at the given time :param str|apscheduler.triggers.base.BaseTrigger trigger: trigger that determines when ``func`` is called :param list|tuple args: list of positional arguments to call func with :param dict kwargs: dict of keyword arguments to call func with :param str|unicode id: explicit identifier for the job (for modifying it later) :param str|unicode name: textual description of the job :param int misfire_grace_time: seconds after the designated runtime that the job is still allowed to be run :param bool coalesce: run once instead of many times if the scheduler determines that the job should be run more than once in succession :param int max_instances: maximum number of concurrently running instances allowed for this job :param datetime next_run_time: when to first run the job, regardless of the trigger (pass ``None`` to add the job as paused) :param str|unicode jobstore: alias of the job store to store the job in :param str|unicode executor: alias of the executor to run the job with :param bool replace_existing: ``True`` to replace an existing job with the same ``id`` (but retain the number of runs from the existing one) :rtype: Job """ |
- func - 주기적으로 수행할 함수
- trigger - 수행방식
- cron
- date
- interval
- args - 주기적으로 수행할 함수에 전달될 파라메터
- id - Job ID, 추후 Job 을 수정할 때 사용
- **trigger_args - trigger 설정을 위한 정보
trigger 방식에 따라 필요한 정보에 차이가 있습니다.
샘플코드
#Cron 방식
from apscheduler.scheduler import BlockingScheduler
def job_function(): print "Hello World" sched = BlockingScheduler() # Schedules job_function to be run on the third Friday # of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00 sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') sched.start() |
#Inteval 방식
from datetime import datetime
from apscheduler.scheduler import BlockingScheduler def job_function(): print("Hello World") sched = BlockingScheduler() # Schedule job_function to be called every two hours sched.add_job(job_function, 'interval', hours=2) sched.start() |
BackgroundScheduler 샘플
Django 와 연동하여 web api 를 제공하기 위해 Wrapping 을 하여 만들어 보았습니다.
참고만 하시면 되겠습니다. Job 을 등록/삭제 기능만 사용하였습니다.(중지/재시작도 APScheduler 는 제공합니다.)
import logging
import time from apscheduler.schedulers.background import BackgroundScheduler logger = logging.getLogger(__name__) class JobLauncher: _sched = None def __init__(self): JobLauncher._sched = BackgroundScheduler() JobLauncher._sched.start() def __str__(self): return "JobLauncher" def run(self, job, request_params): job.job_params.update(request_params) return self.run_job(job) def stop(self, job): JobLauncher._sched.remove_job(job.name) def shutdown(self): if JobLauncher._sched.running(): logger.debug('Scheduler is shutting down.') JobLauncher._sched.shutdown() else: logger.warn("Cannot shutdown scheduler because scheduler is not running at the moment. please check scheduler status.") def run_job(self, job): if JobLauncher._sched.get_job(job.name) is None: if job.schedule_type == "cron": logger.debug(job) _job_trigger_params = JobTriggerParameterBuilder.build_cron_type_params(job.job_trigger_params) _job = JobLauncher._sched.add_job(job.execute, job.schedule_type, id=job.name, args=job.job_params.items(), year=_job_trigger_params['year'], month=_job_trigger_params['month'], day=_job_trigger_params['day'], hour=_job_trigger_params['hour'], minute=_job_trigger_params['minute'], second=_job_trigger_params['second']) return True return False class JobTriggerParameterBuilder: @staticmethod def build_cron_type_params(params): params.setdefault("year", "*") params.setdefault("month", "*") params.setdefault("day", "*") params.setdefault("hour", "*") params.setdefault("minute", "*") params.setdefault("second", "*") return params class CommonJob:
def __str__(self): return "Job Infos : {name : %s, schedule_type : %s, job_trigger_params : %s, job_params : %s}" % (self.name, self.schedule_type, self.job_trigger_params, self.job_params) @property def name(self): return self._name @property def schedule_type(self): return self._schedule_type @property def job_trigger_params(self): return self._job_trigger_params @property def job_params(self): return self._job_params def execute(self, *args, **kwargs): pass class JobLauncherHolder: _launcher = None @staticmethod def getInstance(): if not JobLauncherHolder._launcher: JobLauncherHolder._launcher = JobLauncher() return JobLauncherHolder._launcher |
import logging
from django.http import JsonResponse
from schedule.CommonScheduler import *
logger = logging.getLogger(__name__)
def start_job(request, job_name, mode): if request.method == "GET": if mode == "start": logger.debug(request.GET) launcher = JobLauncherHolder.getInstance() if launcher.run(job_dict[job_name], request.GET): result = get_result("success", "mode is %s" % mode) else: result = get_result("success", "fail to run job because of job's already started") elif mode == "stop": launcher = JobLauncherHolder.getInstance() launcher.stop(job_dict[job_name]) result = get_result("success", "mode is %s" % mode) else:
logger.warn(“¹미정의 mode(%s) 입니다." % mode)
result = get_result("fail", “정의되지 않은 Job Mode(%s) 입니다." % mode)
else:
result = get_result("fail", “지원하지 않는 방식입니다. GET 방식을 이용하세요.")
return JsonResponse(result, safe=False) def shutdown_job_launcher(request): launcher = JobLauncherHolder.getInstance() launcher.shutdown() result = {"flag":"success", "msg":"job_launcher stoped"} return JsonResponse(result, safe=False) def get_result(flag, message): return { "flag" : flag, "message" : message } |
참고
- https://pypi.python.org/pypi/APScheduler/3.2.0
- http://apscheduler.readthedocs.io/en/3.0/
- http://apscheduler.readthedocs.io/en/3.0/py-modindex.html
'Programing > Python' 카테고리의 다른 글
PyJNIus 사용기 (0) | 2016.07.28 |
---|---|
PEP8 이란? (0) | 2016.07.27 |
JPype 사용기 (0) | 2016.07.19 |
Python Intellij Project 설정시 Django 인식 오류 (0) | 2016.07.14 |
Window 환경에서 Python VirtualEnv 사용하기 (0) | 2016.07.06 |