Example MapReduce job
Table of contents
This section describes an example showing the steps involved in running word count on the MapReduce cluster you will implement in this assignment. This is only an example; don’t hardcode any of the numbers listed here. MapReduce may run differently depending on if and when failures happen.
The order in which workers poll for tasks, and the time it takes them to complete those tasks are non-deterministic. The specifics of which worker received which task are not important for your implementation.
Setup
The coordinator (
mr-coordinator
) is started.After a few seconds, 3 workers (
mr-worker
) are started. They also start their own RPC servers in case other workers need to contact them.The workers register with the coordinator by issuing
Register
RPCs, and receive unique worker IDs. We’ll refer to the workers as worker 1, worker 2, and worker 3.The workers begin sending regular heartbeats to the coordinator by issuing
Heartbeat
RPCs to indicate that they are still running.
Job submission
A client (
mr-client
) submits this job via theSubmitJob
RPC:files = [ "data/gutenberg/p.txt", "data/gutenberg/q.txt", "data/gutenberg/r.txt", "data/gutenberg/s.txt" ] output_dir = "/tmp/hw-map-reduce/gutenberg" app = "wc" n_reduce = 2 args = [1, 2, 3]
The
args
are not used by the word count map and reduce functions, but they are included to show you how they should be used for other applications that may depend onargs
.Since there are 4 input files, there are 4 map tasks (one per input file). Since
n_reduce
is 2, there are two reduce tasks.The coordinator accepts the job, assigns it an ID of 1, and returns this ID to the client. The job is added to the coordinator’s queue.
The client periodically polls the status of the job using the
PollJob
RPC withjob_id = 1
to see when it completes or fails.
Map task execution
Worker 1 polls the coordinator for a task using the
GetTask
RPC, and is assigned map task 0 for job 1.Worker 2 polls the coordinator for a task using the
GetTask
RPC, and is assigned map task 1 for job 1. Similarly, worker 3 is assigned map task 2 for job 1.- Each worker executes its assigned map task. This part is already implemented for you, so you do not have to understand this fully. If you are interested, you can look at Daniel Zhu’s notes for a more concrete example.
- They create
n_reduce
(2 in this case) temporary buffers in memory. We’ll refer to these buffers as buckets. - They read the input file into memory (eg. map task 0 would read
data/gutenberg/p.txt
). - They call the map function corresponding to the
wc
application. The key is the input filename; the value is the file’s contents. The auxiliary args[1, 2, 3]
are also passed to the map function. - They iterate over the resulting key value pairs. For each KV pair:
- The key is hashed using the
ihash
function insrc/lib.rs
. - A reduce bucket is selected by computing
ihash(key) % n_reduce
. - The KV pair is written into the corresponding buffer using
codec::LengthDelimitedWriter
. The key is sent first, then the value.
- The key is hashed using the
- The worker saves all the reduce buffers in memory for later.
- They create
Workers 1, 2, and 3 finish their map tasks and notify the coordinator using the
FinishTask
RPC. However, immediately after notifying the coordinator, worker 3 fails (crashes).The coordinator assigns the final map task (task 3) to worker 1. Worker 2 sits idle, since there are no available tasks. It periodically polls the coordinator to see if new tasks are available.
After some time, the coordinator realizes it hasn’t received a heartbeat from worker 3, and assumes that worker 3 has crashed. The coordinator notes that since worker 3 ran map task 2 and the results from map task 2 that were stored in worker 3’s memory must have been lost, map task 2 will need to be re-executed.
Worker 2 polls the coordinator for tasks, and is assigned map task 2.
- Workers 1 and 2 finish their map tasks and notify the coordinator. All map tasks are now complete.
Reduce task execution
Worker 1 polls the coordinator and is assigned reduce task 0. Immediately after this, worker 2 crashes.
Worker 1 begins executing reduce task 0. It reads bucket 0 of map task 0 and map task 3 from its own in-memory buckets. It then tries to contact worker 2 to read bucket 0 of map task 1 and map task 2 using the worker’s
ReadMap
RPC, but fails to connect. Worker 1 notifies the coordinator that it cannot complete the reduce task using theFailTask
RPC.A new worker joins the cluster and is assigned ID 4 after issuing a
Register
RPC to the coordinator.Workers 1 and 4 continually poll the coordinator for tasks. Depending on your implementation, you may have the coordinator tell the workers to idle, to retry the failed reduce task, or to re-execute the necessary map tasks. We’ll assume that your coordinator is not particularly sophisticated, and just tells them to retry the failed task.
After some time passes without a heartbeat from worker 2, the coordinator will realize that worker 2 has crashed. Since worker 2 executed map tasks 1 and 2, the coordinator will schedule map tasks 1 and 2 for re-execution.
The next time worker 1 polls the coordinator, it is told to execute map task 1. Similarly, worker 4 is told to execute map task 2.
The map tasks complete successfully, and the coordinator is notified. All map tasks are now done, and reduce tasks become eligible for scheduling again.
The next time worker 1 polls the coordinator, it is assigned reduce task 0. Worker 4 receives reduce task 1.
Worker 1 executes reduce task 0, reading bucket 0 of map tasks 0, 1, and 3 from memory. It reads bucket 0 of map task 2 via an RPC to worker 4.
Worker 1 concatenates all the key-value pairs it obtains, and then sorts the pairs by key. Once again, this logic is implemented for you, but feel free to look at Daniel’s notes or the starter code for a better understanding of how it works.
- For each run of key-value pairs corresponding to the same key
K
, worker 1 does the following:- Calls the word count reduce function with key
K
, the list of values corresponding toK
, and auxiliary args[1, 2, 3]
. The reduce function returns a single valueV
. - Writes
(K, V)
to the output file/tmp/hw-map-reduce/gutenberg/mr-out-0
.
- Calls the word count reduce function with key
Similarly, worker 4 executes reduce task 1, reading bucket 1 of map tasks 0, 1, and 3 from worker 1 via RPC and bucket 1 of map task 2 from memory.
The workers notify the coordinator that they completed the reduce tasks.
- The coordinator notes that all tasks for job 1 have been completed.
Job completion
On the next
PollJob
RPC issued by the MapReduce client, the coordinator notifies the client that the job was completed.The client runs some post processing on the MapReduce output files to convert them to a human-readable format. Our autograder will inspect this final output. Your output files must be named
mr-out-i
, where0 <= i < n_reduce
is the reduce task number.