void launchTasks(const vector<OfferID>& offerIds,
const vector<TaskInfo>& tasks,
const Filters& filters)
{
Offer::Operation operation;
operation.set_type(Offer::Operation::LAUNCH);
Offer::Operation::Launch* launch = operation.mutable_launch();
foreach (const TaskInfo& task, tasks) {
launch->add_task_infos()->CopyFrom(task);
}
acceptOffers(offerIds, {operation}, filters);
}
void acceptOffers(
const vector<OfferID>& offerIds,
const vector<Offer::Operation>& operations,
const Filters& filters)
{
// TODO(jieyu): Move all driver side verification to master since
// we are moving towards supporting pure launguage scheduler.
if (!connected) {
VLOG(1) << "Ignoring accept offers message as master is disconnected";
// NOTE: Reply to the framework with TASK_LOST messages for each
// task launch. See details from notes in launchTasks.
foreach (const Offer::Operation& operation, operations) {
if (operation.type() != Offer::Operation::LAUNCH) {
continue;
}
foreach (const TaskInfo& task, operation.launch().task_infos()) {
StatusUpdate update = protobuf::createStatusUpdate(
framework.id(),
None(),
task.task_id(),
TASK_LOST,
TaskStatus::SOURCE_MASTER,
None(),
"Master disconnected",
TaskStatus::REASON_MASTER_DISCONNECTED);
statusUpdate(UPID(), update, UPID());
}
}
return;
}
Call call;
CHECK(framework.has_id());
call.mutable_framework_id()->CopyFrom(framework.id());
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
// Setting accept.operations.
foreach (const Offer::Operation& _operation, operations) {
Offer::Operation* operation = accept->add_operations();
operation->CopyFrom(_operation);
}
// Setting accept.offer_ids.
foreach (const OfferID& offerId, offerIds) {
accept->add_offer_ids()->CopyFrom(offerId);
if (!savedOffers.contains(offerId)) {
// TODO(jieyu): A duplicated offer ID could also cause this
// warning being printed. Consider refine this message here
// and in launchTasks as well.
LOG(WARNING) << "Attempting to accept an unknown offer " << offerId;
} else {
// Keep only the slave PIDs where we run tasks so we can send
// framework messages directly.
foreach (const Offer::Operation& operation, operations) {
if (operation.type() != Offer::Operation::LAUNCH) {
continue;
}
foreach (const TaskInfo& task, operation.launch().task_infos()) {
const SlaveID& slaveId = task.slave_id();
if (savedOffers[offerId].contains(slaveId)) {
savedSlavePids[slaveId] = savedOffers[offerId][slaveId];
} else {
LOG(WARNING) << "Attempting to launch task " << task.task_id()
<< " with the wrong slave id " << slaveId;
}
}
}
}
// Remove the offer since we saved all the PIDs we might use.
savedOffers.erase(offerId);
}
// Setting accept.filters.
accept->mutable_filters()->CopyFrom(filters);
CHECK_SOME(master);
send(master.get().pid(), call);
}