Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Job submission

Table of contents

  1. Data Structures
  2. Job Queueing
  3. SubmitJob RPC
  4. PollJob RPC

In this part, you will implement logic to allow the coordinator to handle SubmitJob RPCs from the client.

Data Structures

Start by thinking about what information you need to track the status of each MapReduce job.

Here are some points to keep in mind:

  1. When a job is submitted to the coordinator, it should receive a unique job ID. Once a job ID has been used, it should never be used again.

  2. You will need to implement queueing behavior, as described below.

  3. Looking for information about a job given the job’s ID should be efficient.

  4. Look at the RPC definitions in proto/coordinator.proto to see what information is given to you when a job is submitted.

Think carefully about what data structures you will use to manage the state of each job and its position on the job queue. Once you’ve thought about your data structures, convert them to Rust code.

Having well-designed data structures will make implementing and debugging the rest of your code much easier, so we encourage you to spend some time on this part. Feel free to iterate on your data structures as you progress through the assignment.

You might find tokio::sync::Mutex, std::collections::VecDequeue, and std::collections::HashMap useful here.

Design for simplicity where possible. We are not testing you on efficiency, so try to find a solution that is not too difficult to implement but also satisfies the given constraints. If you are unsure about how to go about this, feel free to come to OH — we would be happy to answer any design questions.

Job Queueing

Clients may concurrently submit jobs to the coordinator. The coordinator should prioritize jobs first-come-first-served, in the order in which they were received. That is, if job 1 was submitted before job 2, tasks for job 1 should have priority over tasks for job 2.

However, the coordinator should never waste time: workers should not sit idle if there are available tasks.

Here are some examples. Assume job 1 was submitted before job 2, which was submitted before job 3. Assume each job has 10 map tasks and 5 reduce tasks, and that all workers are initially idle.

  • If there is only one worker in the cluster, it should sequentially complete the map tasks for job 1, then the reduce tasks for job 1, then the map tasks for job 2, then the reduce tasks for job 2, and so on.
  • If there are 20 workers in the cluster, 10 of them should be assigned map tasks for job 1. The next 10 should be assigned map tasks for job 2. Even though job 1 is not complete, we don’t want workers to idle while waiting for other tasks to complete.
  • Suppose there are 20 workers, and all 10 map tasks for job 1 are complete. 5 workers are working on reduce tasks for job 1; the rest are working on map tasks for jobs 2 and 3. Now suppose one worker that executed a map task for job 1 fails. The map task(s) completed by that worker will need to be re-run if not all reduce tasks have read the data. The map task re-execution (and subsequent re-execution of failed reduce tasks) should take priority over tasks for jobs 2 and 3.

    Don’t implement “pre-emption” here; just assign the job 1 tasks to the next available worker. That is, don’t try to interrupt a worker working on other tasks in order to make it run the higher-priority job 1 task.

Don’t overthink queuing; even though the examples may sound complicated, the expected behavior is simple: each worker should be assigned the first available task. Reduce tasks only become “available” once all map tasks for their job are complete.

Think about how you will implement this queueing behavior using the data structures you designed earlier. If necessary, modify your data structures.

SubmitJob RPC

You should now implement the SubmitJob RPC to allow the client to queue jobs on the MapReduce cluster.

The protocol buffers have already been provided for you in proto/coordinator.proto:

message SubmitJobRequest {
  repeated string files = 1;
  string output_dir = 2;
  string app = 3;
  uint32 n_reduce = 4;
  bytes args = 5;
}

message SubmitJobReply {
  uint32 job_id = 1;
}

Implement the submit_job stub on the coordinator to set up relevant data structures for the new job. You will need to modify src/coordinator/mod.rs. For an example of how to implement an RPC call, take a look at the Example RPC stub.

Keep in mind that your RPC stub needs to be asynchronous and thread safe, so it may help to take a look at the documentation for std::sync::Arc and tokio::sync::Mutex. We do not expect you to use fine-grained synchronization, so feel free to protect all of your mutable state with a single mutex. The suggested way to do this is shown beow:

pub struct CoordinatorState {
  // Put mutable state here
}

pub struct Coordinator {
  // Put immutable state here
  inner: Arc<Mutex<CoordinatorState>>
}

If you choose to use multiple mutexes to improve performance, you will need to be incredibly careful that there are no race conditions in your code. We will not help debug logic with multiple mutexes and will instead suggest you switch over to using a single mutex if you run into issues.

For this part, you should only need to modify src/coordinator/mod.rs. Your RPC implementation is required to do the following:

  • Assign a unique job ID
  • Keep track of the current order of jobs in the queue
  • Store job information in a manner that allows quick lookup by job ID
  • Validate the provided application name

You can check if the provided application name is valid using the crate::app::named function. If you find that the provided application does not exist, return Err(Status::new(Code::InvalidArgument, e.to_string())).

The args field of the SubmitJobRequest is a sequence of bytes. It should be passed as the aux argument to application map and reduce functions. Different applications may interpret args differently; you should not modify the args in any way.

Recall that the map and reduce functions have these signatures:

pub type MapFn = fn(kv: KeyValue, aux: Bytes) -> MapOutput;
pub type ReduceFn = fn(
    key: Bytes,
    values: Box<dyn Iterator<Item = Bytes> + '_>,
    aux: Bytes,
) -> anyhow::Result<Bytes>;

PollJob RPC

You should now implement the PollJob RPC to allow the client to check the status of jobs.

The protocol buffers have already been provided for you in proto/coordinator.proto:

message PollJobRequest {
  uint32 job_id = 1;
}

message PollJobReply {
  bool done = 1;
  bool failed = 2;
  repeated string errors = 3;
}

Implement the poll_job stub on the coordinator to return the current status of the relevant job. Right now, your response will always have done = false and failed = false unless you receive an invalid job_id, in which case you should return Err(Status::new(Code::NotFound, "job id is invalid")). However, we recommend updating your data structures now in a way that will require minimal direct changes to poll_job as you work on other parts of the assignment.

If you implement PollJob incorrectly, you will likely not pass any autograder tests even after implementing later parts as the tests use this RPC to check if submitted jobs complete successfully.

Keep in mind that you will not pass any tests at this point since all of the tests depend on the functionality you will be implementing in Tasks.