Advanced Schedd Interaction

Launch this tutorial in a Jupyter Notebook on Binder: Binder

The introductory tutorial only scratches the surface of what the Python bindings can do with the condor_schedd; this module focuses on covering a wider range of functionality:

  • Job and history querying.

  • Advanced job submission.

  • Python-based negotiation with the Schedd.

As usual, we start by importing the relevant modules:

[1]:
import htcondor
import classad

Job and History Querying

In HTCondor Introduction, we covered the Schedd.xquery method and its two most important keywords:

  • requirements: Filters the jobs the schedd should return.

  • projection: Filters the attributes returned for each job.

For those familiar with SQL queries, requirements performs the equivalent as the WHERE clause while projection performs the equivalent of the column listing in SELECT.

There are two other keywords worth mentioning:

  • limit: Limits the number of returned ads; equivalent to SQL’s LIMIT.

  • opts: Additional flags to send to the schedd to alter query behavior. The only flag currently defined is QueryOpts.AutoCluster; this groups the returned results by the current set of “auto-cluster” attributes used by the pool. It’s analogous to GROUP BY in SQL, except the columns used for grouping are controlled by the schedd.

To illustrate these additional keywords, let’s first submit a few jobs:

[2]:
schedd = htcondor.Schedd()
sub = htcondor.Submit({
    "executable": "/bin/sleep",
    "arguments": "5m",
    "hold": "True",
})
with schedd.transaction() as txn:
    cluster_id = sub.queue(txn, 10)
print(cluster_id)
7

Note: In this example, we used the hold submit command to indicate that the jobs should start out in the condor_schedd in the Hold state; this is used simply to prevent the jobs from running to completion while you are running the tutorial.

We now have 10 jobs running under cluster_id; they should all be identical:

[3]:
print(len(schedd.query(projection=["ProcID"], constraint=f"ClusterId=={cluster_id}")))
10

The sum(1 for _ in ...) syntax is a simple way to count the number of items produced by an iterator without buffering all the objects in memory.

Querying many Schedds

On larger pools, it’s common to write Python scripts that interact with not one but many schedds. For example, if you want to implement a “global query” (equivalent to condor_q -g; concatenates all jobs in all schedds), it might be tempting to write code like this:

[4]:
jobs = []
for schedd_ad in htcondor.Collector().locateAll(htcondor.DaemonTypes.Schedd):
    schedd = htcondor.Schedd(schedd_ad)
    jobs += schedd.xquery()
print(len(jobs))
10

This is sub-optimal for two reasons:

  • xquery is not given any projection, meaning it will pull all attributes for all jobs - much more data than is needed for simply counting jobs.

  • The querying across all schedds is serialized: we may wait for painfully long on one or two “bad apples.”

We can instead begin the query for all schedds simultaneously, then read the responses as they are sent back. First, we start all the queries without reading responses:

[5]:
queries = []
coll_query = htcondor.Collector().locateAll(htcondor.DaemonTypes.Schedd)
for schedd_ad in coll_query:
    schedd_obj = htcondor.Schedd(schedd_ad)
    queries.append(schedd_obj.xquery())

The iterators will yield the matching jobs; to return the autoclusters instead of jobs, use the AutoCluster option (schedd_obj.xquery(opts=htcondor.QueryOpts.AutoCluster)). One auto-cluster ad is returned for each set of jobs that have identical values for all significant attributes. A sample auto-cluster looks like:

[
 RequestDisk = DiskUsage;
 Rank = 0.0;
 FileSystemDomain = "hcc-briantest7.unl.edu";
 MemoryUsage = ( ( ResidentSetSize + 1023 ) / 1024 );
 ImageSize = 1000;
 JobUniverse = 5;
 DiskUsage = 1000;
 JobCount = 1;
 Requirements = ( TARGET.Arch == "X86_64" ) && ( TARGET.OpSys == "LINUX" ) && ( TARGET.Disk >= RequestDisk ) && ( TARGET.Memory >= RequestMemory ) && ( ( TARGET.HasFileTransfer ) || ( TARGET.FileSystemDomain == MY.FileSystemDomain ) );
 RequestMemory = ifthenelse(MemoryUsage isnt undefined,MemoryUsage,( ImageSize + 1023 ) / 1024);
 ResidentSetSize = 0;
 ServerTime = 1483758177;
 AutoClusterId = 2
]

We use the poll function, which will return when a query has available results:

[6]:
job_counts = {}
for query in htcondor.poll(queries):
    schedd_name = query.tag()
    job_counts.setdefault(schedd_name, 0)
    count = len(query.nextAdsNonBlocking())
    job_counts[schedd_name] += count
    print("Got {} results from {}.".format(count, schedd_name))
print(job_counts)
Got 10 results from jovyan@4726328b203e.
{'jovyan@4726328b203e': 10}

The QueryIterator.tag method is used to identify which query is returned; the tag defaults to the Schedd’s name but can be manually set through the tag keyword argument to Schedd.xquery.

History Queries

After a job has finished in the Schedd, it moves from the queue to the history file. The history can be queried (locally or remotely) with the Schedd.history method:

[7]:
schedd = htcondor.Schedd()
for ad in schedd.history(
    constraint='true',
    projection=['ProcId', 'ClusterId', 'JobStatus'],
    match=2,  # limit to 2 returned results
):
    print(ad)

    [
        JobStatus = 4;
        ProcId = 0;
        ClusterId = 1
    ]

    [
        JobStatus = 4;
        ProcId = 0;
        ClusterId = 6
    ]