Changeset 4ac79f9 in indico
- Timestamp:
- 03/28/11 16:31:43 (2 years ago)
- Branches:
- master, burotel, hello-world-walkthrough, ipv6, v0.98-series, v0.98.2, v0.98.3, v0.98b1, v0.98b2, v0.99, 051b2622c51afb171a1dedb46a0df4fbb0cbd02e, 4c7d4152dff271ba5df5a8606605969cab454080
- Children:
- 55c97c
- Parents:
- c745fd
- git-author:
- Pedro Ferreira <jose.pedro.ferreira@…> (03/28/11 16:19:24)
- git-committer:
- Pedro Ferreira <jose.pedro.ferreira@…> (03/28/11 16:31:43)
- Location:
- indico
- Files:
-
- 1 added
- 4 edited
-
MaKaC/webinterface/tpls/AdminPlugins.tpl (modified) (2 diffs)
-
ext/livesync/agent.py (modified) (4 diffs)
-
ext/livesync/bistate.py (modified) (2 diffs)
-
ext/livesync/components.py (modified) (1 diff)
-
ext/livesync/options.py (added)
Legend:
- Unmodified
- Added
- Removed
-
indico/MaKaC/webinterface/tpls/AdminPlugins.tpl
ra92d2a r4ac79f9 76 76 </td> 77 77 </tr> 78 <% if PluginType.getOptions() is not None and len(PluginType.getOptions()) > 0: %> 79 <% includeTpl('AdminPluginsOptionList', Object = PluginType, ObjectType = "PluginType", Favorites = Favorites, Index = 0, rbActive = rbActive, baseURL = baseURL) %> 80 <% end %> 81 <% elif True in self._notify('hasPluginSettings', PluginType.getId(), None): %> 78 <% if True in self._notify('hasPluginSettings', PluginType.getId(), None): %> 82 79 <tr> 83 80 <td> … … 87 84 </tr> 88 85 <% end %> 89 <% else: %> 86 <% if PluginType.getOptions() is not None and len(PluginType.getOptions()) > 0: %> 87 <% includeTpl('AdminPluginsOptionList', Object = PluginType, ObjectType = "PluginType", Favorites = Favorites, Index = 0, rbActive = rbActive, baseURL = baseURL) %> 88 <% end %> 89 <% if not PluginType.getOptions() and True not in self._notify('hasPluginSettings', PluginType.getId(), None): %> 90 90 <tr> 91 91 <td> -
indico/ext/livesync/agent.py
rc08b10 r4ac79f9 23 23 """ 24 24 # standard lib imports 25 import datetime, time 26 from threading import Thread 27 from Queue import Queue, Empty 25 import datetime 28 26 29 27 # dependency libs … … 41 39 from indico.ext.livesync.base import ILiveSyncAgentProvider, MPT_GRANULARITY 42 40 43 # legacy indico 44 from MaKaC.common import DBMgr 45 41 # legacy indico imports 42 from MaKaC import conference 46 43 47 44 class QueryException(Exception): … … 371 368 return self._agents 372 369 370 def objectExcluded(self, obj): 371 """ 372 Decides whether a particular object should be ignored or not 373 """ 374 excluded = getPluginType().getOption('excludedCategories').getValue() 375 if isinstance(obj, conference.Category): 376 return obj.getId() in excluded 377 elif isinstance(obj, conference.Conference): 378 return obj.getOwner().getId() in excluded 379 else: 380 return obj.getConference().getOwner().getId() in excluded 381 373 382 374 383 class RecordUploader(object): … … 416 425 417 426 return True 418 419 420 ###################421 # This code is now unused.422 #423 #424 425 class ThreadedRecordUploader(object):426 """427 A record uploading mechanism, based on worker threads428 """429 430 DEFAULT_BATCH_SIZE, DEFAULT_NUM_SLAVES = 1000, 2431 MAX_DELAYED_BATCHES = 5432 MAX_THREAD_REQUEST_TIME = 300433 434 def __init__(self, slaveClass, agent, logger,435 extraSlaveArgs=(),436 batchSize=DEFAULT_BATCH_SIZE,437 numSlaves=DEFAULT_NUM_SLAVES,438 maxDelayed=MAX_DELAYED_BATCHES,439 monitor=None):440 self._logger = logger441 self._agent = agent442 self._slaveClass = slaveClass443 self._batchSize = batchSize444 self._numSlaves = numSlaves445 self._queue = Queue()446 self._slaves = {}447 self._currentBatch = []448 self._extraSlaveArgs = extraSlaveArgs449 self._enqueued = 0450 self._monitor = monitor451 self._maxDelayed = maxDelayed452 453 def spawn(self):454 """455 Starts the uploader (spawns slave threads)456 """457 for i in range(0, self._numSlaves):458 self._slaves[i] = self._slaveClass("Uploader%s" % i,459 self._queue,460 self._logger,461 self._agent,462 *self._extraSlaveArgs)463 self._slaves[i].start()464 465 def enqueue(self, record):466 """467 Adds a record to the queue.468 We actually accumulate them and queue them in batches.469 """470 471 # when BATCH_SIZE is passed, enqueue it472 if len(self._currentBatch) > (self._batchSize - 1):473 self._queue.put(self._currentBatch)474 self._enqueued += len(self._currentBatch)475 self._currentBatch = []476 477 if self._monitor:478 self.reportStatus(self._monitor)479 480 self._currentBatch.append(record)481 482 def join(self):483 """484 Waits for the slave threads to finish working485 """486 487 result = True488 489 # first, if there is an incomplete batch remaining, enqueue it first490 if self._currentBatch:491 self._queue.put(self._currentBatch)492 493 # now, join the queue (wait for them to be uploaded)494 self._logger.debug('joining queue')495 self._queue.join()496 self._logger.debug('joining slaves')497 498 for slave in self._slaves.values():499 slave.terminate()500 slave.join()501 result &= (not slave.is_alive() and slave.result)502 503 return result504 505 def _checkThreadHealth(self):506 for i in range(0, self._numSlaves):507 slave = self._slaves[i]508 if time.time() > (slave._startTime + \509 ThreadedRecordUploader.MAX_THREAD_REQUEST_TIME) \510 and slave._dead == False:511 self._logger.warning("Slave '%s' seems to be dead. "512 "Adding another one and recovering batch." % \513 slave.getName())514 515 if slave._currentBatch:516 self._queue.put(slave._currentBatch)517 518 newSlave = self._slaveClass("Uploader%s" % self._numSlaves,519 self._queue,520 self._logger,521 self._agent,522 *self._extraSlaveArgs)523 self._slaves[self._numSlaves] = newSlave524 slave._dead = True525 self._numSlaves += 1526 527 newSlave.start()528 529 def iterateOver(self, iterator):530 """531 Consumes an iterator532 """533 # take operations and choose which records to send534 for entry in iterator:535 self.enqueue(entry)536 537 while (self._queue.qsize() > self._maxDelayed):538 self._logger.info('Too many delayed batches, sleeping')539 time.sleep(10)540 541 self._checkThreadHealth()542 543 def reportStatus(self, stream):544 totalUp = 0545 stream.write("%d enqueued\n" % self._enqueued)546 for i in range(0, self._numSlaves):547 slave = self._slaves[i]548 stream.write("\t [wrk %s] %d uploaded\n" % (slave.getName(),549 slave._uploaded))550 totalUp += slave._uploaded551 stream.write("%d uploaded (total)\n\n" % totalUp)552 stream.flush()553 554 555 class UploaderSlave(Thread):556 """557 A generic threaded "work slave" for agents558 """559 560 def __init__(self, name, queue, logger, agent):561 self._keepRunning = True562 self._logger = logger563 self._terminate = False564 self.result = True565 self._queue = queue566 self._name = name567 self._uploaded = 0568 self._agent = agent569 self._startTime = time.time()570 self._currentBatch = []571 self._dead = False572 573 super(UploaderSlave, self).__init__(name=name)574 575 def run(self):576 577 DBMgr.getInstance().startRequest()578 579 self._logger.debug('Worker [%s] started' % self._name)580 try:581 while True:582 taskFetched = False583 try:584 self._currentBatch = self._queue.get(True, 2)585 taskFetched = True586 self._startTime = time.time()587 self.result &= self._uploadBatch(self._currentBatch)588 self._uploaded += len(self._currentBatch)589 except Empty:590 pass591 finally:592 if taskFetched:593 self._queue.task_done()594 595 if self._terminate:596 break597 except:598 self._logger.exception('Worker [%s]:' % self._name)599 return 1600 finally:601 DBMgr.getInstance().endRequest()602 self._logger.debug('Worker [%s] finished' % self._name)603 self._logger.debug('Worker [%s] returning %s' % (self._name, self.result))604 605 def terminate(self):606 self._terminate = True607 608 def _uploadBatch(self, batch):609 """610 Sends a batch of records611 Overloaded by agent.612 """613 raise Exception("Unimplemented method")614 615 def _getMetadata(self, records):616 """617 Generates the metadata for a batch of records.618 Should be overloaded.619 """620 raise Exception("Unimplemented method") -
indico/ext/livesync/bistate.py
r655cb4 r4ac79f9 37 37 CACHE_SWEEP_LIMIT = 10000 38 38 39 # Attention: if this class is not declared, the LiveSync management interface40 # will never know this plugin exists!41 39 42 40 class BistateRecordProcessor(object): … … 107 105 obj = aw.getObject() 108 106 109 ## TODO: enable this, when nconfig is possible from interface110 ## if obj.canAccess(AccessWrapper(access)):107 ## TODO: enable this, when config is possible from interface 108 ## if not obj.canAccess(AccessWrapper(access)): 111 109 ## # no access? jump over this one 112 110 ## continue -
indico/ext/livesync/components.py
rd285dc r4ac79f9 74 74 Logger.get('ext.livesync').debug((objId, action)) 75 75 # TODO: remove redundant items 76 sm.add(timestamp, 77 ActionWrapper(timestamp, obj, actions, objId)) 76 if not sm.objectExcluded(obj): 77 sm.add(timestamp, 78 ActionWrapper(timestamp, obj, actions, objId)) 78 79 79 80 def requestRetry(self, obj, req, nretry):
Note: See TracChangeset
for help on using the changeset viewer.
