source: indico/indico/modules/scheduler/module.py @ d71e61

burotelhello-world-walkthroughipv6v0.98-seriesv0.98.2v0.98.3v0.98b1v0.98b2v0.99v1.0v1.1
Last change on this file since d71e61 was d71e61, checked in by Pedro Ferreira <jose.pedro.ferreira@…>, 2 years ago

[FIX] Alarms - relativity bug

  • Dates were always being saved as absolute ones;
  • Alarm modification was not working properly;
  • Property mode set to 100644
File size: 8.7 KB
Line 
1# -*- coding: utf-8 -*-
2##
3##
4## This file is part of CDS Indico.
5## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007 CERN.
6##
7## CDS Indico is free software; you can redistribute it and/or
8## modify it under the terms of the GNU General Public License as
9## published by the Free Software Foundation; either version 2 of the
10## License, or (at your option) any later version.
11##
12## CDS Indico is distributed in the hope that it will be useful, but
13## WITHOUT ANY WARRANTY; without even the implied warranty of
14## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15## General Public License for more details.
16##
17## You should have received a copy of the GNU General Public License
18## along with CDS Indico; if not, write to the Free Software Foundation, Inc.,
19## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
20
21
22import logging, time, datetime
23
24from BTrees.IOBTree import IOBTree
25from BTrees.Length import Length
26
27from MaKaC.trashCan import TrashCanManager
28
29from indico.modules import Module
30from indico.modules.scheduler import base, tasks
31from indico.util.struct.queue import PersistentWaitingQueue
32from indico.util.date_time import int_timestamp
33from indico.core.index import IOIndex, IIndexableByArbitraryDateTime
34
35from zc.queue import Queue
36
37class SchedulerModule(Module):
38    id = "scheduler"
39
40    def __init__(self):
41        # logging.getLogger('scheduler') = logging.getLogger('scheduler')
42        # logging.getLogger('scheduler').warning('Creating incomingQueue and runningList..')
43        self._waitingQueue = PersistentWaitingQueue()
44        self._runningList = []
45
46        # Failed tasks (there is an object still in the DB)
47        self._failedIndex = IOIndex(IIndexableByArbitraryDateTime)
48
49        # Finished tasks (no object data, just metadata)
50        self._finishedIndex = IOIndex(IIndexableByArbitraryDateTime)
51
52        # Stores all tasks
53        self._taskIdx = IOBTree()
54        self._taskCounter = Length(0)
55
56        # Is the scheduler running
57        self._schedulerStatus = False
58
59        # Temporary area where all the tasks stay before being
60        # added to the waiting list
61        self._taskSpool = Queue()
62
63    def _assertTaskStatus(self, task, status):
64        """
65        Confirm the status of this task
66        """
67
68        if task.status != status:
69            raise base.TaskInconsistentStatusException(
70                "%s status is not %s" %
71                (task, base.status(status)))
72
73        if status == base.TASK_STATUS_RUNNING and \
74               task not in self._runningList:
75                raise base.TaskInconsistentStatusException(
76                    'task %s was not found in the running task list' % task)
77
78        # TODO: remaining elifs
79
80    def _indexTask(self, task):
81        """
82        Provide the task with an id and add it to the
83        task index
84        """
85
86        # give it a serial id
87        task.initialize(self._taskCounter(), base.TASK_STATUS_SPOOLED)
88
89        # index it and increase the count
90        self._taskIdx[task.id] = task
91        self._taskCounter.change(1)
92
93        logging.getLogger('scheduler').debug(
94            'Added %s to index..' % task)
95
96
97    ## These are all interface methods, called by different modules
98
99    def getStatus(self):
100        """
101        Returns some basic info
102        """
103        return {
104            'state': self._schedulerStatus,
105            'waiting': len(self._waitingQueue),
106            'running': len(self._runningList),
107            'spooled': len(self._taskSpool),
108            'failed': self._failedIndex._num_objs() ,
109            'finished': self._finishedIndex._num_objs()
110            }
111
112    def getTaskById(self, taskId):
113        return self._taskIdx[taskId]
114
115    def getSpool(self):
116        return self._taskSpool
117
118    def clearSpool(self):
119        i = 0
120
121        try:
122            while(self._taskSpool.pull()):
123                i += 1
124        except IndexError:
125            pass
126
127        return i
128
129    def spool(self, op, obj):
130        """
131        Adds an 'instruction' to the spool, in the form (op, obj)
132        """
133
134        self._taskSpool.put((op, obj))
135
136        logging.getLogger('scheduler').debug(
137            'Added instruction %s to spool..' % ((op, obj),))
138
139        return True
140
141    def removeRunningTask(self, task):
142        """
143        Remove a task from the running list
144        """
145        try:
146            self._runningList.remove(task)
147            self._p_changed = True
148        except ValueError:
149            logging.getLogger('scheduler').exception()
150
151    def moveTask(self, task, moveFrom, status, occurrence=None, nocheck=False):
152        """
153        Move a task somewhere
154        """
155        if not occurrence:
156            occurrence = task
157
158        if not nocheck:
159            self._assertTaskStatus(task, moveFrom)
160
161        if moveFrom == base.TASK_STATUS_RUNNING:
162            # actually remove it from list
163            self.removeRunningTask(task)
164
165        elif moveFrom == base.TASK_STATUS_QUEUED:
166            idx_timestamp = int_timestamp(task.getStartOn())
167            self._waitingQueue.dequeue(idx_timestamp, task)
168        elif moveFrom == base.TASK_STATUS_FAILED:
169            self._failedIndex.unindex_obj(task)
170
171        # index it either in finished or failed
172        # (or queue it up again)
173        if status == base.TASK_STATUS_FINISHED:
174            self._finishedIndex.index_obj(occurrence)
175        elif status in [base.TASK_STATUS_FAILED,
176                        base.TASK_STATUS_TERMINATED]:
177            self._failedIndex.index_obj(occurrence)
178        elif status == base.TASK_STATUS_QUEUED:
179            self.addTaskToWaitingQueue(occurrence)
180
181    def changeTaskStartDate(self, oldTS, task):
182        i = 0
183
184        newTS = int_timestamp(task.getStartOn())
185
186        # enqueue-dequeue
187        try:
188            self._waitingQueue.dequeue(oldTS, task)
189        except:
190            logging.getLogger('scheduler').error(
191                "%s was supposed to be changed but it was not "
192                "found in the waiting queue!" % task)
193            return
194
195        self._waitingQueue.enqueue(newTS, task)
196
197        logging.getLogger('scheduler').info(
198            '%s moved from bin %s to %s...' % (task, oldTS, newTS))
199
200
201    def addTaskToWaitingQueue(self, task, index=False):
202
203        if index:
204            self._indexTask(task)
205
206        # get an int timestamp
207        timestamp = int_timestamp(task.getStartOn())
208
209        self._waitingQueue.enqueue(timestamp, task)
210
211        # make it "officially" queued
212        task.setStatus(base.TASK_STATUS_QUEUED)
213
214        logging.getLogger('scheduler').debug(
215            'Added %s to waitingQueue..' % task)
216
217    def popNextWaitingTask(self):
218        return self._waitingQueue.pop()
219
220    def peekNextWaitingTask(self):
221        return self._waitingQueue.peek()
222
223    def removeWaitingTask(self, timestamp, task):
224        return self._waitingQueue.dequeue(timestamp, task)
225
226    def getRunningList(self):
227        return self._runningList
228
229    def getWaitingQueue(self):
230        return self._waitingQueue
231
232    def getFailedIndex(self):
233        return self._failedIndex
234
235    def getFinishedIndex(self):
236        return self._finishedIndex
237
238    def getTaskIndex(self):
239        return self._taskIdx
240
241    def setSchedulerRunningStatus(self, status):
242        self._schedulerStatus = status
243
244    def addTaskToRunningList(self, task):
245
246        logging.getLogger('scheduler').debug(
247             'Added task %s to runningList..' % task.id)
248        self._runningList.append(task)
249        self._p_changed = True
250
251    ## def removeTaskFromQueue(self, task):
252    ##     """
253    ##     """
254
255    ##     index = None
256
257    ##     self._waitingQueue.dequeue(task)
258
259    ## def removeTask(self, task):
260    ##     """
261    ##     Remove a task, no matter what is its current state
262    ##     """
263
264    ##     # TODO - implement this using task code?
265
266    ##     # Task still running - throw exception
267    ##     if task.state  == base.TASK_STATUS_RUNNING:
268    ##         raise base.TaskStillRunningException(task)
269
270    ##     # task has failed - remove it from 'failed' index
271    ##     elif task.state == base.TASK_STATUS_FAILED:
272    ##         self._failedIndex.unindex_doc(task.id)
273
274    ##     # task has finished - remove it from 'finished' index
275    ##     elif task.state == base.TASK_STATUS_FINISHED:
276    ##         self._finishedIndex.unindex_doc(task.id)
277
278    ##     # task is queued - removed it from queue
279    ##     elif task.state == base.TASK_STATUS_QUEUED:
280    ##         self.removeTaskFromQueue(task)
281
282
283    ##     # task not found - throw exception
284    ##     else:
285    ##         raise base.TaskInconsistentStatusException()
286
287    ## def deleteTask(self, task):
288    ##     """
289    ##     Add a task to the TrashCanManager.
290    ##     No unindexing is done by this method
291    ##     """
292
293    ##     self.removeTask(task)
294
295    ##     TrashCanManager().add(task)
296
297
298
Note: See TracBrowser for help on using the repository browser.