Lab 2 MapReduce

Due: Monday May 22, 2017 11:59PM


Overview

In this lab, you will write a MapReduce library. You will also write several MapReduce programs that make use of your library. You will be programming in Go.

A straightforward use of the library looks as follows:

package main

import mr "mapreduce"

func mapF(fileName string, contents string) (res []mr.KeyValue) {
    for _, word := range words(contents) {
        res = append(res, mr.KeyValue{Key: word, Value: ""})
    }

    return
}

func reduceF(key string, values []string) string {
    return ""
}

func main() {
    mr.Run("example", mapF, reduceF)
}

The result of executing the MapReduce program above (named "example") is a sorted list of non-duplicated words from any number of documents. The merged output is stored in data/output/mr.result.{jobName} (/data/output/mr.result.example here). Excepting the words(...) pseudo-function, the program above will directly execute on top of the library you will build.

The mapF function is called once per map task; in this library, there is one map task per input file. The mapF function receives as input the name of the input file and the contents of the file. It returns a slice of key-value pairs. The reduceF function is called once for each unique key emitted by mappers. Keys are distributed among many reducers; the maximum number of reducers is chosen by the user via the command line and defaults to 20. The inputs to the reduce function are a key and all of the values that were mapped from that key by map tasks.

Your library will be able to execute MapReduce program sequentially, using a single worker, and in parallel, using as many workers as desired. When executing a job in parallel, the master and workers communicate via RPC calls. When a worker launches, it sends a Register RPC to the master. The master registers the worker and begins sending DoMap and/or DoReduce RPCs to the worker, depending on what the master wants to the worker to do. The master continues sending work to its workers, accounting for worker failures as it proceeds. When the computation is complete, the master sends Shutdown RPCs to its workers. In response, the workers terminate. During the course of the lab, you will implement a sequential master, a worker node, a parallel master, and several MapReduce programs.

This lab is divided into 5 phases and an additional extra credit phase. There are unit tests corresponding to each phase, allowing you to mark your progress as you work through the lab. You may find the list of references to the right (or below if on mobile), particularly those on the Go language, useful as your proceed.


Phase 0: Getting Started

First, ensure that you are working on the lab using a compatible machine. For this lab, compatible means that the machine meets the following requirements:

  • Runs a modern version of: Linux, BSD, OS X, or Windows
  • Runs a 64-bit variant of the OS (check with arch)
  • Has a recent version (>= 1.2, >= 1.5 preferred) of Go installed

If you don’t have a machine that meets these requirements, please contact the course staff. Alternatively, you can use virtual machine software, such VirtualBox, to launch a copy of Linux. Note: The lab will not function properly on corn. This is because corn uses the AFS file system which does not support Unix domain sockets on non-global directories.

Developing on Windows

To work on this lab on Windows, you’ll need to install a bit of software:

During the Git install, we recommend that you select “Use Git from the Windows Command Prompt” and “Checkout as-is, commit Unix-style line endings”.

After you’ve installed these three packages, open Git Bash and create the file ~/.bashrc with the following content:

export PATH="/c/msys/1.0/bin":"/c/Go/bin":"${PATH}"

You may need to adjust the paths if you were already using Bash on Windows of if you installed MSYS or Go in alternate directories. Then, restart Git Bash. You should see a message indicating that Git Bash has discovered the file and has created a few other files for you. Restart Git Bash one more time. Try calling go version and make -v now. If the commands succeed, your environment is almost ready.

Finally, you’ll need to ensure that your firewall settings allow the master and its workers to communicate. On Windows, this communication will happen via TCP. Because workers will use a random port to communicate, we suggest that you disable the Windows firewall for local (private) network entirely while working on this lab.

Getting the skeleton code

To begin, clone the lab 3 skeleton git repository to your development machine:

git clone https://web.stanford.edu/class/cs240/lab2-skeleton.git lab2

Exploring the skeleton code

The MapReduce library code is in the src/mapreduce/ directory. This directory contains the following files:

  • common.go contains type definitions and functions useful for all library code
  • master.go contains interface definitions and functions useful for masters
  • parallel_master.go contains the implementation of the parallel master
  • parse_cmd_line.go exports functions which simplify the execution of MapReduce programs
  • rpc.go implements RPC servers and functions for workers and masters
  • sequential_master.go contains the implementation of the sequential master
  • test_test.go is the unit testing code
  • worker.go contains the implementation of a worker node

The following files contains MapReduce program/job code:

  • src/wordcount/main.go contains the code for the wordcount program
  • src/invertedindex/main.go contains the code for the invertedindex program
  • src/pagerank/main.go contains the code for the pagerank program

The data/ directory contains the following subdirectories:

  • input/ contains text files that will be inputs for your MapReduce programs
  • output/ will contain data output from your library and the tester
  • expected/ contains the expected output from your MapReduce programs

Finally, the test.sh file contains the code to run all of the tests. This shell script executes unit tests and integration tests. The integration tests are defined in the script itself while the unit tests are written in Go and are in src/mapreduce/test_test.go. If a test you expect to pass fails, you can look at these files for clues.

Make targets

Although Make is not commonly used with Go programs, we have provided a Makefile that can simplify calling some of the go commands. The following targets are defined in the Makefile:

  • all: compiles wordcount, invertedindex, and pagerank
  • wordcount: compiles just wordcount
  • invertedindex: compiles just invertedindex
  • pagerank: compiles just pagerank
  • test: runs unit and integration tests using test.sh
  • submission: creates the lab2.tar.gz submission file
  • clean: deletes files that can be remade

Calling make in your shell will implicitly run the all target. The other targets can be invoked using make {target}. Ensure that you are in the root of your lab directory before invoking make. Try calling make test now. You should see five failing test suites. If this doesn’t work, your machine may not meet the requirements.

Getting familiar

Before you continue, read through the files in src/ to get a feel for how the code is structured. Read the function comments and ensure that they make sense to you. Once you have an idea for how the library works at a high level, you are ready to start writing code.


Phase 1: Sequential Scheduling

In this phase, you will implement the Start SequentialMaster method in src/mapreduce/sequential_master.go. The implementation should be simple and straightforward: this phase is meant to provide a working understanding of the library.

MapReduce can be implemented in many different ways. Among them is the simple sequential implementation. In this implementation, a single worker performs each map task one after another, serially. Once they are all complete, the same worker performs each reduce task, one after another, serially.

The SequentialMaster is responsible for scheduling work in exactly this manner. In particular, in this library, the master should create a new worker using the NewWorker function, schedule one map task for each input file by calling the DoMap worker method, and finally schedule one reduce task for each requested reduce task by calling the DoReduce worker method. The number of requested reduce tasks can be found in the NumReducers field of the SequentialMaster structure. Since the DoMap and DoReduce functions haven’t been implemented yet, no real work will happen. To test your code, we have provided dummy implementations of these functions which you will remove in the next phase.

Implement the Start method in src/mapreduce/sequential_master.go. Once you have successfully implemented this method, you should pass the phase 1 unit tests.


Phase 2: Worker

In this phase, you will implement the DoMap and DoReduce worker methods in src/mapreduce/worker.go.

A worker is responsible for reading the required input and calling the user’s supplied map or reduce functions. For map, the worker is responsible for reading the input file, calling the user’s mapF function, partitioning the key value pairs by a given hash for a given reducer, and then writing out the key value pairs to a file with a name that the reducer will later retrieve. Because coming up with a good file name can be tricky, we’ve provided you with the reduceInputName function that, given a job name, a mapper number, and a reducer number, returns the file name for the corresponding reduce input filename.

You’ll need to ensure that key-value pair data is written out and read in reliably, regardless of the contents. As a result, you’ll need to use some reliable form of serialization. Thankfully, Go provides a JSON encoding and decoding library. While you’re not required to use JSON between mappers and reducers, reducers are required to emit JSON for the final merge phase, which is implemented for you. As a result, we highly encourage you to use JSON.

To write a series of JSON-encoded key-value pairs to a file, you may use a snippet such as:

enc := json.NewEncoder(file)
for _, kv := ... {
    err := enc.Encode(&kv)
}
file.Close()

To read all of the JSON-encoded key-value pairs from a file, you may use a snippet such as:

dec := json.NewDecoder(file)

var kv KeyValue
for err := dec.Decode(&kv); err == nil; err = dec.Decode(&kv) {
    ... kv.Key, kv.Value ...
}

For reduce, the worker is responsible for reading in all of its input files generated by every mapper, grouping together all of the values for a given key, and passing each key and its values to the user’s reduce function. The value output from the user’s reduce function should be coupled with the key into a KeyValue structure, serialized, and written out to the merger’s input file, whose name can be obtained by calling the ReduceOutputName function with the proper arguments.

Implement the DoMap and DoReduce worker methods in src/mapreduce/worker.go. Remove the dummy implentations that are marked as such. You should consult the Go package documentation as needed. Once you have successfully completed this phase, you should pass the phase 2 unit tests.

Hint: You may find the ioutil and os packages useful.

Phase 3: Word Count

In this phase, you will implement the mapF and reduceF functions in src/wordcount/main.go.

Now that we have a working master and worker, we can implement MapReduce programs. In this phase, you will implement the classic word count example. In particular, you will implement mapF and reduceF in src/wordcount/main.go so that running the program with many input files reports the number of occurences of each word in the input files. A word is any contiguous sequence of letters as determined by unicode.IsLetter.

To compile the wordcount binary, you may call go build wordcount after setting your GOPATH to the lab’s root directory. Alternatively, you can call make wordcount which will automatically (but temporarily) set the GOPATH. Once the binary has compiled, you can call it as: ./wordcount -s data/input/pg-*.txt. This invokes the SequentialMaster you wrote in phase 1 with the word count map and reduce functions, and uses the file names of 16 text files in the data/input/ directory as the input files. You’ll likely want to use fewer files for testing to speed things up. The output from the reducers is merged by Run and placed in a single output file named data/output/mr.result.wordcount.

Implement the word count program. After you have successfuly implemented this phase, you should pass the phase 3 unit tests. The tests in test.sh verify that your output matches the expected output, which can be found in data/expected/wc-expected.txt. You should be able to compute the expected output from your output using the following command:

sort -n -k2 data/output/mr.result.wordcount | tail -10
Hint: The strings.FieldsFunc function is useful for splitting strings.
Hint: The strconv package has useful functions for converting to and from strings.

Note: The text files are sourced from Project Gutenberg.


Phase 4: Parallel Master

In this phase, you will implement the Start and schedule methods in src/mapreduce/parallel_master.go.

While the SequentialMaster does its job as expected, we’d ideally like for tasks to run in parallel when possible to maximize performance. In Google’s MapReduce implementation, this meant running map and reduce tasks in parallel on multiple machines. To simplify the lab implementation, we’ve constrained parallelization to multiple cores on a single machine. Nonetheless, we have designed the the library such that extending it to work across multiple machines would not require too much work.

The ParallelMaster will receive a Register RPC call when a new worker is available to take on work. We’ve implemented the RPC handler for you. The handler simply sends the worker’s address on the freeWorkers channel for later scheduling.

The Start method should create a buffered channel and fill it with arguments for tasks to schedule using the schedule method. For map tasks, it should create *DoMapArgs structures. For reduce tasks, it should create *DoReduceArgs structures. The schedule method, in turn, should ensure that each task in the channel in its input is completed by a worker. It should not return until all the tasks in the input channel have been completed. You can send a worker a task via RPC as follows:

ok := callWorker(workerAddress, task.TaskName(), task, new(interface{}))

Note that workers may fail at any point in time. Because a worker failure is generally indistinguishable from a connection failure, it suffices to check whether the RPC call failed. If the call fails, you’ll need to reschedule the work on the next available worker. Be aware that workers may resume normal operation after failing; you should continue giving those resumed workers tasks.

Implement the Start and schedule methods for the parallel master. Once you have successfully implemented these methods, you should pass the phase 4 unit tests.

Hint: Use the go statement to send RPCs to workers in parallel.
Hint: You may find the atomic package useful in your implementation.

Phase 5: Inverted Index

In this phase, you will implement the mapF and reduceF functions in src/invertedindex/main.go.

To wrap up, you’ll write one more MapReduce program: inverted index generation. An inverted index maps a piece of content to its location. For example, it might map a keyword to the document where that keyword is found. As you might imagine, this type of map turns out to be extremely useful in practice (as opposed to, say, word count…). Your inverted index program will create a mapping from a word to the number of files the word appears in and the names of those files. For a given word, your program should output a string of the form:

#files file,names,sorted,and,separated,by,commas

For example:

about: 2 data/input/pg-moby_dick.txt,data/input/pg-tom_sawyer.txt

You can compile the invertedindex binary just like the wordcount binary: by calling go build invertedindex or make invertedindex. Once the binary has compiled, you can call it as: ./invertedindex -s data/input/pg-*.txt to use the sequential master, or ./invertedindex -p data/input/pg-*.txt to use the parallel master. You may also pass in -r n where n is some integer to specify the number of reducers to use. If you launch the parallel master, you’ll need to run some workers as well: ./invertedindex -w.

Implement the inverted index program. After you have successfully implemented this phase, you should pass the phase 5 unit tests. The tests in test.sh verify that your output matches the expected output, which can be found in data/expected/ii-expected.txt. You should be able to compute the expected output from your output using the following command:

sort -k1,1 data/output/mr.result.invertedindex | sort -snk2,2 | grep -v '16' | tail -10

Submission

Once you’ve completed the tasks above, you’re done and ready to submit! Any changes made to files outside of the src/ directory will not be submitted with your lab. Ensure that your submission is not dependent on those changes, if any.

Warning: Ensure that your code compiles and all of the tests pass before submitting!

Finally, run make submission and then proceed to the submission page to upload your submission.


Extra Credit: PageRank

This phase is entirely optional and lack of completion will have no adverse effects on your score. Successful completion of this phase will result in a 20% boost to your score. You must successfully complete all other phases to be awarded the extra credit.

Legend has it that PageRank was one of the primary MapReduce workloads. As such, it is only fitting that you implement PageRank on top of your MapReduce library! You should implement PageRank in src/pagerank/main.go. To test your implementation, uncomment the TODO test line in test.sh.

The idea behind PageRank is fairly simple: a page’s “rank” should be equal to the probability that a random user surfing the web lands on that page. To calculate this probability, you can take an iterative approach:

  1. Assign each page an initial page rank of $\frac{1}{N}$, where $N$ is the total number of pages.

  2. Update the page rank $PR$ of a given page $p$ using the following formula:

    $$ PR(p) = \sum\limits_{v \in I_p} \frac{PR(v)}{L(v)} $$

    where $I_p$ is the set of pages linking to page $p$, and $L(v)$ is the number of pages that $v$ is linking to. In words: the page rank of some page $p$ is equal to the sum of the page ranks of all of its inbound links ($I_p$) where each page rank is divided by the number of outbounds links for the given page ($L(v)$).

Step 2 is repeated many times until the page rank converges. The page rank computation is usually dampened using a factor of $d$ by changing the formula into the following:

$$ PR(p) = \frac{1 - d}{N} + d \left( \sum\limits_{v \in I_p} \frac{PR(v)}{L(v)} \right) $$

Your implementation will use the formula with dampening with $d = 0.85$. You will be computing the page rank of 20,000 pages, so $N = 20000$. Your implementation should iterate exactly 10 times, disregarding convergence.

The format of the input files is a series of lines of the form:

p: PR, v1, v2, ..., vk

where p is the identifier for a given page, PR is the current page rank of that page, and v1, v2, ..., vk are the list of links on that page, i.e, outbounds links from p. The file data/input/tiny-pr-input.txt contains an example. You can use this file to quickly test your implementation before running it on the full set of 20,000 pages which is split into 20 files: pr-input*.txt. $N$ for the tiny file is 4, while $N$ for the set of 20 files is 20,000.

After you have successfully implemented this phase, you should pass the phase 6 unit tests. The tests in test.sh verify that your output matches the expected output, which can be found in data/expected/pr-expected.txt. You should be able to compute the expected output from your output using the following command:

sort -k2,2 -srn data/output/mr.result.pagerank | cut -c-13 | head -n 15

To receive the extra credit, please add a file named extra_credit.txt inside the src/ directory explaining your implementation. Then, resubmit.

Hint: Use strconv.ParseFloat to parse a string as a float, and strconv.FormatFloat to convert a float (f) to a string.

Acknowledgements

This lab is inspired by and partly based on a similar offering from MIT’s 6.824.

References

MapReduce Paper
Google's 2004 OSDI paper on MapReduce. Section 3 is particularly useful for this lab.

Tour of Go
A fairly quick, concise, and interactive introduction to the Go Programming language.

Effective Go
From Google: A document that gives tips for writing clear, idiomatic Go code. A must read for any new Go programmer.

How to Write Go
A document describing the pragmatics behind writing software projects in Go.

Channels Wiki Page
A pithy, yet informal, description of channels in programming.

PageRank Wiki Page
Wikipedia's PageRank explanation.