Changeset 10fb61 in indico
- Timestamp:
- 01/24/11 19:09:03 (2 years ago)
- Branches:
- master, burotel, hello-world-walkthrough, ipv6, v0.98-series, v0.98.2, v0.98.3, v0.98b1, v0.98b2, v0.99, 051b2622c51afb171a1dedb46a0df4fbb0cbd02e, 0da0c1403bae8e51d8229f460181c71b9e6dda72
- Children:
- c25a2a
- Parents:
- 597fec
- git-author:
- Pedro Ferreira <jose.pedro.ferreira@…> (01/17/11 13:21:19)
- git-committer:
- Pedro Ferreira <jose.pedro.ferreira@…> (01/24/11 19:09:03)
- Location:
- indico
- Files:
-
- 4 edited
-
MaKaC/common/contextManager.py (modified) (2 diffs)
-
ext/livesync/agent.py (modified) (10 diffs)
-
ext/livesync/console.py (modified) (4 diffs)
-
ext/livesync/invenio/agent.py (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
-
indico/MaKaC/common/contextManager.py
rbac36b r10fb61 142 142 return cls.DummyContext() 143 143 144 145 144 @classmethod 146 145 def set(cls, name, value): … … 152 151 except cls.NoContextException: 153 152 return cls.DummyContext() 154 -
indico/ext/livesync/agent.py
r597fec r10fb61 23 23 """ 24 24 # standard lib imports 25 import datetime 25 import datetime, time 26 26 from threading import Thread 27 27 from Queue import Queue, Empty … … 194 194 self._lastTry = None 195 195 196 def _run(self, data, logger=None ):196 def _run(self, data, logger=None, monitor=None): 197 197 """ 198 198 Overloaded - will contain the specific agent code … … 212 212 """ 213 213 214 def run(self, currentTS, logger=None ):214 def run(self, currentTS, logger=None, monitor=None): 215 215 """ 216 216 Main method, called when agent needs to be run … … 237 237 records = self._generateRecords(data, till) 238 238 # run agent-specific cycle 239 result = self._run(records, logger=logger )239 result = self._run(records, logger=logger, monitor=monitor) 240 240 except: 241 241 if logger: … … 354 354 self._queue = queue 355 355 self._name = name 356 self._uploaded = 0 356 357 357 358 super(UploaderSlave, self).__init__(name=name) … … 369 370 taskFetched = True 370 371 self.result &= self._uploadBatch(batch) 372 self._uploaded += len(batch) 371 373 except Empty: 372 374 pass … … 408 410 """ 409 411 410 DEFAULT_BATCH_SIZE, DEFAULT_NUM_SLAVES = 500, 2 412 DEFAULT_BATCH_SIZE, DEFAULT_NUM_SLAVES = 1000, 2 413 MAX_DELAYED_BATCHES = 5 411 414 412 415 def __init__(self, slaveClass, logger, 413 416 extraSlaveArgs=(), 414 417 batchSize=DEFAULT_BATCH_SIZE, 415 numSlaves=DEFAULT_NUM_SLAVES): 418 numSlaves=DEFAULT_NUM_SLAVES, 419 maxDelayed=MAX_DELAYED_BATCHES, 420 monitor=None): 416 421 self._logger = logger 417 422 self._slaveClass = slaveClass … … 422 427 self._currentBatch = [] 423 428 self._extraSlaveArgs = extraSlaveArgs 429 self._enqueued = 0 430 self._monitor = monitor 431 self._maxDelayed = maxDelayed 424 432 425 433 def spawn(self): … … 443 451 if len(self._currentBatch) > (self._batchSize - 1): 444 452 self._queue.put(self._currentBatch) 453 self._enqueued += len(self._currentBatch) 445 454 self._currentBatch = [] 455 456 if self._monitor: 457 self.reportStatus(self._monitor) 446 458 447 459 self._currentBatch.append(record) … … 477 489 for entry in iterator: 478 490 self.enqueue(entry) 491 492 while (self._queue.qsize() > self._maxDelayed): 493 self._logger.info('Too many delayed batches, sleeping') 494 time.sleep(10) 495 496 def reportStatus(self, stream): 497 totalUp = 0 498 stream.write("%d enqueued\n" % self._enqueued) 499 for i in range(0, self._numSlaves): 500 slave = self._slaves[i] 501 stream.write("\t [wrk %s] %d uploaded\n" % (slave.getName(), 502 slave._uploaded)) 503 totalUp += slave._uploaded 504 stream.write("%d uploaded (total)\n\n" % totalUp) 505 stream.flush() -
indico/ext/livesync/console.py
r597fec r10fb61 137 137 return logger 138 138 139 139 140 def _wrapper(iterator, operation): 140 141 for elem in iterator: 141 142 yield elem, operation 143 142 144 143 145 class AgentCommand(ConsoleLiveSyncCommand): … … 164 166 165 167 root = CategoryManager().getById(0) 168 169 if 'monitor' in args: 170 monitor = open(args.monitor, 'w') 171 else: 172 monitor = None 173 166 174 agent._run(_wrapper(categoryIterator(root, 0), 167 175 agent._creationState), 168 logger=logger) 176 logger=logger, 177 monitor=monitor) 178 179 if monitor: 180 monitor.close() 169 181 170 182 self._dbi.abort() … … 224 236 225 237 parser_agent_export.add_argument("--output", "-o", type=str, 226 metavar="FILE ",238 metavar="FILE_PATH", 227 239 help="file to write to (offline export)" ) 228 240 … … 231 243 required=True, 232 244 help="agent to export data with" ) 245 246 parser_agent_export.add_argument("--monitor", "-m", type=str, 247 metavar="FILE_PATH", 248 help="File to write monitoring info to" ) 233 249 234 250 args = parser.parse_args() -
indico/ext/livesync/invenio/agent.py
rd0c6d5 r10fb61 19 19 ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 20 20 21 # standard lib imports 22 import time 23 21 24 # plugin imports 22 25 from indico.ext.livesync.agent import PushSyncAgent, AgentProviderComponent, \ … … 70 73 self._logger.debug('getting a batch') 71 74 75 tstart = time.time() 72 76 # get a batch 73 77 data = self._getMetadata(batch) 74 78 79 tgen = time.time() - tstart 80 75 81 result = self._server.upload_marcxml(data, "-ir").read() 82 83 tupload = time.time() - (tstart + tgen) 76 84 77 85 self._logger.debug('rec %s result: %s' % (batch, result)) … … 79 87 if result.startswith('[INFO]'): 80 88 fpath = result.strip().split(' ')[-1] 81 self._logger.info('Batch of %s records stored in server (%s)' % \ 82 (len(batch), fpath)) 89 self._logger.info('Batch of %d records stored in server (%s) ' 90 '[%f s %f s]- %d batches left' % \ 91 (len(batch), fpath, tgen, tupload, self._queue.qsize())) 83 92 else: 84 93 self._logger.error('Records: %s output: %s' % (batch, result)) … … 190 199 return InvenioRecordProcessor.computeRecords(data) 191 200 192 def _run(self, records, logger=None ):201 def _run(self, records, logger=None, monitor=None): 193 202 194 203 self._v_logger = logger … … 199 208 uploader = ThreadedRecordUploader(InvenioUploaderSlave, 200 209 self._v_logger, 201 extraSlaveArgs=(server,)) 210 extraSlaveArgs=(server,), 211 monitor=monitor) 202 212 uploader.spawn() 203 213
Note: See TracChangeset
for help on using the changeset viewer.
