Changeset dfc79a in indico
- Timestamp:
- 10/13/10 17:22:33 (3 years ago)
- Branches:
- master, burotel, hello-world-walkthrough, ipv6, v0.98-series, v0.98.2, v0.98.3, v0.98b1, v0.98b2, v0.99, 051b2622c51afb171a1dedb46a0df4fbb0cbd02e, d9941f8582b36b24821a11ea5ba16fda6a457fb1
- Children:
- 48dc69
- Parents:
- 904b3f
- git-author:
- Pedro Ferreira <jose.pedro.ferreira@…> (08/03/10 18:53:46)
- git-committer:
- Pedro Ferreira <jose.pedro.ferreira@…> (10/13/10 17:22:33)
- Files:
-
- 3 added
- 14 edited
-
doc/api/source/fossilize.rst (modified) (1 diff)
-
doc/api/source/images/scheduler_arch.svg (added)
-
doc/api/source/images/task_workflow.svg (added)
-
doc/api/source/images/test_framework_structure.svg (modified) (1 diff)
-
doc/api/source/index.rst (modified) (2 diffs)
-
doc/api/source/scheduler.rst (added)
-
indico/modules/__init__.py (modified) (1 diff)
-
indico/modules/base.py (modified) (2 diffs)
-
indico/modules/scheduler/__init__.py (modified) (1 diff)
-
indico/modules/scheduler/base.py (modified) (1 diff)
-
indico/modules/scheduler/client.py (modified) (3 diffs)
-
indico/modules/scheduler/module.py (modified) (1 diff)
-
indico/modules/scheduler/server.py (modified) (10 diffs)
-
indico/modules/scheduler/slave.py (modified) (3 diffs)
-
indico/modules/scheduler/tasks.py (modified) (10 diffs)
-
indico/tests/util.py (modified) (1 diff)
-
indico/util/date_time.py (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
doc/api/source/fossilize.rst
r4a70b5 rdfc79a 1 :mod:` fossilize` -- "Serializing" elaborate Python objects to dictionaries and lists2 ==================================================================================== 1 :mod:`indico.util.fossilize` -- "Serializing" elaborate Python objects to dictionaries and lists 2 ================================================================================================ 3 3 4 .. automodule:: MaKaC.common.fossilize4 .. automodule:: indico.util.fossilize 5 5 :members: 6 6 -
doc/api/source/images/test_framework_structure.svg
rebc17a rdfc79a 12 12 width="415.62421" 13 13 height="199.9501" 14 viewBox="0 0 415.62421 199.9501" 14 15 id="svg18969" 15 16 version="1.1" -
doc/api/source/index.rst
rebc17a rdfc79a 1 .. cds-indico documentation master file, created by1 .. indico documentation master file, created by 2 2 sphinx-quickstart on Sun Nov 29 13:19:24 2009. 3 3 You can adapt this file completely to your liking, but it should at least 4 4 contain the root `toctree` directive. 5 5 6 cds-indico API7 ========== ====6 Indico API 7 ========== 8 8 9 9 Contents: … … 15 15 ./fossilize.rst 16 16 ./test.rst 17 ./scheduler.rst 17 18 18 19 Indices and tables -
indico/modules/__init__.py
rc79cb2 rdfc79a 19 19 ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 20 20 21 """ 22 """ 23 21 24 from indico.modules.base import ModuleHolder, Module -
indico/modules/base.py
rc79cb2 rdfc79a 63 63 return "%s"%id 64 64 65 def getById( self, id ): 65 def destroyById(self, moduleId): 66 del self._getIdx()[str(moduleId)] 67 68 def getById(self, moduleId): 66 69 """returns an object from the index which id corresponds to the one 67 70 which is specified. 68 71 """ 69 72 70 if type( id) is int:71 id = str(id)72 if self._getIdx().has_key(str( id)):73 return self._getIdx()[str( id)]74 elif self._availableModules.has_key( id):75 newmod=self._availableModules[ id]()73 if type(moduleId) is int: 74 moduleId = str(moduleId) 75 if self._getIdx().has_key(str(moduleId)): 76 return self._getIdx()[str(moduleId)] 77 elif self._availableModules.has_key(moduleId): 78 newmod=self._availableModules[moduleId]() 76 79 self.add(newmod) 77 80 return newmod 78 81 else: 79 raise MaKaCError( ("Module id %s does not exist") % str( id) )82 raise MaKaCError( ("Module id %s does not exist") % str(moduleId) ) 80 83 81 84 … … 98 101 99 102 @classmethod 103 def destroyDBInstance(cls): 104 return ModuleHolder().destroyById(cls.id) 105 106 @classmethod 100 107 def getDBInstance(cls): 108 """ 109 Returns the module instance that is stored in the database 110 """ 101 111 return ModuleHolder().getById(cls.id) -
indico/modules/scheduler/__init__.py
r0ae298 rdfc79a 19 19 ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 20 20 21 """ 22 The ``scheduler`` module provides Indico with a scheduling API that allows specific jobs 23 (tasks to be run at given times, with a certain repeatibility, if needed). 24 """ 25 21 26 from indico.modules.scheduler.module import SchedulerModule 22 27 from indico.modules.scheduler.server import Scheduler -
indico/modules/scheduler/base.py
r904b3f rdfc79a 21 21 from contextlib import contextmanager 22 22 from ZODB.POSException import ConflictError 23 import transaction24 23 25 24 TASK_STATUS_NONE, TASK_STATUS_SPOOLED, TASK_STATUS_QUEUED, TASK_STATUS_RUNNING, \ -
indico/modules/scheduler/client.py
r0ae298 rdfc79a 24 24 25 25 """ 26 `Client`, as the name says, represents a scheduler client, that 27 allows Indico client processes to interact with the scheduler. 26 :py:class:`~indico.modules.scheduler.Client` provices a transparent scheduler 27 client, that allows Indico client processes to interact with the Scheduler 28 without the need for a lot of code. 28 29 29 It acts as a sort of proxy between clients and the SchedulerModule.30 It acts as a remote proxy. 30 31 """ 31 32 … … 37 38 """ 38 39 Schedules a task for execution 39 Returns a tuple containing the task id and the40 next execution time41 40 """ 42 41 … … 45 44 def shutdown(self, msg = ""): 46 45 """ 47 Shuts down the scheduler 46 Shuts down the scheduler. `msg` is an optional paramater that provides 47 an information message that will be written in the logs 48 48 """ 49 49 50 50 return self._schedMod.spool('shutdown', msg) 51 52 def getStatus(self): 53 """ 54 Returns status information (dictionary), containing the lengths (tasks) of: 55 * spool; 56 * waiting queue; 57 * running queue; 58 * finished task index; 59 * failed task index; 60 """ 61 62 return self._schedMod.getStatus() 63 64 def getTask(self, tid): 65 """ 66 Returns a :py:class:`task <indico.modules.scheduler.tasks.BaseTask>` object, 67 given its task id 68 """ 69 70 return self._schedMod.getTaskIndex()[tid] -
indico/modules/scheduler/module.py
r0ae298 rdfc79a 101 101 'waiting': len(self._waitingQueue), 102 102 'running': len(self._runningList), 103 'spooled': len(self._taskSpool) 103 'spooled': len(self._taskSpool), 104 'failed': self._failedIndex._num_docs() , 105 'finished': self._finishedIndex._num_docs() 104 106 } 105 107 -
indico/modules/scheduler/server.py
r904b3f rdfc79a 29 29 30 30 31 from indico.modules.scheduler import SchedulerModule, base 31 from indico.modules.scheduler import SchedulerModule, base, tasks 32 32 from indico.modules.scheduler.slave import Worker 33 33 from indico.util.date_time import nowutc, int_timestamp … … 36 36 class Scheduler(object): 37 37 """ 38 38 A :py:class:`~indico.modules.scheduler.Scheduler` object provides a job scheduler 39 based on a waiting queue, that communicates with its clients through the database. 40 Things have been done in a way that the probability of conflict is minimized, and 41 operations are repeated in case one happens. 42 43 The entry point of the process consists of a 'spooler' that periodically takes 44 tasks out of a `conflict-safe` FIFO (spool) and adds them to an ``IOBTree``-based 45 waiting queue. The waiting queue is then checked periodically for the next task, 46 and when the time comes the task is executed. 47 48 Tasks are executed in different threads. 49 50 The :py:class:`~indico.modules.scheduler.Client` class works as a transparent 51 remote proxy for this class. 39 52 """ 40 53 … … 45 58 # time to wait between cycles 46 59 'sleep_interval': 10, 60 47 61 # AWOL = Absent Without Leave 48 62 # [0.0, 1.0) probability that after a Scheduler tick it will check for AWOL 49 63 # tasks in the runningList the lower the number the lower the number of checks 50 64 'awol_tasks_check_probability': 0.3, 65 51 66 # seconds to consider a task AWOL 52 'awol_tasks_thresold': 6000 67 'awol_tasks_thresold': 6000, 68 69 # Number of times to try to run a task before aborting (min 1) 70 'task_max_tries': 10 53 71 } 54 72 … … 132 150 nextTS, nextTask = res 133 151 152 self._logger.info((nextTS, currentTimestamp)) 153 134 154 # if it's time to execute the task 135 155 if (nextTS <= currentTimestamp): … … 144 164 145 165 # move the tasks in the spool to the waiting queue 146 self._processSpool() 147 148 # process tasks that have finished meanwhile 149 # (tasks have been running in different threads, so, the sync 150 # thas was done above won't hurt) 151 self._checkFinishedTasks() 166 try: 167 self._processSpool() 168 finally: 169 # this `finally` makes sure finished tasks are handled 170 # even if a shutdown order is sent 171 172 # process tasks that have finished meanwhile 173 # (tasks have been running in different threads, so, the sync 174 # thas was done above won't hurt) 175 self._checkFinishedTasks() 152 176 153 177 # we also check AWOL tasks from time to time … … 164 188 need to be moved to the correct places 165 189 """ 190 191 self._logger.info("Checking finished tasks") 166 192 167 193 for taskId, thread in self._runningThreads.items(): … … 214 240 self._relaunchRunningListItems() 215 241 216 # iterate over the tasks in the waiting queue 217 # that should be running218 242 243 # iterate over the tasks in the waiting queue 244 # that should be running 219 245 for timestamp, curTask in self._iterateTasks(): 220 246 # execute the "task cycle" for each new task … … 228 254 raise e 229 255 230 231 256 def _taskCycle(self, timestamp, curTask): 232 257 … … 244 269 # Start a worker subprocess 245 270 # Add it to the thread dict 246 self._runningThreads[curTask.id] = Worker(curTask.id )271 self._runningThreads[curTask.id] = Worker(curTask.id, self._config) 247 272 self._runningThreads[curTask.id].start() 248 273 … … 321 346 task.tearDown() 322 347 323 # TODO324 # Add periodic tasks back to the queue325 #if isinstance(task, base.PeriodicTask):326 # Scheduler.addTask(self)348 if isinstance(task, tasks.PeriodicTask): 349 with self._op.commit('taskIdx'): 350 task.setNextOccurrence() 351 SchedulerModule.getDBInstance().addTaskToWaitingQueue(task) 327 352 328 353 def _notifyTaskFailed(self, task): -
indico/modules/scheduler/slave.py
rc79cb2 rdfc79a 19 19 ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 20 20 21 import logging21 import time, logging 22 22 23 23 from indico.modules.scheduler import SchedulerModule, base 24 24 from MaKaC.common import DBMgr 25 25 26 # Number of times to try to run a task before aborting27 TASK_MAX_RETRIES = 1028 29 26 30 27 class Worker(base._MT_UNIT): 31 28 32 def __init__(self, taskId ):29 def __init__(self, taskId, configData): 33 30 super(Worker, self).__init__() 34 31 35 self._logger = logging.getLogger('worker ')32 self._logger = logging.getLogger('worker/%s' % taskId) 36 33 self.success = None 37 34 self._taskId = taskId 35 self._config = configData 38 36 39 37 def _prepare(self): 40 38 """ 41 This acts as a second "constructor", that is executed in the39 This acts as a second 'constructor', that is executed in the 42 40 context of the thread (due to database reasons) 43 41 """ … … 64 62 self._dbi.startRequest() 65 63 66 while i < TASK_MAX_RETRIESand not self._task.endedOn:64 while i < self._config.task_max_tries and not self._task.endedOn: 67 65 i = i + 1 68 66 try: … … 77 75 self._logger.exception('Error message') 78 76 79 # We sleep progressivelly so that if the error is caused by concurrency 80 # we don't make the problem worse by hammering the server. 81 time.sleep(nextRunIn) 77 if i < self._config.task_max_tries: 78 # if i is still low enough, we sleep progressively more 79 # so that if the error is caused by concurrency we don't make 80 # the problem worse by hammering the server. 81 time.sleep(nextRunIn) 82 82 83 83 # abort transaction and synchronize -
indico/modules/scheduler/tasks.py
rc79cb2 rdfc79a 22 22 import logging 23 23 import time 24 import dateutil 24 25 25 26 from datetime import timedelta … … 50 51 class BaseTask(Persistent, Fossilizable): 51 52 """ 52 To create a new Task subclass Task and define a _run() method with 53 the tasks' actions 54 55 Description of each attribute: 56 AUTOMATIC ATTRS: 57 - startedOn: actual date the task started running 58 - endedOn: actual date the task's run method finished 59 - running: True or False depending on what the task thinks it's happening 60 61 USER-CONFIGURABLE ATTRS (through kw arguments to init and setters/getters) 62 - startOn: time at which the task creator wanted the task to start (can be blank) 63 - endOn: last point in time where the task can run. A task will never enter the 64 runningQueue if current time is past endOn 53 A base class for tasks. 54 `expiryDate` is the last point in time when the task can run. A task will refuse 55 to run if current time is past `expiryDate` 65 56 """ 66 57 67 58 fossilizes(ITaskFossil) 68 59 69 def __init__(self, **kwargs):60 def __init__(self, expiryDate=None): 70 61 self.createdOn = nowutc() 62 self.expiryDate = expiryDate 71 63 self.typeId = self.__class__.__name__ 72 self.reset(**kwargs) 73 74 def reset(self, **kwargs): 64 self.id = None 65 self.reset() 66 67 def reset(self): 75 68 '''Resets a task to its state before being run''' 76 69 77 self.startedOn = None78 self.endedOn = None79 70 self.running = False 80 71 self.onRunningListSince = None 81 self.startOn = None82 self.endOn = None83 72 self.status = base.TASK_STATUS_NONE 84 self.id = None85 86 for k in ('startOn', 'endOn'):87 if k in kwargs:88 setattr(self, k, kwargs[k])89 90 def getEndOn(self):91 return self.EndOn92 93 def getStartOn(self):94 return self.startOn95 73 96 74 def getCreatedOn(self): 97 75 return self.createdOn 98 76 99 def setStartOn(self, newtime): 100 self.startOn = newtime 101 102 def setEndOn(self, newtime): 103 self.endOn = newtime 77 def getStartOn(self): 78 """ 79 To be overloaded 80 """ 81 82 def getLastFinishedOn(self): 83 """ 84 To be overloaded 85 """ 104 86 105 87 def setOnRunningListSince(self, sometime): … … 108 90 109 91 def setStatus(self, newstatus): 110 self.status = base.TASK_STATUS_QUEUED92 self.status = newstatus 111 93 112 94 def getOnRunningListSince(self): … … 122 104 self.id = newid 123 105 self.status = newstatus 124 125 def isPeriodic(self):126 return False127 128 def isPeriodicUnique(self):129 return False130 131 def getStartedOn(self):132 return self.startedOn133 134 def getEndedOn(self):135 return self.endedOn136 106 137 107 def plugLogger(self, logger): … … 145 115 def start(self): 146 116 147 tsDiff = int_timestamp(nowutc()) - int_timestamp(self. startOn)117 tsDiff = int_timestamp(nowutc()) - int_timestamp(self.getStartOn()) 148 118 149 119 if tsDiff < 0: 150 self.getLogger().debug('Task %s will wait for some time. (%s) > (%s)' % (self.id, self. startOn, nowutc()))120 self.getLogger().debug('Task %s will wait for some time. (%s) > (%s)' % (self.id, self.getStartOn(), nowutc())) 151 121 time.sleep(tsDiff) 152 122 153 if self.e ndOn and nowutc() > self.endOn:154 self.getLogger().warning('Task %s will not be executed, e ndOn (%s) < current time (%s)' % (self.id, self.endOn, nowutc()))123 if self.expiryDate and nowutc() > self.expiryDate: 124 self.getLogger().warning('Task %s will not be executed, expiryDate (%s) < current time (%s)' % (self.id, self.expiryDate, nowutc())) 155 125 return False 156 126 … … 169 139 170 140 def __str__(self): 171 return "<%s %s %s %s>" % (self.typeId, self.id, self.status, self. startOn)141 return "<%s %s %s %s>" % (self.typeId, self.id, self.status, self.getStartOn()) 172 142 173 143 174 144 class OneShotTask(BaseTask): 175 145 '''Tasks that are executed only once''' 176 def __init__(self, **kwargs): 177 super(OneShotTask, self).__init__(**kwargs) 178 if 'runOn' in kwargs: 179 self.runOn = kwargs['runOn'] 180 else: 181 self.runOn = nowutc() 182 146 def __init__(self, startDateTime): 147 super(OneShotTask, self).__init__() 148 149 self.startDateTime = startDateTime 150 self.startedOn = None 151 self.endedOn = None 152 153 def getStartOn(self): 154 return self.startDateTime 155 156 def setStartOn(self, newtime): 157 self.startDateTime = newtime 158 159 def getEndedOn(self): 160 return self.endedOn 161 162 def setEndedOn(self, dateTime): 163 self.endedOn = dateTime 164 165 def getStartedOn(self): 166 return self.startedOn 183 167 184 168 def getRunOn(self): 185 169 return self.runOn 186 170 171 def getLastFinishedOn(self): 172 return self.getEndedOn() 187 173 188 174 class PeriodicTask(BaseTask): … … 191 177 """ 192 178 193 def __init__(self, **kwargs):179 def __init__(self, frequency, **kwargs): 194 180 """ 195 Must be fed: 196 - interval: seconds between each successive run 181 - frequency - a valid dateutil frequency specifier (DAILY, HOURLY, etc...) 197 182 """ 198 super(PeriodicTask, self).__init__(**kwargs) 199 if 'interval' not in kwargs: 200 raise Exception('Error: PeriodicTask was not given an interval') 201 202 self.interval = kwargs['interval'] 203 204 205 def isPeriodic(self): 206 return True 207 208 def getInterval(self): 209 return self.interval 183 super(PeriodicTask, self).__init__() 184 185 self.frequency = frequency 186 self.interval = kwargs 187 self.nextOccurrence = None 188 self._lastFinishedOn = None 210 189 211 190 def start(self): … … 219 198 self.reset() 220 199 200 def setNextOccurrence(self): 201 l = list(dateutil.rrule.rrule( 202 self.frequency, 203 dtstart = nowutc(), 204 count = 1, 205 **self.interval 206 )) 207 208 if l: 209 self.nextOccurrence = l[0] 210 else: 211 return None 212 213 def getStartOn(self): 214 # if it's the first time, compute the next occurrence 215 if not self.nextOccurrence: 216 self.setNextOccurrence() 217 218 return self.nextOccurrence 219 220 def getLastFinishedOn(self): 221 return self._lastFinishedOn 222 221 223 222 224 class PeriodicUniqueTask(PeriodicTask): 223 225 '''Singleton periodic tasks: no two or more PeriodicUniqueTask of this 224 226 class will be queued or running at the same time''' 225 def __init__(self, **kwargs):226 super(PeriodicUniqueTask, self).__init__(**kwargs)227 228 def isPeriodicUnique(self):229 return True230 227 231 228 … … 506 503 # Date checkings... 507 504 from MaKaC.conference import ConferenceHolder 508 from MaKaC.common.timezoneUtils import nowutc509 505 if not ConferenceHolder().hasKey(self.conf.getId()) or \ 510 506 self.conf.getStartDate() <= nowutc(): … … 555 551 def run(self): 556 552 self.getLogger().debug('Now i shall sleeeeeeeep!') 557 time.sleep(1 0)553 time.sleep(1) 558 554 self.getLogger().debug('%s executed' % self.__class__.__name__) 559 555 -
indico/tests/util.py
rb37548 rdfc79a 45 45 Actually starts the server 46 46 """ 47 print "spawning server on PID %s" % self.pid48 47 self.server.main() 49 48 -
indico/util/date_time.py
r8eae48 rdfc79a 19 19 ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 20 20 21 import time, pytz 22 21 23 from MaKaC.common.timezoneUtils import nowutc 22 import time23 24 24 def int_timestamp(datetimeVal ):25 return int(time.mktime(datetimeVal. timetuple()))25 def int_timestamp(datetimeVal, tz = pytz.timezone('UTC')): 26 return int(time.mktime(datetimeVal.astimezone(tz).timetuple()))
Note: See TracChangeset
for help on using the changeset viewer.
