Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bin/wmagent-mod-config
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,10 @@ def modifyConfiguration(config, **args):
config.RucioInjector.rucioAccount = args["rucio_account"]
config.RucioInjector.rucioUrl = args["rucio_host"]
config.RucioInjector.rucioAuthUrl = args["rucio_auth"]
# define a different expression for container rule replication in testbed
# define a different expression for block/container rule replication in testbed
if "-int.cern.ch" in args["rucio_host"]:
config.RucioInjector.containerDiskRuleRSEExpr = "(tier=2|tier=1)&cms_type=int&rse_type=DISK"
config.RucioInjector.blockDiskRuleRSEExpr = "(tier=2|tier=1)&cms_type=int&rse_type=DISK"

# custom AgentStatusWatcher
if hasattr(config, "AgentStatusWatcher"):
Expand Down
6 changes: 5 additions & 1 deletion etc/WMAgentConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,14 @@
config.RucioInjector.pollIntervalRules = 43200
config.RucioInjector.cacheExpiration = 2 * 24 * 60 * 60 # two days
config.RucioInjector.createBlockRules = True
config.RucioInjector.createContainerRules = False
config.RucioInjector.RSEPostfix = False # enable it to append _Test to the RSE names
config.RucioInjector.metaDIDProject = "Production"
config.RucioInjector.containerDiskRuleParams = {"weight": "dm_weight", "copies": 2, "grouping": "DATASET"}
config.RucioInjector.blockRuleParams = {}
config.RucioInjector.blockRuleParams = {"copies": 1}
# blockDiskRuleRSEExpr: broader expression used for non-T0 block rules.
# With copies=1, Rucio pins the existing origin-T2 replica without replicating elsewhere.
config.RucioInjector.blockDiskRuleRSEExpr = "(tier=2|tier=1)&cms_type=real&rse_type=DISK"
# this RSEExpr below might be updated by wmagent-mod-config script
config.RucioInjector.containerDiskRuleRSEExpr = "(tier=2|tier=1)&cms_type=real&rse_type=DISK"
config.RucioInjector.rucioAccount = "OVER_WRITE_BY_SECRETS"
Expand Down
37 changes: 31 additions & 6 deletions src/python/WMComponent/RucioInjector/RucioInjectorPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ def __init__(self, config):
self.pollRules = config.RucioInjector.pollIntervalRules
self.lastRulesExecTime = 0
self.createBlockRules = config.RucioInjector.createBlockRules
self.createContainerRules = getattr(config.RucioInjector, "createContainerRules", True)
self.containerDiskRuleParams = config.RucioInjector.containerDiskRuleParams
self.blockRuleParams = config.RucioInjector.blockRuleParams
self.blockDiskRuleRSEExpr = getattr(config.RucioInjector, "blockDiskRuleRSEExpr", None)
self.containerDiskRuleRSEExpr = config.RucioInjector.containerDiskRuleRSEExpr
if config.RucioInjector.metaDIDProject not in RUCIO_VALID_PROJECT:
msg = "Component configured with an invalid 'project' DID: %s"
Expand Down Expand Up @@ -118,6 +120,7 @@ def __init__(self, config):
self.getDeletableWorkflows = None

logging.info("Component configured to create block rules: %s", self.createBlockRules)
logging.info("Component configured to create container rules: %s", self.createContainerRules)

def setup(self, parameters):
"""
Expand Down Expand Up @@ -248,9 +251,20 @@ def insertBlocks(self, uninjectedData):

def insertBlockRules(self):
"""
Creates a simple replication rule for every single block that
is under production in a given site/RSE.
Also persist the rule ID in the database.
Creates a replication rule for every single block that is under production.
For non-T0 agents with blockDiskRuleRSEExpr configured, uses a broader RSE
expression with copies=1 to pin the existing origin-T2 replica without
replicating to a second site.
For T0 agents (or when blockDiskRuleRSEExpr is not set), pins to the specific
origin T2 RSE with copies=1.
Also persists the rule ID in the database.

Note: if blockRuleParams copies is set to more than 1, Rucio will replicate
the block to additional RSEs matching blockDiskRuleRSEExpr to satisfy the
extra copies, creating new block replicas at other sites beyond the origin T2.
During the disk or tape transfer phase, MSRuleCleaner expires these block rules
block by block as soon as all files in each block are confirmed safe in all
wmcore_output rules, rather than waiting for the entire workflow to complete.
"""
if not self.createBlockRules:
return
Expand All @@ -265,10 +279,17 @@ def insertBlockRules(self):
logging.warning("Block: %s not yet in Rucio. Retrying later..", item['blockname'])
continue
kwargs = dict(activity="Production Output", account=self.rucioAcct,
grouping="DATASET", comment="WMAgent automatic container rule",
grouping="DATASET", comment="WMAgent automatic block rule",
ignore_availability=True, meta=self.metaData)
rseName = "%s_Test" % item['pnn'] if self.testRSEs else item['pnn']
# DATASET = replicates all files in the same block to the same RSE
# Non-T0 agents with blockDiskRuleRSEExpr: use broader expression so Rucio
# keeps the existing origin-T2 replica as copy-1 and adds a second copy elsewhere.
if not self.isT0agent and self.blockDiskRuleRSEExpr:
rseName = self.blockDiskRuleRSEExpr
if self.testRSEs:
rseName = rseName.replace("cms_type=real", "cms_type=test")
else:
rseName = "%s_Test" % item['pnn'] if self.testRSEs else item['pnn']
# DATASET grouping: all files in the same block go to the same RSE (per copy)
kwargs.update(self.blockRuleParams)
resp = self.rucio.createReplicationRule(item['blockname'],
rseExpression=rseName, **kwargs)
Expand Down Expand Up @@ -479,6 +500,10 @@ def insertContainerRules(self):
* T0 Tape is created as defined, with a special rule activity for Tape
* T0 Disk is created as defined, with a special rule activity for Disk/Export
"""
if not self.isT0agent and not self.createContainerRules:
logging.info("Container rule creation disabled for non-T0 agent. Skipping.")
return

logging.info("Starting insertContainerRules method")

ruleComment = "WMAgent automatic container rule"
Expand Down
149 changes: 141 additions & 8 deletions src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,16 +310,16 @@ def _dispatchWflow(self, wflow):
msg += " Will retry again in the next cycle."
self.logger.info(msg)
self._checkStatusAdvanceExpired(wflow, additionalInfo=msg)
elif wflow['RequestStatus'] == 'announced' and not wflow['TransferTape']:
# Given that we are still waiting for the tape transfers to be fulfilled,
# we can go ahead and start cleaning up the input data.
msg = "Skipping workflow: %s - tape transfers are not yet completed." % wflow['RequestName']
msg += "Workflow in 'announced' state, hence proceeding only with MSTransferor / input data removal. "
elif wflow['RequestStatus'] == 'announced' and ((not wflow['TransferDisk']) or (not wflow['TransferTape'])):
# Given that we are still waiting for the disk or tape transfers to be fulfilled,
# we can go ahead and start cleaning up the input data and output data blocks in wma_prod that have all files in OK state in disk or tape rules.
msg = "Workflow: %s - disk or tape transfers are not yet completed. " % wflow['RequestName']
msg += "Workflow in 'announced' state, hence proceeding only with MSTransferor / input data removal and block rule cleanup in wma_prod. "
msg += "Will retry the remaining in the next cycle."
self.logger.info(msg)
self._checkStatusAdvanceExpired(wflow, additionalInfo=msg)
for pline in self.mstrlines:
try:
try:
pline.run(wflow)
except Exception as ex:
msg = f"{pline.name}: General error from pipeline"
Expand All @@ -328,10 +328,19 @@ def _dispatchWflow(self, wflow):
msg += "\nWill retry again in the next cycle."
self.logger.exception(msg)
continue
# return now to avoid exeecuting the archival pipeline
# Free origin T2 disk block by block: expire the wma_prod block rule for each
# block whose files are confirmed safe (OK locks) in all wmcore_output rules,
# without waiting for all tape transfers to finish before starting cleanup.
try:
self.cleanOutputBlockRules(wflow)
except Exception as ex:
msg = "cleanOutputBlockRules: error for workflow: %s. Error: %s. Will retry in the next cycle."
self.logger.exception(msg, wflow['RequestName'], str(ex))
# return now to avoid executing the archival pipeline
return
elif wflow['RequestStatus'] in ['announced', 'rejected', 'aborted-completed']:
# Workflows reaching this block are ready for the full pipeline execution
# announced: TransferDone=True, TransferTape=True, TransferDisk=True — safe to clean all
# rejected/aborted-completed: no transfer flags required
for pline in self.cleanuplines:
try:
pline.run(wflow)
Expand Down Expand Up @@ -642,6 +651,40 @@ def getMSOutputTransferInfo(self, wflow):
# Set Transfer status - information fetched from MSOutput only
wflow['TransferDone'] = True

# Collect all wmcore_output rule IDs (disk + tape) per dataset.
# A file must be OK in ALL of these rules to be eligible for block rule cleanup.
for mapRecord in transferInfo['OutputMap']:
ruleIds = []
if mapRecord.get('DiskRuleID'):
ruleIds.append(mapRecord['DiskRuleID'])
if mapRecord.get('TapeRuleID'):
ruleIds.append(mapRecord['TapeRuleID'])
if ruleIds:
wflow['WmcOutputRulesMap'][mapRecord['Dataset']] = ruleIds

# Set Disk rules status — TransferDisk=True when all wmcore_output disk rules are OK.
# This confirms data has physically arrived at the final disk destination and it is
# safe to expire wma_prod block rules (tape can still pull from the disk destination).
diskRulesStatusList = []
for mapRecord in transferInfo['OutputMap']:
if not mapRecord.get('DiskRuleID'):
continue
rucioRule = self.rucio.getRule(mapRecord['DiskRuleID'])
if not rucioRule:
msg = "Disk rule: %s not found for workflow: %s. Possible server side error."
self.logger.error(msg, mapRecord['DiskRuleID'], wflow['RequestName'])
rucioRule = {'state': 'Missing'}
if rucioRule['state'] == 'OK':
diskRulesStatusList.append(True)
msg = "Disk rule: %s in final state: %s for workflow: %s"
self.logger.info(msg, mapRecord['DiskRuleID'], rucioRule['state'], wflow['RequestName'])
else:
diskRulesStatusList.append(False)
msg = "Disk rule: %s in non final state: %s for workflow: %s"
self.logger.info(msg, mapRecord['DiskRuleID'], rucioRule['state'], wflow['RequestName'])
if all(diskRulesStatusList):
wflow['TransferDisk'] = True

# Set Tape rules status - information fetched from Rucio (tape rule ids from MSOutput)
# For setting 'TransferTape' = True we require either no tape rules for the
# workflow have been created or all existing tape rules to be in status 'OK',
Expand Down Expand Up @@ -778,6 +821,95 @@ def cleanRucioRules(self, wflow):
wflow['CleanupStatus'][currPline] = all(delResults)
return wflow

def _getOkFilesFromRule(self, ruleId):
"""
Return the set of file LFNs where ALL replica locks are in state OK
for the given rule id (no REPLICATING or STUCK copies remain).
:param ruleId: string with the Rucio rule id
:return: set of file LFN strings
"""
from collections import defaultdict
lockStates = defaultdict(lambda: {'OK': set(), 'REPLICATING': set(), 'STUCK': set()})
for lock in self.rucio.listReplicaLocks(ruleId):
lockState = lock['state'] if lock['state'] in ('OK', 'REPLICATING', 'STUCK') else 'REPLICATING'
lockStates[lock['name']][lockState].add(lock['rse'])
okFiles = set()
for fname, states in lockStates.items():
if states['OK'] and not states['REPLICATING'] and not states['STUCK']:
okFiles.add(fname)
return okFiles

def cleanOutputBlockRules(self, wflow):
"""
During the disk or tape transfer phase (TransferDone=True, TransferDisk=False or
TransferTape=False), free origin T2 disk space block by block. A wma_prod block rule
is expired (lifetime=0) as soon as ALL files in that block have OK locks in ALL
wmcore_output rules (DiskRuleID and TapeRuleID when present) — intersection across
all wmcore_output rules. This allows early cleanup of completed blocks without
waiting for all disk or tape transfers across the entire workflow to finish.

With wma_prod container rules disabled and block rules carrying copies=1, expiring a
block rule releases the single origin-T2 replica once the data is safely placed by
MSOutput. If copies > 1, all replicas created by the block rule are released.

:param wflow: A MSRuleCleaner workflow representation
:return: the workflow object
"""
wmaAcct = self.msConfig['rucioWmaAccount']

for container in wflow['OutputDatasets']:
# --- Get all wmcore_output rule IDs for this container (disk + tape) ---
wmcRuleIds = wflow['WmcOutputRulesMap'].get(container)
if not wmcRuleIds:
self.logger.info("cleanOutputBlockRules: no wmcore_output rules for %s, skipping", container)
continue

# --- A file is safe only if OK in ALL wmcore_output rules ---
commonOkFiles = None
for ruleId in wmcRuleIds:
okFilesForRule = self._getOkFilesFromRule(ruleId)
self.logger.info("cleanOutputBlockRules: %s — OK in wmcore_output rule %s: %d",
container, ruleId, len(okFilesForRule))
if commonOkFiles is None:
commonOkFiles = okFilesForRule
else:
commonOkFiles &= okFilesForRule

if not commonOkFiles:
self.logger.info("cleanOutputBlockRules: no common OK files yet for %s, skipping", container)
continue

self.logger.info("cleanOutputBlockRules: %s — common OK files across all wmcore_output rules: %d",
container, len(commonOkFiles))

# --- Check each block: expire wma_prod block rule if all files are in common OK set ---
try:
blocks = self.rucio.getBlocksInContainer(container)
except WMRucioDIDNotFoundException:
self.logger.info("cleanOutputBlockRules: container %s not found in Rucio, skipping", container)
continue

for block in blocks:
blockFiles = {f['name'] for f in self.rucio.listContent(block)}
if not blockFiles:
continue
if not blockFiles.issubset(commonOkFiles):
self.logger.info("cleanOutputBlockRules: block %s not fully OK (%d/%d files), skipping",
block, len(blockFiles & commonOkFiles), len(blockFiles))
continue

# All files in block are OK in all wmcore_output rules — expire wma_prod block rule
for blockRule in self.rucio.listDataRules(block, account=wmaAcct):
if self.msConfig['enableRealMode']:
self.logger.info("cleanOutputBlockRules: setting lifetime=0 on block rule %s for %s",
blockRule['id'], block)
self.rucio.updateRule(blockRule['id'], {"lifetime": 0})
else:
self.logger.info("cleanOutputBlockRules: DRY-RUN: would set lifetime=0 on block rule %s for %s",
blockRule['id'], block)

return wflow

def getRequestRecords(self, reqStatus):
"""
Queries ReqMgr2 for requests in a given status.
Expand Down Expand Up @@ -807,6 +939,7 @@ def alertStatusAdvanceExpired(self, wflow):
alertDescription += "\nWorkflow: \n{}".format(pformat({wfKey: wflow[wfKey] for wfKey in ['RequestName',
'ParentageResolved',
'TransferDone',
'TransferDisk',
'TransferTape',
'TapeRulesStatus']}))
alertDescription += "\nHas exceeded the Status Advance Timeout of: {} hours".format(self.msConfig['archiveAlarmHours'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def docSchema(self):
'plineAgentCont': False,
'plineAgentBlock': False},
"TransferDone": False # information - returned by the MSOutput REST call.
"TransferDisk": False # information - fetched by Rucio about disk rules completion
"TransferTape": False # information - fetched by Rucio about tape rules completion
"TapeRulesStatus": [('36805b823062415c8ee60300b0e60378', 'OK', '/AToZHToLLTTbar_MA-1900_MH-1200_TuneCP5_13TeV-amcatnlo-pythia8/RunIISummer20UL16RECO-106X_mcRun2_asymptotic_v13-v2/AODSIM'),
('5b75fb7503524449b0f304ea0e52f0de', 'STUCK', '/AToZHToLLTTbar_MA-1900_MH-1200_TuneCP5_13TeV-amcatnlo-pythia8/RunIISummer20UL16MiniAODv2-106X_mcRun2_asymptotic_v17-v2/MINIAODSIM')]
Expand All @@ -207,6 +208,7 @@ def docSchema(self):
('RulesToClean', {}, dict),
('CleanupStatus', {}, dict),
('TransferDone', False, bool),
('TransferDisk', False, bool),
('TransferTape', False, bool),
('TapeRulesStatus', [], list),
('TargetStatus', None, (bytes, str)),
Expand All @@ -220,7 +222,8 @@ def docSchema(self):
('IncludeParents', False, bool),
('InputDataset', None, (bytes, str)),
('ParentDataset', None, (bytes, str)),
('StatusAdvanceExpiredMsg', "", str)]
('StatusAdvanceExpiredMsg', "", str),
('WmcOutputRulesMap', {}, dict)]

# NOTE: ParentageResolved is set by default to True it will be False only if:
# - RequestType is StepChain
Expand Down
Loading