In this lab, you will write a MapReduce library. You will also write several MapReduce programs that make use of your library. You will be programming in Go.
A straightforward use of the library looks as follows:
package main
import mr "mapreduce"
func mapF(fileName string, contents string) (res []mr.KeyValue) {
for _, word := range words(contents) {
res = append(res, mr.KeyValue{Key: word, Value: ""})
}
return
}
func reduceF(key string, values []string) string {
return ""
}
func main() {
mr.Run("example", mapF, reduceF)
}
The result of executing the MapReduce program above (named "example"
) is a
sorted list of non-duplicated words from any number of documents. The merged
output is stored in data/output/mr.result.{jobName}
(/data/output/mr.result.example
here). Excepting the words(...)
pseudo-function, the program above will directly execute on top of the library
you will build.
The mapF
function is called once per map task; in this library, there is one
map task per input file. The mapF
function receives as input the name of the
input file and the contents of the file. It returns a slice of key-value pairs.
The reduceF
function is called once for each unique key emitted by mappers.
Keys are distributed among many reducers; the maximum number of reducers is
chosen by the user via the command line and defaults to 20. The inputs to the
reduce function are a key and all of the values that were mapped from that key
by map tasks.
Your library will be able to execute MapReduce program sequentially, using a
single worker, and in parallel, using as many workers as desired. When executing
a job in parallel, the master and workers communicate via RPC calls. When a
worker launches, it sends a Register
RPC to the master. The master registers
the worker and begins sending DoMap
and/or DoReduce
RPCs to the worker,
depending on what the master wants to the worker to do. The master continues
sending work to its workers, accounting for worker failures as it proceeds. When
the computation is complete, the master sends Shutdown
RPCs to its workers. In
response, the workers terminate. During the course of the lab, you will
implement a sequential master, a worker node, a parallel master, and several
MapReduce programs.
This lab is divided into 5 phases and an additional extra credit phase. There are unit tests corresponding to each phase, allowing you to mark your progress as you work through the lab. You may find the list of references to the right (or below if on mobile), particularly those on the Go language, useful as your proceed.
First, ensure that you are working on the lab using a compatible machine. For this lab, compatible means that the machine meets the following requirements:
arch
)If you don’t have a machine that meets these requirements, please contact the
course staff. Alternatively, you can use virtual machine software, such
VirtualBox, to launch a copy of Linux. Note:
The lab will not function properly on corn
. This is because corn
uses the
AFS file system which does not support Unix domain
sockets
on non-global directories.
To work on this lab on Windows, you’ll need to install a bit of software:
During the Git install, we recommend that you select “Use Git from the Windows Command Prompt” and “Checkout as-is, commit Unix-style line endings”.
After you’ve installed these three packages, open Git Bash and create the file
~/.bashrc
with the following content:
export PATH="/c/msys/1.0/bin":"/c/Go/bin":"${PATH}"
You may need to adjust the paths if you were already using Bash on Windows of if
you installed MSYS or Go in alternate directories. Then, restart Git Bash. You
should see a message indicating that Git Bash has discovered the file and has
created a few other files for you. Restart Git Bash one more time. Try calling
go version
and make -v
now. If the commands succeed, your environment is
almost ready.
Finally, you’ll need to ensure that your firewall settings allow the master and its workers to communicate. On Windows, this communication will happen via TCP. Because workers will use a random port to communicate, we suggest that you disable the Windows firewall for local (private) network entirely while working on this lab.
To begin, clone the lab 3 skeleton git repository to your development machine:
git clone https://web.stanford.edu/class/cs240/lab2-skeleton.git lab2
The MapReduce library code is in the src/mapreduce/
directory. This directory
contains the following files:
common.go
contains type definitions and functions useful for all library
codemaster.go
contains interface definitions and functions useful for mastersparallel_master.go
contains the implementation of the parallel masterparse_cmd_line.go
exports functions which simplify the execution of
MapReduce programsrpc.go
implements RPC servers and functions for workers and masterssequential_master.go
contains the implementation of the sequential mastertest_test.go
is the unit testing codeworker.go
contains the implementation of a worker nodeThe following files contains MapReduce program/job code:
src/wordcount/main.go
contains the code for the wordcount
programsrc/invertedindex/main.go
contains the code for the invertedindex
programsrc/pagerank/main.go
contains the code for the pagerank
programThe data/
directory contains the following subdirectories:
input/
contains text files that will be inputs for your MapReduce programsoutput/
will contain data output from your library and the testerexpected/
contains the expected output from your MapReduce programsFinally, the test.sh
file contains the code to run all of the tests. This
shell script executes unit tests and integration tests. The integration tests
are defined in the script itself while the unit tests are written in Go and are
in src/mapreduce/test_test.go
. If a test you expect to pass fails, you can
look at these files for clues.
Although Make is not commonly used with Go programs, we have provided a
Makefile
that can simplify calling some of the go
commands. The following
targets are defined in the Makefile
:
all
: compiles wordcount
, invertedindex
, and pagerank
wordcount
: compiles just wordcount
invertedindex
: compiles just invertedindex
pagerank
: compiles just pagerank
test
: runs unit and integration tests using test.sh
submission
: creates the lab2.tar.gz
submission fileclean
: deletes files that can be remadeCalling make
in your shell will implicitly run the all
target. The other
targets can be invoked using make {target}
. Ensure that you are in the root of
your lab directory before invoking make
. Try calling make test
now. You
should see five failing test suites. If this doesn’t work, your machine may not
meet the requirements.
Before you continue, read through the files in src/
to get a feel for how the
code is structured. Read the function comments and ensure that they make sense
to you. Once you have an idea for how the library works at a high level, you are
ready to start writing code.
In this phase, you will implement the Start
SequentialMaster
method in
src/mapreduce/sequential_master.go
. The implementation should be simple and
straightforward: this phase is meant to provide a working understanding of the
library.
MapReduce can be implemented in many different ways. Among them is the simple sequential implementation. In this implementation, a single worker performs each map task one after another, serially. Once they are all complete, the same worker performs each reduce task, one after another, serially.
The SequentialMaster
is responsible for scheduling work in exactly this
manner. In particular, in this library, the master should create a new worker
using the NewWorker
function, schedule one map task for each input file by
calling the DoMap
worker method, and finally schedule one reduce task for each
requested reduce task by calling the DoReduce
worker method. The number of
requested reduce tasks can be found in the NumReducers
field of the
SequentialMaster
structure. Since the DoMap
and DoReduce
functions haven’t
been implemented yet, no real work will happen. To test your code, we have
provided dummy implementations of these functions which you will remove in the
next phase.
Implement the Start
method in src/mapreduce/sequential_master.go
. Once you
have successfully implemented this method, you should pass the phase 1 unit
tests.
In this phase, you will implement the DoMap
and DoReduce
worker methods in
src/mapreduce/worker.go
.
A worker is responsible for reading the required input and calling the user’s
supplied map or reduce functions. For map, the worker is responsible for
reading the input file, calling the user’s mapF
function, partitioning the key
value pairs by a given hash for a given reducer, and then writing out the key
value pairs to a file with a name that the reducer will later retrieve. Because
coming up with a good file name can be tricky, we’ve provided you with the
reduceInputName
function that, given a job name, a mapper number, and a
reducer number, returns the file name for the corresponding reduce input
filename.
You’ll need to ensure that key-value pair data is written out and read in reliably, regardless of the contents. As a result, you’ll need to use some reliable form of serialization. Thankfully, Go provides a JSON encoding and decoding library. While you’re not required to use JSON between mappers and reducers, reducers are required to emit JSON for the final merge phase, which is implemented for you. As a result, we highly encourage you to use JSON.
To write a series of JSON-encoded key-value pairs to a file
, you may use a
snippet such as:
enc := json.NewEncoder(file)
for _, kv := ... {
err := enc.Encode(&kv)
}
file.Close()
To read all of the JSON-encoded key-value pairs from a file
, you may use a
snippet such as:
dec := json.NewDecoder(file)
var kv KeyValue
for err := dec.Decode(&kv); err == nil; err = dec.Decode(&kv) {
... kv.Key, kv.Value ...
}
For reduce, the worker is responsible for reading in all of its input files
generated by every mapper, grouping together all of the values for a given key,
and passing each key and its values to the user’s reduce function. The value
output from the user’s reduce function should be coupled with the key into a
KeyValue
structure, serialized, and written out to the merger’s input file,
whose name can be obtained by calling the ReduceOutputName
function with the
proper arguments.
Implement the DoMap
and DoReduce
worker methods in
src/mapreduce/worker.go
. Remove the dummy implentations that are marked as
such. You should consult the Go package documentation
as needed. Once you have successfully completed this phase, you should pass the
phase 2 unit tests.
In this phase, you will implement the mapF
and reduceF
functions in
src/wordcount/main.go
.
Now that we have a working master and worker, we can implement MapReduce
programs. In this phase, you will implement the classic word count example. In
particular, you will implement mapF
and reduceF
in src/wordcount/main.go
so that running the program with many input files reports the number of
occurences of each word in the input files. A word is any contiguous sequence
of letters as determined by unicode.IsLetter
.
To compile the wordcount
binary, you may call go build wordcount
after
setting your GOPATH to the lab’s root
directory. Alternatively, you can call make wordcount
which will automatically
(but temporarily) set the GOPATH
. Once the binary has compiled, you can call
it as: ./wordcount -s data/input/pg-*.txt
. This invokes the SequentialMaster
you wrote in phase 1 with the word count map and reduce functions, and uses the
file names of 16 text files in the data/input/
directory as the input files.
You’ll likely want to use fewer files for testing to speed things up. The output
from the reducers is merged by Run
and placed in a single output file named
data/output/mr.result.wordcount
.
Implement the word count program. After you have successfuly implemented this
phase, you should pass the phase 3 unit tests. The tests in test.sh
verify
that your output matches the expected output, which can be found in
data/expected/wc-expected.txt
. You should be able to compute the expected
output from your output using the following command:
sort -n -k2 data/output/mr.result.wordcount | tail -10
Note: The text files are sourced from Project Gutenberg.
In this phase, you will implement the Start
and schedule
methods in
src/mapreduce/parallel_master.go
.
While the SequentialMaster
does its job as expected, we’d ideally like for
tasks to run in parallel when possible to maximize performance. In Google’s
MapReduce implementation, this meant running map and reduce tasks in parallel on
multiple machines. To simplify the lab implementation, we’ve constrained
parallelization to multiple cores on a single machine. Nonetheless, we have
designed the the library such that extending it to work across multiple machines
would not require too much work.
The ParallelMaster
will receive a Register
RPC call when a new worker is
available to take on work. We’ve implemented the RPC handler for you. The
handler simply sends the worker’s address on the freeWorkers
channel for later
scheduling.
The Start
method should create a buffered channel and fill it with arguments
for tasks to schedule using the schedule
method. For map tasks, it should
create *DoMapArgs
structures. For reduce tasks, it should create
*DoReduceArgs
structures. The schedule
method, in turn, should ensure that
each task in the channel in its input is completed by a worker. It should not
return until all the tasks in the input channel have been completed. You can
send a worker a task
via RPC as follows:
ok := callWorker(workerAddress, task.TaskName(), task, new(interface{}))
Note that workers may fail at any point in time. Because a worker failure is generally indistinguishable from a connection failure, it suffices to check whether the RPC call failed. If the call fails, you’ll need to reschedule the work on the next available worker. Be aware that workers may resume normal operation after failing; you should continue giving those resumed workers tasks.
Implement the Start
and schedule
methods for the parallel master. Once you
have successfully implemented these methods, you should pass the phase 4 unit
tests.
In this phase, you will implement the mapF
and reduceF
functions in
src/invertedindex/main.go
.
To wrap up, you’ll write one more MapReduce program: inverted index generation. An inverted index maps a piece of content to its location. For example, it might map a keyword to the document where that keyword is found. As you might imagine, this type of map turns out to be extremely useful in practice (as opposed to, say, word count…). Your inverted index program will create a mapping from a word to the number of files the word appears in and the names of those files. For a given word, your program should output a string of the form:
#files file,names,sorted,and,separated,by,commas
For example:
about: 2 data/input/pg-moby_dick.txt,data/input/pg-tom_sawyer.txt
You can compile the invertedindex
binary just like the wordcount
binary: by
calling go build invertedindex
or make invertedindex
. Once the binary has
compiled, you can call it as: ./invertedindex -s data/input/pg-*.txt
to use
the sequential master, or ./invertedindex -p data/input/pg-*.txt
to use the
parallel master. You may also pass in -r n
where n
is some integer to
specify the number of reducers to use. If you launch the parallel master, you’ll
need to run some workers as well: ./invertedindex -w
.
Implement the inverted index program. After you have successfully implemented
this phase, you should pass the phase 5 unit tests. The tests in test.sh
verify that your output matches the expected output, which can be found in
data/expected/ii-expected.txt
. You should be able to compute the expected
output from your output using the following command:
sort -k1,1 data/output/mr.result.invertedindex | sort -snk2,2 | grep -v '16' | tail -10
Once you’ve completed the tasks above, you’re done and ready to submit! Any
changes made to files outside of the src/
directory will not be submitted
with your lab. Ensure that your submission is not dependent on those changes,
if any.
Finally, run make submission
and then proceed to the
submission page to upload your submission.
This phase is entirely optional and lack of completion will have no adverse effects on your score. Successful completion of this phase will result in a 20% boost to your score. You must successfully complete all other phases to be awarded the extra credit.
Legend has it that PageRank was one of the primary MapReduce workloads. As
such, it is only fitting that you implement PageRank on top of your MapReduce
library! You should implement PageRank in src/pagerank/main.go
. To test your
implementation, uncomment the TODO
test line in test.sh
.
The idea behind PageRank is fairly simple: a page’s “rank” should be equal to the probability that a random user surfing the web lands on that page. To calculate this probability, you can take an iterative approach:
Assign each page an initial page rank of $\frac{1}{N}$, where $N$ is the total number of pages.
Update the page rank $PR$ of a given page $p$ using the following formula:
$$ PR(p) = \sum\limits_{v \in I_p} \frac{PR(v)}{L(v)} $$
where $I_p$ is the set of pages linking to page $p$, and $L(v)$ is the number of pages that $v$ is linking to. In words: the page rank of some page $p$ is equal to the sum of the page ranks of all of its inbound links ($I_p$) where each page rank is divided by the number of outbounds links for the given page ($L(v)$).
Step 2 is repeated many times until the page rank converges. The page rank computation is usually dampened using a factor of $d$ by changing the formula into the following:
$$ PR(p) = \frac{1 - d}{N} + d \left( \sum\limits_{v \in I_p} \frac{PR(v)}{L(v)} \right) $$
Your implementation will use the formula with dampening with $d = 0.85$. You will be computing the page rank of 20,000 pages, so $N = 20000$. Your implementation should iterate exactly 10 times, disregarding convergence.
The format of the input files is a series of lines of the form:
p: PR, v1, v2, ..., vk
where p
is the identifier for a given page, PR
is the current page rank of
that page, and v1, v2, ..., vk
are the list of links on that page, i.e,
outbounds links from p
. The file data/input/tiny-pr-input.txt
contains an
example. You can use this file to quickly test your implementation before
running it on the full set of 20,000 pages which is split into 20 files:
pr-input*.txt
. $N$ for the tiny file is 4, while $N$ for the set of 20 files
is 20,000.
After you have successfully implemented this phase, you should pass the phase 6
unit tests. The tests in test.sh
verify that your output matches the expected
output, which can be found in data/expected/pr-expected.txt
. You should be
able to compute the expected output from your output using the following
command:
sort -k2,2 -srn data/output/mr.result.pagerank | cut -c-13 | head -n 15
To receive the extra credit, please add a file named extra_credit.txt
inside
the src/
directory explaining your implementation. Then, resubmit.
This lab is inspired by and partly based on a similar offering from MIT’s 6.824.
MapReduce Paper
Google's 2004 OSDI paper on MapReduce. Section 3 is particularly useful for this lab.
Tour of Go
A fairly quick, concise, and interactive introduction to the Go Programming language.
Effective Go
From Google: A document that gives tips for writing clear, idiomatic Go code. A must read for any new Go programmer.
How to Write Go
A document describing the pragmatics behind writing software projects in Go.
Channels Wiki Page
A pithy, yet informal, description of channels in programming.
PageRank Wiki Page
Wikipedia's PageRank explanation.