Source code for gridengineapp.main

import faulthandler
import logging
import sys
import traceback
from bdb import BdbQuit
from enum import Enum
from inspect import getmembers, ismethod
from types import SimpleNamespace

import networkx as nx

from .argument_handling import (
    setup_args_for_job, execution_parser
)
from .config import configuration
from .determine_executable import subprocess_executable
from .graph_choice import job_subset, execution_ordered
from .run_grid_app import launch_jobs
from .exceptions import NodeMisconfigurationError
from .multiprocess import graph_do
from .restart import restart_count

LOGGER = logging.getLogger(__name__)


def iterate_tasks(job, command_line_task_id):
    """
    Walk through tasks to run from a job. If the task_id is nonzero,
    then limit it to that task_id.

    Args:
        job (Job): The job object.
        command_line_task_id (int): A Task ID chosen on the command
            line. This is how a job learns it should do a particular
            task. This value is set by ``SGE_TASK_ID`` environment
            variable, if that's set.

    Returns:
        A job instance.
    """
    if "task_cnt" in job.resources and job.resources["task_cnt"] > 0:
        if command_line_task_id is not None and command_line_task_id > 0:
            yield job.clone_task(command_line_task_id)
        else:
            for task_id in range(1, 1 + job.resources["task_cnt"]):
                yield job.clone_task(task_id)
    else:
        yield job


def run_jobs(app, args):
    faulthandler.enable()
    try:
        job_graph = job_subset(app, args)
        for identifier in execution_ordered(job_graph):
            LOGGER.info(f"Run {identifier}.")
            for task in iterate_tasks(app.job(identifier), args.task_id):
                if not args.mock_job:
                    task.run()
                else:
                    task.mock_run()

    except BdbQuit:
        pass
    except Exception:  # Too broad be we re-raise.
        if args.pdb:
            # invokes debugger when an exception happens.
            if sys.stdout.fileno() != 1:
                LOGGER.info(f"Not invoking pdb because stdout is captured")
                raise
            import pdb
            import traceback

            traceback.print_exc()
            pdb.post_mortem()
        else:
            raise


def job_task_ids(job):
    """Yields task IDs. If this isn't an array job, the only ID is 0."""
    if "task_cnt" in job.resources and int(job.resources["task_cnt"]) > 1:
        yield from range(1, 1 + int(job.resources["task_cnt"]))
    else:
        yield from [0]


def expand_task_arrays(job_graph, app):
    """Take a job graph and expand the jobs into tasks.
    Every job has at least one task. Jobs that are task
    arrays can have more tasks. If the node type for
    a job graph is Type, then the node type for a
    task graph is the tuple (Type, task_id)."""
    # Pull out the task counts.
    task_graph = nx.DiGraph()
    for job_id in nx.topological_sort(job_graph):
        job = app.job(job_id)
        task_predecessors = list()
        for job_pred in job_graph.predecessors(job_id):
            for pred_idx in job_task_ids(app.job(job_pred)):
                task_predecessors.append((job_pred, pred_idx))
        # Add nodes before edges in case there are no edges.
        task_graph.add_nodes_from(
            (job_id, task_job) for task_job in job_task_ids(job)
        )
        task_graph.add_edges_from(
            (task_pred, (job_id, task_job))
            for task_pred in task_predecessors
            for task_job in job_task_ids(job)
        )
    return task_graph


def find_runnable(remaining):
    runnable = list()
    for remain_job in execution_ordered(remaining):
        has_dependencies = False
        for _u, _v, data in remaining.in_edges(remain_job, data=True):
            # Don't count this particular edge as a dependency.
            if not ("launch" in data and data["launch"]):
                has_dependencies = True
        if not has_dependencies:
            runnable.append(remain_job)
    return runnable


class RunNext:
    def __init__(self, app, task_graph, arg_list, args_to_remove):
        self.app = app
        self.task_graph = task_graph
        self.arg_list = arg_list
        self.args_to_remove = args_to_remove

    def __call__(self, completed_jobs):
        """This is a functor, not a class."""
        keep = [job for job in self.task_graph.nodes
                if job not in completed_jobs]
        remaining = nx.subgraph(self.task_graph, keep)
        runnable = find_runnable(remaining)
        return self.construct_descriptions(runnable)

    def construct_descriptions(self, runnable):
        job_descriptions = dict()
        for job_id, task_id in runnable:
            python_executable, argv0 = subprocess_executable(self.app)
            job_select = self.app.job_id_to_arguments(job_id)
            args = setup_args_for_job(
                self.args_to_remove, job_select, self.arg_list)
            if task_id > 0:
                args.extend(["--task-id", str(task_id)])
            job = self.app.job(job_id)
            job_descriptions[(job_id, task_id)] = SimpleNamespace(
                memory=job.resources["memory_gigabytes"],
                args=[str(python_executable)] + argv0 + args,
            )
        return job_descriptions


def multiprocess_jobs(app, command_args, arg_list, args_to_remove):
    job_graph = job_subset(app, command_args)
    task_graph = expand_task_arrays(job_graph, app)
    LOGGER.debug(f"{len(task_graph)} tasks to run")
    run_next = RunNext(app, task_graph, arg_list, args_to_remove)
    graph_do(run_next, command_args.memory_limit)


class GridEngineReturnCodes(Enum):
    """
    These are return codes that Grid Engine recognizes.
    Any other return codes are treated as OK. If you don't
    return 100, then it will try to run the next job that's holding
    for this job. "man sge_diagnostics" to see more.
    """
    OK = 0
    RequestRestart = 99
    FailAndDeleteHoldingJobs = 100


def configure_from_application(app):
    """If the application has a configuration method,
    then call it. That method should return a ConfigParser instance
    that has a section for this package."""
    app_methods = {name: func for (name, func) in getmembers(app, ismethod)}
    if "configuration" in app_methods:
        configuration(app.configuration())


def grid_child_guard(work, args):
    """Runs Python so that its return codes match those described
    in sge_diagnostics, to demand failure or restart."""
    restart_cnt = restart_count()
    try:
        work()
    except NodeMisconfigurationError as nme:
        LOGGER.exception(nme)
        if (
                hasattr(args, "rerun_cnt") and
                args.rerun_cnt and
                restart_cnt < args.rerun_cnt
        ):
            return GridEngineReturnCodes.RequestRestart.value
        else:
            return GridEngineReturnCodes.FailAndDeleteHoldingJobs.value
    except Exception:
        # Do some work to reduce the size of the error message to users.
        exc_type, exc_value, exc_traceback = sys.exc_info()
        tb_lines = traceback.format_tb(exc_traceback)
        LOGGER.error(f"{exc_type.__name__} {exc_value} {tb_lines[-1]}")
        LOGGER.exception(f"Exception in app")
        return GridEngineReturnCodes.FailAndDeleteHoldingJobs.value
    return GridEngineReturnCodes.OK.value


[docs]def entry(app, arg_list=None): """ This starts the application. Use it with:: if __name__ == "__main__": application = MyApplication() entry(application) Args: app (application.Application): The main application to run. arg_list (Namespace|SimpleNamespace): Arguments to the command line. This is usually None and is used for testing. Pass this around instead of using sys.argv because pytest makes it hard to set sys.argv. """ configure_from_application(app) parser, args_to_remove = execution_parser() app.add_arguments(parser) args = parser.parse_args(arg_list) offset = 10 * (args.quiet_app - args.verbose_app) logging.basicConfig(level=logging.INFO + offset) def work(): app.initialize(args) if args.grid_engine: launch_jobs(app, args, arg_list, args_to_remove) elif args.memory_limit: multiprocess_jobs(app, args, arg_list, args_to_remove) else: run_jobs(app, args) return grid_child_guard(work, app)