diff --git a/src/python/WMComponent/JobSubmitter/JobSubmitterPoller.py b/src/python/WMComponent/JobSubmitter/JobSubmitterPoller.py index 65d23a5273..81021319c5 100644 --- a/src/python/WMComponent/JobSubmitter/JobSubmitterPoller.py +++ b/src/python/WMComponent/JobSubmitter/JobSubmitterPoller.py @@ -34,7 +34,7 @@ from WMCore.Services.ReqMgrAux.ReqMgrAux import ReqMgrAux from WMComponent.JobSubmitter.JobSubmitAPI import availableScheddSlots - +from WMCore.BossAir.Plugins.SimpleCondorPlugin import CondorScheddUnavailable def jobSubmitCondition(jobStats): for jobInfo in jobStats: @@ -754,7 +754,25 @@ def submitJobs(self, jobsToSubmit): myThread.transaction.begin() # Run the actual underlying submit code using bossAir - successList, failList = self.bossAir.submit(jobs=jobList) + try: + successList, failList = self.bossAir.submit(jobs=jobList) + + except CondorScheddUnavailable as ex: + msg = "Condor Schedd is unavailable: %s" % str(ex) + logging.error(msg) + myThread.logdbClient.post("JobSubmitter_submitWork", msg, "error") + # dont raise WMException, just return + logging.warning("JobSubmitter didn't submit any jobs due to condor schedd being unavailable.") + # TODO: verify if we shoule rollback the transaction or not? + myThread.transaction.rollback() + return + + except Exception as ex: + msg = "Error submitting jobs: %s" % str(ex) + logging.error(msg) + myThread.logdbClient.post("JobSubmitter_submitWork", msg, "error") + raise WMException(msg) from ex + logging.info("Jobs that succeeded/failed submission: %d/%d.", len(successList), len(failList)) # Propagate states in the WMBS database diff --git a/src/python/WMComponent/JobUpdater/JobUpdaterPoller.py b/src/python/WMComponent/JobUpdater/JobUpdaterPoller.py index 4c2a546eaa..8fd5123615 100644 --- a/src/python/WMComponent/JobUpdater/JobUpdaterPoller.py +++ b/src/python/WMComponent/JobUpdater/JobUpdaterPoller.py @@ -170,6 +170,7 @@ def synchronizeJobPriority(self): # if there are jobs in wmbs executing state, update their prio in condor if self.executingJobsDAO.execute(workflow) > 0: logging.info("Updating condor jobs priority for request: %s", workflow) + # TODO: verify if we should wrap this in a try/except for the CondorScheddException as well? self.bossAir.updateJobInformation(workflow, requestPriority=priorityCache[workflow]) workflowsToUpdateWMBS[workflow] = priorityCache[workflow] diff --git a/src/python/WMCore/BossAir/Plugins/SimpleCondorPlugin.py b/src/python/WMCore/BossAir/Plugins/SimpleCondorPlugin.py index 591edf7e7e..06ed9dfb03 100644 --- a/src/python/WMCore/BossAir/Plugins/SimpleCondorPlugin.py +++ b/src/python/WMCore/BossAir/Plugins/SimpleCondorPlugin.py @@ -21,6 +21,7 @@ from WMCore.WMInit import getWMBASE from WMCore.Lexicon import getIterMatchObjectOnRegexp, WMEXCEPTION_REGEXP, CONDOR_LOG_FILTER_REGEXP from WMCore.Services.TagCollector.TagCollector import TagCollector +from WMCore.WMException import WMException def activityToType(jobActivity): """ @@ -42,6 +43,22 @@ def activityToType(jobActivity): return activityMap.get(jobActivity, "unknown") +class CondorScheddUnavailable(WMException): + """ + _CondorScheddUnavailable_ + + Exception raised when we fail to create a condor schedd object + """ + + def __init__(self, msg): + """ + _CondorScheddUnavailable_ + + Create a new exception + """ + WMException.__init__(self, msg) + + class SimpleCondorPlugin(BasePlugin): """ _SimpleCondorPlugin_ @@ -144,6 +161,21 @@ def __init__(self, config): self.useCMSToken = getattr(config.JobSubmitter, 'useOauthToken', False) return + + def getScheddObject(self): + """ + __getScheddObject_ + + Return a Schedd object for the current condor schedd + """ + try: + schedd = htcondor.Schedd() + except RuntimeError as ex: + msg = "Failed to create a condor schedd object: %s" % str(ex) + raise CondorScheddUnavailable(msg) + + return schedd + def submit(self, jobs, info=None): """ @@ -158,7 +190,7 @@ def submit(self, jobs, info=None): # Then was have nothing to do return successfulJobs, failedJobs - schedd = htcondor.Schedd() + schedd = self.getScheddObject() # Submit the jobs for jobsReady in grouper(jobs, self.jobsPerSubmit): @@ -208,7 +240,7 @@ def track(self, jobs): # get info about all active and recent jobs logging.debug("SimpleCondorPlugin is going to track %s jobs", len(jobs)) - schedd = htcondor.Schedd() + schedd = self.getScheddObject() logging.debug("Start: Retrieving classAds using Condor Python query") try: @@ -369,7 +401,8 @@ def updateSiteInformation(self, jobs, siteName, excludeSite): Parameters: excludeSite = False when moving to Normal excludeSite = True when moving to Down, Draining or Aborted """ - sd = htcondor.Schedd() + + sd = self.getScheddObject() jobIdToKill = [] jobtokill = [] origSiteLists = set() @@ -436,8 +469,8 @@ def kill(self, jobs, raiseEx=False): Kill can happen for schedd running on localhost... TBC. """ logging.info("Killing %i jobs from the queue", len(jobs)) - - schedd = htcondor.Schedd() + + schedd = self.getScheddObject() gridIds = [job['gridid'] for job in jobs] try: schedd.act(htcondor.JobAction.Remove, gridIds) @@ -455,8 +488,8 @@ def killWorkflowJobs(self, workflow): Kill all the jobs belonging to a specific workflow. """ logging.info("Going to remove all the jobs for workflow %s", workflow) - - schedd = htcondor.Schedd() + + schedd = self.getScheddObject() try: schedd.act(htcondor.JobAction.Remove, "WMAgent_RequestName == %s" % classad.quote(str(workflow))) @@ -478,7 +511,8 @@ def updateJobInformation(self, workflow, **kwargs): Since the default priority is very high, we only need to adjust new priorities for processing/production task types (which have a task priority of 0) """ - schedd = htcondor.Schedd() + + schedd = self.getScheddObject() if 'requestPriority' in kwargs: newPriority = int(kwargs['requestPriority']) diff --git a/src/python/WMCore/BossAir/StatusPoller.py b/src/python/WMCore/BossAir/StatusPoller.py index d016370cd5..0304e3b459 100644 --- a/src/python/WMCore/BossAir/StatusPoller.py +++ b/src/python/WMCore/BossAir/StatusPoller.py @@ -21,6 +21,7 @@ from WMCore.WMExceptions import WM_JOB_ERROR_CODES from WMCore.WorkerThreads.BaseWorkerThread import BaseWorkerThread from WMCore.BossAir.BossAirAPI import BossAirAPI +from WMCore.BossAir.Plugins.SimpleCondorPlugin import CondorScheddUnavailable class StatusPollerException(WMException): """ @@ -68,6 +69,14 @@ def algorithm(self, parameters=None): try: logging.info("Running job status poller algorithm...") self.checkStatus() + + except CondorScheddUnavailable as ex: + msg = "Condor Schedd is unavailable: %s" % str(ex) + logging.error(msg) + if getattr(myThread, 'transaction', None): + myThread.transaction.rollbackForError() + logging.info("JobStatusLite failed to run, will retry in next cycle") + except WMException as ex: if getattr(myThread, 'transaction', None): myThread.transaction.rollbackForError() @@ -90,8 +99,14 @@ def checkStatus(self): and then check for jobs that have timed out. """ + try: + runningJobs = self.bossAir.track() + except Exception as ex: + msg = "Error in BossAir track call: %s" % str(ex) + logging.error(msg) + runningJobs = [] + raise WMException(msg)from ex - runningJobs = self.bossAir.track() if len(runningJobs) < 1: # Then we have no jobs