Changeset b6021d in indico


Ignore:
Timestamp:
01/26/11 18:37:38 (2 years ago)
Author:
Pedro Ferreira <jose.pedro.ferreira@…>
Branches:
master, burotel, hello-world-walkthrough, ipv6, v0.98-series, v0.98.2, v0.98.3, v0.98b1, v0.98b2, v0.99, 051b2622c51afb171a1dedb46a0df4fbb0cbd02e, d9941f8582b36b24821a11ea5ba16fda6a457fb1
Children:
756437
Parents:
1df37b
git-author:
Pedro Ferreira <jose.pedro.ferreira@…> (01/18/11 17:48:28)
git-committer:
Pedro Ferreira <jose.pedro.ferreira@…> (01/26/11 18:37:38)
Message:

[IMP] Back to single threaded :(

Location:
indico/ext/livesync
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • indico/ext/livesync/agent.py

    r1df37b rb6021d  
    342342 
    343343 
    344 class UploaderSlave(Thread): 
    345     """ 
    346     A generic threaded "work slave" for agents 
    347     """ 
    348  
    349     def __init__(self, name, queue, logger, agent): 
    350         self._keepRunning = True 
     344class RecordUploader(object): 
     345 
     346    DEFAULT_BATCH_SIZE, DEFAULT_NUM_SLAVES = 1000, 2 
     347 
     348    def __init__(self, logger, agent, batchSize=DEFAULT_BATCH_SIZE): 
    351349        self._logger = logger 
    352         self._terminate = False 
    353         self.result = True 
    354         self._queue = queue 
    355         self._name = name 
    356         self._uploaded = 0 
    357350        self._agent = agent 
    358  
    359         super(UploaderSlave, self).__init__(name=name) 
    360  
    361     def run(self): 
    362  
    363         DBMgr.getInstance().startRequest() 
    364  
    365         self._logger.debug('Worker [%s] started' % self._name) 
    366         try: 
    367             while True: 
    368                 taskFetched = False 
    369                 try: 
    370                     batch = self._queue.get(True, 2) 
    371                     taskFetched = True 
    372                     self.result &= self._uploadBatch(batch) 
    373                     self._uploaded += len(batch) 
    374                 except Empty: 
    375                     pass 
    376                 finally: 
    377                     if taskFetched: 
    378                         self._queue.task_done() 
    379  
    380                 if self._terminate: 
    381                     break 
    382         except: 
    383             self._logger.exception('Worker [%s]:' % self._name) 
    384             return 1 
    385         finally: 
    386             DBMgr.getInstance().endRequest() 
    387             self._logger.debug('Worker [%s] finished' % self._name) 
    388         self._logger.debug('Worker [%s] returning %s' % (self._name, self.result)) 
    389  
    390     def terminate(self): 
    391         self._terminate = True 
     351        self._batchSize = batchSize 
    392352 
    393353    def _uploadBatch(self, batch): 
    394354        """ 
    395         Sends a batch of records 
    396         Overloaded by agent. 
    397         """ 
    398         raise Exception("Unimplemented method") 
    399  
    400     def _getMetadata(self, records): 
    401         """ 
    402         Generates the metadata for a batch of records. 
    403         Should be overloaded. 
    404         """ 
    405         raise Exception("Unimplemented method") 
    406  
     355        To be overloaded by uploaders 
     356        """ 
     357        raise Exception("Unimplemented method!") 
     358 
     359    def iterateOver(self, iterator): 
     360        """ 
     361        Consumes an iterator 
     362        """ 
     363 
     364        currentBatch = [] 
     365 
     366        # take operations and choose which records to send 
     367        for record in iterator: 
     368 
     369            if len(currentBatch) > (self._batchSize - 1): 
     370                self._uploadBatch(currentBatch) 
     371                currentBatch = [] 
     372 
     373            currentBatch.append(record) 
     374 
     375        if currentBatch: 
     376            self._uploadBatch(currentBatch) 
     377 
     378        return True 
     379 
     380################### 
     381# This code is now unused. 
     382# 
     383# 
    407384 
    408385class ThreadedRecordUploader(object): 
     
    529506        stream.write("%d uploaded (total)\n\n" % totalUp) 
    530507        stream.flush() 
     508 
     509 
     510class UploaderSlave(Thread): 
     511    """ 
     512    A generic threaded "work slave" for agents 
     513    """ 
     514 
     515    def __init__(self, name, queue, logger, agent): 
     516        self._keepRunning = True 
     517        self._logger = logger 
     518        self._terminate = False 
     519        self.result = True 
     520        self._queue = queue 
     521        self._name = name 
     522        self._uploaded = 0 
     523        self._agent = agent 
     524        self._startTime = time.time() 
     525        self._currentBatch = [] 
     526        self._dead = False 
     527 
     528        super(UploaderSlave, self).__init__(name=name) 
     529 
     530    def run(self): 
     531 
     532        DBMgr.getInstance().startRequest() 
     533 
     534        self._logger.debug('Worker [%s] started' % self._name) 
     535        try: 
     536            while True: 
     537                taskFetched = False 
     538                try: 
     539                    self._currentBatch = self._queue.get(True, 2) 
     540                    taskFetched = True 
     541                    self._startTime = time.time() 
     542                    self.result &= self._uploadBatch(self._currentBatch) 
     543                    self._uploaded += len(self._currentBatch) 
     544                except Empty: 
     545                    pass 
     546                finally: 
     547                    if taskFetched: 
     548                        self._queue.task_done() 
     549 
     550                if self._terminate: 
     551                    break 
     552        except: 
     553            self._logger.exception('Worker [%s]:' % self._name) 
     554            return 1 
     555        finally: 
     556            DBMgr.getInstance().endRequest() 
     557            self._logger.debug('Worker [%s] finished' % self._name) 
     558        self._logger.debug('Worker [%s] returning %s' % (self._name, self.result)) 
     559 
     560    def terminate(self): 
     561        self._terminate = True 
     562 
     563    def _uploadBatch(self, batch): 
     564        """ 
     565        Sends a batch of records 
     566        Overloaded by agent. 
     567        """ 
     568        raise Exception("Unimplemented method") 
     569 
     570    def _getMetadata(self, records): 
     571        """ 
     572        Generates the metadata for a batch of records. 
     573        Should be overloaded. 
     574        """ 
     575        raise Exception("Unimplemented method") 
     576 
  • indico/ext/livesync/invenio/agent.py

    r3c93c8 rb6021d  
    2424# plugin imports 
    2525from indico.ext.livesync.agent import PushSyncAgent, AgentProviderComponent, \ 
    26      UploaderSlave, ThreadedRecordUploader 
     26     RecordUploader 
    2727from indico.ext.livesync.invenio.invenio_connector import InvenioConnector 
    2828 
     
    3838 
    3939 
    40 class InvenioUploaderSlave(UploaderSlave): 
    41     """ 
    42     A worker that uploads data using HTTP 
    43     """ 
    44  
    45     def __init__(self, name, queue, logger, agent, server): 
    46         super(InvenioUploaderSlave, self).__init__(name, queue, logger, agent) 
    47         self._server = server 
    48  
    49     def _uploadBatch(self, batch): 
    50         """ 
    51         Uploads a batch to the Invenio server 
    52         """ 
    53  
    54         self._logger.debug('getting a batch') 
    55  
    56         tstart = time.time() 
    57         # get a batch 
    58         data = self._agent._getMetadata(batch) 
    59  
    60         tgen = time.time() - tstart 
    61  
    62         result = self._server.upload_marcxml(data, "-ir").read() 
    63  
    64         tupload = time.time() - (tstart + tgen) 
    65  
    66         self._logger.debug('rec %s result: %s' % (batch, result)) 
    67  
    68         if result.startswith('[INFO]'): 
    69             fpath = result.strip().split(' ')[-1] 
    70             self._logger.info('Batch of %d records stored in server (%s) ' 
    71                               '[%f s %f s]- %d batches left' % \ 
    72                               (len(batch), fpath, tgen, tupload, self._queue.qsize())) 
    73         else: 
    74             self._logger.error('Records: %s output: %s' % (batch, result)) 
    75             raise Exception('upload failed') 
    76  
    77         return True 
    78  
     40# Attention: if this class is not declared, the LiveSync management interface 
     41# will never know this plugin exists! 
    7942 
    8043class InvenioRecordProcessor(object): 
     
    168131    """ 
    169132 
    170     _workerClass = InvenioUploaderSlave 
    171133    _creationState = STATUS_CREATED 
    172134    _extraOptions = {'url': 'Server URL'} 
     
    206168 
    207169        # the uploader will manage everything for us... 
    208         uploader = ThreadedRecordUploader(InvenioUploaderSlave, 
    209                                           self._v_logger, 
    210                                           extraSlaveArgs=(server,), 
    211                                           monitor=monitor) 
    212         uploader.spawn() 
     170 
     171        uploader = InvenioRecordUploader(logger, self, server) 
    213172 
    214173        if self._v_logger: 
     
    216175 
    217176        # iterate over the returned records and upload them 
    218         uploader.iterateOver(records) 
    219  
    220         # wait for uploader to finish 
    221         result = uploader.join() 
    222  
    223         return result 
    224  
    225  
    226 # Attention: if this class is not declared, the LiveSync management interface 
    227 # will never know this plugin exists! 
     177        return uploader.iterateOver(records) 
     178 
     179 
     180class InvenioRecordUploader(RecordUploader): 
     181    """ 
     182    A worker that uploads data using HTTP 
     183    """ 
     184 
     185    def __init__(self, logger, agent, server): 
     186        super(InvenioRecordUploader, self).__init__(logger, agent) 
     187        self._server = server 
     188 
     189    def _uploadBatch(self, batch): 
     190        """ 
     191        Uploads a batch to the Invenio server 
     192        """ 
     193 
     194        self._logger.debug('getting a batch') 
     195 
     196        tstart = time.time() 
     197        # get a batch 
     198 
     199        self._logger.info('Generating metadata') 
     200        data = self._agent._getMetadata(batch) 
     201        self._logger.info('Metadata ready ') 
     202 
     203        tgen = time.time() - tstart 
     204 
     205        result = self._server.upload_marcxml(data, "-ir").read() 
     206 
     207        tupload = time.time() - (tstart + tgen) 
     208 
     209        self._logger.debug('rec %s result: %s' % (batch, result)) 
     210 
     211        if result.startswith('[INFO]'): 
     212            fpath = result.strip().split(' ')[-1] 
     213            self._logger.info('Batch of %d records stored in server (%s) ' 
     214                              '[%f s %f s]' % \ 
     215                              (len(batch), fpath, tgen, tupload)) 
     216        else: 
     217            self._logger.error('Records: %s output: %s' % (batch, result)) 
     218            raise Exception('upload failed') 
     219 
     220        return True 
     221 
    228222 
    229223class InvenioAgentProviderComponent(AgentProviderComponent): 
Note: See TracChangeset for help on using the changeset viewer.