GridEngineApp Tutorial

This follows the location_app example in the examples directory.

The Application

We are going to build a graph of Jobs, where a Job is a class that holds code to run in a UGE job on the cluster. For instance, our code could use the locations hierarchy, in which case we would build the graph as follows:

import networkx as nx
import db_queries

def location_graph(gbd_round_id, location_set_version_id):
    location_df = db_queries.get_location_metadata(
        gbd_round_id=gbd_round_id, location_set_version_id=location_set_version_id)
    G = nx.DiGraph()
    G.add_nodes_from([
        (int(row.location_id), row._asdict())
        for row in location_df.itertuples()])
    # GBD encodes the global node as having itself as a parent.
    G.add_edges_from([
        (int(row.parent_id), int(row.location_id))
        for row in location_df[location_df.location_id != 1].itertuples()])
    return G

The NetworkX Library is a convenient way to build directed acyclic graphs. It has a good NetworkX Tutorial.

The main code required to use this framework is the Application class. It has the following parts:

class GridExample:
    """The class name will be used as the base name for cluster job names."""
    def __init__(self):
        """An init that takes no arguments, because it will be
        called for the children."""
        self.location_set_version_id = None
        self.gbd_round_id = None

    def add_arguments(parser):
        """The same argument parser is used for both the initial
        call to run all the jobs and each time a job is run.
        These arguments both decide the shape of the graph and,
        later, the exact job to run within that graph."""
        parser.add_argument("--location-set-version-id", type=int,
                            default=429)
        parser.add_argument("--gbd-round-id", type=int, default=6)
        parser.add_argument("--job-idx", type=int, help="The job ID")

    def job_id_to_arguments(job_id):
        """Makes a list of arguments to add to a command line in
        order to run a specific job."""
        return ["--job-id", str(job_id)]

    def job_identifiers(self, args):
        """Given arguments, return the jobs specified.
        This could be used to subset the whole graph, for instance
        to run a slice through the locations from global to
        most-detailed locations."""
        if args.job_id:
            return [args.job_id]
        else:
            return self.job_graph().nodes

    def initialize(self, args):
        """Read the arguments in order to know what to do."""
        self.location_set_version_id = args.location_set_version_id
        self.gbd_round_id = args.gbd_round_id

    def job_graph(self):
        """Make the whole job graph and return it."""
        return location_graph(
            self.gbd_round_id, self.location_set_version_id)

    def job(self, location_id):
        """Make a job from its ID.
        We haven't said what this class is yet."""
        return LocationJob(location_id)

Most of that work is to define the job graph or parse arguments to specify parts of the job graph. The work we do is in a Job class.

The Job Class

A Job itself inherits from a base class, Job. The most important parts of the Job are its run method and outputs. The run method does the work, and the framework uses the list of outputs to check whether the job completed. The class’s initialization is done by the Application class, so we can pass in whatever helps initialize the Job:

class LocationJob(Job):
    def __init__(self, location_id, gbd_round_id):
        super().__init__()
        out_file = Path("/data/home") / f"{location_id}.hdf"
        self.outputs["paf"] = FileEntity(out_file)

    @property
    def resources(self):
        """These can be computed from arguments to init."""
        return dict(
            memory_gigabytes=1,
            threads=1,
            run_time_minutes=1,
        )

    def run(self):
        pass  # Make that output file.

The outputs are a dictionary of objects that check whether a file is in a state where we consider this job to have done its work. The FileEntity checks that the file exists. The PandasEntity can check that particular data sets exist in the file.

The list of outputs enables the framework to know which jobs have definitely completed. We can also define self.inputs, which enable the framework to set up mock inputs, so that we can test individual jobs in a larger graph, without first running the whole graph.

The Child Job Main

Finally, at the bottom of the file, under the Application, we put a snippet that is the main() for the jobs:

if __name__ == "__main__":
    app = GridExample()
    exit(entry(app))

This framework looks for this specifically in the same file as the application class. If it doesn’t find one, it will attempt to make its own version of a main().

Running

Debug One Job Locally

In order to start one job locally, you can run it with, in this case:

$ python location_app.py --job-idx 1 --pdb

The --pdb will make the job drop into an interactive debugger when it encounters an exception.

Check Outputs Match Inputs

One way to see that the graph is well-formed is to supply both an input list and an output list to each job and run the whole of it using an automatic mocking:

$ python location_app.py --mock

Because there is no --job-idx argument, it will try to run the whole graph. Because there is no --grid-engine argument, it will run it as functions within this process, and the --mock argument tells it to skip the real run() method and, instead, use the self.outputs to generate fake files. The self.inputs check that the correct fake files exist when a Job first starts.

Run on the Cluster

On the cluster, start the whole thing with the command:

$ python location_app.py --grid-engine --project proj_forecasting

It will launch jobs and return immediately. Those jobs will all have the same name, something like location_app23f824_37, where the first part is the application name, and then there are six hexadecimal characters that are (probably) unique for this job, and then an identifier for the particular location running.

The framework looks at each Job’s run times in order to determine which queue to use.

Smaller Run on One Node

If there is less work to do, it may be easier to run this application interactively, using all the cores of a node. In that case, login to a node, allocating, maybe 16 GB of memory. Then run:

$ python location_app.py --memory-limit 16

Then it will run all jobs as subprocesses, ensuring it doesn’t exceed that memory limit in GB.