Advanced Job Submission and Management

Launch this tutorial in a Jupyter Notebook on Binder: Binder

The two most common HTCondor command line tools are condor_q and condor_submit. In the previous module, we learned about the xquery() method that corresponds to condor_q. Here, we will learn the Python binding equivalent of condor_submit in greater detail.

We start by importing the relevant modules:

[1]:
import htcondor

Submitting Jobs

We will submit jobs utilizing the dedicated Submit object.

Submit objects consist of key-value pairs. Unlike ClassAds, the values do not have an inherent type (such as strings, integers, or booleans); they are evaluated with macro expansion at submit time. Where reasonable, they behave like Python dictionaries:

[2]:
sub = htcondor.Submit({"foo": "1", "bar": "2", "baz": "$(foo)"})
print(sub)
foo = 1
bar = 2
baz = $(foo)

[3]:
sub["qux"] = 3
print(sub)
foo = 1
bar = 2
baz = $(foo)
qux = 3

[4]:
print(sub.expand("baz"))
1

The available attributes and their semantics are documented in the condor_submit manual, sowe won’t repeat them here. A minimal realistic submit object may look like the following:

[5]:
sub = htcondor.Submit({
    "executable": "/bin/sleep",
    "arguments": "5m"
})

To go from a submit object to job in a schedd, one must do three things:

  1. Create a new transaction in the schedd using transaction().

  2. Call the queue() method, passing the transaction object.

  3. Commit the transaction.

Since the transaction object is a Python context, (1) and (3) can be achieved using Python’s with statement:

[6]:
schedd = htcondor.Schedd()         # create a schedd object connected to the local schedd
with schedd.transaction() as txn:
    cluster_id = sub.queue(txn)    # queue one job in the current transaction; get back the submission's cluster ID

    print(cluster_id)
16

If the code block inside the with statement completes successfully, the transaction is automatically committed. If an exception is thrown (or Python abruptly exits), the transaction is aborted.

By default, each invocation of queue will submit a single job. A more common use case is to submit many jobs at once - often identical. Suppose we don’t want to submit a single “sleep” job, but 10; instead of writing a for-loop around the queue method, we can use the count argument:

[7]:
with schedd.transaction() as txn:          # start a new transaction
    cluster_id = sub.queue(txn, count=10)  # submit 10 identical jobs

print(cluster_id)
17

We can now query for those jobs in the queue:

[8]:
schedd.query(
    constraint='ClusterId =?= {}'.format(cluster_id),
    projection=["ClusterId", "ProcId", "JobStatus", "EnteredCurrentStatus"],
)
[8]:
[[ ClusterId = 17; ProcId = 0; EnteredCurrentStatus = 1606229405; JobStatus = 1; ServerTime = 1606229405 ],
 [ ClusterId = 17; ProcId = 1; EnteredCurrentStatus = 1606229405; JobStatus = 1; ServerTime = 1606229405 ],
 [ ClusterId = 17; ProcId = 2; EnteredCurrentStatus = 1606229405; JobStatus = 1; ServerTime = 1606229405 ],
 [ ClusterId = 17; ProcId = 3; EnteredCurrentStatus = 1606229405; JobStatus = 1; ServerTime = 1606229405 ],
 [ ClusterId = 17; ProcId = 4; EnteredCurrentStatus = 1606229405; JobStatus = 1; ServerTime = 1606229405 ],
 [ ClusterId = 17; ProcId = 5; EnteredCurrentStatus = 1606229405; JobStatus = 1; ServerTime = 1606229405 ],
 [ ClusterId = 17; ProcId = 6; EnteredCurrentStatus = 1606229405; JobStatus = 1; ServerTime = 1606229405 ],
 [ ClusterId = 17; ProcId = 7; EnteredCurrentStatus = 1606229405; JobStatus = 1; ServerTime = 1606229405 ],
 [ ClusterId = 17; ProcId = 8; EnteredCurrentStatus = 1606229405; JobStatus = 1; ServerTime = 1606229405 ],
 [ ClusterId = 17; ProcId = 9; EnteredCurrentStatus = 1606229405; JobStatus = 1; ServerTime = 1606229405 ]]

It’s not entirely useful to submit many identical jobs – but rather each one needs to vary slightly based on its ID (the “process ID”) within the job cluster. For this, the Submit object in Python behaves similarly to submit files: references within the submit command are evaluated as macros at submit time.

For example, suppose we want the argument to sleep to vary based on the process ID:

[9]:
sub = htcondor.Submit({"executable": "/bin/sleep", "arguments": "$(Process)s"})

Here, the $(Process) string will be substituted with the process ID at submit time.

[10]:
with schedd.transaction() as txn:         # Start a new transaction
    cluster_id = sub.queue(txn, count=10) # Submit 10 identical jobs

print(cluster_id)

schedd.query(
    constraint='ClusterId=?={}'.format(cluster_id),
    projection=["ClusterId", "ProcId", "JobStatus", "Args"],
)
18
[10]:
[[ Args = "0s"; ClusterId = 18; ProcId = 0; JobStatus = 1; ServerTime = 1606229405 ],
 [ Args = "1s"; ClusterId = 18; ProcId = 1; JobStatus = 1; ServerTime = 1606229405 ],
 [ Args = "2s"; ClusterId = 18; ProcId = 2; JobStatus = 1; ServerTime = 1606229405 ],
 [ Args = "3s"; ClusterId = 18; ProcId = 3; JobStatus = 1; ServerTime = 1606229405 ],
 [ Args = "4s"; ClusterId = 18; ProcId = 4; JobStatus = 1; ServerTime = 1606229405 ],
 [ Args = "5s"; ClusterId = 18; ProcId = 5; JobStatus = 1; ServerTime = 1606229405 ],
 [ Args = "6s"; ClusterId = 18; ProcId = 6; JobStatus = 1; ServerTime = 1606229405 ],
 [ Args = "7s"; ClusterId = 18; ProcId = 7; JobStatus = 1; ServerTime = 1606229405 ],
 [ Args = "8s"; ClusterId = 18; ProcId = 8; JobStatus = 1; ServerTime = 1606229405 ],
 [ Args = "9s"; ClusterId = 18; ProcId = 9; JobStatus = 1; ServerTime = 1606229405 ]]

The macro evaluation behavior (and the various usable tricks and techniques) are identical between the python bindings and the condor_submit executable.

Managing Jobs

Once a job is in queue, the schedd will try its best to execute it to completion. There are several cases where a user may want to interrupt the normal flow of jobs. Perhaps the results are no longer needed; perhaps the job needs to be edited to correct a submission error. These actions fall under the purview of job management.

There are two Schedd methods dedicated to job management:

  • edit(): Change an attribute for a set of jobs to a given expression. If invoked within a transaction, multiple calls to edit are visible atomically.

  • The set of jobs to change can be given as a ClassAd expression. If no jobs match the filter, then an exception is thrown.

  • act(): Change the state of a job to a given state (remove, hold, suspend, etc).

Both methods take a job specification: either a ClassAd expression (such as Owner =?= "janedoe") or a list of job IDs (such as ["1.1", "2.2", "2.3"]). The act method takes an argument from the JobAction enum. The commonly-used values are:

  • Hold: put a job on hold, vacating a running job if necessary. A job will stay in the hold state until explicitly acted upon by the admin or owner.

  • Release: Release a job from the hold state, returning it to Idle.

  • Remove: Remove a job from the Schedd’s queue, cleaning it up first on the remote host (if running). This requires the remote host to acknowledge it has successfully vacated the job, meaning Remove may not be instantaneous.

  • Vacate: Cause a running job to be killed on the remote resource and return to idle state. With Vacate, jobs may be given significant time to cleanly shut down.

Here’s an example of job management in action:

[11]:
with schedd.transaction() as txn:
    cluster_id = sub.queue(txn, 5)  # queues 5 copies of this job
    schedd.edit([f"{cluster_id}.{idx}" for idx in range(2)], "foo", '"bar"')    # sets attribute foo to the string "bar" for the first two jobs

for ad in schedd.xquery(
    constraint=f"ClusterId == {cluster_id}",
    projection=["ProcId", "JobStatus", "foo"],
):
    print(repr(ad))
[ ServerTime = 1606229405; ProcId = 0; JobStatus = 1; foo = "bar" ]
[ ServerTime = 1606229405; ProcId = 1; JobStatus = 1; foo = "bar" ]
[ ServerTime = 1606229405; ProcId = 2; JobStatus = 1 ]
[ ServerTime = 1606229405; ProcId = 3; JobStatus = 1 ]
[ ServerTime = 1606229405; ProcId = 4; JobStatus = 1 ]
[12]:
schedd.act(htcondor.JobAction.Hold, f"ClusterId == {cluster_id} && ProcId >= 2")

for ad in schedd.xquery(
    constraint=f"ClusterId == {cluster_id}",
    projection=["ProcId", "JobStatus", "foo"],
):
    print(repr(ad))
[ ServerTime = 1606229405; ProcId = 0; JobStatus = 1; foo = "bar" ]
[ ServerTime = 1606229405; ProcId = 1; JobStatus = 1; foo = "bar" ]
[ ServerTime = 1606229405; ProcId = 2; JobStatus = 5 ]
[ ServerTime = 1606229405; ProcId = 3; JobStatus = 5 ]
[ ServerTime = 1606229405; ProcId = 4; JobStatus = 5 ]

Finally, let’s clean up after ourselves (this will remove all of the jobs you own from the queue).

[13]:
import getpass

schedd.act(htcondor.JobAction.Remove, f'Owner == "{getpass.getuser()}"')
[13]:
[ TotalJobAds = 12; TotalPermissionDenied = 0; TotalAlreadyDone = 0; TotalNotFound = 0; TotalSuccess = 26; TotalChangedAds = 1; TotalBadStatus = 0; TotalError = 0 ]

That’s It!

You’ve made it through the very basics of the Python bindings. While there are many other features the Python module has to offer, we have covered enough to replace the command line tools of condor_q, condor_submit, condor_status, condor_rm and others.