본문 바로가기
Programing/Python

APScheduler 사용기

by Tomining 2016. 7. 26.
Python 에서 Scheduler 기능을 구현하기 위해 사용한 APScheduler 를 소개합니다.

Scheduler 기능을 제공하는 Package 는 몇 가지가 있습니다.
제가 조사한 Package 는 총 3가지 입니다.
  • celery - 분산 작업큐 역할을 담당하는 프레임웍
  • twisted - 비동기 IO 를 이용한 이벤트 방식의 네트워크 엔진 프레임웍
  • apscheduler

여기서 apscheduler 가 다른 Package 에 비해 간단히 구현이 가능합니다. 또한 기타 다른 Package 와 통합할 수 있는 기능도 제공합니다.
(Python 공식문서에도 링크가 등록되어 있습니다.)


APScheduler 란?

Advanced Python Scheduler 의 약자로 Python code 를 주기적으로 수행할 수 있게 도와주는 Python Library 입니다.
APScheduler 는 자체적으로는 Daemon 이나 Service 가 아닙니다. 이미 존재하는 Application 내에서 수행됩니다.

APScheduler 수행방식

3가지 수행방식을 지원합니다.

  1. Cron 방식 - Cron 표현식으로 Python code 를 수행
  2. Date 방식 - 특정 날짜에 Python code 를 수행
  3. Interval 방식 - 일정 주기로 Python code 를 수행

각 방식마다 trigger parameter 가 달라집니다.
이는 아래 Sample 예제에서 소개하도록 하겠습니다.

JobStore

APScheduler 는 Job(Python code) 을 저장해 두고 주기적으로 수행하게 됩니다.
이 때 4가지 저장방식을 지원합니다.

  • Memory(기본)
  • SQLAlchemy - RDBMS 에서 동작
  • MongoDB
  • Redis

APScheduler 설치


$ pip install apscheduler

현재 3.2.0 버전이 최신 버전입니다.(at 2016/07/26)

Scheduler 종류


BackgroundScheduler

background 에서 Job 들이 수행되며, 여러 Job 들을 등록하여 동시에 수행할 수 있습니다.

import logging
import time

from apscheduler.schedulers.background import BackgroundScheduler

logging = logging.getLogger(__name__)

def tick():
    logger.debug(“tick!!!”)

sched = BackgroundScheduler()
sched.add_job(tick, ‘interval’, seconds=3)
sched.start()

위 샘플은 3초마다 tick() 함수를 실행하는 코드입니다.
log 에 “tick!!!” 메시지가 3초마다 출력되는 것을 확인 할 수 있습니다.

add_job() 함수의 명세는 아래와 같습니다.

def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
            misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
            next_run_time=undefined, jobstore='default', executor='default',
            replace_existing=False, **trigger_args)
"""
add_job(func, trigger=None, args=None, kwargs=None, id=None, \
    name=None, misfire_grace_time=undefined, coalesce=undefined, \
    max_instances=undefined, next_run_time=undefined, \
    jobstore='default', executor='default', \
    replace_existing=False, **trigger_args)

Adds the given job to the job list and wakes up the scheduler if it's already running.

Any option that defaults to ``undefined`` will be replaced with the corresponding default
value when the job is scheduled (which happens when the scheduler is started, or
immediately if the scheduler is already running).

The ``func`` argument can be given either as a callable object or a textual reference in
the ``package.module:some.object`` format, where the first half (separated by ``:``) is an
importable module and the second half is a reference to the callable object, relative to
the module.

The ``trigger`` argument can either be:
  #. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case
    any extra keyword arguments to this method are passed on to the trigger's constructor
  #. an instance of a trigger class

:param func: callable (or a textual reference to one) to run at the given time
:param str|apscheduler.triggers.base.BaseTrigger trigger: trigger that determines when
    ``func`` is called
:param list|tuple args: list of positional arguments to call func with
:param dict kwargs: dict of keyword arguments to call func with
:param str|unicode id: explicit identifier for the job (for modifying it later)
:param str|unicode name: textual description of the job
:param int misfire_grace_time: seconds after the designated runtime that the job is still
    allowed to be run
:param bool coalesce: run once instead of many times if the scheduler determines that the
    job should be run more than once in succession
:param int max_instances: maximum number of concurrently running instances allowed for this
    job
:param datetime next_run_time: when to first run the job, regardless of the trigger (pass
    ``None`` to add the job as paused)
:param str|unicode jobstore: alias of the job store to store the job in
:param str|unicode executor: alias of the executor to run the job with
:param bool replace_existing: ``True`` to replace an existing job with the same ``id``
    (but retain the number of runs from the existing one)
:rtype: Job

"""

  • func - 주기적으로 수행할 함수
  • trigger - 수행방식
    • cron
    • date
    • interval
  • args - 주기적으로 수행할 함수에 전달될 파라메터
  • id - Job ID, 추후 Job 을 수정할 때 사용
  • **trigger_args - trigger 설정을 위한 정보
    trigger  방식에 따라 필요한 정보에 차이가 있습니다.

샘플코드

#Cron 방식
from apscheduler.scheduler import BlockingScheduler


def job_function():
    print "Hello World"

sched = BlockingScheduler()

# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

sched.start()
#Inteval 방식
from datetime import datetime

from apscheduler.scheduler import BlockingScheduler


def job_function():
    print("Hello World")

sched = BlockingScheduler()

# Schedule job_function to be called every two hours
sched.add_job(job_function, 'interval', hours=2)

sched.start()


BackgroundScheduler 샘플

Django 와 연동하여 web api 를 제공하기 위해 Wrapping 을 하여 만들어 보았습니다.
참고만 하시면 되겠습니다. Job 을 등록/삭제 기능만 사용하였습니다.(중지/재시작도 APScheduler 는 제공합니다.)

import logging
import time

from apscheduler.schedulers.background import BackgroundScheduler

logger = logging.getLogger(__name__)


class JobLauncher:

    _sched = None

    def __init__(self):
        JobLauncher._sched = BackgroundScheduler()
        JobLauncher._sched.start()

    def __str__(self):
        return "JobLauncher"

    def run(self, job, request_params):
        job.job_params.update(request_params)
        return self.run_job(job)

    def stop(self, job):
        JobLauncher._sched.remove_job(job.name)

    def shutdown(self):
        if JobLauncher._sched.running():
            logger.debug('Scheduler is shutting down.')
            JobLauncher._sched.shutdown()
        else:
            logger.warn("Cannot shutdown scheduler because scheduler is not running at the moment. please check scheduler status.")

    def run_job(self, job):
        if JobLauncher._sched.get_job(job.name) is None:
            if job.schedule_type == "cron":
                logger.debug(job)
                _job_trigger_params = JobTriggerParameterBuilder.build_cron_type_params(job.job_trigger_params)
                _job = JobLauncher._sched.add_job(job.execute, job.schedule_type, id=job.name, args=job.job_params.items(),
                                                  year=_job_trigger_params['year'], month=_job_trigger_params['month'], day=_job_trigger_params['day'],
                                                  hour=_job_trigger_params['hour'], minute=_job_trigger_params['minute'], second=_job_trigger_params['second'])
                return True

        return False


class JobTriggerParameterBuilder:

    @staticmethod
    def build_cron_type_params(params):
        params.setdefault("year""*")
        params.setdefault("month""*")
        params.setdefault("day""*")
        params.setdefault("hour""*")
        params.setdefault("minute""*")
        params.setdefault("second""*")
        return params



class CommonJob:

    def __str__(self):
        return "Job Infos : {name : %s, schedule_type : %s, job_trigger_params : %s, job_params : %s}" % (self.name, self.schedule_type, self.job_trigger_params, self.job_params)

    @property
    def name(self):
        return self._name

    @property
    def schedule_type(self):
        return self._schedule_type

    @property
    def job_trigger_params(self):
        return self._job_trigger_params

    @property
    def job_params(self):
        return self._job_params

    def execute(self, *args, **kwargs):
        pass


class JobLauncherHolder:

    _launcher = None

    @staticmethod
    def getInstance():
        if not JobLauncherHolder._launcher:
            JobLauncherHolder._launcher = JobLauncher()

        return JobLauncherHolder._launcher
import logging

from django.http import JsonResponse
from schedule.CommonScheduler import *

logger = logging.getLogger(__name__)


def start_job(request, job_name, mode):
    if request.method == "GET":
        if mode == "start":
            logger.debug(request.GET)
            launcher = JobLauncherHolder.getInstance()
            if launcher.run(job_dict[job_name], request.GET):
                result = get_result("success""mode is %s" % mode)
            else:
                result = get_result("success""fail to run job because of job's already started")
        elif mode == "stop":
            launcher = JobLauncherHolder.getInstance()
            launcher.stop(job_dict[job_name])
            result = get_result("success""mode is %s" % mode)
        else:
            logger.warn(“¹미정의 mode(%s) 입니다." % mode)
            result = get_result("fail"“정의되지 않은 Job Mode(%s) 입니다." % mode)
    else:
        result = get_result("fail"“지원하지 않는 방식입니다. GET 방식을 이용하세요.")

    return JsonResponse(result, safe=False)


def shutdown_job_launcher(request):
    launcher = JobLauncherHolder.getInstance()
    launcher.shutdown()
    result = {"flag":"success""msg":"job_launcher stoped"}

    return JsonResponse(result, safe=False)


def get_result(flag, message):
    return {
        "flag" : flag,
        "message" : message
    }




참고



'Programing > Python' 카테고리의 다른 글

PyJNIus 사용기  (0) 2016.07.28
PEP8 이란?  (0) 2016.07.27
JPype 사용기  (0) 2016.07.19
Python Intellij Project 설정시 Django 인식 오류  (0) 2016.07.14
Window 환경에서 Python VirtualEnv 사용하기  (0) 2016.07.06