source: indico/indico/tests/python/unit/indico_tests/modules_tests/scheduler_tests/server_test.py @ 41ca8c

burotelhello-world-walkthroughipv6v0.98-seriesv0.98.2v0.98.3v0.98b1v0.98b2v0.99v1.0v1.1
Last change on this file since 41ca8c was 41ca8c, checked in by Pedro Ferreira <jose.pedro.ferreira@…>, 3 years ago

[IMP] Faster tests

  • The execution time is now acceptable;
  • Abstracted functions that count time (TimeSource?);
  • Property mode set to 100644
File size: 10.7 KB
Line 
1# -*- coding: utf-8 -*-
2##
3##
4## This file is part of CDS Indico.
5## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007 CERN.
6##
7## CDS Indico is free software; you can redistribute it and/or
8## modify it under the terms of the GNU General Public License as
9## published by the Free Software Foundation; either version 2 of the
10## License, or (at your option) any later version.
11##
12## CDS Indico is distributed in the hope that it will be useful, but
13## WITHOUT ANY WARRANTY; without even the implied warranty of
14## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15## General Public License for more details.
16##
17## You should have received a copy of the GNU General Public License
18## along with CDS Indico; if not, write to the Free Software Foundation, Inc.,
19## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
20
21"""
22Tests for scheduler base classes
23"""
24import unittest, threading, multiprocessing
25import time
26from datetime import timedelta
27from dateutil import rrule
28
29from MaKaC.common.db import DBMgr
30
31from indico.util.date_time import nowutc
32from indico.modules.scheduler import Scheduler, SchedulerModule, Client, base
33from indico.modules.scheduler.tasks import OneShotTask, PeriodicTask
34
35terminated = None
36
37
38class TestTimeSource(base.TimeSource):
39    def __init__(self, factor):
40        self._startTime = nowutc()
41        self._factor = factor
42
43    def getCurrentTime(self):
44        realDiff = nowutc() - self._startTime
45        seconds = (realDiff.seconds + realDiff.microseconds / 1E6)
46
47        fakeDiff = timedelta(seconds = seconds*self._factor)
48
49        return self._startTime + fakeDiff
50
51    def sleep(self, amount):
52        time.sleep(amount/float(self._factor))
53
54
55base.TimeSource.set(TestTimeSource(5))
56
57class TestTask(OneShotTask):
58
59    def __init__(self, myid, date_time):
60        super(TestTask, self).__init__(date_time)
61        self._id = myid
62
63    def run(self):
64        base.TimeSource.get().sleep(1)
65        terminated[self._id] = 1
66
67
68class TestFailTask(TestTask):
69    def run(self):
70        base.TimeSource.get().sleep(1)
71        terminated[self._id] = 1
72        raise Exception('I fail!')
73
74
75class TestPeriodicTask(PeriodicTask):
76    def __init__(self, myid, freq, **kwargs):
77        super(TestPeriodicTask, self).__init__(freq, **kwargs)
78        self._id = myid
79
80    def run(self):
81        terminated[self._id] += 1
82
83
84class TestPeriodicFailTask(PeriodicTask):
85    def __init__(self, myid, freq, **kwargs):
86        super(TestPeriodicFailTask, self).__init__(freq, **kwargs)
87        self._id = myid
88
89    def run(self):
90
91        terminated[self._id] += 1
92
93        if terminated[self._id] % 2 == 0:
94            raise Exception('I fail! %s' % terminated[self._id])
95
96
97class Worker(threading.Thread):
98
99    def __init__(self, myid, task_t, time, **kwargs):
100        super(Worker, self).__init__()
101        self._id = myid
102        self.finished = False
103        self._task_t = task_t
104        self._time = time
105        self._extra_args = kwargs
106
107    def run(self):
108        DBMgr.getInstance().startRequest()
109        c = Client()
110        c.enqueue(self._task_t(self._id, self._time, **self._extra_args))
111        DBMgr.getInstance().endRequest()
112        pass
113
114
115class SchedulerThread(threading.Thread):
116
117    def __init__(self, mode):
118        super(SchedulerThread, self).__init__()
119        self._mode = mode
120
121    def run(self):
122        DBMgr.getInstance().startRequest()
123        s = Scheduler(sleep_interval = 1,
124                      task_max_tries = 1,
125                      multitask_mode = self._mode)
126        self.result = s.run()
127        DBMgr.getInstance().endRequest()
128
129
130class _TestScheduler(unittest.TestCase):
131
132    def setUp(self):
133
134        DBMgr.getInstance()._conn = {}
135
136        DBMgr.getInstance().startRequest()
137        self._smodule = SchedulerModule.getDBInstance()
138        DBMgr.getInstance().commit()
139
140        self._sched = SchedulerThread(self._mode)
141        self._sched.start()
142
143    def _checkWorkersFinished(self, timeout, value=1):
144
145        global terminated
146
147        timewaited = 0
148
149        for w in self._workers:
150            while terminated[w] != value:
151                # timeout at 10 sec
152                if timewaited < timeout:
153                    base.TimeSource.get().sleep(1)
154                    timewaited += 1
155                else:
156                    # print "bad news... timeout @ %s, value=%s" % \
157                    #       (w, terminated[w])
158                    return False
159        return True
160
161    def _startSomeWorkers(self, type_tasks, time_tasks, **kwargs):
162        self._workers = {}
163
164        global terminated
165        terminated= multiprocessing.Array('i',[0]*len(type_tasks))
166
167        for i in range(0, len(type_tasks)):
168            w = Worker(i, type_tasks[i], time_tasks[i], **kwargs)
169            w.start()
170            self._workers[i] = w
171
172        for i in range(0, len(type_tasks)):
173            self._workers[i].join()
174
175    def _shutdown(self):
176        c = Client()
177        c.shutdown()
178
179        DBMgr.getInstance().commit()
180
181        self._sched.join()
182
183    def _assertStatus(self, expectedStatus):
184        DBMgr.getInstance().sync()
185        status = self._smodule.getStatus()
186
187        self.assertEqual(status, expectedStatus)
188
189    def testSimpleFinish(self):
190        """
191        Creating 1 tasks that will succeed
192        """
193
194        self._startSomeWorkers([TestTask],
195                               [base.TimeSource.get().getCurrentTime() + \
196                                timedelta(seconds=2)])
197        self.assertEqual(self._checkWorkersFinished(10),
198                         True)
199
200        self._shutdown()
201
202        self._assertStatus({'state': False,
203                            'waiting': 0,
204                            'running': 0,
205                            'spooled': 0,
206                            'finished': 1,
207                            'failed': 0})
208
209    def testPriority(self):
210        """
211        Checking that one task is executed before another
212        """
213
214        self._workers = {}
215
216        global terminated
217
218        terminated = multiprocessing.Array('i', [0, 0])
219
220        terminated[0] = 0
221        terminated[1] = 0
222
223        w1 = Worker(0, TestTask, base.TimeSource.get().getCurrentTime() + \
224                    timedelta(seconds=4))
225        w1.start()
226
227        base.TimeSource.get().sleep(1)
228
229        w2 = Worker(1, TestTask, base.TimeSource.get().getCurrentTime() + \
230                    timedelta(seconds=0))
231        w2.start()
232
233        self._workers[0] = w1
234        self._workers[1] = w2
235
236        w1.join()
237        w2.join()
238
239        self.assertEqual(self._checkWorkersFinished(10),
240                         True)
241
242        c = Client()
243        c.shutdown()
244        DBMgr.getInstance().commit()
245
246        self._sched.join()
247
248        self._assertStatus({'state': False,
249                            'waiting': 0,
250                            'running': 0,
251                            'spooled': 0,
252                            'finished': 2,
253                            'failed': 0})
254
255
256        t1 = c.getTask(0)
257        t2 = c.getTask(1)
258
259
260        self.assertEqual(t1.endedOn > t2.endedOn, True)
261
262    def testSeveralFailFinish(self):
263        """
264        Creating 5 tasks, 2 of which will fail
265        """
266
267        self._startSomeWorkers([TestFailTask] * 2 +
268                               [TestTask] * 3,
269                               [base.TimeSource.get().getCurrentTime() + \
270                                timedelta(seconds=2)]*5)
271        self.assertEqual(self._checkWorkersFinished(10),
272                         True)
273
274        self._shutdown()
275
276        self._assertStatus({'state': False,
277                            'waiting': 0,
278                            'running': 0,
279                            'spooled': 0,
280                            'finished': 3,
281                            'failed': 2})
282
283    def testSeveralFailFinishWaiting(self):
284        """
285        Creating 10 tasks, 3 will fail and 2 still be waiting
286        """
287
288        self._startSomeWorkers([TestFailTask for i in range(0, 3)] +
289                               [TestTask for i in range(3, 10)],
290                               [base.TimeSource.get().getCurrentTime() + \
291                                timedelta(seconds=2)]*8 +
292                               [base.TimeSource.get().getCurrentTime() + \
293                                timedelta(minutes=200)]*2)
294
295        # Not all workers will have finished
296        self.assertEqual(self._checkWorkersFinished(10),
297                         False)
298
299        self._shutdown()
300
301        self._assertStatus({'state': False,
302                            'waiting': 2,
303                            'running': 0,
304                            'spooled': 0,
305                            'finished': 5,
306                            'failed': 3})
307
308    def testPeriodicTasks(self):
309        """
310        Creating 5 periodic tasks
311        """
312
313        now = base.TimeSource.get().getCurrentTime()
314
315        s = ((now.second / 10) + 1) % 6
316
317        seconds = [s*10]
318
319        # get intervals of 10 seconds
320        for i in range(0,2):
321            s = s + 1
322            seconds.append((s % 6) * 10)
323
324        self._startSomeWorkers([TestPeriodicTask] * 5,
325                               [rrule.MINUTELY] * 5,
326                               bysecond = tuple(seconds))
327
328        # Not all workers will have finished
329        self.assertEqual(self._checkWorkersFinished(60, value=3),
330                         True)
331
332        self._shutdown()
333
334        self._assertStatus({'state': False,
335                            'waiting': 5,
336                            'running': 0,
337                            'spooled': 0,
338                            'finished': 15,
339                            'failed': 0})
340
341    def testPeriodicFailTasks(self):
342        """
343        Creating 1 periodic task that fails every second time
344        """
345
346        now = base.TimeSource.get().getCurrentTime()
347
348        s = ((now.second / 10) + 1) % 6
349
350        seconds = [s*10]
351
352        # get intervals of 10 seconds
353        for i in range(0,3):
354            s = s + 1
355            seconds.append((s % 6) * 10)
356
357        self._startSomeWorkers([TestPeriodicFailTask],
358                               [rrule.MINUTELY],
359                               bysecond = tuple(seconds))
360
361        # All workers will have finished
362        self.assertEqual(self._checkWorkersFinished(60, value=4),
363                         True)
364
365        self._shutdown()
366
367        self._assertStatus({'state': False,
368                            'waiting': 1,
369                            'running': 0,
370                            'spooled': 0,
371                            'finished': 2,
372                            'failed': 2})
373
374 # TODO:
375 # some tasks running (test resume)
376 # some tasks spooled (test resume)
377
378    def tearDown(self):
379
380        self._smodule.destroyDBInstance()
381        DBMgr.getInstance().endRequest()
382
383class TestProcessScheduler(_TestScheduler):
384    _mode = 'processes'
385
386class TestThreadScheduler(_TestScheduler):
387    _mode = 'threads'
Note: See TracBrowser for help on using the repository browser.