Changeset 5e086e in indico


Ignore:
Timestamp:
03/17/11 18:41:28 (2 years ago)
Author:
Pedro Ferreira <jose.pedro.ferreira@…>
Branches:
master, burotel, hello-world-walkthrough, ipv6, v0.98-series, v0.98.2, v0.98.3, v0.98b1, v0.98b2, v0.99, 051b2622c51afb171a1dedb46a0df4fbb0cbd02e, 0da0c1403bae8e51d8229f460181c71b9e6dda72
Children:
7ca902
Parents:
af3a6d
Message:

[FIX] AWOL task handling and task termination

  • Corrected some data structure inconsistencies;
  • Improved logging a bit;
Location:
indico/modules/scheduler
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • indico/modules/scheduler/base.py

    r9edb65 r5e086e  
    2020 
    2121import time, types 
    22 from contextlib import contextmanager 
    2322from ZODB.POSException import ConflictError 
    2423 
    2524from indico.util.date_time import nowutc 
    2625 
    27 TASK_STATUS_NONE, TASK_STATUS_SPOOLED, TASK_STATUS_QUEUED, TASK_STATUS_RUNNING, \ 
    28 TASK_STATUS_FAILED, TASK_STATUS_ABORTED, TASK_STATUS_FINISHED = range(0,7) 
     26TASK_STATUS_NONE, TASK_STATUS_SPOOLED, TASK_STATUS_QUEUED, \ 
     27TASK_STATUS_RUNNING, TASK_STATUS_FAILED, TASK_STATUS_ABORTED, \ 
     28TASK_STATUS_FINISHED, TASK_STATUS_TERMINATED = range(0,8) 
    2929 
    30 # threading vs. multiprocessing 
     30 
     31def status(num): 
     32    return ['NN', 'SP', 'QD', 'RN', 'FA', 'AB', 
     33            'FI', 'TD'][num] 
     34 
    3135 
    3236CONFLICTERROR_MAX_RETRIES = 10 
     37 
    3338 
    3439class OperationManager(object): 
     
    7479        return retValue 
    7580 
     81 
    7682## Time Sources 
    7783 
     
    97103        time.sleep(amount) 
    98104 
    99     def getCurrentTime(): 
     105    def getCurrentTime(self): 
    100106        """ 
    101107        Returns the current datetime 
     
    111117TimeSource._source = UTCTimeSource() 
    112118 
     119 
    113120## Exceptions 
    114121 
     
    119126class TaskStillRunningException(SchedulerException): 
    120127    def __init__(self, task): 
    121         SchedulerException.__init__(self, 'Task %s (%s) is currently running' % 
    122                                     (task.id, task.typeId)) 
     128        SchedulerException.__init__(self, '%s is currently running' % task) 
     129 
    123130 
    124131class TaskNotFoundException(SchedulerException): 
     
    129136    pass 
    130137 
     138 
    131139class SchedulerQuitException(SchedulerException): 
    132140    pass 
     141 
    133142 
    134143class SchedulerUnknownOperationException(SchedulerException): 
    135144    pass 
    136145 
     146 
    137147class SchedulerConfigurationException(SchedulerException): 
    138148    pass 
    139  
  • indico/modules/scheduler/module.py

    r41ca8c r5e086e  
    6868        if task.status != status: 
    6969            raise base.TaskInconsistentStatusException( 
    70                 "%s's status is not %s" % 
    71                 (task, status)) 
     70                "%s status is not %s" % 
     71                (task, base.status(status))) 
    7272 
    7373        if status == base.TASK_STATUS_RUNNING and \ 
    7474               task not in self._runningList: 
    7575                raise base.TaskInconsistentStatusException( 
    76                     'task %s (%s) was not found in the running task list' % 
    77                     (task.id, task)) 
     76                    'task %s was not found in the running task list' % task) 
    7877 
    7978        # TODO: remaining elifs 
     
    140139        return True 
    141140 
    142     def moveTask(self, task, moveFrom, status, occurrence = None, nocheck = False): 
     141    def removeRunningTask(self, task): 
    143142        """ 
    144143        Remove a task from the running list 
    145144        """ 
    146  
     145        try: 
     146            self._runningList.remove(task) 
     147            self._p_changed = True 
     148        except ValueError: 
     149            logging.getLogger('scheduler').exception() 
     150 
     151    def moveTask(self, task, moveFrom, status, occurrence=None, nocheck=False): 
     152        """ 
     153        Move a task somewhere 
     154        """ 
    147155        if not occurrence: 
    148156            occurrence = task 
     
    153161        if moveFrom == base.TASK_STATUS_RUNNING: 
    154162            # actually remove it from list 
    155             self._runningList.remove(task) 
    156             self._p_changed = True 
     163            self.removeRunningTask(task) 
     164 
    157165        elif moveFrom == base.TASK_STATUS_QUEUED: 
    158166            idx_timestamp = int_timestamp(task.getStartOn()) 
     
    165173        if status == base.TASK_STATUS_FINISHED: 
    166174            self._finishedIndex.index_obj(occurrence) 
    167         elif status == base.TASK_STATUS_FAILED: 
     175        elif status in [base.TASK_STATUS_FAILED, 
     176                        base.TASK_STATUS_TERMINATED]: 
    168177            self._failedIndex.index_obj(occurrence) 
    169178        elif status == base.TASK_STATUS_QUEUED: 
    170179            self.addTaskToWaitingQueue(occurrence) 
    171180 
    172  
    173     def addTaskToWaitingQueue(self, task, index = False): 
     181    def addTaskToWaitingQueue(self, task, index=False): 
    174182 
    175183        if index: 
     
    185193 
    186194        logging.getLogger('scheduler').debug( 
    187             'Added task %s to waitingQueue..' % task.id) 
     195            'Added %s to waitingQueue..' % task) 
    188196 
    189197    def popNextWaitingTask(self): 
  • indico/modules/scheduler/server.py

    r5d546a r5e086e  
    111111 
    112112    @base.OperationManager 
    113     def _db_moveTask(self, task, moveFrom, status, occurrence = None, nocheck = False): 
     113    def _db_moveTask(self, task, moveFrom, status, occurrence = None, 
     114                     nocheck = False, setStatus = False): 
    114115        self._schedModule.moveTask(task, moveFrom, status, 
    115116                                   occurrence = occurrence, nocheck = nocheck) 
     117        if setStatus: 
     118            task.setStatus(status) 
    116119 
    117120    @base.OperationManager 
     
    144147 
    145148        if status == base.TASK_STATUS_FINISHED: 
    146             self._logger.info('Task %s says it has finished..' % task) 
     149            self._logger.info('Task %s says it has finished...' % task) 
    147150        elif status == base.TASK_STATUS_FAILED: 
    148151            self._logger.error('Task %s says it has failed..' % task) 
    149152        else: 
    150             raise Exception('Impossible task state') 
     153            raise Exception('Impossible task/slave state') 
    151154 
    152155            # clean up the mess 
    153156            task.tearDown() 
     157 
     158        # task forcefully terminated? 
     159        if task.getStatus() == base.TASK_STATUS_TERMINATED: 
     160            # well, we have a final status already, and the task is 
     161            # by now properly indexed 
     162            self._logger.warning("%s finished after being terminated, with status %s" % (task, base.status(status))) 
     163 
     164            # If the task has been left running 
     165            if task in self._schedModule.getRunningList(): 
     166                self._schedModule.removeRunningTask(task) 
     167            # We end here! 
     168            return 
     169        # else... 
    154170 
    155171        task.setStatus(status) 
     
    299315        """ 
    300316 
    301         self._logger.info("Checking finished tasks") 
     317        self._logger.debug("Checking finished tasks") 
    302318 
    303319        for taskId, thread in self._runningWorkers.items(): 
     
    433449                          base.TASK_STATUS_FAILED) 
    434450 
    435         self._logger.info("Task %s dequeued from status %s" % (task, oldStatus)) 
     451        self._logger.info("%s dequeued from status %s" % \ 
     452                          (task, base.status(oldStatus))) 
    436453 
    437454    def _checkAWOLTasks(self): 
    438455 
    439         self._logger.info('Checking AWOL tasks...') 
     456        self._logger.debug('Checking AWOL tasks...') 
    440457 
    441458        for task in self._schedModule.getRunningList(): 
     
    464481                        task, 
    465482                        base.TASK_STATUS_RUNNING, 
    466                         base.TASK_STATUS_FAILED) 
     483                        base.TASK_STATUS_TERMINATED, 
     484                        setStatus=True) 
     485 
     486                    self._logger.info("Task %s terminated." % (task.id)) 
  • indico/modules/scheduler/slave.py

    r23945f r5e086e  
    103103            except Exception, e: 
    104104                nextRunIn = i * 10  # secs 
    105                 self._logger.exception("Task %s failed with exception '%s'. " % 
    106                                              (self._task.id, e)) 
     105                self._logger.exception("%s failed with exception '%s'. " % \ 
     106                                       (self._task, e)) 
    107107 
    108108                if  i < self._config.task_max_tries: 
     
    122122                self._setResult(True) 
    123123            if i > 1: 
    124                 self._logger.warning("Task %s failed %d times before " 
    125                                      "finishing correctly" % (self._task.id, i - 1)) 
     124                self._logger.warning("%s failed %d times before " 
     125                                     "finishing correctly" % (self._task, i - 1)) 
    126126        else: 
    127127            with self._dbi.transaction(): 
    128128                self._setResult(False) 
    129             self._logger.error("Task %s failed too many (%d) times. " 
    130                                "Aborting its execution.." % (self._task.id, i)) 
     129            self._logger.error("%s failed too many (%d) times. " 
     130                               "Aborting its execution.." % (self._task, i)) 
    131131 
    132132            self._logger.info("exiting") 
  • indico/modules/scheduler/tasks.py

    ra7950b r5e086e  
    1919## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 
    2020 
    21 import copy, logging, os 
     21import copy, logging, os, time 
    2222from dateutil import rrule 
    2323from datetime import timedelta 
     
    7979        self.id = None 
    8080        self.reset() 
     81        self.status = 0 
    8182 
    8283        self.startedOn = None 
     
    111112 
    112113    def setStatus(self, newstatus): 
    113         if hasattr(self, '_v_logger'): 
    114             self._v_logger.info("%s set status %s" % (self, newstatus)) 
     114        self.getLogger().info("%s set status %s" % (self, base.status(newstatus))) 
    115115        self.status = newstatus 
    116116 
     
    178178 
    179179    def __str__(self): 
    180         return "<%s %s %s %s>" % (self.typeId, self.id, self.status, self.getStartOn()) 
     180        return "[%s:%s|%s]" % (self.typeId, self.id, 
     181                               base.status(self.status)) 
    181182 
    182183 
     
    195196    def setStartOn(self, newtime): 
    196197        self.startDateTime = newtime 
     198 
     199    def suicide(self): 
     200        self.setStatus(base.TASK_STATUS_TERMINATED) 
     201        self.setEndedOn(self._getCurrentDateTime()) 
    197202 
    198203 
     
    344349    def run(self): 
    345350        from MaKaC.common.FoundationSync.foundationSync import FoundationSync 
    346         FoundationSync(self._v_logger).doAll() 
     351        FoundationSync(self.getLogger()).doAll() 
    347352 
    348353 
     
    584589                                     "Deleting alarm." % self.conf.getId()) 
    585590                self.conf.removeAlarm(self) 
     591                self.suicide() 
    586592            elif self.conf.getStartDate() <= self._getCurrentDateTime(): 
    587593                self.getLogger().warning("Conference %s already started. " 
    588594                                     "Deleting alarm." % self.conf.getId()) 
    589595                self.conf.removeAlarm(self) 
     596                self.suicide() 
    590597                return False 
    591598 
Note: See TracChangeset for help on using the changeset viewer.