티스토리 뷰

Programing/Python

APScheduler 사용기

Tomining 2016. 7. 26. 21:57
Python 에서 Scheduler 기능을 구현하기 위해 사용한 APScheduler 를 소개합니다.

Scheduler 기능을 제공하는 Package 는 몇 가지가 있습니다.
제가 조사한 Package 는 총 3가지 입니다.
  • celery - 분산 작업큐 역할을 담당하는 프레임웍
  • twisted - 비동기 IO 를 이용한 이벤트 방식의 네트워크 엔진 프레임웍
  • apscheduler

여기서 apscheduler 가 다른 Package 에 비해 간단히 구현이 가능합니다. 또한 기타 다른 Package 와 통합할 수 있는 기능도 제공합니다.
(Python 공식문서에도 링크가 등록되어 있습니다.)


APScheduler 란?

Advanced Python Scheduler 의 약자로 Python code 를 주기적으로 수행할 수 있게 도와주는 Python Library 입니다.
APScheduler 는 자체적으로는 Daemon 이나 Service 가 아닙니다. 이미 존재하는 Application 내에서 수행됩니다.

APScheduler 수행방식

3가지 수행방식을 지원합니다.

  1. Cron 방식 - Cron 표현식으로 Python code 를 수행
  2. Date 방식 - 특정 날짜에 Python code 를 수행
  3. Interval 방식 - 일정 주기로 Python code 를 수행

각 방식마다 trigger parameter 가 달라집니다.
이는 아래 Sample 예제에서 소개하도록 하겠습니다.

JobStore

APScheduler 는 Job(Python code) 을 저장해 두고 주기적으로 수행하게 됩니다.
이 때 4가지 저장방식을 지원합니다.

  • Memory(기본)
  • SQLAlchemy - RDBMS 에서 동작
  • MongoDB
  • Redis

APScheduler 설치


$ pip install apscheduler

현재 3.2.0 버전이 최신 버전입니다.(at 2016/07/26)

Scheduler 종류


BackgroundScheduler

background 에서 Job 들이 수행되며, 여러 Job 들을 등록하여 동시에 수행할 수 있습니다.

import logging
import time

from apscheduler.schedulers.background import BackgroundScheduler

logging = logging.getLogger(__name__)

def tick():
    logger.debug(“tick!!!”)

sched = BackgroundScheduler()
sched.add_job(tick, ‘interval’, seconds=3)
sched.start()

위 샘플은 3초마다 tick() 함수를 실행하는 코드입니다.
log 에 “tick!!!” 메시지가 3초마다 출력되는 것을 확인 할 수 있습니다.

add_job() 함수의 명세는 아래와 같습니다.

def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
            misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
            next_run_time=undefined, jobstore='default', executor='default',
            replace_existing=False, **trigger_args)
"""
add_job(func, trigger=None, args=None, kwargs=None, id=None, \
    name=None, misfire_grace_time=undefined, coalesce=undefined, \
    max_instances=undefined, next_run_time=undefined, \
    jobstore='default', executor='default', \
    replace_existing=False, **trigger_args)

Adds the given job to the job list and wakes up the scheduler if it's already running.

Any option that defaults to ``undefined`` will be replaced with the corresponding default
value when the job is scheduled (which happens when the scheduler is started, or
immediately if the scheduler is already running).

The ``func`` argument can be given either as a callable object or a textual reference in
the ``package.module:some.object`` format, where the first half (separated by ``:``) is an
importable module and the second half is a reference to the callable object, relative to
the module.

The ``trigger`` argument can either be:
  #. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case
    any extra keyword arguments to this method are passed on to the trigger's constructor
  #. an instance of a trigger class

:param func: callable (or a textual reference to one) to run at the given time
:param str|apscheduler.triggers.base.BaseTrigger trigger: trigger that determines when
    ``func`` is called
:param list|tuple args: list of positional arguments to call func with
:param dict kwargs: dict of keyword arguments to call func with
:param str|unicode id: explicit identifier for the job (for modifying it later)
:param str|unicode name: textual description of the job
:param int misfire_grace_time: seconds after the designated runtime that the job is still
    allowed to be run
:param bool coalesce: run once instead of many times if the scheduler determines that the
    job should be run more than once in succession
:param int max_instances: maximum number of concurrently running instances allowed for this
    job
:param datetime next_run_time: when to first run the job, regardless of the trigger (pass
    ``None`` to add the job as paused)
:param str|unicode jobstore: alias of the job store to store the job in
:param str|unicode executor: alias of the executor to run the job with
:param bool replace_existing: ``True`` to replace an existing job with the same ``id``
    (but retain the number of runs from the existing one)
:rtype: Job

"""

  • func - 주기적으로 수행할 함수
  • trigger - 수행방식
    • cron
    • date
    • interval
  • args - 주기적으로 수행할 함수에 전달될 파라메터
  • id - Job ID, 추후 Job 을 수정할 때 사용
  • **trigger_args - trigger 설정을 위한 정보
    trigger  방식에 따라 필요한 정보에 차이가 있습니다.

샘플코드

#Cron 방식
from apscheduler.scheduler import BlockingScheduler


def job_function():
    print "Hello World"

sched = BlockingScheduler()

# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

sched.start()
#Inteval 방식
from datetime import datetime

from apscheduler.scheduler import BlockingScheduler


def job_function():
    print("Hello World")

sched = BlockingScheduler()

# Schedule job_function to be called every two hours
sched.add_job(job_function, 'interval', hours=2)

sched.start()


BackgroundScheduler 샘플

Django 와 연동하여 web api 를 제공하기 위해 Wrapping 을 하여 만들어 보았습니다.
참고만 하시면 되겠습니다. Job 을 등록/삭제 기능만 사용하였습니다.(중지/재시작도 APScheduler 는 제공합니다.)

import logging
import time

from apscheduler.schedulers.background import BackgroundScheduler

logger = logging.getLogger(__name__)


class JobLauncher:

    _sched = None

    def __init__(self):
        JobLauncher._sched = BackgroundScheduler()
        JobLauncher._sched.start()

    def __str__(self):
        return "JobLauncher"

    def run(self, job, request_params):
        job.job_params.update(request_params)
        return self.run_job(job)

    def stop(self, job):
        JobLauncher._sched.remove_job(job.name)

    def shutdown(self):
        if JobLauncher._sched.running():
            logger.debug('Scheduler is shutting down.')
            JobLauncher._sched.shutdown()
        else:
            logger.warn("Cannot shutdown scheduler because scheduler is not running at the moment. please check scheduler status.")

    def run_job(self, job):
        if JobLauncher._sched.get_job(job.name) is None:
            if job.schedule_type == "cron":
                logger.debug(job)
                _job_trigger_params = JobTriggerParameterBuilder.build_cron_type_params(job.job_trigger_params)
                _job = JobLauncher._sched.add_job(job.execute, job.schedule_type, id=job.name, args=job.job_params.items(),
                                                  year=_job_trigger_params['year'], month=_job_trigger_params['month'], day=_job_trigger_params['day'],
                                                  hour=_job_trigger_params['hour'], minute=_job_trigger_params['minute'], second=_job_trigger_params['second'])
                return True

        return False


class JobTriggerParameterBuilder:

    @staticmethod
    def build_cron_type_params(params):
        params.setdefault("year""*")
        params.setdefault("month""*")
        params.setdefault("day""*")
        params.setdefault("hour""*")
        params.setdefault("minute""*")
        params.setdefault("second""*")
        return params



class CommonJob:

    def __str__(self):
        return "Job Infos : {name : %s, schedule_type : %s, job_trigger_params : %s, job_params : %s}" % (self.name, self.schedule_type, self.job_trigger_params, self.job_params)

    @property
    def name(self):
        return self._name

    @property
    def schedule_type(self):
        return self._schedule_type

    @property
    def job_trigger_params(self):
        return self._job_trigger_params

    @property
    def job_params(self):
        return self._job_params

    def execute(self, *args, **kwargs):
        pass


class JobLauncherHolder:

    _launcher = None

    @staticmethod
    def getInstance():
        if not JobLauncherHolder._launcher:
            JobLauncherHolder._launcher = JobLauncher()

        return JobLauncherHolder._launcher
import logging

from django.http import JsonResponse
from schedule.CommonScheduler import *

logger = logging.getLogger(__name__)


def start_job(request, job_name, mode):
    if request.method == "GET":
        if mode == "start":
            logger.debug(request.GET)
            launcher = JobLauncherHolder.getInstance()
            if launcher.run(job_dict[job_name], request.GET):
                result = get_result("success""mode is %s" % mode)
            else:
                result = get_result("success""fail to run job because of job's already started")
        elif mode == "stop":
            launcher = JobLauncherHolder.getInstance()
            launcher.stop(job_dict[job_name])
            result = get_result("success""mode is %s" % mode)
        else:
            logger.warn(“¹미정의 mode(%s) 입니다." % mode)
            result = get_result("fail"“정의되지 않은 Job Mode(%s) 입니다." % mode)
    else:
        result = get_result("fail"“지원하지 않는 방식입니다. GET 방식을 이용하세요.")

    return JsonResponse(result, safe=False)


def shutdown_job_launcher(request):
    launcher = JobLauncherHolder.getInstance()
    launcher.shutdown()
    result = {"flag":"success""msg":"job_launcher stoped"}

    return JsonResponse(result, safe=False)


def get_result(flag, message):
    return {
        "flag" : flag,
        "message" : message
    }




참고



'Programing > Python' 카테고리의 다른 글

PyJNIus 사용기  (0) 2016.07.28
PEP8 이란?  (0) 2016.07.27
JPype 사용기  (0) 2016.07.19
Python Intellij Project 설정시 Django 인식 오류  (0) 2016.07.14
Window 환경에서 Python VirtualEnv 사용하기  (0) 2016.07.06
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2024/04   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함