Changeset b6021d in indico
- Timestamp:
- 01/26/11 18:37:38 (2 years ago)
- Branches:
- master, burotel, hello-world-walkthrough, ipv6, v0.98-series, v0.98.2, v0.98.3, v0.98b1, v0.98b2, v0.99, 051b2622c51afb171a1dedb46a0df4fbb0cbd02e, d9941f8582b36b24821a11ea5ba16fda6a457fb1
- Children:
- 756437
- Parents:
- 1df37b
- git-author:
- Pedro Ferreira <jose.pedro.ferreira@…> (01/18/11 17:48:28)
- git-committer:
- Pedro Ferreira <jose.pedro.ferreira@…> (01/26/11 18:37:38)
- Location:
- indico/ext/livesync
- Files:
-
- 2 edited
-
agent.py (modified) (2 diffs)
-
invenio/agent.py (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
-
indico/ext/livesync/agent.py
r1df37b rb6021d 342 342 343 343 344 class UploaderSlave(Thread): 345 """ 346 A generic threaded "work slave" for agents 347 """ 348 349 def __init__(self, name, queue, logger, agent): 350 self._keepRunning = True 344 class RecordUploader(object): 345 346 DEFAULT_BATCH_SIZE, DEFAULT_NUM_SLAVES = 1000, 2 347 348 def __init__(self, logger, agent, batchSize=DEFAULT_BATCH_SIZE): 351 349 self._logger = logger 352 self._terminate = False353 self.result = True354 self._queue = queue355 self._name = name356 self._uploaded = 0357 350 self._agent = agent 358 359 super(UploaderSlave, self).__init__(name=name) 360 361 def run(self): 362 363 DBMgr.getInstance().startRequest() 364 365 self._logger.debug('Worker [%s] started' % self._name) 366 try: 367 while True: 368 taskFetched = False 369 try: 370 batch = self._queue.get(True, 2) 371 taskFetched = True 372 self.result &= self._uploadBatch(batch) 373 self._uploaded += len(batch) 374 except Empty: 375 pass 376 finally: 377 if taskFetched: 378 self._queue.task_done() 379 380 if self._terminate: 381 break 382 except: 383 self._logger.exception('Worker [%s]:' % self._name) 384 return 1 385 finally: 386 DBMgr.getInstance().endRequest() 387 self._logger.debug('Worker [%s] finished' % self._name) 388 self._logger.debug('Worker [%s] returning %s' % (self._name, self.result)) 389 390 def terminate(self): 391 self._terminate = True 351 self._batchSize = batchSize 392 352 393 353 def _uploadBatch(self, batch): 394 354 """ 395 Sends a batch of records 396 Overloaded by agent. 397 """ 398 raise Exception("Unimplemented method") 399 400 def _getMetadata(self, records): 401 """ 402 Generates the metadata for a batch of records. 403 Should be overloaded. 404 """ 405 raise Exception("Unimplemented method") 406 355 To be overloaded by uploaders 356 """ 357 raise Exception("Unimplemented method!") 358 359 def iterateOver(self, iterator): 360 """ 361 Consumes an iterator 362 """ 363 364 currentBatch = [] 365 366 # take operations and choose which records to send 367 for record in iterator: 368 369 if len(currentBatch) > (self._batchSize - 1): 370 self._uploadBatch(currentBatch) 371 currentBatch = [] 372 373 currentBatch.append(record) 374 375 if currentBatch: 376 self._uploadBatch(currentBatch) 377 378 return True 379 380 ################### 381 # This code is now unused. 382 # 383 # 407 384 408 385 class ThreadedRecordUploader(object): … … 529 506 stream.write("%d uploaded (total)\n\n" % totalUp) 530 507 stream.flush() 508 509 510 class UploaderSlave(Thread): 511 """ 512 A generic threaded "work slave" for agents 513 """ 514 515 def __init__(self, name, queue, logger, agent): 516 self._keepRunning = True 517 self._logger = logger 518 self._terminate = False 519 self.result = True 520 self._queue = queue 521 self._name = name 522 self._uploaded = 0 523 self._agent = agent 524 self._startTime = time.time() 525 self._currentBatch = [] 526 self._dead = False 527 528 super(UploaderSlave, self).__init__(name=name) 529 530 def run(self): 531 532 DBMgr.getInstance().startRequest() 533 534 self._logger.debug('Worker [%s] started' % self._name) 535 try: 536 while True: 537 taskFetched = False 538 try: 539 self._currentBatch = self._queue.get(True, 2) 540 taskFetched = True 541 self._startTime = time.time() 542 self.result &= self._uploadBatch(self._currentBatch) 543 self._uploaded += len(self._currentBatch) 544 except Empty: 545 pass 546 finally: 547 if taskFetched: 548 self._queue.task_done() 549 550 if self._terminate: 551 break 552 except: 553 self._logger.exception('Worker [%s]:' % self._name) 554 return 1 555 finally: 556 DBMgr.getInstance().endRequest() 557 self._logger.debug('Worker [%s] finished' % self._name) 558 self._logger.debug('Worker [%s] returning %s' % (self._name, self.result)) 559 560 def terminate(self): 561 self._terminate = True 562 563 def _uploadBatch(self, batch): 564 """ 565 Sends a batch of records 566 Overloaded by agent. 567 """ 568 raise Exception("Unimplemented method") 569 570 def _getMetadata(self, records): 571 """ 572 Generates the metadata for a batch of records. 573 Should be overloaded. 574 """ 575 raise Exception("Unimplemented method") 576 -
indico/ext/livesync/invenio/agent.py
r3c93c8 rb6021d 24 24 # plugin imports 25 25 from indico.ext.livesync.agent import PushSyncAgent, AgentProviderComponent, \ 26 UploaderSlave, ThreadedRecordUploader26 RecordUploader 27 27 from indico.ext.livesync.invenio.invenio_connector import InvenioConnector 28 28 … … 38 38 39 39 40 class InvenioUploaderSlave(UploaderSlave): 41 """ 42 A worker that uploads data using HTTP 43 """ 44 45 def __init__(self, name, queue, logger, agent, server): 46 super(InvenioUploaderSlave, self).__init__(name, queue, logger, agent) 47 self._server = server 48 49 def _uploadBatch(self, batch): 50 """ 51 Uploads a batch to the Invenio server 52 """ 53 54 self._logger.debug('getting a batch') 55 56 tstart = time.time() 57 # get a batch 58 data = self._agent._getMetadata(batch) 59 60 tgen = time.time() - tstart 61 62 result = self._server.upload_marcxml(data, "-ir").read() 63 64 tupload = time.time() - (tstart + tgen) 65 66 self._logger.debug('rec %s result: %s' % (batch, result)) 67 68 if result.startswith('[INFO]'): 69 fpath = result.strip().split(' ')[-1] 70 self._logger.info('Batch of %d records stored in server (%s) ' 71 '[%f s %f s]- %d batches left' % \ 72 (len(batch), fpath, tgen, tupload, self._queue.qsize())) 73 else: 74 self._logger.error('Records: %s output: %s' % (batch, result)) 75 raise Exception('upload failed') 76 77 return True 78 40 # Attention: if this class is not declared, the LiveSync management interface 41 # will never know this plugin exists! 79 42 80 43 class InvenioRecordProcessor(object): … … 168 131 """ 169 132 170 _workerClass = InvenioUploaderSlave171 133 _creationState = STATUS_CREATED 172 134 _extraOptions = {'url': 'Server URL'} … … 206 168 207 169 # the uploader will manage everything for us... 208 uploader = ThreadedRecordUploader(InvenioUploaderSlave, 209 self._v_logger, 210 extraSlaveArgs=(server,), 211 monitor=monitor) 212 uploader.spawn() 170 171 uploader = InvenioRecordUploader(logger, self, server) 213 172 214 173 if self._v_logger: … … 216 175 217 176 # iterate over the returned records and upload them 218 uploader.iterateOver(records) 219 220 # wait for uploader to finish 221 result = uploader.join() 222 223 return result 224 225 226 # Attention: if this class is not declared, the LiveSync management interface 227 # will never know this plugin exists! 177 return uploader.iterateOver(records) 178 179 180 class InvenioRecordUploader(RecordUploader): 181 """ 182 A worker that uploads data using HTTP 183 """ 184 185 def __init__(self, logger, agent, server): 186 super(InvenioRecordUploader, self).__init__(logger, agent) 187 self._server = server 188 189 def _uploadBatch(self, batch): 190 """ 191 Uploads a batch to the Invenio server 192 """ 193 194 self._logger.debug('getting a batch') 195 196 tstart = time.time() 197 # get a batch 198 199 self._logger.info('Generating metadata') 200 data = self._agent._getMetadata(batch) 201 self._logger.info('Metadata ready ') 202 203 tgen = time.time() - tstart 204 205 result = self._server.upload_marcxml(data, "-ir").read() 206 207 tupload = time.time() - (tstart + tgen) 208 209 self._logger.debug('rec %s result: %s' % (batch, result)) 210 211 if result.startswith('[INFO]'): 212 fpath = result.strip().split(' ')[-1] 213 self._logger.info('Batch of %d records stored in server (%s) ' 214 '[%f s %f s]' % \ 215 (len(batch), fpath, tgen, tupload)) 216 else: 217 self._logger.error('Records: %s output: %s' % (batch, result)) 218 raise Exception('upload failed') 219 220 return True 221 228 222 229 223 class InvenioAgentProviderComponent(AgentProviderComponent):
Note: See TracChangeset
for help on using the changeset viewer.
