Changeset 10fb61 in indico


Ignore:
Timestamp:
01/24/11 19:09:03 (2 years ago)
Author:
Pedro Ferreira <jose.pedro.ferreira@…>
Branches:
master, burotel, hello-world-walkthrough, ipv6, v0.98-series, v0.98.2, v0.98.3, v0.98b1, v0.98b2, v0.99, 051b2622c51afb171a1dedb46a0df4fbb0cbd02e, 0da0c1403bae8e51d8229f460181c71b9e6dda72
Children:
c25a2a
Parents:
597fec
git-author:
Pedro Ferreira <jose.pedro.ferreira@…> (01/17/11 13:21:19)
git-committer:
Pedro Ferreira <jose.pedro.ferreira@…> (01/24/11 19:09:03)
Message:

[IMP] Process starvation, monitoring

  • Concurrent access to DB by threads was leading to reading access being blocked most of the time, slowing down everyone else;
  • Added some (optional) monitoring support;
Location:
indico
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • indico/MaKaC/common/contextManager.py

    rbac36b r10fb61  
    142142            return cls.DummyContext() 
    143143 
    144  
    145144    @classmethod 
    146145    def set(cls, name, value): 
     
    152151        except cls.NoContextException: 
    153152            return cls.DummyContext() 
    154  
  • indico/ext/livesync/agent.py

    r597fec r10fb61  
    2323""" 
    2424# standard lib imports 
    25 import datetime 
     25import datetime, time 
    2626from threading import Thread 
    2727from Queue import Queue, Empty 
     
    194194        self._lastTry = None 
    195195 
    196     def _run(self, data, logger=None): 
     196    def _run(self, data, logger=None, monitor=None): 
    197197        """ 
    198198        Overloaded - will contain the specific agent code 
     
    212212        """ 
    213213 
    214     def run(self, currentTS, logger=None): 
     214    def run(self, currentTS, logger=None, monitor=None): 
    215215        """ 
    216216        Main method, called when agent needs to be run 
     
    237237            records = self._generateRecords(data, till) 
    238238            # run agent-specific cycle 
    239             result = self._run(records, logger=logger) 
     239            result = self._run(records, logger=logger, monitor=monitor) 
    240240        except: 
    241241            if logger: 
     
    354354        self._queue = queue 
    355355        self._name = name 
     356        self._uploaded = 0 
    356357 
    357358        super(UploaderSlave, self).__init__(name=name) 
     
    369370                    taskFetched = True 
    370371                    self.result &= self._uploadBatch(batch) 
     372                    self._uploaded += len(batch) 
    371373                except Empty: 
    372374                    pass 
     
    408410    """ 
    409411 
    410     DEFAULT_BATCH_SIZE, DEFAULT_NUM_SLAVES = 500, 2 
     412    DEFAULT_BATCH_SIZE, DEFAULT_NUM_SLAVES = 1000, 2 
     413    MAX_DELAYED_BATCHES = 5 
    411414 
    412415    def __init__(self, slaveClass, logger, 
    413416                 extraSlaveArgs=(), 
    414417                 batchSize=DEFAULT_BATCH_SIZE, 
    415                  numSlaves=DEFAULT_NUM_SLAVES): 
     418                 numSlaves=DEFAULT_NUM_SLAVES, 
     419                 maxDelayed=MAX_DELAYED_BATCHES, 
     420                 monitor=None): 
    416421        self._logger = logger 
    417422        self._slaveClass = slaveClass 
     
    422427        self._currentBatch = [] 
    423428        self._extraSlaveArgs = extraSlaveArgs 
     429        self._enqueued = 0 
     430        self._monitor = monitor 
     431        self._maxDelayed = maxDelayed 
    424432 
    425433    def spawn(self): 
     
    443451        if len(self._currentBatch) > (self._batchSize - 1): 
    444452            self._queue.put(self._currentBatch) 
     453            self._enqueued += len(self._currentBatch) 
    445454            self._currentBatch = [] 
     455 
     456            if self._monitor: 
     457                self.reportStatus(self._monitor) 
    446458 
    447459        self._currentBatch.append(record) 
     
    477489        for entry in iterator: 
    478490            self.enqueue(entry) 
     491 
     492            while (self._queue.qsize() > self._maxDelayed): 
     493                self._logger.info('Too many delayed batches, sleeping') 
     494                time.sleep(10) 
     495 
     496    def reportStatus(self, stream): 
     497        totalUp = 0 
     498        stream.write("%d enqueued\n" % self._enqueued) 
     499        for i in range(0, self._numSlaves): 
     500            slave = self._slaves[i] 
     501            stream.write("\t [wrk %s] %d uploaded\n" % (slave.getName(), 
     502                                                     slave._uploaded)) 
     503            totalUp += slave._uploaded 
     504        stream.write("%d uploaded (total)\n\n" % totalUp) 
     505        stream.flush() 
  • indico/ext/livesync/console.py

    r597fec r10fb61  
    137137    return logger 
    138138 
     139 
    139140def _wrapper(iterator, operation): 
    140141    for elem in iterator: 
    141142        yield elem, operation 
     143 
    142144 
    143145class AgentCommand(ConsoleLiveSyncCommand): 
     
    164166 
    165167                root = CategoryManager().getById(0) 
     168 
     169                if 'monitor' in args: 
     170                    monitor = open(args.monitor, 'w') 
     171                else: 
     172                    monitor = None 
     173 
    166174                agent._run(_wrapper(categoryIterator(root, 0), 
    167175                                    agent._creationState), 
    168                            logger=logger) 
     176                           logger=logger, 
     177                           monitor=monitor) 
     178 
     179                if monitor: 
     180                    monitor.close() 
    169181 
    170182        self._dbi.abort() 
     
    224236 
    225237    parser_agent_export.add_argument("--output", "-o", type=str, 
    226                                      metavar="FILE", 
     238                                     metavar="FILE_PATH", 
    227239                                     help="file to write to (offline export)" ) 
    228240 
     
    231243                                     required=True, 
    232244                                     help="agent to export data with" ) 
     245 
     246    parser_agent_export.add_argument("--monitor", "-m", type=str, 
     247                                     metavar="FILE_PATH", 
     248                                     help="File to write monitoring info to" ) 
    233249 
    234250    args = parser.parse_args() 
  • indico/ext/livesync/invenio/agent.py

    rd0c6d5 r10fb61  
    1919## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 
    2020 
     21# standard lib imports 
     22import time 
     23 
    2124# plugin imports 
    2225from indico.ext.livesync.agent import PushSyncAgent, AgentProviderComponent, \ 
     
    7073        self._logger.debug('getting a batch') 
    7174 
     75        tstart = time.time() 
    7276        # get a batch 
    7377        data = self._getMetadata(batch) 
    7478 
     79        tgen = time.time() - tstart 
     80 
    7581        result = self._server.upload_marcxml(data, "-ir").read() 
     82 
     83        tupload = time.time() - (tstart + tgen) 
    7684 
    7785        self._logger.debug('rec %s result: %s' % (batch, result)) 
     
    7987        if result.startswith('[INFO]'): 
    8088            fpath = result.strip().split(' ')[-1] 
    81             self._logger.info('Batch of %s records stored in server (%s)' % \ 
    82                               (len(batch), fpath)) 
     89            self._logger.info('Batch of %d records stored in server (%s) ' 
     90                              '[%f s %f s]- %d batches left' % \ 
     91                              (len(batch), fpath, tgen, tupload, self._queue.qsize())) 
    8392        else: 
    8493            self._logger.error('Records: %s output: %s' % (batch, result)) 
     
    190199        return InvenioRecordProcessor.computeRecords(data) 
    191200 
    192     def _run(self, records, logger=None): 
     201    def _run(self, records, logger=None, monitor=None): 
    193202 
    194203        self._v_logger = logger 
     
    199208        uploader = ThreadedRecordUploader(InvenioUploaderSlave, 
    200209                                          self._v_logger, 
    201                                           extraSlaveArgs=(server,)) 
     210                                          extraSlaveArgs=(server,), 
     211                                          monitor=monitor) 
    202212        uploader.spawn() 
    203213 
Note: See TracChangeset for help on using the changeset viewer.