source: indico/indico/modules/scheduler/tasks.py @ 43f6a0

burotelhello-world-walkthroughipv6v0.98-seriesv0.98.2v0.98.3v0.98b1v0.98b2v0.99v1.0v1.1
Last change on this file since 43f6a0 was 43f6a0, checked in by Pedro Ferreira <jose.pedro.ferreira@…>, 3 years ago

[FIX] Alarms (partial)

  • Adapted alarms to fit the paradigm of the new task daemon;
  • Still alarm modification to take care of (broken);
  • Still migration script to complete;
  • Property mode set to 100644
File size: 17.7 KB
Line 
1# -*- coding: utf-8 -*-
2##
3##
4## This file is part of CDS Indico.
5## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007 CERN.
6##
7## CDS Indico is free software; you can redistribute it and/or
8## modify it under the terms of the GNU General Public License as
9## published by the Free Software Foundation; either version 2 of the
10## License, or (at your option) any later version.
11##
12## CDS Indico is distributed in the hope that it will be useful, but
13## WITHOUT ANY WARRANTY; without even the implied warranty of
14## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15## General Public License for more details.
16##
17## You should have received a copy of the GNU General Public License
18## along with CDS Indico; if not, write to the Free Software Foundation, Inc.,
19## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
20
21import copy, logging, time, os
22from dateutil import rrule
23
24from datetime import timedelta
25from pytz import timezone
26from pytz import common_timezones
27import zope.interface
28
29# Required by specific tasks
30from MaKaC.user import Avatar
31from MaKaC.i18n import _
32from MaKaC.common import Config
33from MaKaC.common.info import HelperMaKaCInfo
34from MaKaC.common.Counter import Counter
35# end required
36
37from persistent import Persistent
38from BTrees.IOBTree import IOBTree
39
40from indico.util.fossilize import fossilizes, Fossilizable
41from indico.util.date_time import nowutc, int_timestamp
42from indico.modules.scheduler.fossils import ITaskFossil, ITaskOccurrenceFossil
43from indico.modules.scheduler import base
44from indico.core.index import IUniqueIdProvider, IIndexableByArbitraryDateTime
45
46"""
47Defines base classes for tasks, and some specific tasks as well
48"""
49
50class TimedEvent(Persistent, Fossilizable):
51
52    zope.interface.implements(IUniqueIdProvider,
53                              IIndexableByArbitraryDateTime)
54
55    def getIndexingDateTime(self):
56        # just get current date/time
57        return int_timestamp(nowutc())
58
59    def __conform__(self, proto):
60
61        if proto == IIndexableByArbitraryDateTime:
62            return self.getIndexingDateTime()
63        else:
64            return None
65
66
67
68class BaseTask(TimedEvent):
69    """
70    A base class for tasks.
71    `expiryDate` is the last point in time when the task can run. A task will refuse
72    to run if current time is past `expiryDate`
73    """
74
75    fossilizes(ITaskFossil)
76
77    def __init__(self, expiryDate=None):
78        self.createdOn = nowutc()
79        self.expiryDate = expiryDate
80        self.typeId = self.__class__.__name__
81        self.id = None
82        self.reset()
83
84        self.startedOn = None
85        self.endedOn = None
86
87    def reset(self):
88        '''Resets a task to its state before being run'''
89
90        self.running = False
91        self.onRunningListSince = None
92
93    # Time methods
94
95    def getCreatedOn(self):
96        return self.createdOn
97
98    def getEndedOn(self):
99        return self.endedOn
100
101    def setEndedOn(self, dateTime):
102        self.endedOn = dateTime
103
104    def getStartedOn(self):
105        return self.startedOn
106
107    def setOnRunningListSince(self, sometime):
108        self.onRunningListSince = sometime
109        self._p_changed = 1
110
111    def getOnRunningListSince(self):
112        return self.onRunningListSince
113
114    def setStatus(self, newstatus):
115        if hasattr(self, '_v_logger'):
116            self._v_logger.info("set status %s" % newstatus)
117        self.status = newstatus
118
119    def getStatus(self):
120        return self.status
121
122    def getId(self):
123        return self.id
124
125    def getUniqueId(self):
126        return "task%s" % self.id
127
128    def getTypeId(self):
129        return self.typeId
130
131    def initialize(self, newid, newstatus):
132        self.id = newid
133        self.status = newstatus
134
135    def plugLogger(self, logger):
136        self._v_logger = logger
137
138    def getLogger(self):
139        if not getattr(self, '_v_logger') or not self._v_logger:
140            self._v_logger = logging.getLogger('task/%s' % self.typeId)
141        return self._v_logger
142
143    def prepare(self):
144        """
145        This information will be saved regardless of the task being repeated or not
146        """
147
148        tsDiff = int_timestamp(nowutc()) - int_timestamp(self.getStartOn())
149
150        if tsDiff < 0:
151            self.getLogger().debug('Task %s will wait for some time. (%s) > (%s)' % (self.id, self.getStartOn(), nowutc()))
152            time.sleep(tsDiff)
153
154        if self.expiryDate and nowutc() > self.expiryDate:
155            self.getLogger().warning('Task %s will not be executed, expiryDate (%s) < current time (%s)' % (self.id, self.expiryDate, nowutc()))
156            return False
157
158        self.startedOn = nowutc()
159        self.running = True
160        self.status = base.TASK_STATUS_RUNNING
161
162    def start(self):
163
164        try:
165            self.run()
166        finally:
167            self.running = False
168            self.endedOn = nowutc()
169
170    def tearDown(self):
171        '''If a task needs to do something once it has run and been removed from runningList
172        overload this method'''
173        pass
174
175    def __str__(self):
176        return "<%s %s %s %s>" % (self.typeId, self.id, self.status, self.getStartOn())
177
178
179class OneShotTask(BaseTask):
180    """
181    Tasks that are executed only once
182    """
183
184    def __init__(self, startDateTime, expiryDate = None):
185        super(OneShotTask, self).__init__(expiryDate = expiryDate)
186        self.startDateTime = startDateTime
187
188    def getStartOn(self):
189        return self.startDateTime
190
191    def setStartOn(self, newtime):
192        self.startDateTime = newtime
193
194
195class PeriodicTask(BaseTask):
196    """
197    Tasks that should be executed at regular intervals
198    """
199
200    def __init__(self, frequency, **kwargs):
201        """
202        - frequency - a valid dateutil frequency specifier (DAILY, HOURLY, etc...)
203        """
204        super(PeriodicTask, self).__init__()
205
206        self._nextOccurrence = None
207        self._lastFinishedOn = None
208        self._occurrences = IOBTree()
209        self._occurrenceCount = 0
210        self._repeat = True
211
212        if 'dtstart' not in kwargs:
213            kwargs['dtstart'] = nowutc()
214
215
216        self._rule = rrule.rrule(
217            frequency,
218            **kwargs
219            )
220
221        self._nextOccurrence = self._rule.after(kwargs['dtstart'],
222                                                inc = True)
223
224
225
226    def start(self):
227        super(PeriodicTask, self).start()
228
229    def tearDown(self):
230        super(PeriodicTask, self).tearDown()
231
232    def setNextOccurrence(self, dateAfter = None):
233
234        if not self._nextOccurrence:
235            # if it's None already, it means there's no "future"
236            return
237
238        if not dateAfter:
239            dateAfter = nowutc()
240
241        # find next date after
242        nextOcc = self._rule.after(self._nextOccurrence,
243                                   inc = False)
244
245        # repeat process till a satisfactory date is found
246        # or there is nothing left to check
247        while nextOcc and nextOcc < dateAfter:
248            nextOcc = self._rule.after(nextOcc,
249                                       inc = False)
250
251        self._nextOccurrence = nextOcc
252        return nextOcc
253
254    def getStartOn(self):
255        return self._nextOccurrence
256
257    def getLastFinishedOn(self):
258        return self._lastFinishedOn
259
260    def addOccurrence(self, occurrence):
261        occurrence.setId(self._occurrenceCount)
262        self._occurrences[self._occurrenceCount] = occurrence
263        self._occurrenceCount += 1
264
265    def dontComeBack(self):
266        self._repeat = False
267
268    def shouldComeBack(self):
269        return self._repeat
270
271
272class PeriodicUniqueTask(PeriodicTask):
273    """
274    Singleton periodic tasks: no two or more PeriodicUniqueTask of this
275    class will be queued or running at the same time
276    """
277
278
279class TaskOccurrence(TimedEvent):
280    """
281    Wraps around a PeriodicTask object, and represents an occurrence of this task
282    """
283
284    fossilizes(ITaskOccurrenceFossil)
285
286
287    def __init__(self, task):
288        self._task = task
289        self._startedOn = task.getStartedOn()
290        self._endedOn = task.getEndedOn()
291        self._id = None
292
293    def getId(self):
294        return self._id
295
296    def getUniqueId(self):
297        return "%s:%s" % (self._task.getUniqueId(), self._id)
298
299    def setId(self, occId):
300        self._id = occId
301
302    def getStartedOn(self):
303        return self._startedOn
304
305    def getEndedOn(self):
306        return self._endedOn
307
308    def getTask(self):
309        return self._task
310
311class CategoryStatisticsUpdaterTask(PeriodicUniqueTask):
312    '''Updates statistics associated with categories
313    '''
314    def __init__(self, cat, **kwargs):
315        super(CategoryStatisticsUpdaterTask, self).__init__(**kwargs)
316        self._cat = cat
317
318    def run(self):
319        from MaKaC.statistics import CategoryStatistics
320        CategoryStatistics.updateStatistics(self._cat)
321
322
323# TODO CERN Specific
324class FoundationSyncTask(PeriodicUniqueTask):
325    """
326    Synchronizes room data (along with associated room managers
327    and equipment) with Foundation database.
328
329    Also, updates list of CERN Official Holidays
330
331    (This is object for a task class)
332    """
333    def __init__(self, **kwargs):
334        super(FoundationSyncTask, self).__init__(**kwargs)
335        obj.__init__(self)
336
337    def run(self):
338        from MaKaC.common.FoundationSync.foundationSync import FoundationSync
339        FoundationSync().doAll()
340
341
342class SendMailTask(OneShotTask):
343    """
344    """
345    def __init__(self, startDateTime):
346        super(SendMailTask, self).__init__(startDateTime)
347        self.fromAddr = ""
348        self.toAddr = []
349        self.toUser = []
350        self.ccAddr = []
351        self.subject = ""
352        self.text = ""
353        self.smtpServer = Config.getInstance().getSmtpServer()
354
355    def run(self, check = False):
356        import smtplib
357        from MaKaC.webinterface.mail import GenericMailer, GenericNotification
358
359        # prepare the mail
360        self._prepare(check = check);
361
362        addrs = [smtplib.quoteaddr(x) for x in self.toAddr]
363        ccaddrs = [smtplib.quoteaddr(x) for x in self.ccAddr]
364
365        for user in self.toUser:
366            addrs.append(smtplib.quoteaddr(user.getEmail()))
367
368        GenericMailer.send(GenericNotification({"fromAddr": self.fromAddr,
369                                                "toList": addrs,
370                                                "ccList": ccaddrs,
371                                                "subject": self.subject,
372                                                "body": self.text }))
373
374    def getConference(self):
375        return self.conf
376
377    def setFromAddr(self, addr):
378        self.fromAddr = addr
379        self._p_changed = 1
380
381    def getFromAddr(self):
382        return self.fromAddr
383
384    def addToAddr(self, addr):
385        if not addr in self.toAddr:
386            self.toAddr.append(addr)
387            self._p_changed=1
388
389    def addCcAddr(self, addr):
390        if not addr in self.ccAddr:
391            self.ccAddr.append(addr)
392            self._p_changed=1
393
394    def removeToAddr(self, addr):
395        if addr in self.toAddr:
396            self.toAddr.remove(addr)
397            self._p_changed=1
398
399    def setToAddrList(self, addrList):
400        """Params: addrList -- addresses of type : list of str."""
401        self.toAddr = addrList
402        self._p_changed=1
403
404    def getToAddrList(self):
405        return self.toAddr
406
407    def setCcAddrList(self, addrList):
408        """Params: addrList -- addresses of type : list of str."""
409        self.ccAddr = addrList
410        self._p_changed=1
411
412    def getCcAddrList(self):
413        return self.ccAddr
414
415    def addToUser(self, user):
416        if not user in self.toUser:
417            self.toUser.append(user)
418            self._p_changed=1
419
420    def removeToUser(self, user):
421        if user in self.toUser:
422            self.toUser.remove(user)
423            self._p_changed=1
424
425    def getToUserList(self):
426        return self.toUser
427
428    def setSubject(self, subject):
429        self.subject = subject
430
431    def getSubject(self):
432        return self.subject
433
434    def setText(self, text):
435        self.text = text
436
437    def getText(self):
438        return self.text
439
440
441
442class AlarmTask(SendMailTask):
443    """
444    implement an alarm componment
445    """
446    def __init__(self, conf, confRelId, startDateTime):
447        super(AlarmTask, self).__init__(startDateTime)
448        self.conf = conf
449        self.timeBefore = None
450        self.text = ""
451        self.note = ""
452        self.confSumary = False
453        self.toAllParticipants = False
454        self._confRelId = confRelId
455
456    def getConfRelativeId(self):
457        return self._confRelId
458
459    def getToAllParticipants(self):
460        try:
461            return self.toAllParticipants
462        except:
463            self.toAllParticipants = False
464            return self.toAllParticipants
465
466    def setToAllParticipants(self, toAllParticipants):
467        self.toAllParticipants = toAllParticipants
468
469    def clone(self, conference):
470        alarm = AlarmTask(conference)
471        alarm.initialiseToAddr()
472        for addr in self.getToAddrList():
473            alarm.addToAddr(addr)
474        alarm.setFromAddr(self.getFromAddr())
475        alarm.setSubject(self.getSubject())
476        alarm.setConfSumary(self.getConfSumary())
477        alarm.setNote(self.getNote())
478        alarm.setText(self.getText())
479        alarm.setStartOn(copy.copy(self.getStartOn()))
480        alarm.setToAllParticipants(self.getToAllParticipants())
481        return alarm
482
483    def getTimeBefore(self):
484        return self.getConference().getStartDate() - self.getStartOn()
485
486    def addToUser(self, user):
487        super(AlarmTask, self).addToUser(user)
488        if isinstance(user, Avatar):
489            user.linkTo(self, "to")
490
491    def removeToUser(self, user):
492        super(AlarmTask, self).removeToUser(user)
493        if isinstance(user, Avatar):
494            user.unlinkTo(self, "to")
495
496    def getText(self):
497        return self.text
498
499    def getLocator(self):
500        d = self.conf.getLocator()
501        d["alarmId"] = self.getConfRelativeId()
502        return d
503
504    def canAccess(self, aw):
505        return self.conf.canAccess(aw)
506
507    def canModify(self, aw):
508        return self.conf.canModify(aw)
509
510    def _setMailText(self):
511        text = self.text
512        if self.note:
513            text = text + "Note: %s" % self.note
514        if self.confSumary:
515            #try:
516                from MaKaC.common.output import outputGenerator
517                from MaKaC.accessControl import AdminList, AccessWrapper
518                import MaKaC.webinterface.urlHandlers as urlHandlers
519                admin = AdminList().getInstance().getList()[0]
520                aw = AccessWrapper()
521                aw.setUser(admin)
522                path = Config.getInstance().getStylesheetsDir()
523                if os.path.exists("%s/text.xsl" % path):
524                    stylepath = "%s/text.xsl" % path
525                outGen = outputGenerator(aw)
526                vars = { \
527                        "modifyURL": urlHandlers.UHConferenceModification.getURL( self.conf ), \
528                        "sessionModifyURLGen": urlHandlers.UHSessionModification.getURL, \
529                        "contribModifyURLGen": urlHandlers.UHContributionModification.getURL, \
530                        "subContribModifyURLGen":  urlHandlers.UHSubContribModification.getURL, \
531                        "materialURLGen": urlHandlers.UHMaterialDisplay.getURL, \
532                        "resourceURLGen": urlHandlers.UHFileAccess.getURL }
533                confText = outGen.getOutput(self.conf,stylepath,vars)
534                text += "\n\n\n" + confText
535            #except:
536            #    text += "\n\n\nSorry could not embed text version of the agenda..."
537        super(AlarmTask, self).setText(text)
538
539    def setNote(self, note):
540        self.note = note
541        self._setMailText()
542        self._p_changed=1
543
544    def getNote(self):
545        return self.note
546
547    def setConfSumary(self, val):
548        self.confSumary = val
549        self._setMailText()
550        self._p_changed=1
551
552    def getConfSumary(self):
553        return self.confSumary
554
555    def _prepare(self, check = True):
556
557        # Date checks...
558        if check:
559            from MaKaC.conference import ConferenceHolder
560            if not ConferenceHolder().hasKey(self.conf.getId()) or \
561                   self.conf.getStartDate() <= nowutc():
562                self.conf.removeAlarm(self)
563                return True
564
565        # Email
566        self.setSubject("Event reminder: %s"%self.conf.getTitle())
567        try:
568            locationText = self.conf.getLocation().getName()
569            if self.conf.getLocation().getAddress() != "":
570                locationText += ", %s" % self.conf.getLocation().getAddress()
571            if self.conf.getRoom().getName() != "":
572                locationText += " (%s)" % self.conf.getRoom().getName()
573        except:
574            locationText = ""
575        if locationText != "":
576            locationText = _(""" _("Location"): %s""") % locationText
577
578        if self.getToAllParticipants() :
579            for p in self.conf.getParticipation().getParticipantList():
580                self.addToUser(p)
581
582        from MaKaC.webinterface import urlHandlers
583        if Config.getInstance().getShortEventURL() != "":
584            url = "%s%s" % (Config.getInstance().getShortEventURL(),self.conf.getId())
585        else:
586            url = urlHandlers.UHConferenceDisplay.getURL( self.conf )
587        self.setText("""Hello,
588    Please note that the event "%s" will start on %s (%s).
589    %s
590
591    You can access the full event here:
592    %s
593
594Best Regards
595
596    """%(self.conf.getTitle(),\
597                self.conf.getAdjustedStartDate().strftime("%A %d %b %Y at %H:%M"),\
598                self.conf.getTimezone(),\
599                locationText,\
600                url,\
601                ))
602        self._setMailText()
603        return False
604
605
606class SampleOneShotTask(OneShotTask):
607    def run(self):
608        self.getLogger().debug('Now i shall sleeeeeeeep!')
609        time.sleep(1)
610        self.getLogger().debug('%s executed' % self.__class__.__name__)
611
612
613class SamplePeriodicTask(PeriodicTask):
614    def run(self):
615        time.sleep(1)
616        self.getLogger().debug('%s executed' % self.__class__.__name__)
Note: See TracBrowser for help on using the repository browser.