source: indico/indico/modules/scheduler/server.py @ dfc79a

burotelhello-world-walkthroughipv6v0.98-seriesv0.98.2v0.98.3v0.98b1v0.98b2v0.99v1.0v1.1
Last change on this file since dfc79a was dfc79a, checked in by Pedro Ferreira <jose.pedro.ferreira@…>, 3 years ago

[IMP] First draft of Periodic tasks

  • Some minor fixes in server and workers, slightly better tests;
  • Moved task_max_retries to config option;
  • Periodic tasks - moved some things around;
  • New documentation for the scheduler module (still in progress);
  • Property mode set to 100644
File size: 13.1 KB
Line 
1# -*- coding: utf-8 -*-
2##
3##
4## This file is part of CDS Indico.
5## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007 CERN.
6##
7## CDS Indico is free software; you can redistribute it and/or
8## modify it under the terms of the GNU General Public License as
9## published by the Free Software Foundation; either version 2 of the
10## License, or (at your option) any later version.
11##
12## CDS Indico is distributed in the hope that it will be useful, but
13## WITHOUT ANY WARRANTY; without even the implied warranty of
14## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15## General Public License for more details.
16##
17## You should have received a copy of the GNU General Public License
18## along with CDS Indico; if not, write to the Free Software Foundation, Inc.,
19## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
20
21import logging
22import os
23import thread
24import time
25import random
26
27from MaKaC.common import db
28from MaKaC.common.logger import Logger
29
30
31from indico.modules.scheduler import SchedulerModule, base, tasks
32from indico.modules.scheduler.slave import Worker
33from indico.util.date_time import nowutc, int_timestamp
34
35
36class Scheduler(object):
37    """
38    A :py:class:`~indico.modules.scheduler.Scheduler` object provides a job scheduler
39    based on a waiting queue, that communicates with its clients through the database.
40    Things have been done in a way that the probability of conflict is minimized, and
41    operations are repeated in case one happens.
42
43    The entry point of the process consists of a 'spooler' that periodically takes
44    tasks out of a `conflict-safe` FIFO (spool) and adds them to an ``IOBTree``-based
45    waiting queue. The waiting queue is then checked periodically for the next task,
46    and when the time comes the task is executed.
47
48    Tasks are executed in different threads.
49
50    The :py:class:`~indico.modules.scheduler.Client` class works as a transparent
51    remote proxy for this class.
52    """
53
54    __instance = None
55
56    # configuration options
57    _options = {
58        # time to wait between cycles
59        'sleep_interval': 10,
60
61        # AWOL = Absent Without Leave
62        # [0.0, 1.0) probability that after a Scheduler tick it will check for AWOL
63        # tasks in the runningList the lower the number the lower the number of checks
64        'awol_tasks_check_probability': 0.3,
65
66        # seconds to consider a task AWOL
67        'awol_tasks_thresold': 6000,
68
69        # Number of times to try to run a task before aborting (min 1)
70        'task_max_tries': 10
71        }
72
73    def __init__(self, **config):
74        """
75        config is a dictionary containing configuration parameters
76        """
77
78        self._readConfig(config)
79
80        self._logger = logging.getLogger('scheduler')
81
82        self._dbi = db.DBMgr.getInstance()
83        self._op = base.OperationManager(self._dbi, self._logger)
84
85        self._dbi.startRequest()
86        self._schedModule = SchedulerModule.getDBInstance()
87
88        self._runningThreads = {}
89
90    def _readConfig(self, config):
91        """
92        Reads the config dictionary and verifies the parameters are ok.
93        If it's the case, it sets them.
94        """
95
96        class DummyType(object):
97            pass
98
99        self._config = DummyType()
100        self._config.__dict__ = dict(Scheduler._options)
101
102        for name, value in config.iteritems():
103            if name not in Scheduler._options:
104                raise base.SchedulerConfigurationException(
105                    'Option %s is not supported!')
106            else:
107                setattr(self._config, name, value)
108
109    def _relaunchRunningListItems(self):
110        # During startup any item in runningList will have died prematurely
111        # (except for AWOL tasks), so we relaunch them.
112
113        for task in self._schedModule.getRunningList():
114            self._logger.warning('Task %s found in runningList on startup. Relaunching..' % task.id)
115            task.tearDown()
116            try:
117                self._schedModule.moveTaskFromRunningList(task,
118                                                          base.TASK_STATUS_QUEUED)
119            except base.TaskInconsistentStatusException, e:
120                self._logger.exception("Problem relaunching task %s - "
121                                       "setting it as failed" % task)
122                self._schedModule.moveTaskFromRunningList(
123                    task,
124                    base.TASK_STATUS_FAILED,
125                    nocheck=True)
126
127                self._dbi.commit();
128
129    def _iterateTasks(self):
130        """
131        Iterate over all the tasks in the waiting queue, blocking
132        in case there are none
133        """
134        while True:
135
136            currentTimestamp = int_timestamp(nowutc())
137
138            # this will basically abort the transaction, so, make sure
139            # everything important before this was committed
140            self._readFromDb()
141
142            # print a status message (only in debug mode)
143            self._printStatus(mode = 'debug')
144
145            # get the next task in queue
146            res = self._schedModule.peekNextWaitingTask()
147
148            if res:
149                # it's actually a timestamp, task tuple
150                nextTS, nextTask = res
151
152                self._logger.info((nextTS, currentTimestamp))
153
154                # if it's time to execute the task
155                if  (nextTS <= currentTimestamp):
156                    yield nextTS, nextTask
157
158                    # don't sleep, jump back to the beginning of the cycle
159                    continue
160
161            # we assume the task cycle has been completed (if there was one)
162            # so, we can just reset the db status (sync)
163            self._readFromDb()
164
165            # move the tasks in the spool to the waiting queue
166            try:
167                self._processSpool()
168            finally:
169                # this `finally` makes sure finished tasks are handled
170                # even if a shutdown order is sent
171
172                # process tasks that have finished meanwhile
173                # (tasks have been running in different threads, so, the sync
174                # thas was done above won't hurt)
175                self._checkFinishedTasks()
176
177            # we also check AWOL tasks from time to time
178            if random.random() < self._config.awol_tasks_check_probability:
179                self._checkAWOLTasks()
180
181            # if we get here, we have nothing else to do...
182            self._sleep('Nothing to do. Sleeping for %d secs...' %
183                        self._config.sleep_interval)
184
185    def _checkFinishedTasks(self):
186        """
187        Check if there are any tasks that have finished recently, and
188        need to be moved to the correct places
189        """
190
191        self._logger.info("Checking finished tasks")
192
193        for taskId, thread in self._runningThreads.items():
194
195            # the thread is dead? good, it's finished
196            if not thread.isAlive():
197                task = self._schedModule._taskIdx[taskId]
198
199                # let's check if it was successful or not
200                # and write it in the db
201                if thread.success:
202                    self._notifyTaskFinished(task)
203                elif thread.success == False:
204                    self._notifyTaskFailed(task)
205                else:
206                    # something weird happened
207                    self._logger.warning("task %s finished, but the return value "
208                                         "was %s" %
209                                         (task, thread.success))
210
211                # delete the entry from the dictionary
212                del self._runningThreads[taskId]
213
214    def _printStatus(self, mode='debug'):
215        """
216        Print an informative message with some run-time parameters
217        """
218        status = self._schedModule.getStatus()
219
220        if mode == 'debug':
221            func = self._logger.debug
222        else:
223            func = self._logger.info
224
225        func("Status: waiting: [%(waiting)d] | "
226             "running: [%(running)d] | "
227             "spooled: [%(spooled)d]" % status)
228
229    def run(self):
230        """
231        Main loop, should only be called from scheduler
232        """
233
234        try:
235            self._logger.info('**** Scheduler started')
236            self._printStatus()
237
238            # relaunch items that were running in the last session
239            with self._op.commit():
240                self._relaunchRunningListItems()
241
242
243            # iterate over the tasks in the waiting queue
244            # that should be running
245            for timestamp, curTask in self._iterateTasks():
246                # execute the "task cycle" for each new task
247                self._taskCycle(timestamp, curTask)
248
249        except base.SchedulerQuitException, e:
250            self._logger.info('Scheduler was shut down: %s' % e)
251            return 0
252        except Exception, e:
253            self._logger.error('Unexpected error')
254            raise e
255
256    def _taskCycle(self, timestamp, curTask):
257
258        # we commit at the end
259        with self._op.commit():
260            # remove task from waiting list
261            self._schedModule.removeWaitingTask(timestamp, curTask)
262
263            # mark the task as being in the running list
264            curTask.setOnRunningListSince(nowutc())
265
266            # add it to the running list
267            self._schedModule.addTaskToRunningList(curTask)
268
269        # Start a worker subprocess
270        # Add it to the thread dict
271        self._runningThreads[curTask.id] = Worker(curTask.id, self._config)
272        self._runningThreads[curTask.id].start()
273
274    def _popFromSpool(self):
275        """
276        Get the the element at the top of the spool
277        """
278        spool = self._schedModule.getSpool()
279
280        try:
281            with self._op.commit():
282                pair = spool.pull()
283        except IndexError:
284            pair = None
285
286        return pair
287
288    def _processSpool(self):
289        """
290        Adds all the tasks in the spool to the waiting list
291        """
292        # pop the first one
293        pair = self._popFromSpool()
294
295        while pair:
296            op, obj = pair
297
298            if op == 'add':
299                self._addTaskToQueue(obj)
300            elif op == 'shutdown':
301                raise base.SchedulerQuitException(obj)
302            else:
303                raise base.SchedulerUnknownOperationException(op)
304            pair = self._popFromSpool()
305
306    def _sleep(self, msg):
307        self._logger.debug(msg)
308        time.sleep(self._config.sleep_interval)
309
310    def _readFromDb(self):
311        self._logger.debug('_readFromDb()..')
312        self._dbi.sync()
313
314    def _abortDb(self):
315        self._logger.debug('_abortDb()..')
316        self._dbi.abort()
317
318    def _addTaskToQueue(self, task):
319        """
320        Submits a new task
321        """
322        task.setStatus(base.TASK_STATUS_QUEUED)
323
324        with self._op.commit('taskIdx'):
325            SchedulerModule.getDBInstance().addTaskToWaitingQueue(task)
326
327        self._logger.info("Task %s queued for execution" % task)
328
329
330    # notification mechanisms
331
332    def _notifyTaskFinished(self, task):
333        """
334        """
335
336        # Called by a task when it's done. If a task doesn't notify us
337        # after AWOL_TASKS_THRESHOLD seconds we assume it went AWOL and
338        # we notify the admins
339
340        self._logger.debug('Task %s says it\'s finished..' % task)
341
342        with self._op.commit():
343            self._schedModule.moveTaskFromRunningList(
344                task,
345                base.TASK_STATUS_FINISHED)
346            task.tearDown()
347
348        if isinstance(task, tasks.PeriodicTask):
349            with self._op.commit('taskIdx'):
350                task.setNextOccurrence()
351                SchedulerModule.getDBInstance().addTaskToWaitingQueue(task)
352
353    def _notifyTaskFailed(self, task):
354
355        self._logger.error('Task %s failed..' % task)
356
357        with self._op.commit():
358            self._schedModule.moveTaskFromRunningList(
359                task,
360                base.TASK_STATUS_FAILED)
361            task.tearDown()
362
363    def _checkAWOLTasks(self):
364
365        self._logger.info('Checking AWOL tasks...')
366
367        for task in self._schedModule.getRunningList():
368            if not task.getOnRunningListSince():
369                self._logger.warning("Task %s is in the runningList but has no "
370                               "onRunningListSince value! Removing from runningList "
371                               "and relaunching..." % (task.id))
372                task.tearDown()
373
374                # relaunch it
375                with self._op.commit():
376                    self._schedModule.moveTaskFromRunningList(
377                        task,
378                        base.TASK_STATUS_QUEUED)
379            else:
380                runForSecs = int_timestamp(nowutc()) - \
381                             int_timestamp(task.getOnRunningListSince())
382
383                if runForSecs > self._config.awol_tasks_thresold:
384                    self._logger.warning("Task %s has been running for %d secs. "
385                                   "Assuming it has died abnormally and forcibly "
386                                   "calling its tearDown()..." % (task.id, runForSecs))
387                    task.tearDown()
388
389                    with self._op.commit():
390                        self._schedModule.moveTaskFromRunningList(
391                            task,
392                            base.TASK_STATUS_FAIL)
393
394
395
Note: See TracBrowser for help on using the repository browser.