source: indico/indico/modules/scheduler/tasks.py @ 7257af

burotelhello-world-walkthroughipv6v0.98-seriesv0.98.2v0.98.3v0.98b1v0.98b2v0.99v1.0v1.1
Last change on this file since 7257af was 7257af, checked in by Pedro Ferreira <jose.pedro.ferreira@…>, 3 years ago

[FIX] Several bug fixes

  • Periodic tasks - and some tests added;
  • Running tasks on demand now works;
  • Still a lot of tests to be added;
  • Occurrence calculation;
  • Aesthetic improvements;
  • Property mode set to 100644
File size: 18.5 KB
Line 
1# -*- coding: utf-8 -*-
2##
3##
4## This file is part of CDS Indico.
5## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007 CERN.
6##
7## CDS Indico is free software; you can redistribute it and/or
8## modify it under the terms of the GNU General Public License as
9## published by the Free Software Foundation; either version 2 of the
10## License, or (at your option) any later version.
11##
12## CDS Indico is distributed in the hope that it will be useful, but
13## WITHOUT ANY WARRANTY; without even the implied warranty of
14## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15## General Public License for more details.
16##
17## You should have received a copy of the GNU General Public License
18## along with CDS Indico; if not, write to the Free Software Foundation, Inc.,
19## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
20
21import copy
22import logging
23import time
24from dateutil import rrule
25
26from datetime import timedelta
27from pytz import timezone
28from pytz import common_timezones
29import zope.interface
30
31# Required by specific tasks
32from MaKaC.user import Avatar
33from MaKaC.i18n import _
34from MaKaC.common.Configuration import Config
35from MaKaC.common.info import HelperMaKaCInfo
36from MaKaC.common.Counter import Counter
37# end required
38
39from persistent import Persistent
40from BTrees.IOBTree import IOBTree
41
42from indico.util.fossilize import fossilizes, Fossilizable
43from indico.util.date_time import nowutc, int_timestamp
44from indico.modules.scheduler.fossils import ITaskFossil, ITaskOccurrenceFossil
45from indico.modules.scheduler import base
46from indico.core.index import IUniqueIdProvider, IIndexableByArbitraryDateTime
47
48"""
49Defines base classes for tasks, and some specific tasks as well
50"""
51
52class TimedEvent(Persistent, Fossilizable):
53
54    zope.interface.implements(IUniqueIdProvider,
55                              IIndexableByArbitraryDateTime)
56
57    def getIndexingDateTime(self):
58        # just get current date/time
59        return int_timestamp(nowutc())
60
61    def __conform__(self, proto):
62
63        if proto == IIndexableByArbitraryDateTime:
64            return self.getIndexingDateTime()
65        else:
66            return None
67
68
69
70class BaseTask(TimedEvent):
71    """
72    A base class for tasks.
73    `expiryDate` is the last point in time when the task can run. A task will refuse
74    to run if current time is past `expiryDate`
75    """
76
77    fossilizes(ITaskFossil)
78
79    def __init__(self, expiryDate=None):
80        self.createdOn = nowutc()
81        self.expiryDate = expiryDate
82        self.typeId = self.__class__.__name__
83        self.id = None
84        self.reset()
85
86        self.startedOn = None
87        self.endedOn = None
88
89    def reset(self):
90        '''Resets a task to its state before being run'''
91
92        self.running = False
93        self.onRunningListSince = None
94
95    # Time methods
96
97    def getCreatedOn(self):
98        return self.createdOn
99
100    def getEndedOn(self):
101        return self.endedOn
102
103    def setEndedOn(self, dateTime):
104        self.endedOn = dateTime
105
106    def getStartedOn(self):
107        return self.startedOn
108
109    def setOnRunningListSince(self, sometime):
110        self.onRunningListSince = sometime
111        self._p_changed = 1
112
113    def getOnRunningListSince(self):
114        return self.onRunningListSince
115
116    def setStatus(self, newstatus):
117        if hasattr(self, '_v_logger'):
118            self._v_logger.info("set status %s" % newstatus)
119        self.status = newstatus
120
121    def getStatus(self):
122        return self.status
123
124    def getId(self):
125        return self.id
126
127    def getUniqueId(self):
128        return "task%s" % self.id
129
130    def getTypeId(self):
131        return self.typeId
132
133    def initialize(self, newid, newstatus):
134        self.id = newid
135        self.status = newstatus
136
137    def plugLogger(self, logger):
138        self._v_logger = logger
139
140    def getLogger(self):
141        if not getattr(self, '_v_logger') or not self._v_logger:
142            self._v_logger = logging.getLogger('task/%s' % self.typeId)
143        return self._v_logger
144
145    def prepare(self):
146        """
147        This information will be saved regardless of the task being repeated or not
148        """
149
150        tsDiff = int_timestamp(nowutc()) - int_timestamp(self.getStartOn())
151
152        if tsDiff < 0:
153            self.getLogger().debug('Task %s will wait for some time. (%s) > (%s)' % (self.id, self.getStartOn(), nowutc()))
154            time.sleep(tsDiff)
155
156        if self.expiryDate and nowutc() > self.expiryDate:
157            self.getLogger().warning('Task %s will not be executed, expiryDate (%s) < current time (%s)' % (self.id, self.expiryDate, nowutc()))
158            return False
159
160        self.startedOn = nowutc()
161        self.running = True
162        self.status = base.TASK_STATUS_RUNNING
163
164    def start(self):
165
166        try:
167            self.run()
168        finally:
169            self.running = False
170            self.endedOn = nowutc()
171
172    def tearDown(self):
173        '''If a task needs to do something once it has run and been removed from runningList
174        overload this method'''
175        pass
176
177    def __str__(self):
178        return "<%s %s %s %s>" % (self.typeId, self.id, self.status, self.getStartOn())
179
180
181class OneShotTask(BaseTask):
182    """
183    Tasks that are executed only once
184    """
185
186    def __init__(self, startDateTime, expiryDate = None):
187        super(OneShotTask, self).__init__(expiryDate = expiryDate)
188        self.startDateTime = startDateTime
189
190    def getStartOn(self):
191        return self.startDateTime
192
193    def setStartOn(self, newtime):
194        self.startDateTime = newtime
195
196
197class PeriodicTask(BaseTask):
198    """
199    Tasks that should be executed at regular intervals
200    """
201
202    def __init__(self, frequency, **kwargs):
203        """
204        - frequency - a valid dateutil frequency specifier (DAILY, HOURLY, etc...)
205        """
206        super(PeriodicTask, self).__init__()
207
208        self._nextOccurrence = None
209        self._lastFinishedOn = None
210        self._occurrences = IOBTree()
211        self._occurrenceCount = 0
212        self._repeat = True
213
214        if 'dtstart' not in kwargs:
215            kwargs['dtstart'] = nowutc()
216
217
218        self._rule = rrule.rrule(
219            frequency,
220            **kwargs
221            )
222
223        self._nextOccurrence = self._rule.after(kwargs['dtstart'],
224                                                inc = True)
225
226
227
228    def start(self):
229        super(PeriodicTask, self).start()
230
231    def tearDown(self):
232        super(PeriodicTask, self).tearDown()
233
234    def setNextOccurrence(self, dateAfter = None):
235
236        if not self._nextOccurrence:
237            # if it's None already, it means there's no "future"
238            return
239
240        if not dateAfter:
241            dateAfter = nowutc()
242
243        # find next date after
244        nextOcc = self._rule.after(self._nextOccurrence,
245                                   inc = False)
246
247        # repeat process till a satisfactory date is found
248        # or there is nothing left to check
249        while nextOcc and nextOcc < dateAfter:
250            nextOcc = self._rule.after(nextOcc,
251                                       inc = False)
252
253        self._nextOccurrence = nextOcc
254        return nextOcc
255
256    def getStartOn(self):
257        return self._nextOccurrence
258
259    def getLastFinishedOn(self):
260        return self._lastFinishedOn
261
262    def addOccurrence(self, occurrence):
263        occurrence.setId(self._occurrenceCount)
264        self._occurrences[self._occurrenceCount] = occurrence
265        self._occurrenceCount += 1
266
267    def dontComeBack(self):
268        self._repeat = False
269
270    def shouldComeBack(self):
271        return self._repeat
272
273
274class PeriodicUniqueTask(PeriodicTask):
275    """
276    Singleton periodic tasks: no two or more PeriodicUniqueTask of this
277    class will be queued or running at the same time
278    """
279
280
281class TaskOccurrence(TimedEvent):
282    """
283    Wraps around a PeriodicTask object, and represents an occurrence of this task
284    """
285
286    fossilizes(ITaskOccurrenceFossil)
287
288
289    def __init__(self, task):
290        self._task = task
291        self._startedOn = task.getStartedOn()
292        self._endedOn = task.getEndedOn()
293        self._id = None
294
295    def getId(self):
296        return self._id
297
298    def getUniqueId(self):
299        return "%s:%s" % (self._task.getUniqueId(), self._id)
300
301    def setId(self, occId):
302        self._id = occId
303
304    def getStartedOn(self):
305        return self._startedOn
306
307    def getEndedOn(self):
308        return self._endedOn
309
310    def getTask(self):
311        return self._task
312
313class CategoryStatisticsUpdaterTask(PeriodicUniqueTask):
314    '''Updates statistics associated with categories
315    '''
316    def __init__(self, cat, **kwargs):
317        super(CategoryStatisticsUpdaterTask, self).__init__(**kwargs)
318        self._cat = cat
319
320    def run(self):
321        from MaKaC.statistics import CategoryStatistics
322        CategoryStatistics.updateStatistics(self._cat)
323
324
325# TODO CERN Specific
326class FoundationSyncTask(PeriodicUniqueTask):
327    """
328    Synchronizes room data (along with associated room managers
329    and equipment) with Foundation database.
330
331    Also, updates list of CERN Official Holidays
332
333    (This is object for a task class)
334    """
335    def __init__(self, **kwargs):
336        super(FoundationSyncTask, self).__init__(**kwargs)
337        obj.__init__(self)
338
339    def run(self):
340        from MaKaC.common.FoundationSync.foundationSync import FoundationSync
341        FoundationSync().doAll()
342
343
344class SendMailTask(OneShotTask):
345    """
346    """
347    def __init__(self, **kwargs):
348        super(SendMailTask, self).__init__(**kwargs)
349        self.fromAddr = ""
350        self.toAddr = []
351        self.toUser = []
352        self.ccAddr = []
353        self.subject = ""
354        self.text = ""
355        self.smtpServer = Config.getInstance().getSmtpServer()
356
357    def run(self):
358        import smtplib
359        from MaKaC.webinterface.mail import GenericMailer, GenericNotification
360
361        addrs = [smtplib.quoteaddr(x) for x in self.toAddr]
362        ccaddrs = [smtplib.quoteaddr(x) for x in self.ccAddr]
363
364        for user in self.toUser:
365            addrs.append(smtplib.quoteaddr(user.getEmail()))
366
367        GenericMailer.send(GenericNotification({"fromAddr": self.fromAddr,
368                                                "toList": addrs,
369                                                "ccList": ccaddrs,
370                                                "subject": self.subject,
371                                                "body": self.text }))
372
373    def getConference(self):
374        return self.conf
375
376    def setFromAddr(self, addr):
377        self.fromAddr = addr
378        self._p_changed = 1
379
380    def getFromAddr(self):
381        return self.fromAddr
382
383    def initialiseToAddr( self ):
384        self.toAddr = []
385        self._p_changed=1
386
387    def addToAddr(self, addr):
388        if not addr in self.toAddr:
389            self.toAddr.append(addr)
390            self._p_changed=1
391
392    def addCcAddr(self, addr):
393        if not addr in self.ccAddr:
394            self.ccAddr.append(addr)
395            self._p_changed=1
396
397    def removeToAddr(self, addr):
398        if addr in self.toAddr:
399            self.toAddr.remove(addr)
400            self._p_changed=1
401
402    def setToAddrList(self, addrList):
403        """Params: addrList -- addresses of type : list of str."""
404        self.toAddr = addrList
405        self._p_changed=1
406
407    def getToAddrList(self):
408        return self.toAddr
409
410    def setCcAddrList(self, addrList):
411        """Params: addrList -- addresses of type : list of str."""
412        self.ccAddr = addrList
413        self._p_changed=1
414
415    def getCcAddrList(self):
416        return self.ccAddr
417
418    def addToUser(self, user):
419        if not user in self.toUser:
420            self.toUser.append(user)
421            self._p_changed=1
422
423    def removeToUser(self, user):
424        if user in self.toUser:
425            self.toUser.remove(user)
426            self._p_changed=1
427
428    def getToUserList(self):
429        return self.toUser
430
431    def setSubject(self, subject):
432        self.subject = subject
433
434    def getSubject(self):
435        return self.subject
436
437    def setText(self, text):
438        self.text = text
439
440    def getText(self):
441        return self.text
442
443
444
445class AlarmTask(SendMailTask):
446    """
447    implement an alarm componment
448    """
449    def __init__(self, conf, **kwargs):
450        super(AlarmTask, self).__init__(**kwargs)
451        self.conf = conf
452        self.timeBefore = None
453        self.text = ""
454        self.note = ""
455        self.confSumary = False
456        self.toAllParticipants = False
457
458    def getToAllParticipants(self):
459        try:
460            return self.toAllParticipants
461        except:
462            self.toAllParticipants = False
463            return self.toAllParticipants
464
465    def setToAllParticipants(self, toAllParticipants):
466        self.toAllParticipants = toAllParticipants
467
468    def clone(self, conference):
469        alarm = AlarmTask(conference)
470        alarm.initialiseToAddr()
471        for addr in self.getToAddrList():
472            alarm.addToAddr(addr)
473        alarm.setFromAddr(self.getFromAddr())
474        alarm.setSubject(self.getSubject())
475        alarm.setConfSumary(self.getConfSumary())
476        alarm.setNote(self.getNote())
477        alarm.setText(self.getText())
478        if self.getTimeBefore():
479            alarm.setTimeBefore(copy.copy(self.getTimeBefore()))
480        else:
481            alarm.setStartDate(copy.copy(self.getStartDate()))
482        alarm.setToAllParticipants(self.getToAllParticipants())
483        return alarm
484
485    def getStartDate(self):
486        if self.timeBefore:
487            return self.conf.getStartDate() - self.timeBefore
488        else:
489            return task.getStartDate(self)
490
491    def getAdjustedStartDate(self,tz=None):
492        if not tz:
493            tz = self.conf.getTimezone()
494        if tz not in common_timezones:
495           tz = 'UTC'
496        if self.timeBefore:
497            return self.conf.getStartDate().astimezone(timezone(tz)) - self.timeBefore
498        else:
499            if task.getStartDate(self):
500                return task.getStartDate(self).astimezone(timezone(tz))
501            return None
502        if self.getStartDate():
503            return self.getStartDate().astimezone(timezone(tz))
504        return None
505
506    def setTimeBefore(self, timeDelta):
507        #we don't need startDate if timeBefore is set
508        self.timeBefore = timeDelta
509        self.startDate = None
510        self._p_changed=1
511
512    def getTimeBefore(self):
513        return self.timeBefore
514
515    def addToUser(self, user):
516        super(AlarmTask, self).addToUser(user)
517        if isinstance(user, Avatar):
518            user.linkTo(self, "to")
519
520    def removeToUser(self, user):
521        super(AlarmTask, self).removeToUser(user)
522        if isinstance(user, Avatar):
523            user.unlinkTo(self, "to")
524
525    def getText(self):
526        return self.text
527
528    def getLocator(self):
529        d = self.conf.getLocator()
530        d["alarmId"] = self.getId()
531        return d
532
533    def canAccess(self, aw):
534        return self.conf.canAccess(aw)
535
536    def canModify(self, aw):
537        return self.conf.canModify(aw)
538
539    def _setMailText(self):
540        text = self.text
541        if self.note:
542            text = text + "Note: %s" % self.note
543        if self.confSumary:
544            #try:
545                from MaKaC.common.output import outputGenerator
546                from MaKaC.accessControl import AdminList, AccessWrapper
547                import MaKaC.webinterface.urlHandlers as urlHandlers
548                admin = AdminList().getInstance().getList()[0]
549                aw = AccessWrapper()
550                aw.setUser(admin)
551                path = Config.getInstance().getStylesheetsDir()
552                if os.path.exists("%s/text.xsl" % path):
553                    stylepath = "%s/text.xsl" % path
554                outGen = outputGenerator(aw)
555                vars = { \
556                        "modifyURL": urlHandlers.UHConferenceModification.getURL( self.conf ), \
557                        "sessionModifyURLGen": urlHandlers.UHSessionModification.getURL, \
558                        "contribModifyURLGen": urlHandlers.UHContributionModification.getURL, \
559                        "subContribModifyURLGen":  urlHandlers.UHSubContribModification.getURL, \
560                        "materialURLGen": urlHandlers.UHMaterialDisplay.getURL, \
561                        "resourceURLGen": urlHandlers.UHFileAccess.getURL }
562                confText = outGen.getOutput(self.conf,stylepath,vars)
563                text += "\n\n\n" + confText
564            #except:
565            #    text += "\n\n\nSorry could not embed text version of the agenda..."
566        super(AlarmTask, self).setText(text)
567
568    def setNote(self, note):
569        self.note = note
570        self._setMailText()
571        self._p_changed=1
572
573    def getNote(self):
574        return self.note
575
576    def setConfSumary(self, val):
577        self.confSumary = val
578        self._setMailText()
579        self._p_changed=1
580
581    def getConfSumary(self):
582        return self.confSumary
583
584    def run(self):
585
586        # Date checkings...
587        from MaKaC.conference import ConferenceHolder
588        if not ConferenceHolder().hasKey(self.conf.getId()) or \
589                self.conf.getStartDate() <= nowutc():
590           self.conf.removeAlarm(self)
591           return True
592        # Email
593        self.setSubject("Event reminder: %s"%self.conf.getTitle())
594        try:
595            locationText = self.conf.getLocation().getName()
596            if self.conf.getLocation().getAddress() != "":
597                locationText += ", %s" % self.conf.getLocation().getAddress()
598            if self.conf.getRoom().getName() != "":
599                locationText += " (%s)" % self.conf.getRoom().getName()
600        except:
601            locationText = ""
602        if locationText != "":
603            locationText = _(""" _("Location"): %s""") % locationText
604
605        if self.getToAllParticipants() :
606            for p in self.conf.getParticipation().getParticipantList():
607                self.addToUser(p)
608
609        from MaKaC.webinterface import urlHandlers
610        if Config.getInstance().getShortEventURL() != "":
611            url = "%s%s" % (Config.getInstance().getShortEventURL(),self.conf.getId())
612        else:
613            url = urlHandlers.UHConferenceDisplay.getURL( self.conf )
614        self.setText("""Hello,
615    Please note that the event "%s" will start on %s (%s).
616    %s
617
618    You can access the full event here:
619    %s
620
621Best Regards
622
623    """%(self.conf.getTitle(),\
624                self.conf.getAdjustedStartDate().strftime("%A %d %b %Y at %H:%M"),\
625                self.conf.getTimezone(),\
626                locationText,\
627                url,\
628                ))
629        self._setMailText()
630        return False
631
632
633class SampleOneShotTask(OneShotTask):
634    def run(self):
635        self.getLogger().debug('Now i shall sleeeeeeeep!')
636        time.sleep(1)
637        self.getLogger().debug('%s executed' % self.__class__.__name__)
638
639
640class SamplePeriodicTask(PeriodicTask):
641    def run(self):
642        time.sleep(1)
643        self.getLogger().debug('%s executed' % self.__class__.__name__)
Note: See TracBrowser for help on using the repository browser.