Changeset dfc79a in indico


Ignore:
Timestamp:
10/13/10 17:22:33 (3 years ago)
Author:
Pedro Ferreira <jose.pedro.ferreira@…>
Branches:
master, burotel, hello-world-walkthrough, ipv6, v0.98-series, v0.98.2, v0.98.3, v0.98b1, v0.98b2, v0.99, 051b2622c51afb171a1dedb46a0df4fbb0cbd02e, d9941f8582b36b24821a11ea5ba16fda6a457fb1
Children:
48dc69
Parents:
904b3f
git-author:
Pedro Ferreira <jose.pedro.ferreira@…> (08/03/10 18:53:46)
git-committer:
Pedro Ferreira <jose.pedro.ferreira@…> (10/13/10 17:22:33)
Message:

[IMP] First draft of Periodic tasks

  • Some minor fixes in server and workers, slightly better tests;
  • Moved task_max_retries to config option;
  • Periodic tasks - moved some things around;
  • New documentation for the scheduler module (still in progress);
Files:
3 added
14 edited

Legend:

Unmodified
Added
Removed
  • doc/api/source/fossilize.rst

    r4a70b5 rdfc79a  
    1 :mod:`fossilize` -- "Serializing" elaborate Python objects to dictionaries and lists 
    2 ==================================================================================== 
     1:mod:`indico.util.fossilize` -- "Serializing" elaborate Python objects to dictionaries and lists 
     2================================================================================================ 
    33 
    4 .. automodule:: MaKaC.common.fossilize 
     4.. automodule:: indico.util.fossilize 
    55   :members: 
    66 
  • doc/api/source/images/test_framework_structure.svg

    rebc17a rdfc79a  
    1212   width="415.62421" 
    1313   height="199.9501" 
     14   viewBox="0 0 415.62421 199.9501" 
    1415   id="svg18969" 
    1516   version="1.1" 
  • doc/api/source/index.rst

    rebc17a rdfc79a  
    1 .. cds-indico documentation master file, created by 
     1.. indico documentation master file, created by 
    22   sphinx-quickstart on Sun Nov 29 13:19:24 2009. 
    33   You can adapt this file completely to your liking, but it should at least 
    44   contain the root `toctree` directive. 
    55 
    6 cds-indico API 
    7 ============== 
     6Indico API 
     7========== 
    88 
    99Contents: 
     
    1515   ./fossilize.rst 
    1616   ./test.rst 
     17   ./scheduler.rst 
    1718 
    1819Indices and tables 
  • indico/modules/__init__.py

    rc79cb2 rdfc79a  
    1919## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 
    2020 
     21""" 
     22""" 
     23 
    2124from indico.modules.base import ModuleHolder, Module 
  • indico/modules/base.py

    rc79cb2 rdfc79a  
    6363        return "%s"%id 
    6464 
    65     def getById( self, id ): 
     65    def destroyById(self, moduleId): 
     66        del self._getIdx()[str(moduleId)] 
     67 
     68    def getById(self, moduleId): 
    6669        """returns an object from the index which id corresponds to the one 
    6770            which is specified. 
    6871        """ 
    6972 
    70         if type(id) is int: 
    71             id = str(id) 
    72         if self._getIdx().has_key(str(id)): 
    73             return self._getIdx()[str(id)] 
    74         elif self._availableModules.has_key(id): 
    75             newmod=self._availableModules[id]() 
     73        if type(moduleId) is int: 
     74            moduleId = str(moduleId) 
     75        if self._getIdx().has_key(str(moduleId)): 
     76            return self._getIdx()[str(moduleId)] 
     77        elif self._availableModules.has_key(moduleId): 
     78            newmod=self._availableModules[moduleId]() 
    7679            self.add(newmod) 
    7780            return newmod 
    7881        else: 
    79             raise MaKaCError( ("Module id %s does not exist") % str(id) ) 
     82            raise MaKaCError( ("Module id %s does not exist") % str(moduleId) ) 
    8083 
    8184 
     
    98101 
    99102    @classmethod 
     103    def destroyDBInstance(cls): 
     104        return ModuleHolder().destroyById(cls.id) 
     105 
     106    @classmethod 
    100107    def getDBInstance(cls): 
     108        """ 
     109        Returns the module instance that is stored in the database 
     110        """ 
    101111        return ModuleHolder().getById(cls.id) 
  • indico/modules/scheduler/__init__.py

    r0ae298 rdfc79a  
    1919## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 
    2020 
     21""" 
     22The ``scheduler`` module provides Indico with a scheduling API that allows specific jobs 
     23(tasks to be run at given times, with a certain repeatibility, if needed). 
     24""" 
     25 
    2126from indico.modules.scheduler.module import SchedulerModule 
    2227from indico.modules.scheduler.server import Scheduler 
  • indico/modules/scheduler/base.py

    r904b3f rdfc79a  
    2121from contextlib import contextmanager 
    2222from ZODB.POSException import ConflictError 
    23 import transaction 
    2423 
    2524TASK_STATUS_NONE, TASK_STATUS_SPOOLED, TASK_STATUS_QUEUED, TASK_STATUS_RUNNING, \ 
  • indico/modules/scheduler/client.py

    r0ae298 rdfc79a  
    2424 
    2525    """ 
    26     `Client`, as the name says, represents a scheduler client, that 
    27     allows Indico client processes to interact with the scheduler. 
     26    :py:class:`~indico.modules.scheduler.Client` provices a transparent scheduler 
     27    client, that allows Indico client processes to interact with the Scheduler 
     28    without the need for a lot of code. 
    2829 
    29     It acts as a sort of proxy between clients and the SchedulerModule. 
     30    It acts as a remote proxy. 
    3031    """ 
    3132 
     
    3738        """ 
    3839        Schedules a task for execution 
    39         Returns a tuple containing the task id and the 
    40         next execution time 
    4140        """ 
    4241 
     
    4544    def shutdown(self, msg = ""): 
    4645        """ 
    47         Shuts down the scheduler 
     46        Shuts down the scheduler. `msg` is an optional paramater that provides 
     47        an information message that will be written in the logs 
    4848        """ 
    4949 
    5050        return self._schedMod.spool('shutdown', msg) 
     51 
     52    def getStatus(self): 
     53        """ 
     54        Returns status information (dictionary), containing the lengths (tasks) of: 
     55          * spool; 
     56          * waiting queue; 
     57          * running queue; 
     58          * finished task index; 
     59          * failed task index; 
     60        """ 
     61 
     62        return self._schedMod.getStatus() 
     63 
     64    def getTask(self, tid): 
     65        """ 
     66        Returns a :py:class:`task <indico.modules.scheduler.tasks.BaseTask>` object, 
     67        given its task id 
     68        """ 
     69 
     70        return self._schedMod.getTaskIndex()[tid] 
  • indico/modules/scheduler/module.py

    r0ae298 rdfc79a  
    101101            'waiting': len(self._waitingQueue), 
    102102            'running': len(self._runningList), 
    103             'spooled': len(self._taskSpool) 
     103            'spooled': len(self._taskSpool), 
     104            'failed': self._failedIndex._num_docs() , 
     105            'finished': self._finishedIndex._num_docs() 
    104106            } 
    105107 
  • indico/modules/scheduler/server.py

    r904b3f rdfc79a  
    2929 
    3030 
    31 from indico.modules.scheduler import SchedulerModule, base 
     31from indico.modules.scheduler import SchedulerModule, base, tasks 
    3232from indico.modules.scheduler.slave import Worker 
    3333from indico.util.date_time import nowutc, int_timestamp 
     
    3636class Scheduler(object): 
    3737    """ 
    38  
     38    A :py:class:`~indico.modules.scheduler.Scheduler` object provides a job scheduler 
     39    based on a waiting queue, that communicates with its clients through the database. 
     40    Things have been done in a way that the probability of conflict is minimized, and 
     41    operations are repeated in case one happens. 
     42 
     43    The entry point of the process consists of a 'spooler' that periodically takes 
     44    tasks out of a `conflict-safe` FIFO (spool) and adds them to an ``IOBTree``-based 
     45    waiting queue. The waiting queue is then checked periodically for the next task, 
     46    and when the time comes the task is executed. 
     47 
     48    Tasks are executed in different threads. 
     49 
     50    The :py:class:`~indico.modules.scheduler.Client` class works as a transparent 
     51    remote proxy for this class. 
    3952    """ 
    4053 
     
    4558        # time to wait between cycles 
    4659        'sleep_interval': 10, 
     60 
    4761        # AWOL = Absent Without Leave 
    4862        # [0.0, 1.0) probability that after a Scheduler tick it will check for AWOL 
    4963        # tasks in the runningList the lower the number the lower the number of checks 
    5064        'awol_tasks_check_probability': 0.3, 
     65 
    5166        # seconds to consider a task AWOL 
    52         'awol_tasks_thresold': 6000 
     67        'awol_tasks_thresold': 6000, 
     68 
     69        # Number of times to try to run a task before aborting (min 1) 
     70        'task_max_tries': 10 
    5371        } 
    5472 
     
    132150                nextTS, nextTask = res 
    133151 
     152                self._logger.info((nextTS, currentTimestamp)) 
     153 
    134154                # if it's time to execute the task 
    135155                if  (nextTS <= currentTimestamp): 
     
    144164 
    145165            # move the tasks in the spool to the waiting queue 
    146             self._processSpool() 
    147  
    148             # process tasks that have finished meanwhile 
    149             # (tasks have been running in different threads, so, the sync 
    150             # thas was done above won't hurt) 
    151             self._checkFinishedTasks() 
     166            try: 
     167                self._processSpool() 
     168            finally: 
     169                # this `finally` makes sure finished tasks are handled 
     170                # even if a shutdown order is sent 
     171 
     172                # process tasks that have finished meanwhile 
     173                # (tasks have been running in different threads, so, the sync 
     174                # thas was done above won't hurt) 
     175                self._checkFinishedTasks() 
    152176 
    153177            # we also check AWOL tasks from time to time 
     
    164188        need to be moved to the correct places 
    165189        """ 
     190 
     191        self._logger.info("Checking finished tasks") 
    166192 
    167193        for taskId, thread in self._runningThreads.items(): 
     
    214240                self._relaunchRunningListItems() 
    215241 
    216                 # iterate over the tasks in the waiting queue 
    217                 # that should be running 
    218  
     242 
     243            # iterate over the tasks in the waiting queue 
     244            # that should be running 
    219245            for timestamp, curTask in self._iterateTasks(): 
    220246                # execute the "task cycle" for each new task 
     
    228254            raise e 
    229255 
    230  
    231256    def _taskCycle(self, timestamp, curTask): 
    232257 
     
    244269        # Start a worker subprocess 
    245270        # Add it to the thread dict 
    246         self._runningThreads[curTask.id] = Worker(curTask.id) 
     271        self._runningThreads[curTask.id] = Worker(curTask.id, self._config) 
    247272        self._runningThreads[curTask.id].start() 
    248273 
     
    321346            task.tearDown() 
    322347 
    323         # TODO 
    324         # Add periodic tasks back to the queue 
    325         #if isinstance(task, base.PeriodicTask): 
    326         #    Scheduler.addTask(self) 
     348        if isinstance(task, tasks.PeriodicTask): 
     349            with self._op.commit('taskIdx'): 
     350                task.setNextOccurrence() 
     351                SchedulerModule.getDBInstance().addTaskToWaitingQueue(task) 
    327352 
    328353    def _notifyTaskFailed(self, task): 
  • indico/modules/scheduler/slave.py

    rc79cb2 rdfc79a  
    1919## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 
    2020 
    21 import logging 
     21import time, logging 
    2222 
    2323from indico.modules.scheduler import SchedulerModule, base 
    2424from MaKaC.common import DBMgr 
    2525 
    26 # Number of times to try to run a task before aborting 
    27 TASK_MAX_RETRIES = 10 
    28  
    2926 
    3027class Worker(base._MT_UNIT): 
    3128 
    32     def __init__(self, taskId): 
     29    def __init__(self, taskId, configData): 
    3330        super(Worker, self).__init__() 
    3431 
    35         self._logger = logging.getLogger('worker') 
     32        self._logger = logging.getLogger('worker/%s' % taskId) 
    3633        self.success = None 
    3734        self._taskId = taskId 
     35        self._config = configData 
    3836 
    3937    def _prepare(self): 
    4038        """ 
    41         This acts as a second "constructor", that is executed in the 
     39        This acts as a second 'constructor', that is executed in the 
    4240        context of the thread (due to database reasons) 
    4341        """ 
     
    6462        self._dbi.startRequest() 
    6563 
    66         while i < TASK_MAX_RETRIES and not self._task.endedOn: 
     64        while i < self._config.task_max_tries and not self._task.endedOn: 
    6765            i = i + 1 
    6866            try: 
     
    7775                self._logger.exception('Error message') 
    7876 
    79                 # We sleep progressivelly so that if the error is caused by concurrency 
    80                 # we don't make the problem worse by hammering the server. 
    81                 time.sleep(nextRunIn) 
     77                if  i < self._config.task_max_tries: 
     78                    # if i is still low enough, we sleep progressively more 
     79                    # so that if the error is caused by concurrency we don't make 
     80                    # the problem worse by hammering the server. 
     81                    time.sleep(nextRunIn) 
    8282 
    8383                # abort transaction and synchronize 
  • indico/modules/scheduler/tasks.py

    rc79cb2 rdfc79a  
    2222import logging 
    2323import time 
     24import dateutil 
    2425 
    2526from datetime import timedelta 
     
    5051class BaseTask(Persistent, Fossilizable): 
    5152    """ 
    52     To create a new Task subclass Task and define a _run() method with 
    53     the tasks' actions 
    54  
    55     Description of each attribute: 
    56     AUTOMATIC ATTRS: 
    57     - startedOn:  actual date the task started running 
    58     - endedOn:    actual date the task's run method finished 
    59     - running:    True or False depending on what the task thinks it's happening 
    60  
    61     USER-CONFIGURABLE ATTRS (through kw arguments to init and setters/getters) 
    62     - startOn:    time at which the task creator wanted the task to start (can be blank) 
    63     - endOn:      last point in time where the task can run. A task will never enter the 
    64                   runningQueue if current time is past endOn 
     53    A base class for tasks. 
     54    `expiryDate` is the last point in time when the task can run. A task will refuse 
     55    to run if current time is past `expiryDate` 
    6556    """ 
    6657 
    6758    fossilizes(ITaskFossil) 
    6859 
    69     def __init__(self, **kwargs): 
     60    def __init__(self, expiryDate=None): 
    7061        self.createdOn = nowutc() 
     62        self.expiryDate = expiryDate 
    7163        self.typeId = self.__class__.__name__ 
    72         self.reset(**kwargs) 
    73  
    74     def reset(self, **kwargs): 
     64        self.id = None 
     65        self.reset() 
     66 
     67    def reset(self): 
    7568        '''Resets a task to its state before being run''' 
    7669 
    77         self.startedOn = None 
    78         self.endedOn = None 
    7970        self.running = False 
    8071        self.onRunningListSince = None 
    81         self.startOn = None 
    82         self.endOn = None 
    8372        self.status = base.TASK_STATUS_NONE 
    84         self.id = None 
    85  
    86         for k in ('startOn', 'endOn'): 
    87             if k in kwargs: 
    88                 setattr(self, k, kwargs[k]) 
    89  
    90     def getEndOn(self): 
    91         return self.EndOn 
    92  
    93     def getStartOn(self): 
    94         return self.startOn 
    9573 
    9674    def getCreatedOn(self): 
    9775        return self.createdOn 
    9876 
    99     def setStartOn(self, newtime): 
    100         self.startOn = newtime 
    101  
    102     def setEndOn(self, newtime): 
    103         self.endOn = newtime 
     77    def getStartOn(self): 
     78        """ 
     79        To be overloaded 
     80        """ 
     81 
     82    def getLastFinishedOn(self): 
     83        """ 
     84        To be overloaded 
     85        """ 
    10486 
    10587    def setOnRunningListSince(self, sometime): 
     
    10890 
    10991    def setStatus(self, newstatus): 
    110         self.status = base.TASK_STATUS_QUEUED 
     92        self.status = newstatus 
    11193 
    11294    def getOnRunningListSince(self): 
     
    122104        self.id = newid 
    123105        self.status = newstatus 
    124  
    125     def isPeriodic(self): 
    126         return False 
    127  
    128     def isPeriodicUnique(self): 
    129         return False 
    130  
    131     def getStartedOn(self): 
    132         return self.startedOn 
    133  
    134     def getEndedOn(self): 
    135         return self.endedOn 
    136106 
    137107    def plugLogger(self, logger): 
     
    145115    def start(self): 
    146116 
    147         tsDiff = int_timestamp(nowutc()) - int_timestamp(self.startOn) 
     117        tsDiff = int_timestamp(nowutc()) - int_timestamp(self.getStartOn()) 
    148118 
    149119        if tsDiff < 0: 
    150             self.getLogger().debug('Task %s will wait for some time. (%s) > (%s)' % (self.id, self.startOn, nowutc())) 
     120            self.getLogger().debug('Task %s will wait for some time. (%s) > (%s)' % (self.id, self.getStartOn(), nowutc())) 
    151121            time.sleep(tsDiff) 
    152122 
    153         if self.endOn and nowutc() > self.endOn: 
    154             self.getLogger().warning('Task %s will not be executed, endOn (%s) < current time (%s)' % (self.id, self.endOn, nowutc())) 
     123        if self.expiryDate and nowutc() > self.expiryDate: 
     124            self.getLogger().warning('Task %s will not be executed, expiryDate (%s) < current time (%s)' % (self.id, self.expiryDate, nowutc())) 
    155125            return False 
    156126 
     
    169139 
    170140    def __str__(self): 
    171         return "<%s %s %s %s>" % (self.typeId, self.id, self.status, self.startOn) 
     141        return "<%s %s %s %s>" % (self.typeId, self.id, self.status, self.getStartOn()) 
    172142 
    173143 
    174144class OneShotTask(BaseTask): 
    175145    '''Tasks that are executed only once''' 
    176     def __init__(self, **kwargs): 
    177         super(OneShotTask, self).__init__(**kwargs) 
    178         if 'runOn' in kwargs: 
    179             self.runOn = kwargs['runOn'] 
    180         else: 
    181             self.runOn = nowutc() 
    182  
     146    def __init__(self, startDateTime): 
     147        super(OneShotTask, self).__init__() 
     148 
     149        self.startDateTime = startDateTime 
     150        self.startedOn = None 
     151        self.endedOn = None 
     152 
     153    def getStartOn(self): 
     154        return self.startDateTime 
     155 
     156    def setStartOn(self, newtime): 
     157        self.startDateTime = newtime 
     158 
     159    def getEndedOn(self): 
     160        return self.endedOn 
     161 
     162    def setEndedOn(self, dateTime): 
     163        self.endedOn = dateTime 
     164 
     165    def getStartedOn(self): 
     166        return self.startedOn 
    183167 
    184168    def getRunOn(self): 
    185169        return self.runOn 
    186170 
     171    def getLastFinishedOn(self): 
     172        return self.getEndedOn() 
    187173 
    188174class PeriodicTask(BaseTask): 
     
    191177    """ 
    192178 
    193     def __init__(self, **kwargs): 
     179    def __init__(self, frequency, **kwargs): 
    194180        """ 
    195         Must be fed: 
    196         - interval: seconds between each successive run 
     181        - frequency - a valid dateutil frequency specifier (DAILY, HOURLY, etc...) 
    197182        """ 
    198         super(PeriodicTask, self).__init__(**kwargs) 
    199         if 'interval' not in kwargs: 
    200             raise Exception('Error: PeriodicTask was not given an interval') 
    201  
    202         self.interval = kwargs['interval'] 
    203  
    204  
    205     def isPeriodic(self): 
    206         return True 
    207  
    208     def getInterval(self): 
    209         return self.interval 
     183        super(PeriodicTask, self).__init__() 
     184 
     185        self.frequency = frequency 
     186        self.interval = kwargs 
     187        self.nextOccurrence = None 
     188        self._lastFinishedOn = None 
    210189 
    211190    def start(self): 
     
    219198        self.reset() 
    220199 
     200    def setNextOccurrence(self): 
     201        l = list(dateutil.rrule.rrule( 
     202            self.frequency, 
     203            dtstart = nowutc(), 
     204            count = 1, 
     205            **self.interval 
     206            )) 
     207 
     208        if l: 
     209            self.nextOccurrence = l[0] 
     210        else: 
     211            return None 
     212 
     213    def getStartOn(self): 
     214        # if it's the first time, compute the next occurrence 
     215        if not self.nextOccurrence: 
     216            self.setNextOccurrence() 
     217 
     218        return self.nextOccurrence 
     219 
     220    def getLastFinishedOn(self): 
     221        return self._lastFinishedOn 
     222 
    221223 
    222224class PeriodicUniqueTask(PeriodicTask): 
    223225    '''Singleton periodic tasks: no two or more PeriodicUniqueTask of this 
    224226    class will be queued or running at the same time''' 
    225     def __init__(self, **kwargs): 
    226         super(PeriodicUniqueTask, self).__init__(**kwargs) 
    227  
    228     def isPeriodicUnique(self): 
    229         return True 
    230227 
    231228 
     
    506503        # Date checkings... 
    507504        from MaKaC.conference import ConferenceHolder 
    508         from MaKaC.common.timezoneUtils import nowutc 
    509505        if not ConferenceHolder().hasKey(self.conf.getId()) or \ 
    510506                self.conf.getStartDate() <= nowutc(): 
     
    555551    def run(self): 
    556552        self.getLogger().debug('Now i shall sleeeeeeeep!') 
    557         time.sleep(10) 
     553        time.sleep(1) 
    558554        self.getLogger().debug('%s executed' % self.__class__.__name__) 
    559555 
  • indico/tests/util.py

    rb37548 rdfc79a  
    4545        Actually starts the server 
    4646        """ 
    47         print "spawning server on PID %s" % self.pid 
    4847        self.server.main() 
    4948 
  • indico/util/date_time.py

    r8eae48 rdfc79a  
    1919## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 
    2020 
     21import time, pytz 
     22 
    2123from MaKaC.common.timezoneUtils import nowutc 
    22 import time 
    2324 
    24 def int_timestamp(datetimeVal): 
    25     return int(time.mktime(datetimeVal.timetuple())) 
     25def int_timestamp(datetimeVal, tz = pytz.timezone('UTC')): 
     26    return int(time.mktime(datetimeVal.astimezone(tz).timetuple())) 
Note: See TracChangeset for help on using the changeset viewer.