Summary. The map-reduce concept is a unified way of implementing algorithms such that one can easily utilize (large-scale) parallel computing resources. A famous version of the concept, branded as MapReduce, was introduced by Google in 2004 and plays a key role when Google's search engines work with large sets of documents on large clusters of computers. Many different software libraries support variants of map-reduce. For Python code development, Disco is a tool that allows parallel processing of large-scale computing problems expressed by map-reduce algorithms. The purpose of the forthcoming text is to illustrate and explore the map-reduce concept for numerical computing.
def midpoint(f, a, b, n):
h = (b-a)/float(n)
s = 0
for i in range(n):
x_i = a + (i + 0.5)*h
s += f(x_i)
return h*s
The Monte Carlo integration method follows a very similar implementation:
import random
def MonteCarlo(f, a, b, n):
s = 0
for i in range(n):
x_i = random.uniform(a, b)
s += f(x_i)
return (b-a)/float(n)*s
Testing the two methods on a linear function \( f(x)=2x \),
print midpoint(f, a, b, 2)
print MonteCarlo(f, a, b, 4000)
results in
1.0
0.999962735888
The Midpoint rule integrates a linear function exactly for any \( n>0 \), while Monte Carlo integration is always approximate.
for
loop by numpy
operations on arrays.
Three steps are required to vectorize the sum \( \sum_i f(x_i) \): 1) all
the \( x_i \) values must be generated at once and stored in an array,
2) \( f \) must be applied
to the array of \( x_i \) values, and 3) a summation function must be
applied to the array of \( f(x_i) \) values. The resulting code looks like
"""Integration example: vectorized implementation."""
from functools import reduce
import numpy as np
def midpoint(f, a, b, n):
h = (b-a)/float(n)
x = np.linspace(a + 0.5*h, b - 0.5*h, n)
s = np.sum(f(x))
return h*s
def MonteCarlo(f, a, b, n):
x = np.random.uniform(a, b, n)
s = np.sum(f(x))
return (b-a)/float(n)*s
a = 0; b = 1
print midpoint (lambda x: 2*x, a, b, 2)
print MonteCarlo(lambda x: 2*x, a, b, 4000)
The vectorized code is shorter and (much) faster.
Observe that we have also saved some space by using lambda functions
instead of f(x)
: lambda x, y, z, ...: expression
is the same
as
def f(x, y, z, ...):
return expression
map
and reduce
. Python also has these functions. Calling map(f, L)
applies the function f(e)
to every element e
in a list L
and
returns the resulting values as a list. We can implement our own
map
function by
def mymap1(f, L):
r = []
for e in L:
r.append(f(e))
return r
def mymap2(f, L):
return [f(e) for e in L]
List comprehensions, as used in mymap2
, are recommended as replacement
of map
in modern Python.
The reduce
function is a bit more complicated. It operates on a list,
usually the result of map
, and reduces it to a single object.
If a list L
has 5 elements, the call reduce(f, L)
calculates
f(f(f(f(L[0], L[1]), L[2]), L[3]), L[4])
For example, with f=lambda x,y: x+y
we have
(((L[0] + L[1]) + L[2]) + L[3]) + L[4]
which is nothing but the sum of the elements in L
. Here is a demo
program:
from functools import reduce
def mapper(x):
print 'mapper: %s -> %s' % (x, x**2)
return x**2
def reducer(x, y):
print 'reducer: %s, %s -> %s' % (x, y, x + y)
return x + y
L = range(1, 5)
r = reduce(reducer, map(mapper, L))
yielding the output
mapper: 1 -> 1
mapper: 2 -> 4
mapper: 3 -> 9
mapper: 4 -> 16
reducer: 1, 4 -> 5
reducer: 5, 9 -> 14
reducer: 14, 16 -> 30
We see how x
in reducer
is always the result of the previous call
to reducer
and that y
is the next element in the list.
The reduce
function can take a third argument which acts as an
initial value of the reduce operations. Calling
r = reduce(reducer, map(mapper, L), 0)
results in different output in the reducer
function since the
first call will not consists of the first two elements in the
list returned by map
, but by the result so far (0) and the first
element in the list:
reducer: 0, 1 -> 1
reducer: 1, 4 -> 5
reducer: 5, 9 -> 14
reducer: 14, 16 -> 30
The use of an initial value makes it conceptually clearer what the
reducer does: reducer(res, e)
computes the combination of the
reduce results so far, res
, and a new element e
in the list returned
from map
. In these writings we will mostly use an initial value
when calling reduce
and write the reducer as reduce(res, e)
:
def reducer(res, e):
print 'reducer: %s, %s -> %s' % (res, e, res + e)
return res + e
The numerical integration rules are easily implemented using the
map-reduce concept since the map phase is to apply \( f \) to the list of
\( x_i \) points, and the reduce phase is to sum the elements in the
list returned from map
:
"""Integration example: map-reduce implementation and numpy constructs."""
from functools import reduce
import numpy as np
def midpoint(f, a, b, n):
h = (b-a)/float(n)
x = np.linspace(a + 0.5*h, b - 0.5*h, n)
fx = map(f, x)
s = reduce(lambda res, e: res + e, fx, 0)
return h*s
def MonteCarlo(f, a, b, n):
x = np.random.uniform(a, b, n)
fx = map(f, x)
s = reduce(lambda res, e: res + e, fx, 0)
return (b-a)/float(n)*s
a = 0; b = 1
print midpoint (lambda x: 2*x, a, b, 2)
print MonteCarlo(lambda x: 2*x, a, b, 4000)
Basically, the vectorized operation f(x)
is replaced by map
, and
np.sum
is replaced by reduce
.
The computation of x
can also be done my map
:
x = map(lambda i: a + (0.5+i)*h, range(n))
x = map(lambda i: random.uniform(a, b), range(n))
We remark that lambda functions as well as functions defined inside midpoint
or MonteCarlo
"remembers" variables like a
, b
, and n
(such
functions are referred to as closures).
map
, since map
applies the same function to each
element of a list, and all these actions are independent of each other.
That is, writing a map
operation immediately makes it clear that the
operations on the list are independent and can be done in parallel.
Imagine that a set of \( n \) compute units works in parallel such that unit \( i \) computes \( x_i \) and \( f(x_i) \). The summation step in the numerical integration formula requires the compute units to communicate: unit 0 needs to send its \( f(x_0) \) value to unit 1, which adds that value with its own value \( f(x_1) \) and sends the sum \( f(x_0)+f(x_1) \) to unit 2, which adds that value to \( f(x_2) \) and sends \( f(x_0)+f(x_1)+f(x_2) \) to unit 3, which adds that value to its own, and so forth until all partial sums have been communicated and we have the final sum. These actions are the same as expressed in the reduce step.
The idea is that software libraries can implement their parallel version
of map
and the communicating version of reduce
. If our algorithm
can be expressed as calls to map
and reduce
, this is an easy way of
parallelizing the algorithm. However, it does not make much sense to
distribute each \( x_i \) and \( f(x_i) \) computation to separate compute units:
we need to distribute a chunck of \( x_i \) and \( f(x_i) \) to a compute unit.
The next section explains the idea in a serial setting.
Partitioning the array of \( x_i \) values into equal-sized chunks, and putting remaining elements into the last chunk can be done by this function:
def partition(x, N):
"""Return a list of N chunks of x."""
chunk_size = int(len(x)/N)
c = [chunk_size for i in range(N)]
c[-1] = len(x) - (N-1)*chunk_size # remaining elements
chunks = [x[:c[0]]]
start = c[0]
for k in range(1, N-1):
#print 'k=%d, c[k]=%d, slice: %d:%d = %s' % (k, c[k], start, start+c[k], x[start:start+c[k]].tolist())
chunks.append(x[start:start+c[k]])
start = start + c[k]
chunks.append(x[start:])
return chunks
Note that the return value is a list of slices of (views into) the x
array.
A simpler approach to partitioning would be to just make x
a two-dimensional
array where each row represent a chunk of data. We would then normally
need to throw away some of the last elements in x
to create a two-dimensional
shape (i.e., to create equal-sized chunks):
def partition(x, N):
chunk_size = int(len(x)/N)
return x[:chunk_size*N].reshape(N, chunk_size)
The returned object is now a two-dimensional numpy
array, but map
can easily iterate over the first index, i.e., the rows in this object.
Below, we use the first version of partition
where the final chunk
may have a different size from the other ones.
The map
operation on each element
in the chunk
list must first apply the f
function (on the slice),
yielding a numpy
array, and then apply the np.sum
function on
this array. The result is the partial sum \( s_k \) as defined above.
The midpoint
method can be implemented as
def midpoint(f, a, b, n, N=5):
h = (b-a)/float(n)
x = np.linspace(a + 0.5*h, b - 0.5*h, n)
chunks = partition(x, N)
mapped_values = map(lambda x_k: np.sum(f(x_k)), chunks)
s = reduce(lambda res, e: res + e, mapped_values, 0)
return h*s
To understand what is computed and if the intermediate results are right, we call
print midpoint (lambda x: 2*x, -0.5, 10.5, 11, N=4)
This results in x
as [0, 1, 2, ..., 10]
. With \( n=11 \) and \( N=4 \),
the chunk_size
becomes 2 (recall that division between two integers
in Python version 2.x implies integer division, while in Python 3.x
a float results, but the next int
transformation truncates the
decimals so the final result is identical to integer division).
The resulting 4 chunks now become
[0, 1]
, [2, 3]
, [4, 5]
, and [6, 7, 8, 9, 10]
.
The s
list returned from map
is [2, 10, 18, 80]
.
Here is an example on how to run the code inside the debugger in
ipython
and inspect the intermediate results inside the midpoint
function:
In [1]: run -d int_mrc.py
Breakpoint 1 at /some/path/int_mrc.py:1
NOTE: Enter 'c' at the ipdb> prompt to start your script.
> <string>(1)<module>()
ipdb> c
> /some/path/int_mrc.py(1)<module>()
1---> 1 import numpy as np
2
3 def partition(x, N):
ipdb> break midpoint
Breakpoint 2 at /some/path/int_mrc.py:17
ipdb> c
> /some/path/int_mrc.py(18)midpoint()
2 17 def midpoint(f, a, b, n, N=5):
---> 18 h = (b-a)/float(n)
19 x = np.linspace(a + 0.5*h, b - 0.5*h, n)
ipdb> n
> /some/path/int_mrc.py(19)midpoint()
18 h = (b-a)/float(n)
---> 19 x = np.linspace(a + 0.5*h, b - 0.5*h, n)
20 chunks = partition(x, N)
ipdb> n
> /some/path/int_mrc.py(20)midpoint()
19 x = np.linspace(a + 0.5*h, b - 0.5*h, n)
---> 20 chunks = partition(x, N)
21 s = map(lambda x_k: np.sum(f(x_k)), chunks)
ipdb> n
> /some/path/int_mrc.py(21)midpoint()
20 chunks = partition(x, N)
---> 21 s = map(lambda x_k: np.sum(f(x_k)), chunks)
22 s = reduce(lambda x,y: x+y, s)
ipdb> import pprint; pprint.pprint(chunks)
[array([ 0., 1.]),
array([ 2., 3.]),
array([ 4., 5.]),
array([ 6., 7., 8., 9., 10.])]
ipdb> n
> /some/path/int_mrc.py(22)midpoint()
21 s = map(lambda x_k: np.sum(f(x_k)), chunks)
---> 22 s = reduce(lambda x,y: x+y, s)
23 return h*s
ipdb> print s
[2.0, 10.0, 18.0, 80.0]
ipdb> print sum(s), b**2-a**2
110.0 110.0
The MonteCarlo
function working with chunks is similar to function
midpoint
, but we can drop the partition
function and instead
generate the random numbers for each chunk directly. Equal-sized
chunks will then be convenient so we adjust \( n \) to fit \( N \) equal-sized
chunks. The code reads
def MonteCarlo(f, a, b, n, N=5):
chunk_size = c_k = int(n/N)
n = N*c_k # adjust n to ensure equal-sized chunks
chunks = map(lambda i: np.random.uniform(a, b, c_k),
range(N))
mapped_values = map(lambda x_k: np.sum(f(x_k)), chunks)
s = reduce(lambda res, e: res + e, mapped_values, 0)
return (b-a)/float(n)*s
map
must return a list of \( (c_k, I_k) \) tuples, and
reduce
must combine these correctly. The forthcoming code will provide
the details.
The vectorized MonteCarlo
function shown earlier is a good candidate
for computing \( I_k \):
def MonteCarlo(f, a, b, n):
x = np.random.uniform(a, b, n)
s = np.sum(f(x))
return (b-a)/float(n)*s
a = 0; b = 1
The mapper function called for each element in the list that map
operates on must call this MonteCarlo
function. To this end, the mapper
function needs to pass f
, a
, b
, and the chunk size on to
MonteCarlo
. Since the mapper function can take only one argument,
we need to pack the necessary data in this argument. A list, here called
e
(for element), is used for this purpose:
def mapper(e):
c_k, a, b, f = e
I_k = MonteCarlo(f, a, b, c_k)
return c_k, I_k
Note that mapper
returns a tuple \( (c_k, I_k) \) since the reduce step
needs access to these data.
The map step consists of packing a list of e
objects as expected
by mapper
and pass this on to map
. With c_k
denoting the chunk size
we can write
data = [(c_k, a, b, f) for k in range(N)]
intermediate = map(mapper, data)
In this particular example, all the elements in data
are equal, but this
does not need to be the case. If we imagine that the mapper
function
is run on computers with very different speed and memory, we would
adapt the chunk sizes to the size of the individual compute units.
The reducer function takes in general two arguments: the result of
the reduce operations so far, res
, and a new element e
from the
list returned by map
. Recall that e
is a tuple \( (c_k, I_k) \).
The reducer must add \( c_kI_k \) to the previous result res
:
def reducer(res, e):
c_k, I_k = e
return res + c_k*I_k
A remark is important here. The above reducer
function will work
if reduce
is called with a third argument 0
. If not, the
first call to reducer
will have the two first elements of the
the list returned from map
as the res
and e
arguments. That case
calls for a special treatment:
def reducer(res, e):
c_k, I_k = e
if isinstance(res, tuple):
c_k0, I_k0 = res
res = c_k0*I_k0
return res + c_k*I_k
Situations like this always happens if the object type of res
and
e
can be different, which is a rather common case. The test on
the type of res
breaks the logic that res
is always the result so far and
that e
is to update res
. We therefore recommend to always use reduce
with a third argument, at least in these cases where reduce
performs
a summation.
Now we have the building blocks for creating a parallel Monte Carlo integration function where essentially specify a chunk size and the map-reduce construction runs independent integral approximations for each chunk (in parallel if desired) and combines the results:
def parallel_MonteCarlo(f, a, b, n, N=5):
chunk_size = c_k = int(n/N)
n = N*c_k # adjust n to fit equal-sized chunks
data = [(c_k, a, b, f) for k in range(N)]
mapped_values = map(mapper, data)
I = reduce(reducer, mapped_values, 0)/n
return I
The mapped_values
list is handy for debugging purposes, but is often
omitted:
def parallel_MonteCarlo2(f, a, b, n, N=5):
c_k = int(n/N) # chunk size
n = c_k*N # adjust n to fit equal-sized chunks
data = [(c_k, a, b, f) for k in range(N)]
return reduce(reducer, map(mapper, data), 0)/n
In the MapReduce framework, the map step takes an iterable object as
input (list, tuple, array, or any object obj
allowing for element
in obj
), and produces a list of (key, value) pairs as output. The
next step is a partition step where the list of (key, value) pairs are
turned into a list of (key, values) pairs, where all values associated
with a key are collected in a list. The reduce step takes the list
from the partition step and applies the reducer to each key and its
values. There is no possibility for accumulation of results as in the
classical reduce
method. The reduce step in the MapReduce framework
is basically a map step, because elements from a list are fed to the
reducer (without any previous results of the reduced step), and the
output is collected in a list. A more classical reduce
step is then required to process that list and calculate the final
results. The next example will illustrate the differences between
classical map-reduce and the "modern" MapReduce framework.
mapper
function already returns a 2-tuple \( (c_k, I_k) \),
we may take \( c_k \) as key and \( I_k \) as value. The mapper
function can
therefore be reused without changes.
The partition step "sorts" the (key, value) pairs to (key, values) pairs. That is, all the values associated with one key are collected in a list. In the present example, this means that all the \( I_k \) values with the same chunk size \( c_k \) are to be collected in a list. The following general function performs the task, using a dictionary for convenience to collect keys and values:
def partition(mapped_values):
partitioned_data = {}
for key, value in mapped_values:
if not key in partitioned_data:
partitioned_data[key] = [value]
else:
partitioned_data[key].append(value)
return partitioned_data.items()
The return statement turns dictionary into a list of pairs
(key, partitioned_data[key])
, as required for the partition step.
Since all our
chunks have the same size, there is only one key (\( c_k \)) and hence
one element in the list returned from partition
.
Treatment of the special case that key
is not yet registered as a list
in the dictionary can be avoided by utilizing a dictionary with
default values (here empty lists):
def partition(mapped_values):
partitioned_data = collections.defaultdict(lambda: [])
for key, value in mapped_values:
partitioned_data[key].append(value)
return partitioned_data.items()
The reducer is supposed to have the signature reducer(e)
,
where e
is an element in the list produced by the partition
step. In our case, there is only one key and reducer
is called only
once, but we code it as there can be different keys (chunk sizes):
def reducer(e):
c_k, I_values = e
return c_k*sum(I_values)
To produce the final result, all the returned values c_k*sum(I_values)
must be summed and divided by the total number of samples (n
).
The reduce function to be used in the MapReduce framework is quite
different from the classical reduce
function in Python. Our new
reduce
takes a reducer and a sequence as input, but the reducer only
receives elements from this sequence and no result from the previous
reducer
operation. Actually, the new reduce
is simply a classical map
operation, so we can set reduce=map
.
The result in our case of calling reduce(reducer, partitioned_data)
is a list reduced_values
of \( c_k\sum_j I_j \) values, where
the \( I_j \) in each sum are integral approximations using the same number (\( c_k \))
of samples. The final integral is then computed by
reduced_values = reduce(reducer, partitioned_data)
I = np.sum(reduced_values)/n
The complete parallel Monte Carlo function using the MapReduce framework takes the form
def parallel_MonteCarlo(f, a, b, n, N=5):
chunk_size = c_k = int(n/N)
n = N*c_k # adjust n to fit equal-sized chunks
data = [(c_k, a, b, f) for k in range(N)]
mapped_values = map(mapper, data)
partitioned_data = partition(mapped_values)
reduce = map
reduced_values = reduce(reducer, partitioned_data)
I = np.sum(reduced_values)/n
return I
map
function
and making our own partition
and reduce
functions, we can
use a simple class framework that has the appropriate map, partition,
and reduce steps, and that also supports true parallel computing.
The MapReduce
module contains the class SimpleMapReduce
, which is
used as follows:
from MapReduce import SimpleMapReduce
map_reduce = SimpleMapReduce(mapper, reducer)
reduced_values = map_reduce(data)
data
with
numbers and that we want to calculate the mean and variance of the data.
This is easily done with
mean = numpy.mean(data)
var = numpy.var(data
However, we want to independently run numpy.mean
and numpy.var
on a set of
\( N \) chunks of data
via the map-reduce concept. Applying a mapper
function on chunk number \( k \) results in a tuple \( (c_k, m_k, v_k) \), where
\( c_k \) is the chunk size, \( m_k \) is the mean of this chunk of data, and \( v_k \)
is the variance. The reducer function must combine two means and two
variances to yield a new mean and variance. Relevant formulas are
$$
\begin{align*}
m_i &= \frac{c_jm_j + c_km_k}{c_j + c_k},\\
v_i &= \frac{c_jv_j + c_kv_k}{c_j+c_k} + c_jc_k\left(\frac{m_j - m_k}{c_j+c_k}\right)^2
\end{align*}
$$
Make a program that calculates the mean and variance using \( N \) chunks
in a map-reduce implementation.
Check that the result compare well to applying numpy.mean
and numpy.var
on the whole array data
.
Filename: meanvar_mr.py
.