Job submission
Table of contents
In this part, you will implement logic to allow the coordinator to handle SubmitJob
RPCs from the client.
Data Structures
Start by thinking about what information you need to track the status of each MapReduce job.
Here are some points to keep in mind:
When a job is submitted to the coordinator, it should receive a unique job ID. Once a job ID has been used, it should never be used again.
You will need to implement queueing behavior, as described below.
Looking for information about a job given the job’s ID should be efficient.
Look at the RPC definitions in
proto/coordinator.proto
to see what information is given to you when a job is submitted.
Think carefully about what data structures you will use to manage the state of each job and its position on the job queue. Once you’ve thought about your data structures, convert them to Rust code.
Having well-designed data structures will make implementing and debugging the rest of your code much easier, so we encourage you to spend some time on this part. Feel free to iterate on your data structures as you progress through the assignment.
You might find
tokio::sync::Mutex
,std::collections::VecDequeue
, andstd::collections::HashMap
useful here.Design for simplicity where possible. We are not testing you on efficiency, so try to find a solution that is not too difficult to implement but also satisfies the given constraints. If you are unsure about how to go about this, feel free to come to OH — we would be happy to answer any design questions.
Job Queueing
Clients may concurrently submit jobs to the coordinator. The coordinator should prioritize jobs first-come-first-served, in the order in which they were received. That is, if job 1 was submitted before job 2, tasks for job 1 should have priority over tasks for job 2.
However, the coordinator should never waste time: workers should not sit idle if there are available tasks.
Here are some examples. Assume job 1 was submitted before job 2, which was submitted before job 3. Assume each job has 10 map tasks and 5 reduce tasks, and that all workers are initially idle.
- If there is only one worker in the cluster, it should sequentially complete the map tasks for job 1, then the reduce tasks for job 1, then the map tasks for job 2, then the reduce tasks for job 2, and so on.
- If there are 20 workers in the cluster, 10 of them should be assigned map tasks for job 1. The next 10 should be assigned map tasks for job 2. Even though job 1 is not complete, we don’t want workers to idle while waiting for other tasks to complete.
Suppose there are 20 workers, and all 10 map tasks for job 1 are complete. 5 workers are working on reduce tasks for job 1; the rest are working on map tasks for jobs 2 and 3. Now suppose one worker that executed a map task for job 1 fails. The map task(s) completed by that worker will need to be re-run if not all reduce tasks have read the data. The map task re-execution (and subsequent re-execution of failed reduce tasks) should take priority over tasks for jobs 2 and 3.
Don’t implement “pre-emption” here; just assign the job 1 tasks to the next available worker. That is, don’t try to interrupt a worker working on other tasks in order to make it run the higher-priority job 1 task.
Don’t overthink queuing; even though the examples may sound complicated, the expected behavior is simple: each worker should be assigned the first available task. Reduce tasks only become “available” once all map tasks for their job are complete.
Think about how you will implement this queueing behavior using the data structures you designed earlier. If necessary, modify your data structures.
SubmitJob RPC
You should now implement the SubmitJob
RPC to allow the client to queue jobs on the MapReduce cluster.
The protocol buffers have already been provided for you in proto/coordinator.proto
:
message SubmitJobRequest {
repeated string files = 1;
string output_dir = 2;
string app = 3;
uint32 n_reduce = 4;
bytes args = 5;
}
message SubmitJobReply {
uint32 job_id = 1;
}
Implement the submit_job
stub on the coordinator to set up relevant data structures for the new job. You will need to modify src/coordinator/mod.rs
. For an example of how to implement an RPC call, take a look at the Example
RPC stub.
Keep in mind that your RPC stub needs to be asynchronous and thread safe, so it may help to take a look at the documentation for
std::sync::Arc
andtokio::sync::Mutex
. We do not expect you to use fine-grained synchronization, so feel free to protect all of your mutable state with a single mutex. The suggested way to do this is shown beow:pub struct CoordinatorState { // Put mutable state here } pub struct Coordinator { // Put immutable state here inner: Arc<Mutex<CoordinatorState>> }
If you choose to use multiple mutexes to improve performance, you will need to be incredibly careful that there are no race conditions in your code. We will not help debug logic with multiple mutexes and will instead suggest you switch over to using a single mutex if you run into issues.
For this part, you should only need to modify src/coordinator/mod.rs
. Your RPC implementation is required to do the following:
- Assign a unique job ID
- Keep track of the current order of jobs in the queue
- Store job information in a manner that allows quick lookup by job ID
- Validate the provided application name
You can check if the provided application name is valid using the crate::app::named
function. If you find that the provided application does not exist, return Err(Status::new(Code::InvalidArgument, e.to_string()))
.
The args
field of the SubmitJobRequest
is a sequence of bytes. It should be passed as the aux
argument to application map
and reduce
functions. Different applications may interpret args
differently; you should not modify the args
in any way.
Recall that the map
and reduce
functions have these signatures:
pub type MapFn = fn(kv: KeyValue, aux: Bytes) -> MapOutput;
pub type ReduceFn = fn(
key: Bytes,
values: Box<dyn Iterator<Item = Bytes> + '_>,
aux: Bytes,
) -> anyhow::Result<Bytes>;
PollJob RPC
You should now implement the PollJob
RPC to allow the client to check the status of jobs.
The protocol buffers have already been provided for you in proto/coordinator.proto
:
message PollJobRequest {
uint32 job_id = 1;
}
message PollJobReply {
bool done = 1;
bool failed = 2;
repeated string errors = 3;
}
Implement the poll_job
stub on the coordinator to return the current status of the relevant job. Right now, your response will always have done = false
and failed = false
unless you receive an invalid job_id
, in which case you should return Err(Status::new(Code::NotFound, "job id is invalid"))
. However, we recommend updating your data structures now in a way that will require minimal direct changes to poll_job
as you work on other parts of the assignment.
If you implement
PollJob
incorrectly, you will likely not pass any autograder tests even after implementing later parts as the tests use this RPC to check if submitted jobs complete successfully.Keep in mind that you will not pass any tests at this point since all of the tests depend on the functionality you will be implementing in Tasks.