# HG changeset patch # User paulb # Date 1212446623 0 # Node ID 614aa092ba4e7bcb0608e7c752fe2ab0d2eb8e4d # Parent c9753c628f21c86c33195a04cf172ac264f13933 [project @ 2008-06-02 22:43:43 by paulb] Added details of the persistent and background features and examples. diff -r c9753c628f21 -r 614aa092ba4e docs/reference.html --- a/docs/reference.html Mon Jun 02 22:43:11 2008 +0000 +++ b/docs/reference.html Mon Jun 02 22:43:43 2008 +0000 @@ -32,6 +32,8 @@
  • Map-style Processing
  • Reusing Processes and Channels
  • Making Existing Functions Parallel and Reusable
  • +
  • Background Processes and Callables
  • +
  • Background and Persistent Queues
  • Implementation Notes
  • @@ -331,6 +333,67 @@

    This wraps the callable in a similar fashion to MakeParallel, but provides the necessary mechanisms described above for reusable code.

    +

    Background Processes and Callables

    + +

    So far, all parallel computations have involved created processes which +depend on the existence of the created process to collect results and to +communicate with these created processes, preventing the created process from +terminating, even if the created processes actually perform work and potentially +create output which need not concern the process which created them. In order to +separate creating and created processes, the concept of a background process +(also known as a daemon process) is introduced.

    + +

    The BackgroundCallable class acts somewhat like the +manage method on exchange-based objects, although no exchange is +immediately involved, and instances of BackgroundCallable provide +wrappers around existing parallel-aware callables which then be invoked in order +to initiate a background computation in a created process. For example:

    + +
    +backgroundfn = pprocess.BackgroundCallable(address, fn)
    +
    + +

    This wraps the supplied callable (which can itself be the result of using +MakeParallel), with the resulting wrapper lending itself to +invocation like any other function. One distinguishing feature is that of the +address: in order to contact the background process after +invocation to (amongst other things) receive any result, a specific address +must be given to define the contact point between the created process and any +processes seeking to connect to it. Since these "persistent" communications +employ special files (specifically UNIX-domain sockets), the address must be a +suitable filename.

    + +

    Background and Persistent Queues

    + +

    Background processes employing persistent communications require adaptations +of the facilities described in the sections above. For a single background +process, the BackgroundQueue function is sufficient to create a +queue-like object which can monitor the communications channel between the +connecting process and a background process. For example:

    + +
    +queue = pprocess.BackgroundQueue(address)
    +
    + +

    This code will cause the process reachable via the given address +to be contacted and any results made available via the created queue-like +object.

    + +

    Where many background processes have been created, a single +PersistentQueue object can monitor their communications by being +connected to them all, as in the following example:

    + +
    +queue = pprocess.PersistentQueue()
    +for address in addresses:
    +    queue.connect(address)
    +
    + +

    Here, the queue monitors all previously created processes whose addresses +reside in the addresses sequence. Upon iterating over the queue, +results will be taken from whichever process happens to have data available in +no particular pre-defined order.

    +

    Implementation Notes

    Signals and Waiting

    diff -r c9753c628f21 -r 614aa092ba4e docs/tutorial.html --- a/docs/tutorial.html Mon Jun 02 22:43:11 2008 +0000 +++ b/docs/tutorial.html Mon Jun 02 22:43:43 2008 +0000 @@ -15,7 +15,18 @@ advantage of more than one processor to simultaneously process data - is to use the pmap function.

    -

    Converting Map-Style Code

    + + +

    Converting Map-Style Code

    Consider a program using the built-in map function and a sequence of inputs:

    @@ -99,7 +110,7 @@ variable is defined elsewhere), several calculations can now be performed in parallel.

    -

    Converting Invocations to Parallel Operations

    +

    Converting Invocations to Parallel Operations

    Although some programs make natural use of the map function, others may employ an invocation in a nested loop. This may also be converted @@ -198,7 +209,7 @@ list operations provides the results in the same order as their corresponding inputs.

    -

    Converting Arbitrarily-Ordered Invocations

    +

    Converting Arbitrarily-Ordered Invocations

    In some programs, it is not important to receive the results of computations in any particular order, usually because either the order of @@ -245,7 +256,7 @@

    We can bring the benefits of parallel processing to the above program with the following code:

    -
    +
         t = time.time()
     
         # Initialise the communications queue with a limit on the number of
    @@ -433,7 +444,7 @@
     start method still calls the provided callable, but using a
     different notation from that employed previously.

    -

    Converting Inline Computations

    +

    Converting Inline Computations

    Although many programs employ functions and other useful abstractions which can be treated as parallelisable units, some programs perform computations @@ -525,7 +536,7 @@ the results in the same order as the initiation of the computations which produced them.

    -

    Reusing Processes in Parallel Programs

    +

    Reusing Processes in Parallel Programs

    One notable aspect of the above programs when parallelised is that each invocation of a computation in parallel creates a new process in which the @@ -577,7 +588,273 @@ creating a new process for each computation and then discarding it, only to create a new process for the next computation.

    -

    Summary

    +

    Performing Computations in Background Processes

    + +

    Occasionally, it is desirable to initiate time-consuming computations and to +not only leave such processes running in the background, but to be able to detach +the creating process from them completely, potentially terminating the creating +process altogether, and yet also be able to collect the results of the created +processes at a later time, potentially in another completely different process. +For such situations, we can make use of the BackgroundCallable +class, which converts a parallel-aware callable into a callable which will run +in a background process when invoked.

    + +

    Consider this excerpt from a modified version of the simple_managed_queue program:

    + +
    +def task():
    +
    +    # Initialise the communications queue with a limit on the number of
    +    # channels/processes.
    +
    +    queue = pprocess.Queue(limit=limit)
    +
    +    # Initialise an array.
    +
    +    results = [0] * N * N
    +
    +    # Wrap the calculate function and manage it.
    +
    +    calc = queue.manage(pprocess.MakeParallel(calculate))
    +
    +    # Perform the work.
    +
    +    print "Calculating..."
    +    for i in range(0, N):
    +        for j in range(0, N):
    +            calc(i, j)
    +
    +    # Store the results as they arrive.
    +
    +    print "Finishing..."
    +    for i, j, result in queue:
    +        results[i*N+j] = result
    +
    +    return results
    +
    + +

    Here, we have converted the main program into a function, and instead of +printing out the results, we return the results list from the function.

    + +

    Now, let us consider the new main program (with the relevant mechanisms +highlighted):

    + +
    +    t = time.time()
    +
    +    if "--reconnect" not in sys.argv:
    +
    +        # Wrap the computation and manage it.
    +
    +        ptask = pprocess.BackgroundCallable("task.socket", pprocess.MakeParallel(task))
    +
    +        # Perform the work.
    +
    +        ptask()
    +
    +        # Discard the callable.
    +
    +        del ptask
    +        print "Discarded the callable."
    +
    +    if "--start" not in sys.argv:
    +
    +        # Open a queue and reconnect to the task.
    +
    +        print "Opening a queue."
    +        queue = pprocess.BackgroundQueue("task.socket")
    +
    +        # Wait for the results.
    +
    +        print "Waiting for persistent results"
    +        for results in queue:
    +            pass # should only be one element
    +
    +        # Show the results.
    +
    +        for i in range(0, N):
    +            for result in results[i*N:i*N+N]:
    +                print result,
    +            print
    +
    +        print "Time taken:", time.time() - t
    +
    + +

    (This code in context with import statements and functions is +found in the examples/simple_background_queue.py file.)

    + +

    This new main program has two parts: the part which initiates the +computation, and the part which connects to the computation in order to collect +the results. Both parts can be run in the same process, and this should result +in similar behaviour to that of the original +simple_managed_queue program.

    + +

    In the above program, however, we are free to specify --start as +an option when running the program, and the result of this is merely to initiate +the computation in a background process, using BackgroundCallable +to obtain a callable which, when invoked, creates the background process and +runs the computation. After doing this, the program will then exit, but it will +leave the computation running as a collection of background processes, and a +special file called task.socket will exist in the current working +directory.

    + +

    When the above program is run using the --reconnect option, an +attempt will be made to reconnect to the background processes already created by +attempting to contact them using the previously created task.socket +special file (which is, in fact, a UNIX-domain socket); this being done using +the BackgroundQueue function which will handle the incoming results +in a fashion similar to that of a Queue object. Since only one +result is returned by the computation (as defined by the return +statement in the task function), we need only expect one element to +be collected by the queue: a list containing all of the results produced in the +computation.

    + +

    Managing Several Background Processes

    + +

    In the above example, a single background process was used to manage a number +of other processes, with all of them running in the background. However, it can +be desirable to manage more than one background process.

    + +

    Consider this excerpt from a modified version of the simple_managed_queue program:

    + +
    +def task(i):
    +
    +    # Initialise the communications queue with a limit on the number of
    +    # channels/processes.
    +
    +    queue = pprocess.Queue(limit=limit)
    +
    +    # Initialise an array.
    +
    +    results = [0] * N
    +
    +    # Wrap the calculate function and manage it.
    +
    +    calc = queue.manage(pprocess.MakeParallel(calculate))
    +
    +    # Perform the work.
    +
    +    print "Calculating..."
    +    for j in range(0, N):
    +        calc(i, j)
    +
    +    # Store the results as they arrive.
    +
    +    print "Finishing..."
    +    for i, j, result in queue:
    +        results[j] = result
    +
    +    return i, results
    +
    + +

    Just as we see in the previous example, a function called task +has been defined to hold a background computation, and this function returns a +portion of the results. However, unlike the previous example or the original +example, the scope of the results of the computation collected in the function +have been changed: here, only results for calculations involving a certain value +of i are collected, with the particular value of i +returned along with the appropriate portion of the results.

    + +

    Now, let us consider the new main program (with the relevant mechanisms +highlighted):

    + +
    +    t = time.time()
    +
    +    if "--reconnect" not in sys.argv:
    +
    +        # Wrap the computation and manage it.
    +
    +        ptask = pprocess.MakeParallel(task)
    +
    +        for i in range(0, N):
    +
    +            # Make a distinct callable for each part of the computation.
    +
    +            ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask)
    +
    +            # Perform the work.
    +
    +            ptask_i(i)
    +
    +        # Discard the callable.
    +
    +        del ptask
    +        print "Discarded the callable."
    +
    +    if "--start" not in sys.argv:
    +
    +        # Open a queue and reconnect to the task.
    +
    +        print "Opening a queue."
    +        queue = pprocess.PersistentQueue()
    +        for i in range(0, N):
    +            queue.connect("task-%d.socket" % i)
    +
    +        # Initialise an array.
    +
    +        results = [0] * N
    +
    +        # Wait for the results.
    +
    +        print "Waiting for persistent results"
    +        for i, result in queue:
    +            results[i] = result
    +
    +        # Show the results.
    +
    +        for i in range(0, N):
    +            for result in results[i]:
    +                print result,
    +            print
    +
    +        print "Time taken:", time.time() - t
    +
    + +

    (This code in context with import statements and functions is +found in the examples/simple_persistent_queue.py file.)

    + +

    In the first section, the process of making a parallel-aware callable is as +expected, but instead of then invoking a single background version of such a +callable, as in the previous example, we create a version for each value of +i (using BackgroundCallable) and then invoke each one. +The result of this is a total of N background processes, each +running an invocation of the task function with a distinct value of +i (which in turn perform computations), and each employing a +UNIX-domain socket for communication with a name of the form +task-i.socket.

    + +

    In the second section, since we now have more than one background process, we +must find a way to monitor them after reconnecting to them; to achieve this, a +PersistentQueue is created, which acts like a regular +Queue object but is instead focused on handling persistent +communications. Upon connecting the queue to each of the previously created +UNIX-domain sockets, the queue acts like a regular Queue and +exposes received results through an iterator. Here, the principal difference +from previous examples is the structure of results: instead of collecting each +individual value in a flat i by j array, a list is +returned for each value of i and is stored directly in another +list.

    + +

    Applications of Background Computations

    + +

    Background computations are useful because they provide flexibility in the +way the results can be collected. One area in which they can be useful is Web +programming, where a process handling an incoming HTTP request may need to +initiate a computation but then immediately send output to the Web client - such +as a page indicating that the computation is "in progress" - without having to +wait for the computation or to allocate resources to monitor it. Moreover, in +some Web architectures, notably those employing the Common Gateway Interface +(CGI), it is necessary for a process handling an incoming request to terminate +before its output will be sent to clients. By using a +BackgroundCallable, a Web server process can initiate a +communication, and then subsequent server processes can be used to reconnect to +the background computation and to wait efficiently for results.

    + +

    Summary

    The following table indicates the features used in converting one sequential example program to another parallel program:

    @@ -602,7 +879,7 @@ MakeParallel, Map, manage - simple2 + simple2 simple_managed_queue MakeParallel, Queue, manage @@ -615,6 +892,14 @@ Channel, Exchange (subclass), start, finish + simple_background_queue + MakeParallel, BackgroundCallable, BackgroundQueue + + + simple_persistent_queue + MakeParallel, BackgroundCallable, PersistentQueue + + simple simple_create_map Channel, Map, create, exit