Example: DAGMan Completes When Some Percentage of Nodes Succeed
Sometimes it is desired to have a DAG set up as a bag of nodes that completes after a certain percentage of successfully executed nodes. The following example adds a POST SCRIPT to all nodes in the DAG to calculate the percent of nodes that finished successfully that informs the DAG to exit when the specified threshold is achieved.
The following Python script uses the HTCondor python bindings to get the DAG_NodesTotal and DAG_NodesDone from the workflows manager job from the local condor_schedd queue.
#!/usr/bin/env python3
# Example Script Provided by:
# https://htcondor.readthedocs.io/en/latest/auto-redirect.html?category=example&tag=bagman-percent-done
import sys
try:
import htcondor
except ImportError:
error("Failed to import htcondor python bindings", 1)
def error(msg: str, code: int):
"""Print error message and exit with specified exit code"""
print(f"ERROR: {msg}", file=sys.stderr)
exit(code)
def parse_args():
"""Parse command line arguments"""
if len(sys.argv) != 3:
error("Missing argument(s) Job Id and/or Job return code", 1)
# Parse this nodes Job ID
try:
ID = int(sys.argv[1].split(".")[0])
except ValueError:
error(f"Failed to convert Job Id ({sys.argv[1]}) to integer", 1)
# Parse this nodes exit code to preserve node success/failure based on job exit
try:
CODE = int(sys.argv[2])
except ValueError:
error(f"Failed to convert Job exit code ({sys.argv[2]}) to integer", 1)
return (ID, CODE)
def get_job_ad(job_id: int, exit_code: int):
"""Query and return the parent DAGMan proper job ad"""
DAG_ATTRS = ["DAG_NodesTotal", "DAG_NodesDone"]
found = False
schedd = htcondor.Schedd()
# Get workflow Job ID from this a job history ad for this node
for ad in schedd.history(f"ClusterId=={job_id}", ["DAGManJobId"], match=1):
if "DAGManJobId" in ad:
found = True
DAG_ID = int(ad["DAGManJobId"])
if not found:
error(f"Failed to query job ad for cluster {job_id}", exit_code)
# Query workflow information from DAGMan proper job
ads = schedd.query(f"ClusterId=={DAG_ID}", DAG_ATTRS)
if len(ads) != 1:
error(f"Failed to query DAGMan job ad for ID:{DAG_ID}", exit_code)
dag_ad = ads[0]
for attr in DAG_ATTRS:
if attr not in dag_ad:
error(f"DAGMan job ad is missing '{attr}' attribute", exit_code)
return dag_ad
def main():
# Threshold to exit if 75% of DAG is complete
THRESHOLD = 0.75
job_id, exit_code = parse_args()
ad = get_job_ad(job_id, exit_code)
# If this node job was successful then add 1 to count of completed nodes
num_done = int(ad["DAG_NodesDone"]) + int(exit_code == 0)
print(f"Nodes Successfully Completed: {num_done}")
p_done = float(num_done / int(ad["DAG_NodesTotal"]))
print(f"DAG: {p_done}%")
# If threshold is passed then exit with specific code (124) to inform DAGMan to exit
if p_done >= THRESHOLD:
print(f"DAG is {p_done}% done!")
sys.exit(124)
sys.exit(exit_code)
if __name__ == "__main__":
main()
The above Python script can then be applied in the DAG file in coordination with the ABORT-DAG-ON command to inform DAGMan to exit successfully when the specified threshold is achieved as follows:
# Bag of 1000 nodes
# Example DAG Provided By:
# https://htcondor.readthedocs.io/en/latest/auto-redirect.html?category=example&tag=bagman-percent-done
SCRIPT POST ALL_NODES dag_percent.py $JOBID $RETURN
ABORT-DAG-ON ALL_NODES 124 RETURN 0
JOB Node_1 awesome-science.sub
JOB Node_2 awesome-science.sub
...
JOB Node_999 awesome-science.sub
JOB Node_1000 awesome-science.sub
Note
ALL_NODES
will apply the POST Script and abort DAG semantics onto every node
declared in the DAG.