Changeset 5e086e in indico
- Timestamp:
- 03/17/11 18:41:28 (2 years ago)
- Branches:
- master, burotel, hello-world-walkthrough, ipv6, v0.98-series, v0.98.2, v0.98.3, v0.98b1, v0.98b2, v0.99, 051b2622c51afb171a1dedb46a0df4fbb0cbd02e, 0da0c1403bae8e51d8229f460181c71b9e6dda72
- Children:
- 7ca902
- Parents:
- af3a6d
- Location:
- indico/modules/scheduler
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
indico/modules/scheduler/base.py
r9edb65 r5e086e 20 20 21 21 import time, types 22 from contextlib import contextmanager23 22 from ZODB.POSException import ConflictError 24 23 25 24 from indico.util.date_time import nowutc 26 25 27 TASK_STATUS_NONE, TASK_STATUS_SPOOLED, TASK_STATUS_QUEUED, TASK_STATUS_RUNNING, \ 28 TASK_STATUS_FAILED, TASK_STATUS_ABORTED, TASK_STATUS_FINISHED = range(0,7) 26 TASK_STATUS_NONE, TASK_STATUS_SPOOLED, TASK_STATUS_QUEUED, \ 27 TASK_STATUS_RUNNING, TASK_STATUS_FAILED, TASK_STATUS_ABORTED, \ 28 TASK_STATUS_FINISHED, TASK_STATUS_TERMINATED = range(0,8) 29 29 30 # threading vs. multiprocessing 30 31 def status(num): 32 return ['NN', 'SP', 'QD', 'RN', 'FA', 'AB', 33 'FI', 'TD'][num] 34 31 35 32 36 CONFLICTERROR_MAX_RETRIES = 10 37 33 38 34 39 class OperationManager(object): … … 74 79 return retValue 75 80 81 76 82 ## Time Sources 77 83 … … 97 103 time.sleep(amount) 98 104 99 def getCurrentTime( ):105 def getCurrentTime(self): 100 106 """ 101 107 Returns the current datetime … … 111 117 TimeSource._source = UTCTimeSource() 112 118 119 113 120 ## Exceptions 114 121 … … 119 126 class TaskStillRunningException(SchedulerException): 120 127 def __init__(self, task): 121 SchedulerException.__init__(self, ' Task %s (%s) is currently running' %122 (task.id, task.typeId)) 128 SchedulerException.__init__(self, '%s is currently running' % task) 129 123 130 124 131 class TaskNotFoundException(SchedulerException): … … 129 136 pass 130 137 138 131 139 class SchedulerQuitException(SchedulerException): 132 140 pass 141 133 142 134 143 class SchedulerUnknownOperationException(SchedulerException): 135 144 pass 136 145 146 137 147 class SchedulerConfigurationException(SchedulerException): 138 148 pass 139 -
indico/modules/scheduler/module.py
r41ca8c r5e086e 68 68 if task.status != status: 69 69 raise base.TaskInconsistentStatusException( 70 "%s 'sstatus is not %s" %71 (task, status))70 "%s status is not %s" % 71 (task, base.status(status))) 72 72 73 73 if status == base.TASK_STATUS_RUNNING and \ 74 74 task not in self._runningList: 75 75 raise base.TaskInconsistentStatusException( 76 'task %s (%s) was not found in the running task list' % 77 (task.id, task)) 76 'task %s was not found in the running task list' % task) 78 77 79 78 # TODO: remaining elifs … … 140 139 return True 141 140 142 def moveTask(self, task, moveFrom, status, occurrence = None, nocheck = False):141 def removeRunningTask(self, task): 143 142 """ 144 143 Remove a task from the running list 145 144 """ 146 145 try: 146 self._runningList.remove(task) 147 self._p_changed = True 148 except ValueError: 149 logging.getLogger('scheduler').exception() 150 151 def moveTask(self, task, moveFrom, status, occurrence=None, nocheck=False): 152 """ 153 Move a task somewhere 154 """ 147 155 if not occurrence: 148 156 occurrence = task … … 153 161 if moveFrom == base.TASK_STATUS_RUNNING: 154 162 # actually remove it from list 155 self. _runningList.remove(task)156 self._p_changed = True 163 self.removeRunningTask(task) 164 157 165 elif moveFrom == base.TASK_STATUS_QUEUED: 158 166 idx_timestamp = int_timestamp(task.getStartOn()) … … 165 173 if status == base.TASK_STATUS_FINISHED: 166 174 self._finishedIndex.index_obj(occurrence) 167 elif status == base.TASK_STATUS_FAILED: 175 elif status in [base.TASK_STATUS_FAILED, 176 base.TASK_STATUS_TERMINATED]: 168 177 self._failedIndex.index_obj(occurrence) 169 178 elif status == base.TASK_STATUS_QUEUED: 170 179 self.addTaskToWaitingQueue(occurrence) 171 180 172 173 def addTaskToWaitingQueue(self, task, index = False): 181 def addTaskToWaitingQueue(self, task, index=False): 174 182 175 183 if index: … … 185 193 186 194 logging.getLogger('scheduler').debug( 187 'Added task %s to waitingQueue..' % task.id)195 'Added %s to waitingQueue..' % task) 188 196 189 197 def popNextWaitingTask(self): -
indico/modules/scheduler/server.py
r5d546a r5e086e 111 111 112 112 @base.OperationManager 113 def _db_moveTask(self, task, moveFrom, status, occurrence = None, nocheck = False): 113 def _db_moveTask(self, task, moveFrom, status, occurrence = None, 114 nocheck = False, setStatus = False): 114 115 self._schedModule.moveTask(task, moveFrom, status, 115 116 occurrence = occurrence, nocheck = nocheck) 117 if setStatus: 118 task.setStatus(status) 116 119 117 120 @base.OperationManager … … 144 147 145 148 if status == base.TASK_STATUS_FINISHED: 146 self._logger.info('Task %s says it has finished.. ' % task)149 self._logger.info('Task %s says it has finished...' % task) 147 150 elif status == base.TASK_STATUS_FAILED: 148 151 self._logger.error('Task %s says it has failed..' % task) 149 152 else: 150 raise Exception('Impossible task state')153 raise Exception('Impossible task/slave state') 151 154 152 155 # clean up the mess 153 156 task.tearDown() 157 158 # task forcefully terminated? 159 if task.getStatus() == base.TASK_STATUS_TERMINATED: 160 # well, we have a final status already, and the task is 161 # by now properly indexed 162 self._logger.warning("%s finished after being terminated, with status %s" % (task, base.status(status))) 163 164 # If the task has been left running 165 if task in self._schedModule.getRunningList(): 166 self._schedModule.removeRunningTask(task) 167 # We end here! 168 return 169 # else... 154 170 155 171 task.setStatus(status) … … 299 315 """ 300 316 301 self._logger. info("Checking finished tasks")317 self._logger.debug("Checking finished tasks") 302 318 303 319 for taskId, thread in self._runningWorkers.items(): … … 433 449 base.TASK_STATUS_FAILED) 434 450 435 self._logger.info("Task %s dequeued from status %s" % (task, oldStatus)) 451 self._logger.info("%s dequeued from status %s" % \ 452 (task, base.status(oldStatus))) 436 453 437 454 def _checkAWOLTasks(self): 438 455 439 self._logger. info('Checking AWOL tasks...')456 self._logger.debug('Checking AWOL tasks...') 440 457 441 458 for task in self._schedModule.getRunningList(): … … 464 481 task, 465 482 base.TASK_STATUS_RUNNING, 466 base.TASK_STATUS_FAILED) 483 base.TASK_STATUS_TERMINATED, 484 setStatus=True) 485 486 self._logger.info("Task %s terminated." % (task.id)) -
indico/modules/scheduler/slave.py
r23945f r5e086e 103 103 except Exception, e: 104 104 nextRunIn = i * 10 # secs 105 self._logger.exception(" Task %s failed with exception '%s'. " %106 (self._task.id, e))105 self._logger.exception("%s failed with exception '%s'. " % \ 106 (self._task, e)) 107 107 108 108 if i < self._config.task_max_tries: … … 122 122 self._setResult(True) 123 123 if i > 1: 124 self._logger.warning(" Task%s failed %d times before "125 "finishing correctly" % (self._task .id, i - 1))124 self._logger.warning("%s failed %d times before " 125 "finishing correctly" % (self._task, i - 1)) 126 126 else: 127 127 with self._dbi.transaction(): 128 128 self._setResult(False) 129 self._logger.error(" Task%s failed too many (%d) times. "130 "Aborting its execution.." % (self._task .id, i))129 self._logger.error("%s failed too many (%d) times. " 130 "Aborting its execution.." % (self._task, i)) 131 131 132 132 self._logger.info("exiting") -
indico/modules/scheduler/tasks.py
ra7950b r5e086e 19 19 ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 20 20 21 import copy, logging, os 21 import copy, logging, os, time 22 22 from dateutil import rrule 23 23 from datetime import timedelta … … 79 79 self.id = None 80 80 self.reset() 81 self.status = 0 81 82 82 83 self.startedOn = None … … 111 112 112 113 def setStatus(self, newstatus): 113 if hasattr(self, '_v_logger'): 114 self._v_logger.info("%s set status %s" % (self, newstatus)) 114 self.getLogger().info("%s set status %s" % (self, base.status(newstatus))) 115 115 self.status = newstatus 116 116 … … 178 178 179 179 def __str__(self): 180 return "<%s %s %s %s>" % (self.typeId, self.id, self.status, self.getStartOn()) 180 return "[%s:%s|%s]" % (self.typeId, self.id, 181 base.status(self.status)) 181 182 182 183 … … 195 196 def setStartOn(self, newtime): 196 197 self.startDateTime = newtime 198 199 def suicide(self): 200 self.setStatus(base.TASK_STATUS_TERMINATED) 201 self.setEndedOn(self._getCurrentDateTime()) 197 202 198 203 … … 344 349 def run(self): 345 350 from MaKaC.common.FoundationSync.foundationSync import FoundationSync 346 FoundationSync(self. _v_logger).doAll()351 FoundationSync(self.getLogger()).doAll() 347 352 348 353 … … 584 589 "Deleting alarm." % self.conf.getId()) 585 590 self.conf.removeAlarm(self) 591 self.suicide() 586 592 elif self.conf.getStartDate() <= self._getCurrentDateTime(): 587 593 self.getLogger().warning("Conference %s already started. " 588 594 "Deleting alarm." % self.conf.getId()) 589 595 self.conf.removeAlarm(self) 596 self.suicide() 590 597 return False 591 598
Note: See TracChangeset
for help on using the changeset viewer.
