Changeset 4ac79f9 in indico


Ignore:
Timestamp:
03/28/11 16:31:43 (2 years ago)
Author:
Pedro Ferreira <jose.pedro.ferreira@…>
Branches:
master, burotel, hello-world-walkthrough, ipv6, v0.98-series, v0.98.2, v0.98.3, v0.98b1, v0.98b2, v0.99, 051b2622c51afb171a1dedb46a0df4fbb0cbd02e, 4c7d4152dff271ba5df5a8606605969cab454080
Children:
55c97c
Parents:
c745fd
git-author:
Pedro Ferreira <jose.pedro.ferreira@…> (03/28/11 16:19:24)
git-committer:
Pedro Ferreira <jose.pedro.ferreira@…> (03/28/11 16:31:43)
Message:

[IMP] livesync - exclude categories

  • Also removed unnecessary code;
Location:
indico
Files:
1 added
4 edited

Legend:

Unmodified
Added
Removed
  • indico/MaKaC/webinterface/tpls/AdminPlugins.tpl

    ra92d2a r4ac79f9  
    7676        </td> 
    7777    </tr> 
    78     <% if PluginType.getOptions() is not None and len(PluginType.getOptions()) > 0: %> 
    79         <% includeTpl('AdminPluginsOptionList', Object = PluginType, ObjectType = "PluginType", Favorites = Favorites, Index = 0, rbActive = rbActive, baseURL = baseURL) %> 
    80     <% end %> 
    81     <% elif True in self._notify('hasPluginSettings', PluginType.getId(), None): %> 
     78    <% if True in self._notify('hasPluginSettings', PluginType.getId(), None): %> 
    8279    <tr> 
    8380        <td> 
     
    8784    </tr> 
    8885    <% end %> 
    89     <% else: %> 
     86    <% if PluginType.getOptions() is not None and len(PluginType.getOptions()) > 0: %> 
     87        <% includeTpl('AdminPluginsOptionList', Object = PluginType, ObjectType = "PluginType", Favorites = Favorites, Index = 0, rbActive = rbActive, baseURL = baseURL) %> 
     88    <% end %> 
     89    <% if not PluginType.getOptions() and True not in self._notify('hasPluginSettings', PluginType.getId(), None): %> 
    9090    <tr> 
    9191        <td> 
  • indico/ext/livesync/agent.py

    rc08b10 r4ac79f9  
    2323""" 
    2424# standard lib imports 
    25 import datetime, time 
    26 from threading import Thread 
    27 from Queue import Queue, Empty 
     25import datetime 
    2826 
    2927# dependency libs 
     
    4139from indico.ext.livesync.base import ILiveSyncAgentProvider, MPT_GRANULARITY 
    4240 
    43 # legacy indico 
    44 from MaKaC.common import DBMgr 
    45  
     41# legacy indico imports 
     42from MaKaC import conference 
    4643 
    4744class QueryException(Exception): 
     
    371368        return self._agents 
    372369 
     370    def objectExcluded(self, obj): 
     371        """ 
     372        Decides whether a particular object should be ignored or not 
     373        """ 
     374        excluded = getPluginType().getOption('excludedCategories').getValue() 
     375        if isinstance(obj, conference.Category): 
     376            return obj.getId() in excluded 
     377        elif isinstance(obj, conference.Conference): 
     378            return obj.getOwner().getId() in excluded 
     379        else: 
     380            return obj.getConference().getOwner().getId() in excluded 
     381 
    373382 
    374383class RecordUploader(object): 
     
    416425 
    417426        return True 
    418  
    419  
    420 ################### 
    421 # This code is now unused. 
    422 # 
    423 # 
    424  
    425 class ThreadedRecordUploader(object): 
    426     """ 
    427     A record uploading mechanism, based on worker threads 
    428     """ 
    429  
    430     DEFAULT_BATCH_SIZE, DEFAULT_NUM_SLAVES = 1000, 2 
    431     MAX_DELAYED_BATCHES = 5 
    432     MAX_THREAD_REQUEST_TIME = 300 
    433  
    434     def __init__(self, slaveClass, agent, logger, 
    435                  extraSlaveArgs=(), 
    436                  batchSize=DEFAULT_BATCH_SIZE, 
    437                  numSlaves=DEFAULT_NUM_SLAVES, 
    438                  maxDelayed=MAX_DELAYED_BATCHES, 
    439                  monitor=None): 
    440         self._logger = logger 
    441         self._agent = agent 
    442         self._slaveClass = slaveClass 
    443         self._batchSize = batchSize 
    444         self._numSlaves = numSlaves 
    445         self._queue = Queue() 
    446         self._slaves = {} 
    447         self._currentBatch = [] 
    448         self._extraSlaveArgs = extraSlaveArgs 
    449         self._enqueued = 0 
    450         self._monitor = monitor 
    451         self._maxDelayed = maxDelayed 
    452  
    453     def spawn(self): 
    454         """ 
    455         Starts the uploader (spawns slave threads) 
    456         """ 
    457         for i in range(0, self._numSlaves): 
    458             self._slaves[i] = self._slaveClass("Uploader%s" % i, 
    459                                                self._queue, 
    460                                                self._logger, 
    461                                                self._agent, 
    462                                                *self._extraSlaveArgs) 
    463             self._slaves[i].start() 
    464  
    465     def enqueue(self, record): 
    466         """ 
    467         Adds a record to the queue. 
    468         We actually accumulate them and queue them in batches. 
    469         """ 
    470  
    471         # when BATCH_SIZE is passed, enqueue it 
    472         if len(self._currentBatch) > (self._batchSize - 1): 
    473             self._queue.put(self._currentBatch) 
    474             self._enqueued += len(self._currentBatch) 
    475             self._currentBatch = [] 
    476  
    477             if self._monitor: 
    478                 self.reportStatus(self._monitor) 
    479  
    480         self._currentBatch.append(record) 
    481  
    482     def join(self): 
    483         """ 
    484         Waits for the slave threads to finish working 
    485         """ 
    486  
    487         result = True 
    488  
    489         # first, if there is an incomplete batch remaining, enqueue it first 
    490         if self._currentBatch: 
    491             self._queue.put(self._currentBatch) 
    492  
    493         # now, join the queue (wait for them to be uploaded) 
    494         self._logger.debug('joining queue') 
    495         self._queue.join() 
    496         self._logger.debug('joining slaves') 
    497  
    498         for slave in self._slaves.values(): 
    499             slave.terminate() 
    500             slave.join() 
    501             result &= (not slave.is_alive() and slave.result) 
    502  
    503         return result 
    504  
    505     def _checkThreadHealth(self): 
    506         for i in range(0, self._numSlaves): 
    507             slave = self._slaves[i] 
    508             if time.time() > (slave._startTime + \ 
    509                               ThreadedRecordUploader.MAX_THREAD_REQUEST_TIME) \ 
    510                and slave._dead == False: 
    511                 self._logger.warning("Slave '%s' seems to be dead. " 
    512                                      "Adding another one and recovering batch." % \ 
    513                                      slave.getName()) 
    514  
    515                 if slave._currentBatch: 
    516                     self._queue.put(slave._currentBatch) 
    517  
    518                 newSlave = self._slaveClass("Uploader%s" % self._numSlaves, 
    519                                             self._queue, 
    520                                             self._logger, 
    521                                             self._agent, 
    522                                             *self._extraSlaveArgs) 
    523                 self._slaves[self._numSlaves] = newSlave 
    524                 slave._dead = True 
    525                 self._numSlaves += 1 
    526  
    527                 newSlave.start() 
    528  
    529     def iterateOver(self, iterator): 
    530         """ 
    531         Consumes an iterator 
    532         """ 
    533         # take operations and choose which records to send 
    534         for entry in iterator: 
    535             self.enqueue(entry) 
    536  
    537             while (self._queue.qsize() > self._maxDelayed): 
    538                 self._logger.info('Too many delayed batches, sleeping') 
    539                 time.sleep(10) 
    540  
    541             self._checkThreadHealth() 
    542  
    543     def reportStatus(self, stream): 
    544         totalUp = 0 
    545         stream.write("%d enqueued\n" % self._enqueued) 
    546         for i in range(0, self._numSlaves): 
    547             slave = self._slaves[i] 
    548             stream.write("\t [wrk %s] %d uploaded\n" % (slave.getName(), 
    549                                                      slave._uploaded)) 
    550             totalUp += slave._uploaded 
    551         stream.write("%d uploaded (total)\n\n" % totalUp) 
    552         stream.flush() 
    553  
    554  
    555 class UploaderSlave(Thread): 
    556     """ 
    557     A generic threaded "work slave" for agents 
    558     """ 
    559  
    560     def __init__(self, name, queue, logger, agent): 
    561         self._keepRunning = True 
    562         self._logger = logger 
    563         self._terminate = False 
    564         self.result = True 
    565         self._queue = queue 
    566         self._name = name 
    567         self._uploaded = 0 
    568         self._agent = agent 
    569         self._startTime = time.time() 
    570         self._currentBatch = [] 
    571         self._dead = False 
    572  
    573         super(UploaderSlave, self).__init__(name=name) 
    574  
    575     def run(self): 
    576  
    577         DBMgr.getInstance().startRequest() 
    578  
    579         self._logger.debug('Worker [%s] started' % self._name) 
    580         try: 
    581             while True: 
    582                 taskFetched = False 
    583                 try: 
    584                     self._currentBatch = self._queue.get(True, 2) 
    585                     taskFetched = True 
    586                     self._startTime = time.time() 
    587                     self.result &= self._uploadBatch(self._currentBatch) 
    588                     self._uploaded += len(self._currentBatch) 
    589                 except Empty: 
    590                     pass 
    591                 finally: 
    592                     if taskFetched: 
    593                         self._queue.task_done() 
    594  
    595                 if self._terminate: 
    596                     break 
    597         except: 
    598             self._logger.exception('Worker [%s]:' % self._name) 
    599             return 1 
    600         finally: 
    601             DBMgr.getInstance().endRequest() 
    602             self._logger.debug('Worker [%s] finished' % self._name) 
    603         self._logger.debug('Worker [%s] returning %s' % (self._name, self.result)) 
    604  
    605     def terminate(self): 
    606         self._terminate = True 
    607  
    608     def _uploadBatch(self, batch): 
    609         """ 
    610         Sends a batch of records 
    611         Overloaded by agent. 
    612         """ 
    613         raise Exception("Unimplemented method") 
    614  
    615     def _getMetadata(self, records): 
    616         """ 
    617         Generates the metadata for a batch of records. 
    618         Should be overloaded. 
    619         """ 
    620         raise Exception("Unimplemented method") 
  • indico/ext/livesync/bistate.py

    r655cb4 r4ac79f9  
    3737CACHE_SWEEP_LIMIT = 10000 
    3838 
    39 # Attention: if this class is not declared, the LiveSync management interface 
    40 # will never know this plugin exists! 
    4139 
    4240class BistateRecordProcessor(object): 
     
    107105            obj = aw.getObject() 
    108106 
    109             ## TODO: enable this, whenn config is possible from interface 
    110             ## if obj.canAccess(AccessWrapper(access)): 
     107            ## TODO: enable this, when config is possible from interface 
     108            ## if not obj.canAccess(AccessWrapper(access)): 
    111109            ##     # no access? jump over this one 
    112110            ##     continue 
  • indico/ext/livesync/components.py

    rd285dc r4ac79f9  
    7474                Logger.get('ext.livesync').debug((objId, action)) 
    7575                # TODO: remove redundant items 
    76             sm.add(timestamp, 
    77                    ActionWrapper(timestamp, obj, actions, objId)) 
     76            if not sm.objectExcluded(obj): 
     77                sm.add(timestamp, 
     78                       ActionWrapper(timestamp, obj, actions, objId)) 
    7879 
    7980    def requestRetry(self, obj, req, nretry): 
Note: See TracChangeset for help on using the changeset viewer.