Application of Map-Reduce in Scientific Computing

Application of Map-Reduce in Scientific Computing

Hans Petter Langtangen [1, 2]
Mohammed Sourouri [1]

[1] Center for Biomedical Computing, Simula Research Laboratory
[2] Deptartment of Informatics, University of Oslo

Jul 8, 2014

Summary. The map-reduce concept is a unified way of implementing algorithms such that one can easily utilize (large-scale) parallel computing resources. A famous version of the concept, branded as MapReduce, was introduced by Google in 2004 and plays a key role when Google's search engines work with large sets of documents on large clusters of computers. Many different software libraries support variants of map-reduce. For Python code development, Disco is a tool that allows parallel processing of large-scale computing problems expressed by map-reduce algorithms. The purpose of the forthcoming text is to illustrate and explore the map-reduce concept for numerical computing.

Numerical integration

Our first example on map-reduce regards the implementation of a particular type of numerical integration rules: $$ \int_a^b f(x)dx \approx \frac{b-a}{n}\sum_{i=0}^{n-1} f(x_i) .$$ Here, \( x_i \) is a set of points. Choosing \( x_i = a + (i + 0.5)h \), \( h=(b-a)/n \) gives the Midpoint rule, where the area below the \( f(x) \) is approximated by \( n \) rectangles of width \( h \). Width \( x_i \) as random points distributed uniformly in \( [a,b] \) we get the Monte Carlo integration method.

Plain Python implementation

The Midpoint integration rule can be coded directly in Python as

def midpoint(f, a, b, n):
    h = (b-a)/float(n)
    s = 0
    for i in range(n):
        x_i = a + (i + 0.5)*h
        s += f(x_i)
    return h*s

The Monte Carlo integration method follows a very similar implementation:

import random

def MonteCarlo(f, a, b, n):
    s = 0
    for i in range(n):
        x_i = random.uniform(a, b)
        s += f(x_i)
    return (b-a)/float(n)*s

Testing the two methods on a linear function \( f(x)=2x \),

print midpoint(f, a, b, 2)
print MonteCarlo(f, a, b, 4000)

results in

1.0
0.999962735888

The Midpoint rule integrates a linear function exactly for any \( n>0 \), while Monte Carlo integration is always approximate.

Vectorized Python implementation

Computations can be speeded up by vectorization, which means that we replace the for loop by numpy operations on arrays. Three steps are required to vectorize the sum \( \sum_i f(x_i) \): 1) all the \( x_i \) values must be generated at once and stored in an array, 2) \( f \) must be applied to the array of \( x_i \) values, and 3) a summation function must be applied to the array of \( f(x_i) \) values. The resulting code looks like

"""Integration example: vectorized implementation."""
from functools import reduce
import numpy as np

def midpoint(f, a, b, n):
    h = (b-a)/float(n)
    x = np.linspace(a + 0.5*h, b - 0.5*h, n)
    s = np.sum(f(x))
    return h*s

def MonteCarlo(f, a, b, n):
    x = np.random.uniform(a, b, n)
    s = np.sum(f(x))
    return (b-a)/float(n)*s

a = 0; b = 1
print midpoint  (lambda x: 2*x, a, b, 2)
print MonteCarlo(lambda x: 2*x, a, b, 4000)

The vectorized code is shorter and (much) faster. Observe that we have also saved some space by using lambda functions instead of f(x): lambda x, y, z, ...: expression is the same as

def f(x, y, z, ...):
    return expression

Classical map-reduce

Functional programming makes frequent use of two functions named map and reduce. Python also has these functions. Calling map(f, L) applies the function f(e) to every element e in a list L and returns the resulting values as a list. We can implement our own map function by

def mymap1(f, L):
    r = []
    for e in L:
        r.append(f(e))
    return r

def mymap2(f, L):
    return [f(e) for e in L]

List comprehensions, as used in mymap2, are recommended as replacement of map in modern Python.

The reduce function is a bit more complicated. It operates on a list, usually the result of map, and reduces it to a single object. If a list L has 5 elements, the call reduce(f, L) calculates

f(f(f(f(L[0], L[1]), L[2]), L[3]), L[4])

For example, with f=lambda x,y: x+y we have

(((L[0] + L[1]) + L[2]) + L[3]) + L[4]

which is nothing but the sum of the elements in L. Here is a demo program:

from functools import reduce

def mapper(x):
    print 'mapper: %s -> %s' % (x, x**2)
    return x**2

def reducer(x, y):
    print 'reducer: %s, %s -> %s' % (x, y, x + y)
    return x + y

L = range(1, 5)
r = reduce(reducer, map(mapper, L))

yielding the output

mapper: 1 -> 1
mapper: 2 -> 4
mapper: 3 -> 9
mapper: 4 -> 16
reducer: 1, 4 -> 5
reducer: 5, 9 -> 14
reducer: 14, 16 -> 30

We see how x in reducer is always the result of the previous call to reducer and that y is the next element in the list.

The reduce function can take a third argument which acts as an initial value of the reduce operations. Calling

r = reduce(reducer, map(mapper, L), 0)

results in different output in the reducer function since the first call will not consists of the first two elements in the list returned by map, but by the result so far (0) and the first element in the list:

reducer: 0, 1 -> 1
reducer: 1, 4 -> 5
reducer: 5, 9 -> 14
reducer: 14, 16 -> 30

The use of an initial value makes it conceptually clearer what the reducer does: reducer(res, e) computes the combination of the reduce results so far, res, and a new element e in the list returned from map. In these writings we will mostly use an initial value when calling reduce and write the reducer as reduce(res, e):

def reducer(res, e):
    print 'reducer: %s, %s -> %s' % (res, e, res + e)
    return res + e

The numerical integration rules are easily implemented using the map-reduce concept since the map phase is to apply \( f \) to the list of \( x_i \) points, and the reduce phase is to sum the elements in the list returned from map:

"""Integration example: map-reduce implementation and numpy constructs."""
from functools import reduce
import numpy as np

def midpoint(f, a, b, n):
    h = (b-a)/float(n)
    x = np.linspace(a + 0.5*h, b - 0.5*h, n)
    fx = map(f, x)
    s = reduce(lambda res, e: res + e, fx, 0)
    return h*s

def MonteCarlo(f, a, b, n):
    x = np.random.uniform(a, b, n)
    fx = map(f, x)
    s = reduce(lambda res, e: res + e, fx, 0)
    return (b-a)/float(n)*s

a = 0; b = 1
print midpoint  (lambda x: 2*x, a, b, 2)
print MonteCarlo(lambda x: 2*x, a, b, 4000)

Basically, the vectorized operation f(x) is replaced by map, and np.sum is replaced by reduce.

The computation of x can also be done my map:

x = map(lambda i: a + (0.5+i)*h, range(n))
x = map(lambda i: random.uniform(a, b), range(n))

We remark that lambda functions as well as functions defined inside midpoint or MonteCarlo "remembers" variables like a, b, and n (such functions are referred to as closures).

Parallelism

Looking at the numerical integration formula, we realize that all the \( x_i \) points can be calculated in parallel, and the same goes with all the \( f(x_i) \) evaluations. This is particularly evident when these operations are expressed by map, since map applies the same function to each element of a list, and all these actions are independent of each other. That is, writing a map operation immediately makes it clear that the operations on the list are independent and can be done in parallel.

Imagine that a set of \( n \) compute units works in parallel such that unit \( i \) computes \( x_i \) and \( f(x_i) \). The summation step in the numerical integration formula requires the compute units to communicate: unit 0 needs to send its \( f(x_0) \) value to unit 1, which adds that value with its own value \( f(x_1) \) and sends the sum \( f(x_0)+f(x_1) \) to unit 2, which adds that value to \( f(x_2) \) and sends \( f(x_0)+f(x_1)+f(x_2) \) to unit 3, which adds that value to its own, and so forth until all partial sums have been communicated and we have the final sum. These actions are the same as expressed in the reduce step.

The idea is that software libraries can implement their parallel version of map and the communicating version of reduce. If our algorithm can be expressed as calls to map and reduce, this is an easy way of parallelizing the algorithm. However, it does not make much sense to distribute each \( x_i \) and \( f(x_i) \) computation to separate compute units: we need to distribute a chunck of \( x_i \) and \( f(x_i) \) to a compute unit. The next section explains the idea in a serial setting.

Large data sets

Suppose \( n \) is very large. We then want to compute parts of the sum \( \sum_i f(x_i) \) separately and thereafter merge the results. The collection \( x_0,\ldots,x_{n-1} \) is divided into \( N \) chunks of sizes \( c_0,\ldots,c_{N-1} \), such that \( n = \sum_{k=0}^{N-1} c_k \). The \( x_i \) values of chunk number \( k \) is written as \( x_{k,0}, x_{k,1}, \ldots, x_{k, c_k-1} \). Mathematically, we write $$ \sum_{i=0}^{n-1} f(x_i) = \sum_{k=0}^{N-1} \underbrace{\sum_{j=0}^{c_k-1} f(x_{k,j})}_{= s_k} . $$ The idea is that the map step computes \( s_k \), while the reduce step adds the \( s_k \) values.

Partitioning the array of \( x_i \) values into equal-sized chunks, and putting remaining elements into the last chunk can be done by this function:

def partition(x, N):
    """Return a list of N chunks of x."""
    chunk_size = int(len(x)/N)
    c = [chunk_size for i in range(N)]
    c[-1] = len(x) - (N-1)*chunk_size  # remaining elements
    chunks = [x[:c[0]]]
    start = c[0]
    for k in range(1, N-1):
        #print 'k=%d, c[k]=%d, slice: %d:%d = %s' % (k, c[k], start, start+c[k], x[start:start+c[k]].tolist())
        chunks.append(x[start:start+c[k]])
        start = start + c[k]
    chunks.append(x[start:])
    return chunks

Note that the return value is a list of slices of (views into) the x array.

A simpler approach to partitioning would be to just make x a two-dimensional array where each row represent a chunk of data. We would then normally need to throw away some of the last elements in x to create a two-dimensional shape (i.e., to create equal-sized chunks):

def partition(x, N):
    chunk_size = int(len(x)/N)
    return x[:chunk_size*N].reshape(N, chunk_size)

The returned object is now a two-dimensional numpy array, but map can easily iterate over the first index, i.e., the rows in this object. Below, we use the first version of partition where the final chunk may have a different size from the other ones.

The map operation on each element in the chunk list must first apply the f function (on the slice), yielding a numpy array, and then apply the np.sum function on this array. The result is the partial sum \( s_k \) as defined above. The midpoint method can be implemented as

def midpoint(f, a, b, n, N=5):
    h = (b-a)/float(n)
    x = np.linspace(a + 0.5*h, b - 0.5*h, n)
    chunks = partition(x, N)
    mapped_values = map(lambda x_k: np.sum(f(x_k)), chunks)
    s = reduce(lambda res, e: res + e, mapped_values, 0)
    return h*s

To understand what is computed and if the intermediate results are right, we call

print midpoint  (lambda x: 2*x, -0.5, 10.5, 11, N=4)

This results in x as [0, 1, 2, ..., 10]. With \( n=11 \) and \( N=4 \), the chunk_size becomes 2 (recall that division between two integers in Python version 2.x implies integer division, while in Python 3.x a float results, but the next int transformation truncates the decimals so the final result is identical to integer division). The resulting 4 chunks now become [0, 1], [2, 3], [4, 5], and [6, 7, 8, 9, 10]. The s list returned from map is [2, 10, 18, 80].

Here is an example on how to run the code inside the debugger in ipython and inspect the intermediate results inside the midpoint function:

In [1]: run -d int_mrc.py
Breakpoint 1 at /some/path/int_mrc.py:1
NOTE: Enter 'c' at the ipdb>  prompt to start your script.
> <string>(1)<module>()

ipdb> c
> /some/path/int_mrc.py(1)<module>()
1---> 1 import numpy as np
      2
      3 def partition(x, N):

ipdb> break midpoint
Breakpoint 2 at /some/path/int_mrc.py:17
ipdb> c
> /some/path/int_mrc.py(18)midpoint()
2    17 def midpoint(f, a, b, n, N=5):
---> 18     h = (b-a)/float(n)
     19     x = np.linspace(a + 0.5*h, b - 0.5*h, n)

ipdb> n
> /some/path/int_mrc.py(19)midpoint()
     18     h = (b-a)/float(n)
---> 19     x = np.linspace(a + 0.5*h, b - 0.5*h, n)
     20     chunks = partition(x, N)

ipdb> n
> /some/path/int_mrc.py(20)midpoint()
     19     x = np.linspace(a + 0.5*h, b - 0.5*h, n)
---> 20     chunks = partition(x, N)
     21     s = map(lambda x_k: np.sum(f(x_k)), chunks)

ipdb> n
> /some/path/int_mrc.py(21)midpoint()
     20     chunks = partition(x, N)
---> 21     s = map(lambda x_k: np.sum(f(x_k)), chunks)
     22     s = reduce(lambda x,y: x+y, s)

ipdb> import pprint; pprint.pprint(chunks)
[array([ 0.,  1.]),
 array([ 2.,  3.]),
 array([ 4.,  5.]),
 array([  6.,   7.,   8.,   9.,  10.])]
ipdb> n
> /some/path/int_mrc.py(22)midpoint()
     21     s = map(lambda x_k: np.sum(f(x_k)), chunks)
---> 22     s = reduce(lambda x,y: x+y, s)
     23     return h*s

ipdb> print s
[2.0, 10.0, 18.0, 80.0]
ipdb> print sum(s), b**2-a**2
110.0 110.0

The MonteCarlo function working with chunks is similar to function midpoint, but we can drop the partition function and instead generate the random numbers for each chunk directly. Equal-sized chunks will then be convenient so we adjust \( n \) to fit \( N \) equal-sized chunks. The code reads

def MonteCarlo(f, a, b, n, N=5):
    chunk_size = c_k = int(n/N)
    n = N*c_k  # adjust n to ensure equal-sized chunks
    chunks = map(lambda i: np.random.uniform(a, b, c_k),
                 range(N))
    mapped_values = map(lambda x_k: np.sum(f(x_k)), chunks)
    s = reduce(lambda res, e: res + e, mapped_values, 0)
    return (b-a)/float(n)*s

Map with independent Monte Carlo integrals

Now we assume that the map step is a complete Monte Carlo integration procedure calculating the integral \( \int_a^b f(x)dx \) based on \( x_i \) samples in some chunk number \( k \): $$ I_k = \frac{b-a}{c_k}\sum_{j=1}^{c_k-1} f(x_{k,j}).$$ The reduce step is now not a simple sum. To compute the integral \( I=\int_a^b f(x)dx \) from the individual approximations \( I_0,\ldots,I_N \), we need a weighted sum, $$ I = \sum_{k=0}^{N} c_k I_k .$$ This means that map must return a list of \( (c_k, I_k) \) tuples, and reduce must combine these correctly. The forthcoming code will provide the details.

The vectorized MonteCarlo function shown earlier is a good candidate for computing \( I_k \):

def MonteCarlo(f, a, b, n):
    x = np.random.uniform(a, b, n)
    s = np.sum(f(x))
    return (b-a)/float(n)*s

a = 0; b = 1

The mapper function called for each element in the list that map operates on must call this MonteCarlo function. To this end, the mapper function needs to pass f, a, b, and the chunk size on to MonteCarlo. Since the mapper function can take only one argument, we need to pack the necessary data in this argument. A list, here called e (for element), is used for this purpose:

def mapper(e):
    c_k, a, b, f = e
    I_k = MonteCarlo(f, a, b, c_k)
    return c_k, I_k

Note that mapper returns a tuple \( (c_k, I_k) \) since the reduce step needs access to these data.

The map step consists of packing a list of e objects as expected by mapper and pass this on to map. With c_k denoting the chunk size we can write

data = [(c_k, a, b, f) for k in range(N)]
intermediate = map(mapper, data)

In this particular example, all the elements in data are equal, but this does not need to be the case. If we imagine that the mapper function is run on computers with very different speed and memory, we would adapt the chunk sizes to the size of the individual compute units.

The reducer function takes in general two arguments: the result of the reduce operations so far, res, and a new element e from the list returned by map. Recall that e is a tuple \( (c_k, I_k) \). The reducer must add \( c_kI_k \) to the previous result res:

def reducer(res, e):
    c_k, I_k = e
    return res + c_k*I_k

A remark is important here. The above reducer function will work if reduce is called with a third argument 0. If not, the first call to reducer will have the two first elements of the the list returned from map as the res and e arguments. That case calls for a special treatment:

def reducer(res, e):
    c_k, I_k = e
    if isinstance(res, tuple):
        c_k0, I_k0 = res
        res = c_k0*I_k0
    return res + c_k*I_k

Situations like this always happens if the object type of res and e can be different, which is a rather common case. The test on the type of res breaks the logic that res is always the result so far and that e is to update res. We therefore recommend to always use reduce with a third argument, at least in these cases where reduce performs a summation.

Now we have the building blocks for creating a parallel Monte Carlo integration function where essentially specify a chunk size and the map-reduce construction runs independent integral approximations for each chunk (in parallel if desired) and combines the results:

def parallel_MonteCarlo(f, a, b, n, N=5):
    chunk_size = c_k = int(n/N)
    n = N*c_k   # adjust n to fit equal-sized chunks
    data = [(c_k, a, b, f) for k in range(N)]
    mapped_values = map(mapper, data)
    I = reduce(reducer, mapped_values, 0)/n
    return I

The mapped_values list is handy for debugging purposes, but is often omitted:

def parallel_MonteCarlo2(f, a, b, n, N=5):
    c_k = int(n/N)  # chunk size
    n = c_k*N       # adjust n to fit equal-sized chunks
    data = [(c_k, a, b, f) for k in range(N)]
    return reduce(reducer, map(mapper, data), 0)/n

The MapReduce framework

So far we have applied the classical map and reduce functions known from functional programming. The MapReduce framework as used by Google and many software libraries differs from the classical map-reduce framework in that the map and reduce functions take different input and produce different output. There is also an intermediate step between map and reduce.

In the MapReduce framework, the map step takes an iterable object as input (list, tuple, array, or any object obj allowing for element in obj), and produces a list of (key, value) pairs as output. The next step is a partition step where the list of (key, value) pairs are turned into a list of (key, values) pairs, where all values associated with a key are collected in a list. The reduce step takes the list from the partition step and applies the reducer to each key and its values. There is no possibility for accumulation of results as in the classical reduce method. The reduce step in the MapReduce framework is basically a map step, because elements from a list are fed to the reducer (without any previous results of the reduced step), and the output is collected in a list. A more classical reduce step is then required to process that list and calculate the final results. The next example will illustrate the differences between classical map-reduce and the "modern" MapReduce framework.

Monte Carlo integration with MapReduce

Let us turn the Monte Carlo implementation from the section Map with independent Monte Carlo integrals and into the MapReduce formulation. We need to define what the keys and values are as produced by the mapper. Since our mapper function already returns a 2-tuple \( (c_k, I_k) \), we may take \( c_k \) as key and \( I_k \) as value. The mapper function can therefore be reused without changes.

The partition step "sorts" the (key, value) pairs to (key, values) pairs. That is, all the values associated with one key are collected in a list. In the present example, this means that all the \( I_k \) values with the same chunk size \( c_k \) are to be collected in a list. The following general function performs the task, using a dictionary for convenience to collect keys and values:

def partition(mapped_values):
    partitioned_data = {}
    for key, value in mapped_values:
        if not key in partitioned_data:
            partitioned_data[key] = [value]
        else:
            partitioned_data[key].append(value)
    return partitioned_data.items()

The return statement turns dictionary into a list of pairs (key, partitioned_data[key]), as required for the partition step. Since all our chunks have the same size, there is only one key (\( c_k \)) and hence one element in the list returned from partition.

Treatment of the special case that key is not yet registered as a list in the dictionary can be avoided by utilizing a dictionary with default values (here empty lists):

def partition(mapped_values):
    partitioned_data = collections.defaultdict(lambda: [])
    for key, value in mapped_values:
        partitioned_data[key].append(value)
    return partitioned_data.items()

The reducer is supposed to have the signature reducer(e), where e is an element in the list produced by the partition step. In our case, there is only one key and reducer is called only once, but we code it as there can be different keys (chunk sizes):

def reducer(e):
    c_k, I_values = e
    return c_k*sum(I_values)

To produce the final result, all the returned values c_k*sum(I_values) must be summed and divided by the total number of samples (n).

The reduce function to be used in the MapReduce framework is quite different from the classical reduce function in Python. Our new reduce takes a reducer and a sequence as input, but the reducer only receives elements from this sequence and no result from the previous reducer operation. Actually, the new reduce is simply a classical map operation, so we can set reduce=map. The result in our case of calling reduce(reducer, partitioned_data) is a list reduced_values of \( c_k\sum_j I_j \) values, where the \( I_j \) in each sum are integral approximations using the same number (\( c_k \)) of samples. The final integral is then computed by

    reduced_values = reduce(reducer, partitioned_data)
    I = np.sum(reduced_values)/n

The complete parallel Monte Carlo function using the MapReduce framework takes the form

def parallel_MonteCarlo(f, a, b, n, N=5):
    chunk_size = c_k = int(n/N)
    n = N*c_k   # adjust n to fit equal-sized chunks
    data = [(c_k, a, b, f) for k in range(N)]
    mapped_values = map(mapper, data)
    partitioned_data = partition(mapped_values)
    reduce = map
    reduced_values = reduce(reducer, partitioned_data)
    I = np.sum(reduced_values)/n
    return I

Monte Carlo integration with a MapReduce class

Rather than calling the traditional Python map function and making our own partition and reduce functions, we can use a simple class framework that has the appropriate map, partition, and reduce steps, and that also supports true parallel computing.

The MapReduce module contains the class SimpleMapReduce, which is used as follows:

from MapReduce import SimpleMapReduce
map_reduce = SimpleMapReduce(mapper, reducer)
reduced_values = map_reduce(data)

Exercises

Exercise 1: Mean and variance computations

Suppose we have some data in a one-dimensional array data with numbers and that we want to calculate the mean and variance of the data. This is easily done with

mean = numpy.mean(data)
var  = numpy.var(data

However, we want to independently run numpy.mean and numpy.var on a set of \( N \) chunks of data via the map-reduce concept. Applying a mapper function on chunk number \( k \) results in a tuple \( (c_k, m_k, v_k) \), where \( c_k \) is the chunk size, \( m_k \) is the mean of this chunk of data, and \( v_k \) is the variance. The reducer function must combine two means and two variances to yield a new mean and variance. Relevant formulas are $$ \begin{align*} m_i &= \frac{c_jm_j + c_km_k}{c_j + c_k},\\ v_i &= \frac{c_jv_j + c_kv_k}{c_j+c_k} + c_jc_k\left(\frac{m_j - m_k}{c_j+c_k}\right)^2 \end{align*} $$

Make a program that calculates the mean and variance using \( N \) chunks in a map-reduce implementation. Check that the result compare well to applying numpy.mean and numpy.var on the whole array data. Filename: meanvar_mr.py.

Resources

Techniques behind: