Changeset 136ed9a in indico


Ignore:
Timestamp:
03/15/11 11:13:44 (2 years ago)
Author:
Pedro Ferreira <jose.pedro.ferreira@…>
Branches:
master, burotel, hello-world-walkthrough, ipv6, v0.98-series, v0.98.2, v0.98.3, v0.98b1, v0.98b2, v0.99, 051b2622c51afb171a1dedb46a0df4fbb0cbd02e, 0da0c1403bae8e51d8229f460181c71b9e6dda72
Children:
33e0d7
Parents:
ee3e75
git-author:
Pedro Ferreira <jose.pedro.ferreira@…> (03/14/11 18:25:09)
git-committer:
Pedro Ferreira <jose.pedro.ferreira@…> (03/15/11 11:13:44)
Message:

[FIX] Solved RB DB problems with tasks

  • DB commits seemed to execute for both DBs, causing inconsistencies;
  • Improved thread-safety of RB DB code;
  • Isolated DB code inside with statements;
  • Improved also the logging of e-mail tasks;
Location:
indico
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • indico/MaKaC/authentication/LDAPAuthentication.py

    r7d30d5 r136ed9a  
    380380    av = {} 
    381381    av["email"] = [ret['mail']] 
    382     av["name"] = ret['givenName'] 
    383     av["surName"] = ret['sn'] 
     382    av["name"] = ret.get('givenName', '') 
     383    av["surName"] = ret.get('sn', '') 
    384384 
    385385    if 'o' in ret: 
  • indico/MaKaC/common/db.py

    r755a70 r136ed9a  
    197197        self._storage.tpc_finish(trans) 
    198198 
     199    def transaction(self): 
     200        """ 
     201        Calls the ZODB context manager (`with`) 
     202        """ 
     203        return self._db.transaction() 
     204 
    199205    # ZODB version check 
    200206    try: 
  • indico/MaKaC/plugins/RoomBooking/default/dalManager.py

    r336214 r136ed9a  
    1818## along with CDS Indico; if not, write to the Free Software Foundation, Inc., 
    1919## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 
     20 
     21import threading 
     22from contextlib import contextmanager 
    2023 
    2124from MaKaC.common import Configuration 
     
    3033from ZODB.DB import DB 
    3134import transaction 
     35 
     36 
     37@contextmanager 
     38def dummyContextManager(): 
     39    yield 
     40 
     41 
     42class DummyConnection(): 
     43    """ 
     44    Used so that we can use context managers for database connections 
     45    without producing failures when RB is not active. 
     46    Of course tis is not the ideal solution, but it's the best possible 
     47    without changing the whole RB DB code. 
     48    """ 
     49    def transaction(self): 
     50        return dummyContextManager() 
    3251 
    3352 
     
    110129        self.db.pack(days=days) 
    111130 
     131    def transaction(self): 
     132        """ 
     133        Calls the ZODB context manager for the connection 
     134        """ 
     135        return self.db.transaction() 
     136 
    112137 
    113138class DALManager(DALManagerBase): 
    114139    """ ZODB specific implementation. """ 
    115140 
    116     _instance = None 
     141    _instances = {} 
    117142 
    118143    @staticmethod 
     
    126151 
    127152    @staticmethod 
    128     def getInstance(): 
    129         if not DALManager._instance: 
     153    def getInstance(create=True): 
     154        tid = threading._get_ident() 
     155        instance = DALManager._instances.get(tid) 
     156 
     157        if not instance and create: 
    130158            minfo = info.HelperMaKaCInfo.getMaKaCInfoInstance() 
    131             DALManager._instance = DBConnection(minfo) 
    132         if not DALManager._instance: 
    133             raise "cannot open DB backend" 
    134         return DALManager._instance 
     159            instance = DBConnection(minfo) 
     160            DALManager._instances[tid] = instance 
     161        return instance 
    135162 
    136163    @staticmethod 
     
    139166        Returns true if the current DALManager is connected 
    140167        """ 
    141         return DALManager._instance.isConnected() 
     168        if DALManager.getInstance(create=False): 
     169            return DALManager.getInstance().isConnected() 
     170        else: 
     171            return False 
    142172 
    143173    @staticmethod 
    144174    def getRoot(name=""): 
    145         return DALManager._instance.getRoot(name) 
     175        return DALManager.getInstance().getRoot(name) 
    146176 
    147177    @staticmethod 
     
    151181    @staticmethod 
    152182    def disconnect(): 
    153         DALManager._instance.disconnect() 
     183        DALManager.getInstance().disconnect() 
    154184 
    155185    @staticmethod 
    156186    def commit(): 
    157         DALManager._instance.commit() 
     187        DALManager.getInstance().commit() 
    158188 
    159189    @staticmethod 
    160190    def rollback(): 
    161         DALManager._instance.rollback() 
     191        DALManager.getInstance().rollback() 
    162192 
    163193    @staticmethod 
    164194    def sync(): 
    165         DALManager._instance.sync() 
     195        DALManager.getInstance().sync() 
    166196 
    167197    @staticmethod 
    168198    def pack(days=1): 
    169         DALManager._instance.pack() 
     199        DALManager.getInstance().pack() 
     200 
     201    @staticmethod 
     202    def dummyConnection(): 
     203        return DummyConnection() 
  • indico/modules/scheduler/slave.py

    r336214 r136ed9a  
    2929import threading 
    3030 
     31from ZODB.POSException import ConflictError 
     32 
    3133class _Worker(object): 
    3234 
     
    4749        self._dbi.startRequest() 
    4850 
    49         info = HelperMaKaCInfo.getMaKaCInfoInstance() 
    50         schedMod = SchedulerModule.getDBInstance() 
    51         self._rbEnabled = info.getRoomBookingModuleActive() 
     51        with self._dbi.transaction(): 
     52            schedMod = SchedulerModule.getDBInstance() 
     53            self._task = schedMod.getTaskById(self._taskId) 
    5254 
    53         if self._rbEnabled: 
    54             self._rbdbi = DBConnection(info) 
     55            info = HelperMaKaCInfo.getMaKaCInfoInstance() 
     56            self._rbEnabled = info.getRoomBookingModuleActive() 
    5557 
    56         self._task = schedMod.getTaskById(self._taskId) 
     58            if self._rbEnabled: 
     59                self._rbdbi = DALManager.getInstance() 
     60                self._rbdbi.connect() 
     61            else: 
     62                self._rbdbi = DALManager.dummyConnection() 
    5763 
    5864        # open a logging channel 
    5965        self._task.plugLogger(self._logger) 
    60  
    61         self._dbi.endRequest() 
    6266 
    6367    def run(self): 
     
    7175        i = 0 
    7276 
    73         self._dbi.startRequest() 
    7477        # RoomBooking forces us to connect to its own DB if needed 
    7578        # Maybe we should add some extension point here that lets plugins 
    7679        # define their own actions on DB connect/disconnect/commit/abort 
    7780 
    78         if self._rbEnabled: 
    79             self._rbdbi.connect() 
    80  
    8181        # potentially conflict-prone (!) 
    82         self._task.prepare() 
    83         self._dbi.commit() 
    84         if self._rbEnabled: 
    85             self._rbdbi.commit() 
     82        with self._dbi.transaction(): 
     83            with self._rbdbi.transaction(): 
     84                self._task.prepare() 
    8685 
    8786        while i < self._config.task_max_tries: 
     87            try: 
     88                with self._dbi.transaction(): 
     89                    with self._rbdbi.transaction(): 
    8890 
    89             self._logger.info('Task cycle %d' % i) 
     91                        self._logger.info('Task cycle %d' % i) 
     92                        i = i + 1 
     93                        try: 
     94                            self._task.start() 
     95                            break 
    9096 
    91             i = i + 1 
    92             try: 
    93                 self._task.start() 
     97                        except Exception, e: 
     98                            nextRunIn = i * 10  # secs 
     99                            self._logger.exception('Error message') 
    94100 
    95             except Exception, e: 
    96                 nextRunIn = i * 10 # secs 
     101                            raise 
     102            except: 
    97103                self._logger.warning("Task %s failed with exception '%s'. " % 
    98                                      (self._task.id, e)) 
    99  
    100                 self._logger.exception('Error message') 
     104                                             (self._task.id, e)) 
    101105 
    102106                if  i < self._config.task_max_tries: 
    103                     self._logger.warning("Retrying for the %dth time in %d secs.." % 
     107                    self._logger.warning("Retrying for the %dth time in %d secs.." % \ 
    104108                                         (i + 1, nextRunIn)) 
    105109 
     
    109113                    base.TimeSource.get().sleep(nextRunIn) 
    110114 
    111                 # abort transaction and synchronize 
    112                 self._dbi.sync() 
    113                 if self._rbEnabled: 
    114                     self._rbdbi.abort() 
    115             else: 
    116                 break 
    117  
    118115        self._logger.info('Ended on: %s' % self._task.endedOn) 
    119116 
    120         if self._task.endedOn: # task successfully finished 
    121             self._setResult(True) 
     117        # task successfully finished 
     118        if self._task.endedOn: 
     119            with self._dbi.transaction(): 
     120                self._setResult(True) 
    122121            if i > 1: 
    123122                self._logger.warning("Task %s failed %d times before " 
    124123                                     "finishing correctly" % (self._task.id, i - 1)) 
    125124        else: 
    126             self._setResult(False) 
     125            with self._dbi.transaction(): 
     126                self._setResult(False) 
    127127            self._logger.error("Task %s failed too many (%d) times. " 
    128128                               "Aborting its execution.." % (self._task.id, i)) 
    129129 
    130         if self._rbEnabled: 
    131             self._rbdbi.commit() 
    132             self._rbdbi.disconnect() 
    133         self._dbi.endRequest() 
    134         self._logger.info("exiting") 
     130            self._logger.info("exiting") 
    135131 
    136132 
  • indico/modules/scheduler/tasks.py

    r4ec29c r136ed9a  
    1919## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 
    2020 
    21 import copy, logging, time, os 
     21import copy, logging, os 
    2222from dateutil import rrule 
    2323 
    24 from datetime import timedelta 
    25 from pytz import timezone 
    26 from pytz import common_timezones 
    2724import zope.interface 
    2825 
     
    3128from MaKaC.i18n import _ 
    3229from MaKaC.common import Config 
    33 from MaKaC.common.info import HelperMaKaCInfo 
    34 from MaKaC.common.Counter import Counter 
    3530# end required 
    3631 
     
    372367        addrs = [smtplib.quoteaddr(x) for x in self.toAddr] 
    373368        ccaddrs = [smtplib.quoteaddr(x) for x in self.ccAddr] 
     369 
     370        if len(addrs) + len(ccaddrs) == 0: 
     371            self._v_logger.warning("Attention: mail contains no recipients!") 
     372        else: 
     373            self._v_logger.info("Sending mail To: %s, CC: %s" % (addrs, ccaddrs)) 
    374374 
    375375        for user in self.toUser: 
     
    567567        if check: 
    568568            from MaKaC.conference import ConferenceHolder 
    569             if not ConferenceHolder().hasKey(self.conf.getId()) or \ 
    570                    self.conf.getStartDate() <= self._getCurrentDateTime(): 
     569            if not ConferenceHolder().hasKey(self.conf.getId()): 
     570                self._logger.warning("Conference %s no longer exists! " 
     571                                     "Deleting alarm." % self.conf.getId()) 
     572                self.conf.removeAlarm(self) 
     573            elif self.conf.getStartDate() <= self._getCurrentDateTime(): 
     574                self._logger.warning("Conference %s already started. " 
     575                                     "Deleting alarm." % self.conf.getId()) 
    571576                self.conf.removeAlarm(self) 
    572577                return True 
Note: See TracChangeset for help on using the changeset viewer.