Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Development

Table of contents

  1. Usage
  2. Documentation
  3. Async code
  4. Testing and debugging
    1. General tips
    2. Test cases
    3. Debugging fault tolerance
    4. Common Errors
    5. Detailed testing breakdown

The skeleton code implements a basic client that can submit jobs to the coordinator as well as basic gRPC servers for coordinators and workers to communicate with one another. You will be implementing RPC calls on the coordinator to allow these jobs to be carried out in a distributed and fault-tolerant manner. The API that your coordinator must expose has been specified for you in proto/coordinator.proto.

Usage

Running cargo build creates 3 binaries in the target/debug folder: mr-coordinator, mr-worker, and mr-client. The intended flow for starting your own MapReduce cluster and submitting jobs to it is explained below (commands are run from the root directory of the homework).

This procedure will not result in any output until you begin implementing your MapReduce system.

  1. Start the coordinator by running the mr-coordinator binary with no arguments. Specifically, you can run the following:

     ./target/debug/mr-coordinator
    
  2. Start one or more workers by running the mr-worker binary with no arguments in several terminal windows (or multiple times in the background of the same terminal). They will then connect to the coordinator’s RPC server to request tasks and will die once the coordinator is killed.

    We strongly recommend using tmux. Commands for running a single worker (should be run in multiple terminals) and several workers (in the background of one terminal) are provided below:

     ./target/debug/mr-worker                                   # Multiple terminals
     for i in {1..5}; do (./target/release/mr-worker &); done   # Single terminal (5 workers)
    
  3. Submit one or more jobs by running mr-client submit. Usage details are shown below.

     mr-client-submit
     Submit a job to a MapReduce cluster
    
     USAGE:
         mr-client submit [OPTIONS] --output-dir <OUTPUT_DIR> --app <APP> [FILES]... [-- <ARGS>...]
    
     ARGS:
         <FILES>...    The list of input files
         <ARGS>...     Auxiliary arguments to pass to the MapReduce application
    
     OPTIONS:
         -a, --app <APP>                  The name of the MapReduce application
         -h, --help                       Print help information
         -n, --num-reduce <NUM_REDUCE>    The number of reduce tasks [default: 5]
         -o, --output-dir <OUTPUT_DIR>    The directory in which output files should be stored
         -w, --wait                       Whether or not to wait until the job is finished
    

    The available options are:

    --app

    Chooses which app to run on the chosen input files. For example, --app wc runs the word count application.

    --num-reduce

    Number of reduce tasks to split key/value pairs into.

    --output-dir

    Directory to write MapReduce output files to.

    For example, to submit a word count job on the entire gutenberg dataset and wait for it to complete, you can run the following from the root directory of the homework:

     ./target/debug/mr-client submit -a wc -o out -w -n 10 data/gutenberg/*
    
  4. Once the job is complete, the final output is postprocessed by invoking mr-client process. Usage details are shown below.

     mr-client-process
     Post-process the output of a completed MapReduce job
    
     USAGE:
         mr-client process [OPTIONS] --output-dir <OUTPUT_DIR> --app <APP>
    
     OPTIONS:
         -a, --app <APP>                  The name of the MapReduce application
         -h, --help                       Print help information
         -n, --num-reduce <NUM_REDUCE>    The number of reduce tasks [default: 5]
         -o, --output-dir <OUTPUT_DIR>    The directory in which output files were stored
    

    The options are the same as above. Note that you do not need to specify input files here, however.

    To process the example job submitted in the previous step, you would run the following from the root directory of the homework:

     ./target/debug/mr-client process -a wc -o out -n 10
    

You can use the wc and grep commands to sanity check your outputs for the word count and grep applications.

Documentation

The starter code includes a reasonable number of comments and examples describing how to use the code we provide. You can read the comments directly, but if you prefer to view them in rustdoc format, you can run cargo doc --open from the hw-map-reduce directory.

The examples in the doc comments assume you are writing code outside the map_reduce crate. Since you will be writing code within the map_reduce crate, you should generally replace imports like

use map_reduce::app;

with

use crate::app;

Async code

The code for this assignment uses Tokio as an asynchronous execution framework. You should make sure that your code uses async-compatible functions. For example, use the functions in tokio::fs rather than std::fs.

In general, if you are calling a function that involves waiting (like sleeping, reading/writing a file, network operations, RPCs, etc.), you should make sure that you are using an async version of that function. Subtle bugs can sometimes arise from using synchronous (blocking) functions in async execution contexts.

If you find that you don’t have to await a function that involves waiting or I/O, you are probably doing something wrong! Also remember that if you don’t await an async function, it does nothing.

Testing and debugging

General tips

Debugging via GDB is possible using rust-gdb, but is likely to be quite difficult, since it is not easy to inspect the state of multiple processes at once.

We generally recommend inserting log statements to sanity-check that your code is correct. If you make your code modular, you can also write your own unit tests to verify that the data structures you implement operate correctly.

For more general debugging tips from TAs for MIT’s 6.824, see here and here.

Test cases

We include one example test case for you – see src/tests/mod.rs. If you wish to run it, remove the #[ignore] attribute, then run cargo test (or cargo t for short). Note that you will need to replace the todo! with an actual file path. This test does not check whether or not your final output is correct; it only checks that there are no issues submitting and waiting for a job. The test likely will not pass until you have finished the “Tasks” section of this assignment.

The sample test starts a fake cluster (with all workers and the coordinator running in the same process), then submits a word count job and waits for it to complete. The test will print the final output from running word count. Note that on the autograder, each worker and the coordinator will be run in separate processes, so they won’t be able to directly share memory.

Generally, cargo will only display test output if the test fails. If you wish to see the test output even if it passes, run

cargo test -- --nocapture

Debugging fault tolerance

When debugging the fault tolerance portion of this assignment, we recommend inserting log and sleep statements before workers begin executing map tasks. You can then manually crash a worker before it completes a task by pressing Ctrl-C, and verify that the coordinator reassigns the appropriate task(s) to other workers. Specifically, you will want to add a sleep statement right after the worker calls get_task on line 288 of src/worker/mod.rs in the starter code so that the worker crashes after it receives a task but before it executes it.

It might also help to add a print statement before the sleep to see what task the worker received and was about to execute before you crash it.

Common Errors

There are a couple common errors that can happen for any test for similar reasons across the board. The errors and the root cause are outlined below:

  • Transport error: This error occurs if the coordinator crashes due to an unhandled error. The most common cause of this error is in the poll-bad-id test, but it can occur during any test where the coordinator crashes during runtime.
  • No such file or directory (os error 2): This error occurs when reduce tasks exhibit unintended behavior. The post-processing of a job reads the files that reduce tasks are supposed to output, so you may see this error if your reduce tasks are either not fully completed when the job is marked as done, being assigned to multiple workers per reduce task, or being skipped when the job is marked as done. We recommend looking through your coordinator logic for marking jobs as complete as well as assigning reduce tasks.

Below, you will find more specific descriptions of test cases and their potential errors.

Detailed testing breakdown

While the autograder will provide you with basic information on what each test checks, it is sometimes difficult to figure out where to start debugging. Here are some additional details for each test as well as some hints on what issues may cause them to fail.

We recommend that you reference this section as little as possible since you will learn much more by coming up with potential issues with your implementation on your own, but we understand that sometimes it can be frustrating to debug opaque tests that are hard to reproduce locally. Refer to the hints below before you ask a question relating to autograder tests on Ed or in OH.

poll-bad-id

Checks that polling for an invalid job ID returns an error without crashing the coordinator. The test also submits valid jobs and ensures that their outputs are correct even when interspersed with invalid POLL_JOB requests.

Potential bugs:

  • Not checking that the job exists before trying to access its fields (Error: transport error).
  • Incorrectly assigning job IDs (Error: IDs that should be invalid are valid).
  • Not returning Err(Status::new(Code::InvalidArgument, e.to_string())) in the PollJob RPC when the provided ID is invalid (Error: IDs that are invalid do not result in an error, or timeout if done is set to false).
  • Incorrectly implementing Job submission or Tasks (see wc).

wc

Run the word count application. The test submits jobs with different types of inputs (small or large file sizes, a few or several input files, low or high n_reduce, etc.), so you may need to modify these parameters if you aren’t able to replicate the failure using the default parameters.

Potential bugs:

  • Incorrectly implementing Job submission.
    • Not returning done = true in the POLL_JOB RPC when a job completed (Error: timeout).
    • Returning done = true in the POLL_JOB RPC before a job is completed (Error: incorrect output).
  • Incorrectly implementing Tasks.
    • Incorrectly assigning tasks.
      • Assigning the same task multiple times (Error: incorrect output).
      • Forgetting to assign a certain task, (Error: timeout or incorrect output).
      • Not waiting for all map tasks to finish before assigning reduce tasks (Error: error opening intermediate files).
    • Incorrectly handling finished tasks.
      • Not marking jobs as completed when all tasks are completed (Error: timeout).
      • Marking jobs as completed before all tasks are completed (Error: timeout).

grep

Run the grep application.

Potential bugs:

  • Incorrectly passing args down to the workers (Error: incorrect output).
  • Incorrectly implementing Job submission or Tasks (see wc).

vertex-degree

Run the vertex degree application.

Potential bugs:

map-parallel

Check that map tasks can be run in parallel. Specifically, the test doesn’t allow any map tasks to complete until all are assigned to a pool of workers that is the same size as the number of available map tasks.

Potential bugs:

  • Waiting for a FINISH_TASK RPC for an assigned task before assigning new tasks (Error: timeout).
  • Assigning map tasks multiple times (Error: timeout).
  • Incorrectly implementing Job submission or Tasks (see wc).

map-parallel-add-worker

Check that map tasks can be run in parallel, even when workers join the cluster after a job has started.

Potential bugs:

  • Allowing reduce tasks to run before map tasks complete (Error: job completes without having enough workers to accept map tasks).
  • See map-parallel.

reduce-parallel

Check that reduce tasks can be run in parallel. Specifically, the test doesn’t allow any reduce tasks to complete until all are assigned to a pool of workers that is the same size as the number of available reduce tasks.

Potential bugs:

  • Waiting for a FINISH_TASK RPC for an assigned task before assigning new tasks (Error: timeout).
  • Assigning reduce tasks multiple times (Error: timeout).
  • Incorrectly implementing Job submission or Tasks (see wc).

reduce-parallel-add-worker

Check that reduce tasks can be run in parallel, even when workers join the cluster after a job has started.

Potential bugs:

  • Marking job as complete without all reduce tasks being complete (Error: job completes without having enough workers to accept reduce tasks).
  • See reduce-parallel.

map-slow

Check that a long-running map task does not result in task reassignment.

Potential bugs:

  • Re-assignment logic uses time since assignment rather than time since last heartbeat (Error: task reassigned unecessarily).
  • Incorrectly implementing Job submission or Tasks (see wc).

reduce-slow

Check that a long-running reduce task does not result in task reassignment.

Potential bugs:

  • Re-assignment logic uses time since assignment rather than time since last heartbeat (Error: task reassigned unecessarily).
  • Incorrectly implementing Job submission or Tasks (see wc).

mr-concurrent

Run multiple MapReduce jobs concurrently. The test submits several jobs and ensures that they all complete and produce the correct output.

Potential bugs:

  • Incorrect queuing logic where jobs submitted while another job is running are not registered (Error: timeout or invalid job ID error).
  • Incorrect task assignment logic with multiple jobs.
    • Finishing a task from job 2 marks a task from job 1 as completed (Error: timeout or incorrect output).
    • Jobs are removed from the queue before they are completed (Error: timeout).
    • Jobs are not removed from the queue when they are completed and tasks for later jobs are not assigned (Error: timeout).
  • Incorrectly implementing Job submission or Tasks (see wc).

coordinator-no-work

Check that the coordinator does not execute MapReduce tasks.

Potential bugs:

  • Calling worker functions from the coordinator directly instead of assigning tasks via RPCs.
  • Incorrectly implementing Job submission or Tasks (see wc).

mr-queue-order

Check that tasks are assigned to workers in the correct order. Jobs should run in the order they are submitted (i.e. tasks for job 1 should run before tasks for job 2).

Potential bugs:

  • Assigning tasks in a random order or not keeping track of the order in which jobs are submitted (Error: incorrect task assignment order).
  • Incorrectly implementing Job submission or Tasks (see wc).

queue-map-multiple

Check that map tasks for multiple jobs can run in parallel. Specifically, this tests that map tasks from job 2 are assigned to idle workers even while map tasks from job 1 are still in progress.

Potential bugs:

  • Waiting on a job to complete before running tasks from later jobs (Error: timeout).
  • Waiting for the map stage of the previous job to complete before running the map stage of the next job (Error: timeout).
  • Incorrectly implementing Job submission or Tasks (see wc).

mr-no-duplicates

Check that no tasks are assigned more than once.

Potential bugs:

  • Incorrect fault tolerance logic that re-assigns tasks before the necessary timeout has expired (Error: task assigned multiple times).
  • Assigning tasks solely based on whether they have completed or not instead of keeping track of which tasks have already been assigned (Error: task assigned multiple times).
  • Incorrectly implementing Job submission or Tasks (see wc).

crash-reassign

Check that tasks are re-executed if a worker crashes.

Potential bugs:

  • Incorrect fault tolerance logic.
    • Re-assigns tasks before the necessary timeout has expired (Error: incorrect output).
    • Does not correctly identify a worker as crashed if it does not send a heartbeat within the given timeout (Error: timeout).
    • Does not re-assign the task currently assigned to a crashed worker (Error: timeout).
    • Does not re-assign map tasks previously completed by a crashed worker (Error: timeout).
    • Continues assigning reduce tasks even though re-assigned map tasks have not finished (Error: timeout).
    • Marks the job as failed if a worker crashes instead of re-assigning the relevant tasks (Error: job failed).
    • Ignoring the FailTask RPC (Error: timeout).
    • Does not reassign the relevant task after worker sends a FailTask RPC with retry = true (Error: timeout).
  • Incorrectly implementing Job submission or Tasks (see wc).

map-crash

Check that MapReduce jobs complete even when a worker crashes during a map task. This test runs with several different worker pool sizes and a varying number of crashes.

Potential bugs:

reduce-crash

Check that MapReduce jobs complete even when a worker crashes during a reduce task. This test runs with several different worker pool sizes and a varying number of crashes.

Potential bugs:

rand-crash

Check that MapReduce jobs complete even when workers crash randomly.

Potential bugs:

crash-multi-job

Check that MapReduce jobs complete even when a worker that ran tasks for multiple jobs crashes.

Potential bugs:

  • Fault tolerance logic only works for a single job due to keeping track of assigned tasks based only on task ID, rather than both task ID and job ID (Error: timeout).
  • Incorrect fault tolerance logic (see crash-reassign).
  • Incorrectly implementing Job submission or Tasks (see wc).

crash-no-duplicates

Check that no extraneous tasks are scheduled when one or more workers crash.

Potential bugs:

  • Fault tolerance logic does not keep track of whether failed tasks are reassigned (Error: task assigned multiple times).
  • Incorrect fault tolerance logic (see crash-reassign).
  • Incorrectly implementing Job submission or Tasks (see wc).

submit-bad-app

Check that submitting a job with an invalid app does not crash the coordinator or workers.

Potential bugs:

  • Allowing successful SubmitJob RPC even if the submitted app is invalid (Error: missing error).
  • Incorrectly implementing Job submission or Tasks (see wc).

map-error

Check that the MapReduce cluster tolerates map function errors.

Potential bugs:

  • Ignoring the FailTask RPC (Error: timeout).
  • Trying to reassign tasks with retry = false even though these are permanent errors (Error: timeout).
  • Not replying with failed = true when PollJob is called on a failed job (Error: timeout or missing error).
  • Incorrectly implementing Job submission or Tasks (see wc).

map-iter-error

Check that the MapReduce cluster tolerates errors while iterating over map function results.

Potential bugs:

reduce-error

Check that the MapReduce cluster tolerates reduce function errors.

Potential bugs: