From 522cf359fce1b945fad9e977cfee06307917f4fd Mon Sep 17 00:00:00 2001 From: Duong Date: Wed, 17 Jun 2026 16:10:24 -0400 Subject: [PATCH 1/2] output data Rucio block rule cleaning --- bin/wmagent-mod-config | 3 +- etc/WMAgentConfig.py | 6 +- .../RucioInjector/RucioInjectorPoller.py | 37 ++++- .../MSRuleCleaner/MSRuleCleaner.py | 149 +++++++++++++++++- .../MSRuleCleaner/MSRuleCleanerWflow.py | 5 +- src/python/WMCore/Services/Rucio/Rucio.py | 20 +++ 6 files changed, 203 insertions(+), 17 deletions(-) diff --git a/bin/wmagent-mod-config b/bin/wmagent-mod-config index df0eb73e41..34fbe00e0b 100755 --- a/bin/wmagent-mod-config +++ b/bin/wmagent-mod-config @@ -256,9 +256,10 @@ def modifyConfiguration(config, **args): config.RucioInjector.rucioAccount = args["rucio_account"] config.RucioInjector.rucioUrl = args["rucio_host"] config.RucioInjector.rucioAuthUrl = args["rucio_auth"] - # define a different expression for container rule replication in testbed + # define a different expression for block/container rule replication in testbed if "-int.cern.ch" in args["rucio_host"]: config.RucioInjector.containerDiskRuleRSEExpr = "(tier=2|tier=1)&cms_type=int&rse_type=DISK" + config.RucioInjector.blockDiskRuleRSEExpr = "(tier=2|tier=1)&cms_type=int&rse_type=DISK" # custom AgentStatusWatcher if hasattr(config, "AgentStatusWatcher"): diff --git a/etc/WMAgentConfig.py b/etc/WMAgentConfig.py index 3c45a6b52d..1b9d76f18a 100644 --- a/etc/WMAgentConfig.py +++ b/etc/WMAgentConfig.py @@ -437,10 +437,14 @@ config.RucioInjector.pollIntervalRules = 43200 config.RucioInjector.cacheExpiration = 2 * 24 * 60 * 60 # two days config.RucioInjector.createBlockRules = True +config.RucioInjector.createContainerRules = False config.RucioInjector.RSEPostfix = False # enable it to append _Test to the RSE names config.RucioInjector.metaDIDProject = "Production" config.RucioInjector.containerDiskRuleParams = {"weight": "dm_weight", "copies": 2, "grouping": "DATASET"} -config.RucioInjector.blockRuleParams = {} +config.RucioInjector.blockRuleParams = {"copies": 1} +# blockDiskRuleRSEExpr: broader expression used for non-T0 block rules. +# With copies=1, Rucio pins the existing origin-T2 replica without replicating elsewhere. +config.RucioInjector.blockDiskRuleRSEExpr = "(tier=2|tier=1)&cms_type=real&rse_type=DISK" # this RSEExpr below might be updated by wmagent-mod-config script config.RucioInjector.containerDiskRuleRSEExpr = "(tier=2|tier=1)&cms_type=real&rse_type=DISK" config.RucioInjector.rucioAccount = "OVER_WRITE_BY_SECRETS" diff --git a/src/python/WMComponent/RucioInjector/RucioInjectorPoller.py b/src/python/WMComponent/RucioInjector/RucioInjectorPoller.py index b63e25a89a..0d767fb9b9 100644 --- a/src/python/WMComponent/RucioInjector/RucioInjectorPoller.py +++ b/src/python/WMComponent/RucioInjector/RucioInjectorPoller.py @@ -64,8 +64,10 @@ def __init__(self, config): self.pollRules = config.RucioInjector.pollIntervalRules self.lastRulesExecTime = 0 self.createBlockRules = config.RucioInjector.createBlockRules + self.createContainerRules = getattr(config.RucioInjector, "createContainerRules", True) self.containerDiskRuleParams = config.RucioInjector.containerDiskRuleParams self.blockRuleParams = config.RucioInjector.blockRuleParams + self.blockDiskRuleRSEExpr = getattr(config.RucioInjector, "blockDiskRuleRSEExpr", None) self.containerDiskRuleRSEExpr = config.RucioInjector.containerDiskRuleRSEExpr if config.RucioInjector.metaDIDProject not in RUCIO_VALID_PROJECT: msg = "Component configured with an invalid 'project' DID: %s" @@ -118,6 +120,7 @@ def __init__(self, config): self.getDeletableWorkflows = None logging.info("Component configured to create block rules: %s", self.createBlockRules) + logging.info("Component configured to create container rules: %s", self.createContainerRules) def setup(self, parameters): """ @@ -248,9 +251,20 @@ def insertBlocks(self, uninjectedData): def insertBlockRules(self): """ - Creates a simple replication rule for every single block that - is under production in a given site/RSE. - Also persist the rule ID in the database. + Creates a replication rule for every single block that is under production. + For non-T0 agents with blockDiskRuleRSEExpr configured, uses a broader RSE + expression with copies=1 to pin the existing origin-T2 replica without + replicating to a second site. + For T0 agents (or when blockDiskRuleRSEExpr is not set), pins to the specific + origin T2 RSE with copies=1. + Also persists the rule ID in the database. + + Note: if blockRuleParams copies is set to more than 1, Rucio will replicate + the block to additional RSEs matching blockDiskRuleRSEExpr to satisfy the + extra copies, creating new block replicas at other sites beyond the origin T2. + During the disk or tape transfer phase, MSRuleCleaner expires these block rules + block by block as soon as all files in each block are confirmed safe in all + wmcore_output rules, rather than waiting for the entire workflow to complete. """ if not self.createBlockRules: return @@ -265,10 +279,17 @@ def insertBlockRules(self): logging.warning("Block: %s not yet in Rucio. Retrying later..", item['blockname']) continue kwargs = dict(activity="Production Output", account=self.rucioAcct, - grouping="DATASET", comment="WMAgent automatic container rule", + grouping="DATASET", comment="WMAgent automatic block rule", ignore_availability=True, meta=self.metaData) - rseName = "%s_Test" % item['pnn'] if self.testRSEs else item['pnn'] - # DATASET = replicates all files in the same block to the same RSE + # Non-T0 agents with blockDiskRuleRSEExpr: use broader expression so Rucio + # keeps the existing origin-T2 replica as copy-1 and adds a second copy elsewhere. + if not self.isT0agent and self.blockDiskRuleRSEExpr: + rseName = self.blockDiskRuleRSEExpr + if self.testRSEs: + rseName = rseName.replace("cms_type=real", "cms_type=test") + else: + rseName = "%s_Test" % item['pnn'] if self.testRSEs else item['pnn'] + # DATASET grouping: all files in the same block go to the same RSE (per copy) kwargs.update(self.blockRuleParams) resp = self.rucio.createReplicationRule(item['blockname'], rseExpression=rseName, **kwargs) @@ -479,6 +500,10 @@ def insertContainerRules(self): * T0 Tape is created as defined, with a special rule activity for Tape * T0 Disk is created as defined, with a special rule activity for Disk/Export """ + if not self.isT0agent and not self.createContainerRules: + logging.info("Container rule creation disabled for non-T0 agent. Skipping.") + return + logging.info("Starting insertContainerRules method") ruleComment = "WMAgent automatic container rule" diff --git a/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py b/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py index f1ab977790..3aa4e54980 100644 --- a/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py +++ b/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py @@ -310,16 +310,16 @@ def _dispatchWflow(self, wflow): msg += " Will retry again in the next cycle." self.logger.info(msg) self._checkStatusAdvanceExpired(wflow, additionalInfo=msg) - elif wflow['RequestStatus'] == 'announced' and not wflow['TransferTape']: - # Given that we are still waiting for the tape transfers to be fulfilled, - # we can go ahead and start cleaning up the input data. - msg = "Skipping workflow: %s - tape transfers are not yet completed." % wflow['RequestName'] - msg += "Workflow in 'announced' state, hence proceeding only with MSTransferor / input data removal. " + elif wflow['RequestStatus'] == 'announced' and ((not wflow['TransferDisk']) or (not wflow['TransferTape'])): + # Given that we are still waiting for the disk or tape transfers to be fulfilled, + # we can go ahead and start cleaning up the input data and output data blocks in wma_prod that have all files in OK state in disk or tape rules. + msg = "Workflow: %s - disk or tape transfers are not yet completed. " % wflow['RequestName'] + msg += "Workflow in 'announced' state, hence proceeding only with MSTransferor / input data removal and block rule cleanup in wma_prod. " msg += "Will retry the remaining in the next cycle." self.logger.info(msg) self._checkStatusAdvanceExpired(wflow, additionalInfo=msg) for pline in self.mstrlines: - try: + try: pline.run(wflow) except Exception as ex: msg = f"{pline.name}: General error from pipeline" @@ -328,10 +328,19 @@ def _dispatchWflow(self, wflow): msg += "\nWill retry again in the next cycle." self.logger.exception(msg) continue - # return now to avoid exeecuting the archival pipeline + # Free origin T2 disk block by block: expire the wma_prod block rule for each + # block whose files are confirmed safe (OK locks) in all wmcore_output rules, + # without waiting for all tape transfers to finish before starting cleanup. + try: + self.cleanOutputBlockRules(wflow) + except Exception as ex: + msg = "cleanOutputBlockRules: error for workflow: %s. Error: %s. Will retry in the next cycle." + self.logger.exception(msg, wflow['RequestName'], str(ex)) + # return now to avoid executing the archival pipeline return elif wflow['RequestStatus'] in ['announced', 'rejected', 'aborted-completed']: - # Workflows reaching this block are ready for the full pipeline execution + # announced: TransferDone=True, TransferTape=True, TransferDisk=True — safe to clean all + # rejected/aborted-completed: no transfer flags required for pline in self.cleanuplines: try: pline.run(wflow) @@ -642,6 +651,40 @@ def getMSOutputTransferInfo(self, wflow): # Set Transfer status - information fetched from MSOutput only wflow['TransferDone'] = True + # Collect all wmcore_output rule IDs (disk + tape) per dataset. + # A file must be OK in ALL of these rules to be eligible for block rule cleanup. + for mapRecord in transferInfo['OutputMap']: + ruleIds = [] + if mapRecord.get('DiskRuleID'): + ruleIds.append(mapRecord['DiskRuleID']) + if mapRecord.get('TapeRuleID'): + ruleIds.append(mapRecord['TapeRuleID']) + if ruleIds: + wflow['WmcOutputRulesMap'][mapRecord['Dataset']] = ruleIds + + # Set Disk rules status — TransferDisk=True when all wmcore_output disk rules are OK. + # This confirms data has physically arrived at the final disk destination and it is + # safe to expire wma_prod block rules (tape can still pull from the disk destination). + diskRulesStatusList = [] + for mapRecord in transferInfo['OutputMap']: + if not mapRecord.get('DiskRuleID'): + continue + rucioRule = self.rucio.getRule(mapRecord['DiskRuleID']) + if not rucioRule: + msg = "Disk rule: %s not found for workflow: %s. Possible server side error." + self.logger.error(msg, mapRecord['DiskRuleID'], wflow['RequestName']) + rucioRule = {'state': 'Missing'} + if rucioRule['state'] == 'OK': + diskRulesStatusList.append(True) + msg = "Disk rule: %s in final state: %s for workflow: %s" + self.logger.info(msg, mapRecord['DiskRuleID'], rucioRule['state'], wflow['RequestName']) + else: + diskRulesStatusList.append(False) + msg = "Disk rule: %s in non final state: %s for workflow: %s" + self.logger.info(msg, mapRecord['DiskRuleID'], rucioRule['state'], wflow['RequestName']) + if all(diskRulesStatusList): + wflow['TransferDisk'] = True + # Set Tape rules status - information fetched from Rucio (tape rule ids from MSOutput) # For setting 'TransferTape' = True we require either no tape rules for the # workflow have been created or all existing tape rules to be in status 'OK', @@ -778,6 +821,95 @@ def cleanRucioRules(self, wflow): wflow['CleanupStatus'][currPline] = all(delResults) return wflow + def _getOkFilesFromRule(self, ruleId): + """ + Return the set of file LFNs where ALL replica locks are in state OK + for the given rule id (no REPLICATING or STUCK copies remain). + :param ruleId: string with the Rucio rule id + :return: set of file LFN strings + """ + from collections import defaultdict + lockStates = defaultdict(lambda: {'OK': set(), 'REPLICATING': set(), 'STUCK': set()}) + for lock in self.rucio.listReplicaLocks(ruleId): + lockState = lock['state'] if lock['state'] in ('OK', 'REPLICATING', 'STUCK') else 'REPLICATING' + lockStates[lock['name']][lockState].add(lock['rse']) + okFiles = set() + for fname, states in lockStates.items(): + if states['OK'] and not states['REPLICATING'] and not states['STUCK']: + okFiles.add(fname) + return okFiles + + def cleanOutputBlockRules(self, wflow): + """ + During the disk or tape transfer phase (TransferDone=True, TransferDisk=False or + TransferTape=False), free origin T2 disk space block by block. A wma_prod block rule + is expired (lifetime=0) as soon as ALL files in that block have OK locks in ALL + wmcore_output rules (DiskRuleID and TapeRuleID when present) — intersection across + all wmcore_output rules. This allows early cleanup of completed blocks without + waiting for all disk or tape transfers across the entire workflow to finish. + + With wma_prod container rules disabled and block rules carrying copies=1, expiring a + block rule releases the single origin-T2 replica once the data is safely placed by + MSOutput. If copies > 1, all replicas created by the block rule are released. + + :param wflow: A MSRuleCleaner workflow representation + :return: the workflow object + """ + wmaAcct = self.msConfig['rucioWmaAccount'] + + for container in wflow['OutputDatasets']: + # --- Get all wmcore_output rule IDs for this container (disk + tape) --- + wmcRuleIds = wflow['WmcOutputRulesMap'].get(container) + if not wmcRuleIds: + self.logger.info("cleanOutputBlockRules: no wmcore_output rules for %s, skipping", container) + continue + + # --- A file is safe only if OK in ALL wmcore_output rules --- + commonOkFiles = None + for ruleId in wmcRuleIds: + okFilesForRule = self._getOkFilesFromRule(ruleId) + self.logger.info("cleanOutputBlockRules: %s — OK in wmcore_output rule %s: %d", + container, ruleId, len(okFilesForRule)) + if commonOkFiles is None: + commonOkFiles = okFilesForRule + else: + commonOkFiles &= okFilesForRule + + if not commonOkFiles: + self.logger.info("cleanOutputBlockRules: no common OK files yet for %s, skipping", container) + continue + + self.logger.info("cleanOutputBlockRules: %s — common OK files across all wmcore_output rules: %d", + container, len(commonOkFiles)) + + # --- Check each block: expire wma_prod block rule if all files are in common OK set --- + try: + blocks = self.rucio.getBlocksInContainer(container) + except WMRucioDIDNotFoundException: + self.logger.info("cleanOutputBlockRules: container %s not found in Rucio, skipping", container) + continue + + for block in blocks: + blockFiles = {f['name'] for f in self.rucio.listContent(block)} + if not blockFiles: + continue + if not blockFiles.issubset(commonOkFiles): + self.logger.info("cleanOutputBlockRules: block %s not fully OK (%d/%d files), skipping", + block, len(blockFiles & commonOkFiles), len(blockFiles)) + continue + + # All files in block are OK in all wmcore_output rules — expire wma_prod block rule + for blockRule in self.rucio.listDataRules(block, account=wmaAcct): + if self.msConfig['enableRealMode']: + self.logger.info("cleanOutputBlockRules: setting lifetime=0 on block rule %s for %s", + blockRule['id'], block) + self.rucio.updateRule(blockRule['id'], {"lifetime": 0}) + else: + self.logger.info("cleanOutputBlockRules: DRY-RUN: would set lifetime=0 on block rule %s for %s", + blockRule['id'], block) + + return wflow + def getRequestRecords(self, reqStatus): """ Queries ReqMgr2 for requests in a given status. @@ -807,6 +939,7 @@ def alertStatusAdvanceExpired(self, wflow): alertDescription += "\nWorkflow: \n{}".format(pformat({wfKey: wflow[wfKey] for wfKey in ['RequestName', 'ParentageResolved', 'TransferDone', + 'TransferDisk', 'TransferTape', 'TapeRulesStatus']})) alertDescription += "\nHas exceeded the Status Advance Timeout of: {} hours".format(self.msConfig['archiveAlarmHours']) diff --git a/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleanerWflow.py b/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleanerWflow.py index b12203c1e0..c40caf139e 100644 --- a/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleanerWflow.py +++ b/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleanerWflow.py @@ -181,6 +181,7 @@ def docSchema(self): 'plineAgentCont': False, 'plineAgentBlock': False}, "TransferDone": False # information - returned by the MSOutput REST call. + "TransferDisk": False # information - fetched by Rucio about disk rules completion "TransferTape": False # information - fetched by Rucio about tape rules completion "TapeRulesStatus": [('36805b823062415c8ee60300b0e60378', 'OK', '/AToZHToLLTTbar_MA-1900_MH-1200_TuneCP5_13TeV-amcatnlo-pythia8/RunIISummer20UL16RECO-106X_mcRun2_asymptotic_v13-v2/AODSIM'), ('5b75fb7503524449b0f304ea0e52f0de', 'STUCK', '/AToZHToLLTTbar_MA-1900_MH-1200_TuneCP5_13TeV-amcatnlo-pythia8/RunIISummer20UL16MiniAODv2-106X_mcRun2_asymptotic_v17-v2/MINIAODSIM')] @@ -207,6 +208,7 @@ def docSchema(self): ('RulesToClean', {}, dict), ('CleanupStatus', {}, dict), ('TransferDone', False, bool), + ('TransferDisk', False, bool), ('TransferTape', False, bool), ('TapeRulesStatus', [], list), ('TargetStatus', None, (bytes, str)), @@ -220,7 +222,8 @@ def docSchema(self): ('IncludeParents', False, bool), ('InputDataset', None, (bytes, str)), ('ParentDataset', None, (bytes, str)), - ('StatusAdvanceExpiredMsg', "", str)] + ('StatusAdvanceExpiredMsg', "", str), + ('WmcOutputRulesMap', {}, dict)] # NOTE: ParentageResolved is set by default to True it will be False only if: # - RequestType is StepChain diff --git a/src/python/WMCore/Services/Rucio/Rucio.py b/src/python/WMCore/Services/Rucio/Rucio.py index f7055976fe..feeab77644 100644 --- a/src/python/WMCore/Services/Rucio/Rucio.py +++ b/src/python/WMCore/Services/Rucio/Rucio.py @@ -739,6 +739,26 @@ def deleteRule(self, ruleId, purgeReplicas=False): res = False return res + def listReplicaLocks(self, ruleId): + """ + List all file-level replica locks for a given rule id. + Returns one entry per (file, RSE) pair; for copies=N there are N entries + per file. Each entry is a dict with at least: + 'name' - the file LFN + 'rse' - the RSE name + 'state' - 'OK', 'REPLICATING', or 'STUCK' + :param ruleId: string with the rule id + :return: a list of lock dictionaries (empty on error) + """ + res = [] + try: + res = list(self.cli.list_replica_locks(ruleId)) + except RuleNotFound: + self.logger.error("listReplicaLocks: rule id %s not found", ruleId) + except Exception as ex: + self.logger.error("listReplicaLocks: failed for rule id %s. Error: %s", ruleId, str(ex)) + return res + def evaluateRSEExpression(self, rseExpr, useCache=True, returnTape=True): """ Provided an RSE expression, resolve it and return a flat list of RSEs From 75438178bf6fe23712cbb8f14a2e09b0dcbc5832 Mon Sep 17 00:00:00 2001 From: Duong Date: Wed, 17 Jun 2026 15:57:54 -0400 Subject: [PATCH 2/2] add unit tests --- .../MSRuleCleaner_t/MSRuleCleaner_t.py | 213 ++++++++++++++++++ 1 file changed, 213 insertions(+) diff --git a/test/python/WMCore_t/MicroService_t/MSRuleCleaner_t/MSRuleCleaner_t.py b/test/python/WMCore_t/MicroService_t/MSRuleCleaner_t/MSRuleCleaner_t.py index 5cfa4551c8..c1299dc072 100644 --- a/test/python/WMCore_t/MicroService_t/MSRuleCleaner_t/MSRuleCleaner_t.py +++ b/test/python/WMCore_t/MicroService_t/MSRuleCleaner_t/MSRuleCleaner_t.py @@ -129,8 +129,10 @@ def testPipelineAgentBlock(self): 'RulesToClean': {'plineAgentBlock': []}, 'TargetStatus': None, 'TransferDone': False, + 'TransferDisk': False, 'TransferTape': False, 'TapeRulesStatus': [], + 'WmcOutputRulesMap': {}, 'StatusAdvanceExpiredMsg': ""} self.assertDictEqual(wflow, expectedWflow) @@ -173,8 +175,10 @@ def testPipelineAgentCont(self): 'RulesToClean': {'plineAgentCont': []}, 'TargetStatus': None, 'TransferDone': False, + 'TransferDisk': False, 'TransferTape': False, 'TapeRulesStatus': [], + 'WmcOutputRulesMap': {}, 'StatusAdvanceExpiredMsg': ""} self.assertDictEqual(wflow, expectedWflow) @@ -219,8 +223,10 @@ def testPipelineMSTrBlock(self): 'RulesToClean': {'plineMSTrBlock': []}, 'TargetStatus': None, 'TransferDone': False, + 'TransferDisk': False, 'TransferTape': False, 'TapeRulesStatus': [], + 'WmcOutputRulesMap': {}, 'StatusAdvanceExpiredMsg': ""} self.assertDictEqual(wflow, expectedWflow) @@ -265,8 +271,10 @@ def testPipelineMSTrCont(self): 'RulesToClean': {'plineMSTrCont': []}, 'TargetStatus': None, 'TransferDone': False, + 'TransferDisk': False, 'TransferTape': False, 'TapeRulesStatus': [], + 'WmcOutputRulesMap': {}, 'StatusAdvanceExpiredMsg': ""} self.assertDictEqual(wflow, expectedWflow) @@ -323,8 +331,10 @@ def testPipelineArchive(self): 'RulesToClean': {'plineAgentBlock': [], 'plineAgentCont': []}, 'TargetStatus': 'normal-archived', 'TransferDone': False, + 'TransferDisk': False, 'TransferTape': False, 'TapeRulesStatus': [], + 'WmcOutputRulesMap': {}, 'StatusAdvanceExpiredMsg': "Not properly cleaned workflow: TaskChain_LumiMask_multiRun_HG2011_Val_201029_112735_5891"} self.assertDictEqual(wflow, expectedWflow) @@ -378,8 +388,10 @@ def testPipelineArchiveStepChain(self): 'RulesToClean': {'plineAgentBlock': [], 'plineAgentCont': []}, 'TargetStatus': 'aborted-archived', 'TransferDone': False, + 'TransferDisk': False, 'TransferTape': False, 'TapeRulesStatus': [], + 'WmcOutputRulesMap': {}, 'StatusAdvanceExpiredMsg': ("Not properly cleaned workflow: StepChain_Tasks_HG2011_Val_201029_112731_6371" " - 'ParentageResolved' flag set to false.\n" "Not properly cleaned workflow: StepChain_Tasks_HG2011_Val_201029_112731_6371\n" @@ -428,6 +440,207 @@ def testCheckClean(self): 'PlineMarkers': ['plineAgentBlock', 'plineAgentCont']} self.assertFalse(self.msRuleCleaner._checkClean(wflowFlags)) + # ----------------------------------------------------------------------- + # Tests for cleanOutputBlockRules — block-by-block wma_prod rule expiry + # ----------------------------------------------------------------------- + + def _makeWflow(self, outputDatasets, wmcOutputRulesMap): + """Build a minimal wflow dict for cleanOutputBlockRules tests.""" + return { + 'OutputDatasets': outputDatasets, + 'WmcOutputRulesMap': wmcOutputRulesMap, + } + + def _setupRucioMock(self, getOkFilesMap, blocksInContainer, blockFiles, blockRules): + """ + Configure self.msRuleCleaner.rucio mock for cleanOutputBlockRules. + + :param getOkFilesMap: dict {ruleId: set of OK file names} + :param blocksInContainer: dict {container: [block names]} + :param blockFiles: dict {block: [{'name': filename}, ...]} + :param blockRules: dict {block: [{'id': ruleId}, ...]} + """ + self.msRuleCleaner._getOkFilesFromRule = lambda ruleId: getOkFilesMap.get(ruleId, set()) + self.msRuleCleaner.rucio.getBlocksInContainer.side_effect = \ + lambda container: blocksInContainer.get(container, []) + self.msRuleCleaner.rucio.listContent.side_effect = \ + lambda block: blockFiles.get(block, []) + self.msRuleCleaner.rucio.listDataRules.side_effect = \ + lambda block, account: blockRules.get(block, []) + + def testCleanOutputBlockRulesAllFilesOk(self): + """ + Block rule is expired when all files are OK in all wmcore_output rules. + Scenario: disk rule and tape rule both have file1 + file2 OK. + Block contains exactly file1 + file2 → fully covered → updateRule called. + """ + self.msRuleCleaner.msConfig['enableRealMode'] = True + container = '/Dataset/Processed/TIER' + block = container + '#block1' + + self._setupRucioMock( + getOkFilesMap={ + 'disk-rule-1': {'file1', 'file2'}, + 'tape-rule-1': {'file1', 'file2'}, + }, + blocksInContainer={container: [block]}, + blockFiles={block: [{'name': 'file1'}, {'name': 'file2'}]}, + blockRules={block: [{'id': 'wma-block-rule-1'}]}, + ) + + wflow = self._makeWflow( + outputDatasets=[container], + wmcOutputRulesMap={container: ['disk-rule-1', 'tape-rule-1']}, + ) + self.msRuleCleaner.cleanOutputBlockRules(wflow) + + self.msRuleCleaner.rucio.updateRule.assert_called_once_with( + 'wma-block-rule-1', {'lifetime': 0} + ) + + def testCleanOutputBlockRulesPartialBlockOk(self): + """ + Block rule is NOT expired when only some files are OK. + Scenario: block has file1 + file2 + file3, but only file1 + file2 are OK. + """ + self.msRuleCleaner.msConfig['enableRealMode'] = True + container = '/Dataset/Processed/TIER' + block = container + '#block1' + + self._setupRucioMock( + getOkFilesMap={'disk-rule-1': {'file1', 'file2'}}, + blocksInContainer={container: [block]}, + blockFiles={block: [{'name': 'file1'}, {'name': 'file2'}, {'name': 'file3'}]}, + blockRules={block: [{'id': 'wma-block-rule-1'}]}, + ) + + wflow = self._makeWflow( + outputDatasets=[container], + wmcOutputRulesMap={container: ['disk-rule-1']}, + ) + self.msRuleCleaner.cleanOutputBlockRules(wflow) + + self.msRuleCleaner.rucio.updateRule.assert_not_called() + + def testCleanOutputBlockRulesNoCommonOkFiles(self): + """ + Nothing is cleaned when the intersection of OK files across rules is empty. + Scenario: disk rule has file1 + file2 OK, tape rule has no files OK yet. + """ + self.msRuleCleaner.msConfig['enableRealMode'] = True + container = '/Dataset/Processed/TIER' + block = container + '#block1' + + self._setupRucioMock( + getOkFilesMap={ + 'disk-rule-1': {'file1', 'file2'}, + 'tape-rule-1': set(), # tape not done yet + }, + blocksInContainer={container: [block]}, + blockFiles={block: [{'name': 'file1'}, {'name': 'file2'}]}, + blockRules={block: [{'id': 'wma-block-rule-1'}]}, + ) + + wflow = self._makeWflow( + outputDatasets=[container], + wmcOutputRulesMap={container: ['disk-rule-1', 'tape-rule-1']}, + ) + self.msRuleCleaner.cleanOutputBlockRules(wflow) + + self.msRuleCleaner.rucio.updateRule.assert_not_called() + + def testCleanOutputBlockRulesMultipleBlocks(self): + """ + Only fully-OK blocks are cleaned; partially-OK blocks are skipped. + Scenario: block1 fully OK → expired. block2 partial → skipped. + """ + self.msRuleCleaner.msConfig['enableRealMode'] = True + container = '/Dataset/Processed/TIER' + block1 = container + '#block1' + block2 = container + '#block2' + + self._setupRucioMock( + getOkFilesMap={'disk-rule-1': {'file1', 'file2', 'file3'}}, + blocksInContainer={container: [block1, block2]}, + blockFiles={ + block1: [{'name': 'file1'}, {'name': 'file2'}], # fully OK + block2: [{'name': 'file2'}, {'name': 'file3'}, {'name': 'file4'}], # file4 missing + }, + blockRules={ + block1: [{'id': 'wma-block-rule-1'}], + block2: [{'id': 'wma-block-rule-2'}], + }, + ) + + wflow = self._makeWflow( + outputDatasets=[container], + wmcOutputRulesMap={container: ['disk-rule-1']}, + ) + self.msRuleCleaner.cleanOutputBlockRules(wflow) + + self.msRuleCleaner.rucio.updateRule.assert_called_once_with( + 'wma-block-rule-1', {'lifetime': 0} + ) + + def testCleanOutputBlockRulesDryRun(self): + """ + updateRule is NOT called when enableRealMode=False, even if block is fully OK. + """ + self.msRuleCleaner.msConfig['enableRealMode'] = False + container = '/Dataset/Processed/TIER' + block = container + '#block1' + + self._setupRucioMock( + getOkFilesMap={'disk-rule-1': {'file1', 'file2'}}, + blocksInContainer={container: [block]}, + blockFiles={block: [{'name': 'file1'}, {'name': 'file2'}]}, + blockRules={block: [{'id': 'wma-block-rule-1'}]}, + ) + + wflow = self._makeWflow( + outputDatasets=[container], + wmcOutputRulesMap={container: ['disk-rule-1']}, + ) + self.msRuleCleaner.cleanOutputBlockRules(wflow) + + self.msRuleCleaner.rucio.updateRule.assert_not_called() + + def testCleanOutputBlockRulesContainerNotInRucio(self): + """ + WMRucioDIDNotFoundException is caught gracefully — no crash, no updateRule. + """ + from WMCore.Services.Rucio.Rucio import WMRucioDIDNotFoundException + self.msRuleCleaner.msConfig['enableRealMode'] = True + container = '/Dataset/Processed/TIER' + + self.msRuleCleaner._getOkFilesFromRule = lambda ruleId: {'file1'} + self.msRuleCleaner.rucio.getBlocksInContainer.side_effect = \ + WMRucioDIDNotFoundException("container not found") + + wflow = self._makeWflow( + outputDatasets=[container], + wmcOutputRulesMap={container: ['disk-rule-1']}, + ) + self.msRuleCleaner.cleanOutputBlockRules(wflow) # must not raise + + self.msRuleCleaner.rucio.updateRule.assert_not_called() + + def testCleanOutputBlockRulesNoRulesForContainer(self): + """ + Container missing from WmcOutputRulesMap is silently skipped. + """ + self.msRuleCleaner.msConfig['enableRealMode'] = True + container = '/Dataset/Processed/TIER' + + wflow = self._makeWflow( + outputDatasets=[container], + wmcOutputRulesMap={}, # no entry for this container + ) + self.msRuleCleaner.cleanOutputBlockRules(wflow) # must not raise + + self.msRuleCleaner.rucio.getBlocksInContainer.assert_not_called() + self.msRuleCleaner.rucio.updateRule.assert_not_called() + if __name__ == '__main__': unittest.main()