{ "cells": [ { "cell_type": "markdown", "metadata": { "pycharm": {} }, "source": [ "# Submitting and Managing Jobs\n", "\n", "Launch this tutorial in a Jupyter Notebook on Binder: \n", "[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/htcondor/htcondor-python-bindings-tutorials/master?urlpath=lab/tree/Submitting-and-Managing-Jobs.ipynb)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## What is HTCondor?\n", "\n", "An HTCondor pool provides a way for you (as a user) to submit units of work, called **jobs**, to be executed on a distributed network of computing resources.\n", "HTCondor provides tools to monitor your jobs as they run, and make certain kinds of changes to them after submission, which we call \"managing\" jobs.\n", "\n", "In this tutorial, we will learn how to submit and manage jobs *from Python*. \n", "We will see how to submit jobs with various toy executables, how to ask HTCondor for information about them, and how to tell HTCondor to do things with them.\n", "All of these things are possible from the command line as well, using tools like `condor_submit`, `condor_qedit`, and `condor_hold`.\n", "However, working from Python instead of the command line gives us access to the full power of Python to do things like generate jobs programmatically based on user input, pass information consistently from submission to management, or even expose an HTCondor pool to a web application." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We start by importing the HTCondor Python bindings modules, which provide the functions we will need to talk to HTCondor." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "pycharm": {} }, "outputs": [], "source": [ "import htcondor # for submitting jobs, querying HTCondor daemons, etc.\n", "import classad # for interacting with ClassAds, HTCondor's internal data format" ] }, { "cell_type": "markdown", "metadata": { "pycharm": {} }, "source": [ "## Submitting a Simple Job\n", "\n", "To submit a job, we must first describe it.\n", "A submit description is held in a `Submit` object.\n", "`Submit` objects consist of key-value pairs, and generally behave like Python dictionaries.\n", "If you're familiar with HTCondor's submit file syntax, you should think of each line in the submit file as a single key-value pair in the `Submit` object.\n", "\n", "Let's start by writing a `Submit` object that describes a job that executes the `hostname` command on an execute node, which prints out the \"name\" of the node.\n", "Since `hostname` prints its results to standard output (stdout), we will capture stdout and bring it back to the submit machine so we can see the name." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "pycharm": {} }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "executable = /bin/hostname\n", "output = hostname.out\n", "error = hostname.err\n", "log = hostname.log\n", "request_cpus = 1\n", "request_memory = 128MB\n", "request_disk = 128MB\n", "\n" ] } ], "source": [ "hostname_job = htcondor.Submit({\n", " \"executable\": \"/bin/hostname\", # the program to run on the execute node\n", " \"output\": \"hostname.out\", # anything the job prints to standard output will end up in this file\n", " \"error\": \"hostname.err\", # anything the job prints to standard error will end up in this file\n", " \"log\": \"hostname.log\", # this file will contain a record of what happened to the job\n", " \"request_cpus\": \"1\", # how many CPU cores we want\n", " \"request_memory\": \"128MB\", # how much memory we want\n", " \"request_disk\": \"128MB\", # how much disk space we want\n", "})\n", "\n", "print(hostname_job)" ] }, { "cell_type": "markdown", "metadata": { "pycharm": {} }, "source": [ "The available descriptors are documented in the `condor_submit` [manual page](https://htcondor.readthedocs.io/en/latest/man-pages/condor_submit.html).\n", "The keys of the Python dictionary you pass to `htcondor.Submit` should be the same as for the submit descriptors, and the values should be **strings containing exactly what would go on the right-hand side**.\n", "\n", "Note that we gave the `Submit` object several relative filepaths.\n", "These paths are relative to the directory containing this Jupyter notebook (or, more generally, the current working directory).\n", "When we run the job, you should see those files appear in the file browser on the left as HTCondor creates them.\n", "\n", "Now that we have a job description, let's submit a job.\n", "The `htcondor.Schedd.submit` method returns a `SubmitResult` object that contains information about the job, such as its `ClusterId`." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "pycharm": {} }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "12\n" ] } ], "source": [ "schedd = htcondor.Schedd() # get the Python representation of the scheduler\n", "submit_result = schedd.submit(hostname_job) # submit the job\n", "print(submit_result.cluster()) # print the job's ClusterId" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The job's `ClusterId` uniquely identifies this submission.\n", "Later in this module, we will use it to ask the HTCondor scheduler for information about our jobs.\n", "\n", "For now, our job will hopefully have finished running.\n", "You should be able to see the files in the file browser on the left.\n", "Try opening one of them and seeing what's inside.\n", "\n", "We can also look at the output from inside Python:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2a8198d0534b\n", "\n" ] } ], "source": [ "import os\n", "import time\n", "\n", "output_path = \"hostname.out\"\n", "\n", "# this is a crude way to wait for the job to finish\n", "# see the Advanced tutorial \"Scalable Job Tracking\" for better methods!\n", "while not os.path.exists(output_path):\n", " print(\"Output file doesn't exist yet; sleeping for one second\")\n", " time.sleep(1)\n", "\n", "with open(output_path, mode = \"r\") as f:\n", " print(f.read())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you got some text, it worked!\n", "\n", "If the file never shows up, it means your job didn't run.\n", "You might try looking at the `log` or `error` files specified in the submit description to see if there is any useful information in them about why the job failed." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Submitting Multiple Jobs" ] }, { "cell_type": "markdown", "metadata": { "pycharm": {} }, "source": [ "By default, each `submit` will submit a single job.\n", "A more common use case is to submit many jobs at once, often sharing some base submit description.\n", "Let's write a new submit description which runs `sleep`.\n", "\n", "When we have multiple **jobs** in a single **cluster**, each job will be identified not just by its **ClusterId** but also by a **ProcID**.\n", "We can use the ProcID to separate the output and error files for each individual job.\n", "Anything that looks like `$(...)` in a submit description is a **macro**, a placeholder which will be \"expanded\" later by HTCondor into a real value for that particular job.\n", "The ProcID expands to a series of incrementing integers, starting at 0.\n", "So the first job in a cluster will have ProcID 0, the next will have ProcID 1, etc." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "executable = /bin/sleep\n", "arguments = 10s\n", "output = sleep-$(ProcId).out\n", "error = sleep-$(ProcId).err\n", "log = sleep.log\n", "request_cpus = 1\n", "request_memory = 128MB\n", "request_disk = 128MB\n", "\n" ] } ], "source": [ "sleep_job = htcondor.Submit({\n", " \"executable\": \"/bin/sleep\", \n", " \"arguments\": \"10s\", # sleep for 10 seconds\n", " \"output\": \"sleep-$(ProcId).out\", # output and error for each job, using the $(ProcId) macro\n", " \"error\": \"sleep-$(ProcId).err\", \n", " \"log\": \"sleep.log\", # we still send all of the HTCondor logs for every job to the same file (not split up!)\n", " \"request_cpus\": \"1\", \n", " \"request_memory\": \"128MB\", \n", " \"request_disk\": \"128MB\", \n", "})\n", "\n", "print(sleep_job)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will submit 10 of these jobs.\n", "All we need to change from our previous `submit` call is to add the `count` keyword argument." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "13\n" ] } ], "source": [ "schedd = htcondor.Schedd() \n", "submit_result = schedd.submit(sleep_job, count=10) # submit 10 jobs\n", "\n", "print(submit_result.cluster())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have a bunch of jobs in flight, we might want to check how they're doing.\n", "We can ask the HTCondor scheduler about jobs by using its `query` method.\n", "We give it a **constraint**, which tells it which jobs to look for, and a **projection**, which tells it what information to return." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[[ ProcId = 0; Out = \"sleep-0.out\"; ClusterId = 13; ServerTime = 1695159772 ],\n", " [ ProcId = 1; Out = \"sleep-1.out\"; ClusterId = 13; ServerTime = 1695159772 ],\n", " [ ProcId = 2; Out = \"sleep-2.out\"; ClusterId = 13; ServerTime = 1695159772 ],\n", " [ ProcId = 3; Out = \"sleep-3.out\"; ClusterId = 13; ServerTime = 1695159772 ],\n", " [ ProcId = 4; Out = \"sleep-4.out\"; ClusterId = 13; ServerTime = 1695159772 ],\n", " [ ProcId = 5; Out = \"sleep-5.out\"; ClusterId = 13; ServerTime = 1695159772 ],\n", " [ ProcId = 6; Out = \"sleep-6.out\"; ClusterId = 13; ServerTime = 1695159772 ],\n", " [ ProcId = 7; Out = \"sleep-7.out\"; ClusterId = 13; ServerTime = 1695159772 ],\n", " [ ProcId = 8; Out = \"sleep-8.out\"; ClusterId = 13; ServerTime = 1695159772 ],\n", " [ ProcId = 9; Out = \"sleep-9.out\"; ClusterId = 13; ServerTime = 1695159772 ]]" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "schedd.query(\n", " constraint=f\"ClusterId == {submit_result.cluster()}\",\n", " projection=[\"ClusterId\", \"ProcId\", \"Out\"],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There are a few things to notice here:\n", "- Depending on how long it took you to run the cell, you may only get a few of your 10 jobs in the query. Jobs that have finished **leave the queue**, and will no longer show up in queries. To see those jobs, you must use the `history` method instead, which behaves like `query`, but **only** looks at jobs that have left the queue.\n", "- The results may not have come back in ProcID-sorted order. If you want to guarantee the order of the results, you must do so yourself.\n", "- Attributes are often renamed between the submit description and the actual job description in the queue. See [the manual](https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html) for a description of the job attribute names.\n", "- The objects returned by the query are instances of `ClassAd`. ClassAds are the common data exchange format used by HTCondor. In Python, they mostly behave like dictionaries." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Using Itemdata to Vary Over Parameters\n", "\n", "By varying some part of the submit description using the ProcID, we can change how each individual job behaves.\n", "Perhaps it will use a different input file, or a different argument.\n", "However, we often want more flexibility than that.\n", "Perhaps our input files are named after different cities, or by timestamp, or some other naming scheme that already exists.\n", "\n", "To use such information in the submit description, we need to use **itemdata**.\n", "Itemdata lets us pass arbitrary extra information when we queue, which we can reference with macros inside the submit description.\n", "This lets use the full power of Python to generate the submit descriptions for our jobs.\n", "\n", "Let's mock this situation out by generating some files with randomly-chosen names.\n", "We'll also switch to using `pathlib.Path`, Python's more modern file path manipulation library." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "from pathlib import Path\n", "import random\n", "import string\n", "import shutil\n", "\n", "def random_string(length):\n", " \"\"\"Produce a random lowercase ASCII string with the given length.\"\"\"\n", " return \"\".join(random.choices(string.ascii_lowercase, k = length))\n", "\n", "# make a directory to hold the input files, clearing away any existing directory\n", "input_dir = Path.cwd() / \"inputs\"\n", "shutil.rmtree(input_dir, ignore_errors = True)\n", "input_dir.mkdir()\n", "\n", "# make 5 input files\n", "for idx in range(5):\n", " rs = random_string(5)\n", " input_file = input_dir / \"{}.txt\".format(rs)\n", " input_file.write_text(\"Hello from job {}\".format(rs))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we'll get a list of all the files we just created in the input directory.\n", "This is precisely the kind of situation where Python affords us a great deal of flexibility over a submit file: we can use Python instead of the HTCondor submit language to generate and inspect the information we're going to put into the submit description." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "/home/jovyan/tutorials/inputs/kaffh.txt\n", "/home/jovyan/tutorials/inputs/glmws.txt\n", "/home/jovyan/tutorials/inputs/qrxxw.txt\n", "/home/jovyan/tutorials/inputs/acxvc.txt\n", "/home/jovyan/tutorials/inputs/efnkm.txt\n" ] } ], "source": [ "input_files = list(input_dir.glob(\"*.txt\"))\n", "\n", "for path in input_files:\n", " print(path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we'll make our submit description.\n", "Our goal is just to print out the text held in each file, which we can do using `cat`.\n", "\n", "We will tell HTCondor to transfer the input file to the execute location by including it in `transfer_input_files`.\n", "We also need to call `cat` on the right file via `arguments`. \n", "Keep in mind that HTCondor will move the files in `transfer_input_files` directly to the scratch directory on the execute machine, so instead of the full path, we just need the file's \"name\", the last component of its path.\n", "`pathlib` will make it easy to extract this information." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "executable = /bin/cat\n", "arguments = $(input_file_name)\n", "transfer_input_files = $(input_file)\n", "should_transfer_files = yes\n", "output = cat-$(ProcId).out\n", "error = cat-$(ProcId).err\n", "log = cat.log\n", "request_cpus = 1\n", "request_memory = 128MB\n", "request_disk = 128MB\n", "\n" ] } ], "source": [ "cat_job = htcondor.Submit({\n", " \"executable\": \"/bin/cat\", \n", " \"arguments\": \"$(input_file_name)\", # we will pass in the value for this macro via itemdata\n", " \"transfer_input_files\": \"$(input_file)\", # we also need HTCondor to move the file to the execute node\n", " \"should_transfer_files\": \"yes\", # force HTCondor to transfer files even though we're running entirely inside a container (and it normally wouldn't need to)\n", " \"output\": \"cat-$(ProcId).out\", \n", " \"error\": \"cat-$(ProcId).err\", \n", " \"log\": \"cat.log\", \n", " \"request_cpus\": \"1\", \n", " \"request_memory\": \"128MB\", \n", " \"request_disk\": \"128MB\", \n", "})\n", "\n", "print(cat_job)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The itemdata should be passed as a list of dictionaries, where the keys are the macro names to replace in the submit description.\n", "In our case, the keys are `input_file` and `input_file_name`, so should have a list of 10 dictionaries, each with two entries.\n", "HTCondor expects the input file list to be a comma-separated list of POSIX-style paths, so we explicitly convert our `Path` to a POSIX string." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'input_file': '/home/jovyan/tutorials/inputs/kaffh.txt', 'input_file_name': 'kaffh.txt'}\n", "{'input_file': '/home/jovyan/tutorials/inputs/glmws.txt', 'input_file_name': 'glmws.txt'}\n", "{'input_file': '/home/jovyan/tutorials/inputs/qrxxw.txt', 'input_file_name': 'qrxxw.txt'}\n", "{'input_file': '/home/jovyan/tutorials/inputs/acxvc.txt', 'input_file_name': 'acxvc.txt'}\n", "{'input_file': '/home/jovyan/tutorials/inputs/efnkm.txt', 'input_file_name': 'efnkm.txt'}\n" ] } ], "source": [ "itemdata = [{\"input_file\": path.as_posix(), \"input_file_name\": path.name} for path in input_files]\n", "\n", "for item in itemdata:\n", " print(item)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we'll submit the jobs, adding the `itemdata` parameter to the `submit` call:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "14\n" ] } ], "source": [ "schedd = htcondor.Schedd()\n", "submit_result = schedd.submit(cat_job, itemdata = iter(itemdata)) # submit one job for each item in the itemdata\n", "\n", "print(submit_result.cluster())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's do a query to make sure we got the itemdata right (these jobs run fast, so you might need to re-run the jobs if your first run has already left the queue):" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[[ ProcId = 0; Args = \"kaffh.txt\"; Out = \"cat-0.out\"; ClusterId = 14; TransferInput = \"/home/jovyan/tutorials/inputs/kaffh.txt\"; ServerTime = 1695159772 ],\n", " [ ProcId = 1; Args = \"glmws.txt\"; Out = \"cat-1.out\"; ClusterId = 14; TransferInput = \"/home/jovyan/tutorials/inputs/glmws.txt\"; ServerTime = 1695159772 ],\n", " [ ProcId = 2; Args = \"qrxxw.txt\"; Out = \"cat-2.out\"; ClusterId = 14; TransferInput = \"/home/jovyan/tutorials/inputs/qrxxw.txt\"; ServerTime = 1695159772 ],\n", " [ ProcId = 3; Args = \"acxvc.txt\"; Out = \"cat-3.out\"; ClusterId = 14; TransferInput = \"/home/jovyan/tutorials/inputs/acxvc.txt\"; ServerTime = 1695159772 ],\n", " [ ProcId = 4; Args = \"efnkm.txt\"; Out = \"cat-4.out\"; ClusterId = 14; TransferInput = \"/home/jovyan/tutorials/inputs/efnkm.txt\"; ServerTime = 1695159772 ]]" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "schedd.query(\n", " constraint=f\"ClusterId == {submit_result.cluster()}\",\n", " projection=[\"ClusterId\", \"ProcId\", \"Out\", \"Args\", \"TransferInput\"],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And let's take a look at all the output:" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "/home/jovyan/tutorials/cat-0.out -> Hello from job dlbgw\n", "/home/jovyan/tutorials/cat-3.out -> Hello from job mvgmu\n", "/home/jovyan/tutorials/cat-1.out -> Hello from job pyidg\n", "/home/jovyan/tutorials/cat-2.out -> Hello from job sxpny\n", "/home/jovyan/tutorials/cat-4.out -> Hello from job dumud\n" ] } ], "source": [ "# again, this is very crude - see the advanced tutorials!\n", "while not len(list(Path.cwd().glob(\"cat-*.out\"))) == len(itemdata):\n", " print(\"Not all output files exist yet; sleeping for one second\")\n", " time.sleep(1)\n", "\n", "for output_file in Path.cwd().glob(\"cat-*.out\"):\n", " print(output_file, \"->\", output_file.read_text())" ] }, { "cell_type": "markdown", "metadata": { "pycharm": {} }, "source": [ "## Managing Jobs\n", "\n", "Once a job is in queue, the scheduler will try its best to execute it to completion. \n", "There are several cases where you may want to interrupt the normal flow of jobs. \n", "Perhaps the results are no longer needed; perhaps the job needs to be edited to correct a submission error. \n", "These actions fall under the purview of **job management**.\n", "\n", "There are two `Schedd` methods dedicated to job management:\n", "\n", "* `edit()`: Change an attribute for a set of jobs.\n", "* `act()`: Change the state of a job (remove it from the queue, hold it, suspend it, etc.).\n", "\n", "The `act` method takes an argument from the `JobAction` enum.\n", "Commonly-used values include:\n", "\n", "* `Hold`: put a job on hold, vacating a running job if necessary. A job will stay in the hold\n", " state until told otherwise.\n", "* `Release`: Release a job from the hold state, returning it to Idle.\n", "* `Remove`: Remove a job from the queue. If it is running, it will stop running.\n", " This requires the execute node to acknowledge it has successfully vacated the job, so ``Remove`` may\n", " not be instantaneous.\n", "* `Vacate`: Cause a running job to be killed on the remote resource and return to the Idle state. With\n", " `Vacate`, jobs may be given significant time to cleanly shut down.\n", "\n", "To play with this, let's bring back our sleep submit description, but increase the sleep time significantly so that we have time to interact with the jobs." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "executable = /bin/sleep\n", "arguments = 10m\n", "output = sleep-$(ProcId).out\n", "error = sleep-$(ProcId).err\n", "log = sleep.log\n", "request_cpus = 1\n", "request_memory = 128MB\n", "request_disk = 128MB\n", "\n" ] } ], "source": [ "long_sleep_job = htcondor.Submit({\n", " \"executable\": \"/bin/sleep\", \n", " \"arguments\": \"10m\", # sleep for 10 minutes\n", " \"output\": \"sleep-$(ProcId).out\", \n", " \"error\": \"sleep-$(ProcId).err\", \n", " \"log\": \"sleep.log\", \n", " \"request_cpus\": \"1\", \n", " \"request_memory\": \"128MB\", \n", " \"request_disk\": \"128MB\", \n", "})\n", "\n", "print(long_sleep_job)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "pycharm": {} }, "outputs": [], "source": [ "schedd = htcondor.Schedd()\n", "submit_result = schedd.submit(long_sleep_job, count=5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As an experiment, let's set an arbitrary attribute on the jobs and check that it worked.\n", "When we're really working, we could do things like change the amount of memory a job has requested by editing its `RequestMemory` attribute.\n", "The job attributes that are built-in to HTCondor are described [here](https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html), but your site may specify additional, custom attributes as well." ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[[ ProcId = 0; foo = \"bar\"; ClusterId = 15; JobStatus = 1; ServerTime = 1695159772 ],\n", " [ ProcId = 1; foo = \"bar\"; ClusterId = 15; JobStatus = 1; ServerTime = 1695159772 ],\n", " [ ProcId = 2; foo = \"bar\"; ClusterId = 15; JobStatus = 1; ServerTime = 1695159772 ],\n", " [ ProcId = 3; foo = \"bar\"; ClusterId = 15; JobStatus = 1; ServerTime = 1695159772 ],\n", " [ ProcId = 4; foo = \"bar\"; ClusterId = 15; JobStatus = 1; ServerTime = 1695159772 ]]" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# sets attribute foo to the string \"bar\" for all of our jobs\n", "# note the nested quotes around bar! The outer \"\" make it a Python string; the inner \"\" make it a ClassAd string.\n", "schedd.edit(f\"ClusterId == {submit_result.cluster()}\", \"foo\", \"\\\"bar\\\"\")\n", "\n", "# do a query to check the value of attribute foo\n", "schedd.query(\n", " constraint=f\"ClusterId == {submit_result.cluster()}\",\n", " projection=[\"ClusterId\", \"ProcId\", \"JobStatus\", \"foo\"],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Although the job status appears to be an attribute, we cannot `edit` it directly.\n", "As mentioned above, we must instead `act` on the job.\n", "Let's hold the first two jobs so that they stop running, but leave the others going." ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "ProcID = 0 has JobStatus = 5\n", "ProcID = 1 has JobStatus = 5\n", "ProcID = 2 has JobStatus = 1\n", "ProcID = 3 has JobStatus = 1\n", "ProcID = 4 has JobStatus = 1\n" ] } ], "source": [ "# hold the first two jobs\n", "schedd.act(htcondor.JobAction.Hold, f\"ClusterId == {submit_result.cluster()} && ProcID <= 1\")\n", "\n", "# check the status of the jobs\n", "ads = schedd.query(\n", " constraint=f\"ClusterId == {submit_result.cluster()}\",\n", " projection=[\"ClusterId\", \"ProcId\", \"JobStatus\"],\n", ")\n", "\n", "for ad in ads:\n", " # the ClassAd objects returned by the query act like dictionaries, so we can extract individual values out of them using []\n", " print(f\"ProcID = {ad['ProcID']} has JobStatus = {ad['JobStatus']}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The various job statuses are represented by numbers. `1` means `Idle`, `2` means `Running`, and `5` means `Held`. If you see `JobStatus = 5` above for `ProcID = 0` and `ProcID = 1`, then we succeeded!\n", "\n", "The opposite of `JobAction.Hold` is `JobAction.Release`.\n", "Let's release those jobs and let them go back to `Idle`." ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "ProcID = 0 has JobStatus = 1\n", "ProcID = 1 has JobStatus = 1\n", "ProcID = 2 has JobStatus = 1\n", "ProcID = 3 has JobStatus = 1\n", "ProcID = 4 has JobStatus = 1\n" ] } ], "source": [ "schedd.act(htcondor.JobAction.Release, f\"ClusterId == {submit_result.cluster()}\")\n", "\n", "ads = schedd.query(\n", " constraint=f\"ClusterId == {submit_result.cluster()}\",\n", " projection=[\"ClusterId\", \"ProcId\", \"JobStatus\"],\n", ")\n", "\n", "for ad in ads:\n", " # the ClassAd objects returned by the query act like dictionaries, so we can extract individual values out of them using []\n", " print(f\"ProcID = {ad['ProcID']} has JobStatus = {ad['JobStatus']}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that we simply released all the jobs in the cluster.\n", "Releasing a job that is not held doesn't do anything, so we don't have to be extremely careful.\n", "\n", "Finally, let's clean up after ourselves:" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[ TotalChangedAds = 1; TotalJobAds = 0; TotalPermissionDenied = 0; TotalAlreadyDone = 0; TotalBadStatus = 0; TotalNotFound = 0; TotalSuccess = 5; TotalError = 0 ]" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "schedd.act(htcondor.JobAction.Remove, f\"ClusterId == {submit_result.cluster()}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exercises\n", "\n", "Now let's practice what we've learned.\n", "\n", "- In each exercise, you will be given a piece of code and a test that does not yet pass.\n", "- The exercises are vaguely in order of increasing difficulty.\n", "- Modify the code, or add new code to it, to pass the test. Do whatever it takes!\n", "- You can run the test by running the block it is in.\n", "- Feel free to look at the test for clues as to how to modify the code.\n", "- Many of the exercises can be solved either by using Python to generate inputs, or by using advanced features of the [ClassAd language](https://htcondor.readthedocs.io/en/latest/classads/classad-mechanism.html#htcondor-s-classad-mechanism). Either way is valid!\n", "- Don't modify the test. That's cheating!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Exercise 1: Incrementing Sleeps\n", "\n", "Submit five jobs which sleep for `5`, `6`, `7`, `8`, and `9` seconds, respectively." ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "# MODIFY OR ADD TO THIS BLOCK...\n", "\n", "incrementing_sleep = htcondor.Submit({\n", " \"executable\": \"/bin/sleep\", \n", " \"arguments\": \"1\",\n", " \"output\": \"ex1-$(ProcId).out\",\n", " \"error\": \"ex1-$(ProcId).err\", \n", " \"log\": \"ex1.log\",\n", " \"request_cpus\": \"1\",\n", " \"request_memory\": \"128MB\",\n", " \"request_disk\": \"128MB\",\n", "})\n", "\n", "schedd = htcondor.Schedd()\n", "submit_result = schedd.submit(incrementing_sleep)" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Expected ['5', '6', '7', '8', '9']\n", "Got ['1']\n" ] }, { "ename": "AssertionError", "evalue": "Arguments were not what we expected!", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mAssertionError\u001b[0m Traceback (most recent call last)", "\u001b[0;32m/tmp/ipykernel_404/3067880786.py\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 8\u001b[0m \u001b[0mprint\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"Got \"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0marguments\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 9\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 10\u001b[0;31m \u001b[0;32massert\u001b[0m \u001b[0marguments\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mexpected\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"Arguments were not what we expected!\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 11\u001b[0m \u001b[0mprint\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"The test passed. Good job!\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;31mAssertionError\u001b[0m: Arguments were not what we expected!" ] } ], "source": [ "# ... TO MAKE THIS TEST PASS\n", "\n", "expected = [str(i) for i in range(5, 10)]\n", "print(\"Expected \", expected)\n", "\n", "ads = schedd.query(f\"ClusterId == {submit_result.cluster()}\", projection = [\"Args\"])\n", "arguments = sorted(ad[\"Args\"] for ad in ads)\n", "print(\"Got \", arguments)\n", "\n", "assert arguments == expected, \"Arguments were not what we expected!\"\n", "print(\"The test passed. Good job!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Exercise 2: Echo to Target\n", "\n", "Run a job that makes the text `Echo to Target` appear in a file named `ex3.txt`." ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "ename": "HTCondorInternalError", "evalue": "No 'executable' parameter was provided", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mHTCondorInternalError\u001b[0m Traceback (most recent call last)", "\u001b[0;32m/tmp/ipykernel_404/2917236442.py\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 8\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 9\u001b[0m \u001b[0mschedd\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mhtcondor\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mSchedd\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 10\u001b[0;31m \u001b[0msubmit_result\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mschedd\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msubmit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mecho\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m", "\u001b[0;32m/opt/conda/lib/python3.9/site-packages/htcondor/_lock.py\u001b[0m in \u001b[0;36mwrapper\u001b[0;34m(*args, **kwargs)\u001b[0m\n\u001b[1;32m 68\u001b[0m \u001b[0macquired\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mLOCK\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0macquire\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 69\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 70\u001b[0;31m \u001b[0mrv\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 71\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 72\u001b[0m \u001b[0;31m# if the function returned a context manager,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;31mHTCondorInternalError\u001b[0m: No 'executable' parameter was provided" ] } ], "source": [ "# MODIFY OR ADD TO THIS BLOCK...\n", "\n", "echo = htcondor.Submit({\n", " \"request_cpus\": \"1\",\n", " \"request_memory\": \"128MB\",\n", " \"request_disk\": \"128MB\", \n", "})\n", "\n", "schedd = htcondor.Schedd()\n", "submit_result = schedd.submit(echo)" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "ename": "AssertionError", "evalue": "ex3.txt does not exist!", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mAssertionError\u001b[0m Traceback (most recent call last)", "\u001b[0;32m/tmp/ipykernel_404/1707749984.py\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 2\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 3\u001b[0m \u001b[0mdoes_file_exist\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mos\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mexists\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"ex3.txt\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 4\u001b[0;31m \u001b[0;32massert\u001b[0m \u001b[0mdoes_file_exist\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"ex3.txt does not exist!\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 5\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 6\u001b[0m \u001b[0mexpected\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m\"Echo to Target\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;31mAssertionError\u001b[0m: ex3.txt does not exist!" ] } ], "source": [ "# ... TO MAKE THIS TEST PASS\n", "\n", "does_file_exist = os.path.exists(\"ex3.txt\")\n", "assert does_file_exist, \"ex3.txt does not exist!\"\n", "\n", "expected = \"Echo to Target\"\n", "print(\"Expected \", expected)\n", "\n", "contents = open(\"ex3.txt\", mode = \"r\").read().strip()\n", "print(\"Got \", contents)\n", "\n", "assert expected in contents, \"Contents were not what we expected!\"\n", "\n", "print(\"The test passed. Good job!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Exercise 3: Holding Odds\n", "\n", "Hold all of the odd-numbered jobs in this large cluster.\n", "\n", "- Note that the test block **removes all of the jobs you own** when it runs, to prevent these long-running jobs from corrupting other tests!" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "# MODIFY OR ADD TO THIS BLOCK...\n", "\n", "long_sleep = htcondor.Submit({\n", " \"executable\": \"/bin/sleep\", \n", " \"arguments\": \"10m\",\n", " \"output\": \"ex2-$(ProcId).out\",\n", " \"error\": \"ex2-$(ProcId).err\", \n", " \"log\": \"ex2.log\",\n", " \"request_cpus\": \"1\",\n", " \"request_memory\": \"128MB\",\n", " \"request_disk\": \"128MB\", \n", "})\n", "\n", "schedd = htcondor.Schedd()\n", "submit_result = schedd.submit(long_sleep, count=100)" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Proc 0 has status 1\n", "Proc 1 has status 1\n", "Proc 2 has status 1\n", "Proc 3 has status 1\n", "Proc 4 has status 1\n", "Proc 5 has status 1\n", "Proc 6 has status 1\n", "Proc 7 has status 1\n", "Proc 8 has status 1\n", "Proc 9 has status 1\n", "Proc 10 has status 1\n", "Proc 11 has status 1\n", "Proc 12 has status 1\n", "Proc 13 has status 1\n", "Proc 14 has status 1\n", "Proc 15 has status 1\n", "Proc 16 has status 1\n", "Proc 17 has status 1\n", "Proc 18 has status 1\n", "Proc 19 has status 1\n", "Proc 20 has status 1\n", "Proc 21 has status 1\n", "Proc 22 has status 1\n", "Proc 23 has status 1\n", "Proc 24 has status 1\n", "Proc 25 has status 1\n", "Proc 26 has status 1\n", "Proc 27 has status 1\n", "Proc 28 has status 1\n", "Proc 29 has status 1\n", "Proc 30 has status 1\n", "Proc 31 has status 1\n", "Proc 32 has status 1\n", "Proc 33 has status 1\n", "Proc 34 has status 1\n", "Proc 35 has status 1\n", "Proc 36 has status 1\n", "Proc 37 has status 1\n", "Proc 38 has status 1\n", "Proc 39 has status 1\n", "Proc 40 has status 1\n", "Proc 41 has status 1\n", "Proc 42 has status 1\n", "Proc 43 has status 1\n", "Proc 44 has status 1\n", "Proc 45 has status 1\n", "Proc 46 has status 1\n", "Proc 47 has status 1\n", "Proc 48 has status 1\n", "Proc 49 has status 1\n", "Proc 50 has status 1\n", "Proc 51 has status 1\n", "Proc 52 has status 1\n", "Proc 53 has status 1\n", "Proc 54 has status 1\n", "Proc 55 has status 1\n", "Proc 56 has status 1\n", "Proc 57 has status 1\n", "Proc 58 has status 1\n", "Proc 59 has status 1\n", "Proc 60 has status 1\n", "Proc 61 has status 1\n", "Proc 62 has status 1\n", "Proc 63 has status 1\n", "Proc 64 has status 1\n", "Proc 65 has status 1\n", "Proc 66 has status 1\n", "Proc 67 has status 1\n", "Proc 68 has status 1\n", "Proc 69 has status 1\n", "Proc 70 has status 1\n", "Proc 71 has status 1\n", "Proc 72 has status 1\n", "Proc 73 has status 1\n", "Proc 74 has status 1\n", "Proc 75 has status 1\n", "Proc 76 has status 1\n", "Proc 77 has status 1\n", "Proc 78 has status 1\n", "Proc 79 has status 1\n", "Proc 80 has status 1\n", "Proc 81 has status 1\n", "Proc 82 has status 1\n", "Proc 83 has status 1\n", "Proc 84 has status 1\n", "Proc 85 has status 1\n", "Proc 86 has status 1\n", "Proc 87 has status 1\n", "Proc 88 has status 1\n", "Proc 89 has status 1\n", "Proc 90 has status 1\n", "Proc 91 has status 1\n", "Proc 92 has status 1\n", "Proc 93 has status 1\n", "Proc 94 has status 1\n", "Proc 95 has status 1\n", "Proc 96 has status 1\n", "Proc 97 has status 1\n", "Proc 98 has status 1\n", "Proc 99 has status 1\n" ] }, { "ename": "AssertionError", "evalue": "Not all odd jobs were held.", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mAssertionError\u001b[0m Traceback (most recent call last)", "\u001b[0;32m/tmp/ipykernel_404/4042351238.py\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 11\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 12\u001b[0m \u001b[0;32massert\u001b[0m \u001b[0mlen\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mproc_to_status\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;36m100\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"Wrong number of jobs (perhaps you need to resubmit them?).\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 13\u001b[0;31m \u001b[0;32massert\u001b[0m \u001b[0mall\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mstatus\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;36m5\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mproc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstatus\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mproc_to_status\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mitems\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mproc\u001b[0m \u001b[0;34m%\u001b[0m \u001b[0;36m2\u001b[0m \u001b[0;34m!=\u001b[0m \u001b[0;36m0\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"Not all odd jobs were held.\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 14\u001b[0m \u001b[0;32massert\u001b[0m \u001b[0mall\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mstatus\u001b[0m \u001b[0;34m!=\u001b[0m \u001b[0;36m5\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mproc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstatus\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mproc_to_status\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mitems\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mproc\u001b[0m \u001b[0;34m%\u001b[0m \u001b[0;36m2\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;36m0\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"An even job was held.\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 15\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;31mAssertionError\u001b[0m: Not all odd jobs were held." ] } ], "source": [ "# ... TO MAKE THIS TEST PASS\n", "\n", "import getpass\n", "\n", "try:\n", " ads = schedd.query(f\"ClusterId == {submit_result.cluster()}\", projection = [\"ProcID\", \"JobStatus\"])\n", " proc_to_status = {int(ad[\"ProcID\"]): ad[\"JobStatus\"] for ad in sorted(ads, key = lambda ad: ad[\"ProcID\"])}\n", "\n", " for proc, status in proc_to_status.items():\n", " print(\"Proc {} has status {}\".format(proc, status))\n", " \n", " assert len(proc_to_status) == 100, \"Wrong number of jobs (perhaps you need to resubmit them?).\"\n", " assert all(status == 5 for proc, status in proc_to_status.items() if proc % 2 != 0), \"Not all odd jobs were held.\"\n", " assert all(status != 5 for proc, status in proc_to_status.items() if proc % 2 == 0), \"An even job was held.\"\n", " \n", " print(\"The test passed. Good job!\")\n", "finally:\n", " schedd.act(htcondor.JobAction.Remove, f'Owner==\"{getpass.getuser()}\"')" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.6" } }, "nbformat": 4, "nbformat_minor": 4 }