package org.apache.flink.mesos.runtime.clusterframework;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.pattern.Patterns;
import com.netflix.fenzo.TaskScheduler;
import com.netflix.fenzo.VirtualMachineLease;
import com.netflix.fenzo.functions.Action1;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
import org.apache.flink.mesos.scheduler.TaskMonitor;
import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
import org.apache.flink.mesos.scheduler.Tasks;
import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
import org.apache.flink.mesos.scheduler.messages.Disconnected;
import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
import org.apache.flink.mesos.scheduler.messages.FrameworkMessage;
import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
import org.apache.flink.mesos.scheduler.messages.ReRegistered;
import org.apache.flink.mesos.scheduler.messages.Registered;
import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
import org.apache.flink.mesos.scheduler.messages.SlaveLost;
import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
import org.apache.flink.mesos.util.MesosArtifactServer;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.mesos.Protos;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.class */
public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerNode> {
    protected static final Logger LOG;
    private final Configuration flinkConfig;
    private final MesosConfiguration mesosConfig;
    private final MesosServices mesosServices;
    private final MesosTaskManagerParameters taskManagerParameters;
    private final ContainerSpecification taskManagerContainerSpec;
    private final MesosArtifactServer artifactServer;
    private MesosWorkerStore workerStore;
    private final ActorSystem actorSystem;

    @Nullable
    private final String webUiUrl;
    private final Collection<ResourceProfile> slotsPerWorker;
    private SchedulerDriver schedulerDriver;
    private ActorRef selfActor;
    private ActorRef connectionMonitor;
    private ActorRef taskMonitor;
    private ActorRef launchCoordinator;
    private ActorRef reconciliationCoordinator;
    final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
    final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
    final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
    private MesosConfiguration initializedMesosConfig;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager$AkkaAdapter.class */
    private class AkkaAdapter extends UntypedActor {
        private AkkaAdapter() {
        }

        @Override // akka.actor.UntypedActor
        public void onReceive(final Object obj) throws Exception {
            if (obj instanceof ReconciliationCoordinator.Reconcile) {
                MesosResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.AkkaAdapter.1
                    @Override // java.lang.Runnable
                    public void run() {
                        MesosResourceManager.this.reconcile((ReconciliationCoordinator.Reconcile) obj);
                    }
                });
                return;
            }
            if (obj instanceof TaskMonitor.TaskTerminated) {
                MesosResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.AkkaAdapter.2
                    @Override // java.lang.Runnable
                    public void run() {
                        MesosResourceManager.this.taskTerminated((TaskMonitor.TaskTerminated) obj);
                    }
                });
            } else if (obj instanceof AcceptOffers) {
                MesosResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.AkkaAdapter.3
                    @Override // java.lang.Runnable
                    public void run() {
                        MesosResourceManager.this.acceptOffers((AcceptOffers) obj);
                    }
                });
            } else {
                MesosResourceManager.LOG.error("unrecognized message: " + obj);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager$MesosResourceManagerSchedulerCallback.class */
    private class MesosResourceManagerSchedulerCallback implements Scheduler {
        private MesosResourceManagerSchedulerCallback() {
        }

        @Override // org.apache.mesos.Scheduler
        public void registered(SchedulerDriver schedulerDriver, final Protos.FrameworkID frameworkID, final Protos.MasterInfo masterInfo) {
            MesosResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.MesosResourceManagerSchedulerCallback.1
                @Override // java.lang.Runnable
                public void run() {
                    MesosResourceManager.this.registered(new Registered(frameworkID, masterInfo));
                }
            });
        }

        @Override // org.apache.mesos.Scheduler
        public void reregistered(SchedulerDriver schedulerDriver, final Protos.MasterInfo masterInfo) {
            MesosResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.MesosResourceManagerSchedulerCallback.2
                @Override // java.lang.Runnable
                public void run() {
                    MesosResourceManager.this.reregistered(new ReRegistered(masterInfo));
                }
            });
        }

        @Override // org.apache.mesos.Scheduler
        public void resourceOffers(SchedulerDriver schedulerDriver, final List<Protos.Offer> list) {
            MesosResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.MesosResourceManagerSchedulerCallback.3
                @Override // java.lang.Runnable
                public void run() {
                    MesosResourceManager.this.resourceOffers(new ResourceOffers(list));
                }
            });
        }

        @Override // org.apache.mesos.Scheduler
        public void offerRescinded(SchedulerDriver schedulerDriver, final Protos.OfferID offerID) {
            MesosResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.MesosResourceManagerSchedulerCallback.4
                @Override // java.lang.Runnable
                public void run() {
                    MesosResourceManager.this.offerRescinded(new OfferRescinded(offerID));
                }
            });
        }

        @Override // org.apache.mesos.Scheduler
        public void statusUpdate(SchedulerDriver schedulerDriver, final Protos.TaskStatus taskStatus) {
            MesosResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.MesosResourceManagerSchedulerCallback.5
                @Override // java.lang.Runnable
                public void run() {
                    MesosResourceManager.this.statusUpdate(new StatusUpdate(taskStatus));
                }
            });
        }

        @Override // org.apache.mesos.Scheduler
        public void frameworkMessage(SchedulerDriver schedulerDriver, final Protos.ExecutorID executorID, final Protos.SlaveID slaveID, final byte[] bArr) {
            MesosResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.MesosResourceManagerSchedulerCallback.6
                @Override // java.lang.Runnable
                public void run() {
                    MesosResourceManager.this.frameworkMessage(new FrameworkMessage(executorID, slaveID, bArr));
                }
            });
        }

        @Override // org.apache.mesos.Scheduler
        public void disconnected(SchedulerDriver schedulerDriver) {
            MesosResourceManager.this.runAsyncWithoutFencing(new Runnable() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.MesosResourceManagerSchedulerCallback.7
                @Override // java.lang.Runnable
                public void run() {
                    MesosResourceManager.this.disconnected(new Disconnected());
                }
            });
        }

        @Override // org.apache.mesos.Scheduler
        public void slaveLost(SchedulerDriver schedulerDriver, final Protos.SlaveID slaveID) {
            MesosResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.MesosResourceManagerSchedulerCallback.8
                @Override // java.lang.Runnable
                public void run() {
                    MesosResourceManager.this.slaveLost(new SlaveLost(slaveID));
                }
            });
        }

        @Override // org.apache.mesos.Scheduler
        public void executorLost(SchedulerDriver schedulerDriver, final Protos.ExecutorID executorID, final Protos.SlaveID slaveID, final int i) {
            MesosResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.MesosResourceManagerSchedulerCallback.9
                @Override // java.lang.Runnable
                public void run() {
                    MesosResourceManager.this.executorLost(new ExecutorLost(executorID, slaveID, i));
                }
            });
        }

        @Override // org.apache.mesos.Scheduler
        public void error(SchedulerDriver schedulerDriver, final String str) {
            MesosResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.MesosResourceManagerSchedulerCallback.10
                @Override // java.lang.Runnable
                public void run() {
                    MesosResourceManager.this.onFatalError(new ResourceManagerException(str));
                }
            });
        }
    }

    public MesosResourceManager(RpcService rpcService, String str, ResourceID resourceID, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, Configuration configuration, MesosServices mesosServices, MesosConfiguration mesosConfiguration, MesosTaskManagerParameters mesosTaskManagerParameters, ContainerSpecification containerSpecification, @Nullable String str2, ResourceManagerMetricGroup resourceManagerMetricGroup) {
        super(rpcService, str, resourceID, highAvailabilityServices, heartbeatServices, slotManager, jobLeaderIdService, clusterInformation, fatalErrorHandler, resourceManagerMetricGroup);
        this.mesosServices = (MesosServices) Preconditions.checkNotNull(mesosServices);
        this.actorSystem = (ActorSystem) Preconditions.checkNotNull(mesosServices.getLocalActorSystem());
        this.flinkConfig = (Configuration) Preconditions.checkNotNull(configuration);
        this.mesosConfig = (MesosConfiguration) Preconditions.checkNotNull(mesosConfiguration);
        this.artifactServer = (MesosArtifactServer) Preconditions.checkNotNull(mesosServices.getArtifactServer());
        this.taskManagerParameters = (MesosTaskManagerParameters) Preconditions.checkNotNull(mesosTaskManagerParameters);
        this.taskManagerContainerSpec = (ContainerSpecification) Preconditions.checkNotNull(containerSpecification);
        this.webUiUrl = str2;
        this.workersInNew = new HashMap(8);
        this.workersInLaunch = new HashMap(8);
        this.workersBeingReturned = new HashMap(8);
        this.slotsPerWorker = TaskExecutorProcessUtils.createDefaultWorkerSlotProfiles(mesosTaskManagerParameters.containeredParameters().getTaskExecutorProcessSpec(), mesosTaskManagerParameters.containeredParameters().numSlots());
    }

    protected ActorRef createSelfActor() {
        return this.actorSystem.actorOf(Props.create((Class<?>) AkkaAdapter.class, this), YarnWebParams.RM_WEB_UI);
    }

    protected ActorRef createConnectionMonitor() {
        return this.actorSystem.actorOf(ConnectionMonitor.createActorProps(ConnectionMonitor.class, this.flinkConfig), "connectionMonitor");
    }

    protected ActorRef createTaskMonitor(SchedulerDriver schedulerDriver) {
        return this.actorSystem.actorOf(Tasks.createActorProps(Tasks.class, this.selfActor, this.flinkConfig, schedulerDriver, TaskMonitor.class), "tasks");
    }

    protected ActorRef createLaunchCoordinator(SchedulerDriver schedulerDriver, ActorRef actorRef) {
        return this.actorSystem.actorOf(LaunchCoordinator.createActorProps(LaunchCoordinator.class, actorRef, this.flinkConfig, schedulerDriver, createOptimizer()), "launchCoordinator");
    }

    protected ActorRef createReconciliationCoordinator(SchedulerDriver schedulerDriver) {
        return this.actorSystem.actorOf(ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, this.flinkConfig, schedulerDriver), "reconciliationCoordinator");
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected void initialize() throws ResourceManagerException {
        try {
            this.workerStore = this.mesosServices.createMesosWorkerStore(this.flinkConfig, getRpcService().getExecutor());
            this.workerStore.start();
            Protos.FrameworkInfo.Builder checkpoint = this.mesosConfig.frameworkInfo().mo4454clone().setCheckpoint(true);
            if (this.webUiUrl != null) {
                checkpoint.setWebuiUrl(this.webUiUrl);
            }
            try {
                Option<Protos.FrameworkID> frameworkID = this.workerStore.getFrameworkID();
                if (frameworkID.isEmpty()) {
                    LOG.info("Registering as new framework.");
                } else {
                    LOG.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
                    checkpoint.setId(frameworkID.get());
                }
                this.initializedMesosConfig = this.mesosConfig.withFrameworkInfo(checkpoint);
                MesosConfiguration.logMesosConfig(LOG, this.initializedMesosConfig);
                this.selfActor = createSelfActor();
                try {
                    LaunchableMesosWorker.configureArtifactServer(this.artifactServer, this.taskManagerContainerSpec);
                } catch (IOException e) {
                    throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
                }
            } catch (Exception e2) {
                throw new ResourceManagerException("Unable to recover the framework ID.", e2);
            }
        } catch (Exception e3) {
            throw new ResourceManagerException("Unable to initialize the worker store.", e3);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected CompletableFuture<Void> prepareLeadershipAsync() {
        Preconditions.checkState(this.initializedMesosConfig != null);
        this.schedulerDriver = this.initializedMesosConfig.createDriver(new MesosResourceManagerSchedulerCallback(), false);
        this.connectionMonitor = createConnectionMonitor();
        this.launchCoordinator = createLaunchCoordinator(this.schedulerDriver, this.selfActor);
        this.reconciliationCoordinator = createReconciliationCoordinator(this.schedulerDriver);
        this.taskMonitor = createTaskMonitor(this.schedulerDriver);
        return getWorkersAsync().thenApplyAsync(list -> {
            recoverWorkers(list);
            this.connectionMonitor.tell(new ConnectionMonitor.Start(), this.selfActor);
            this.schedulerDriver.start();
            LOG.info("Mesos resource manager started.");
            return null;
        }, (Executor) getMainThreadExecutor());
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected CompletableFuture<Void> clearStateAsync() {
        this.schedulerDriver.stop(true);
        this.workersInNew.clear();
        this.workersInLaunch.clear();
        this.workersBeingReturned.clear();
        return stopSupportingActorsAsync();
    }

    private CompletableFuture<List<MesosWorkerStore.Worker>> getWorkersAsync() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                List<MesosWorkerStore.Worker> recoverWorkers = this.workerStore.recoverWorkers();
                for (MesosWorkerStore.Worker worker : recoverWorkers) {
                    if (worker.state() == MesosWorkerStore.WorkerState.New) {
                        this.workerStore.removeWorker(worker.taskID());
                    }
                }
                return recoverWorkers;
            } catch (Exception e) {
                throw new CompletionException(new ResourceManagerException(e));
            }
        }, getRpcService().getExecutor());
    }

    private void recoverWorkers(List<MesosWorkerStore.Worker> list) {
        if (!$assertionsDisabled && !this.workersInNew.isEmpty()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !this.workersInLaunch.isEmpty()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !this.workersBeingReturned.isEmpty()) {
            throw new AssertionError();
        }
        if (list.isEmpty()) {
            return;
        }
        LOG.info("Retrieved {} TaskManagers from previous attempt", Integer.valueOf(list.size()));
        ArrayList arrayList = new ArrayList(list.size());
        for (MesosWorkerStore.Worker worker : list) {
            switch (worker.state()) {
                case Launched:
                    this.workersInLaunch.put(extractResourceID(worker.taskID()), worker);
                    arrayList.add(new Tuple2(createLaunchableMesosWorker(worker.taskID()).taskRequest(), worker.hostname().get()));
                    break;
                case Released:
                    this.workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
                    break;
            }
            this.taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), this.selfActor);
        }
        if (arrayList.size() >= 1) {
            this.launchCoordinator.tell(new LaunchCoordinator.Assign(arrayList), this.selfActor);
        }
    }

    private CompletableFuture<Void> stopSupportingActorsAsync() {
        FiniteDuration finiteDuration = new FiniteDuration(5L, TimeUnit.SECONDS);
        CompletableFuture<Boolean> stopActor = stopActor(this.taskMonitor, finiteDuration);
        this.taskMonitor = null;
        CompletableFuture<Boolean> stopActor2 = stopActor(this.connectionMonitor, finiteDuration);
        this.connectionMonitor = null;
        CompletableFuture<Boolean> stopActor3 = stopActor(this.launchCoordinator, finiteDuration);
        this.launchCoordinator = null;
        CompletableFuture<Boolean> stopActor4 = stopActor(this.reconciliationCoordinator, finiteDuration);
        this.reconciliationCoordinator = null;
        return CompletableFuture.allOf(stopActor, stopActor2, stopActor3, stopActor4);
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager, org.apache.flink.runtime.rpc.RpcEndpoint
    public CompletableFuture<Void> onStop() {
        return stopSupportingActorsAsync().thenCompose(r3 -> {
            return super.onStop();
        });
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected void internalDeregisterApplication(ApplicationStatus applicationStatus, @Nullable String str) throws ResourceManagerException {
        LOG.info("Shutting down and unregistering as a Mesos framework.");
        Exception exc = null;
        try {
            this.schedulerDriver.stop(false);
        } catch (Exception e) {
            exc = new Exception("Could not unregister the Mesos framework.", e);
        }
        try {
            this.workerStore.stop(true);
        } catch (Exception e2) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(new Exception("Could not stop the Mesos worker store.", e2), exc);
        }
        if (exc != null) {
            throw new ResourceManagerException("Could not properly shut down the Mesos application.", exc);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
        if (!this.slotsPerWorker.iterator().next().isMatching(resourceProfile)) {
            return Collections.emptyList();
        }
        LOG.info("Starting a new worker.");
        try {
            MesosWorkerStore.Worker newWorker = MesosWorkerStore.Worker.newWorker(this.workerStore.newTaskID(), resourceProfile);
            this.workerStore.putWorker(newWorker);
            this.workersInNew.put(extractResourceID(newWorker.taskID()), newWorker);
            LaunchableMesosWorker createLaunchableMesosWorker = createLaunchableMesosWorker(newWorker.taskID());
            LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus, {} gpus, {} disk MB, {} Mbps).", createLaunchableMesosWorker.taskID().getValue(), Double.valueOf(createLaunchableMesosWorker.taskRequest().getMemory()), Double.valueOf(createLaunchableMesosWorker.taskRequest().getCPUs()), createLaunchableMesosWorker.taskRequest().getScalarRequests().get("gpus"), Double.valueOf(createLaunchableMesosWorker.taskRequest().getDisk()), Double.valueOf(createLaunchableMesosWorker.taskRequest().getNetworkMbps()));
            this.taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(newWorker)), this.selfActor);
            this.launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList(createLaunchableMesosWorker)), this.selfActor);
            return this.slotsPerWorker;
        } catch (Exception e) {
            onFatalError(new ResourceManagerException("Unable to request new workers.", e));
            return Collections.emptyList();
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    public boolean stopWorker(RegisteredMesosWorkerNode registeredMesosWorkerNode) {
        LOG.info("Stopping worker {}.", registeredMesosWorkerNode.getResourceID());
        try {
            if (this.workersInLaunch.containsKey(registeredMesosWorkerNode.getResourceID())) {
                MesosWorkerStore.Worker releaseWorker = this.workersInLaunch.remove(registeredMesosWorkerNode.getResourceID()).releaseWorker();
                this.workerStore.putWorker(releaseWorker);
                this.workersBeingReturned.put(extractResourceID(releaseWorker.taskID()), releaseWorker);
                this.taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(releaseWorker)), this.selfActor);
                if (releaseWorker.hostname().isDefined()) {
                    this.launchCoordinator.tell(new LaunchCoordinator.Unassign(releaseWorker.taskID(), releaseWorker.hostname().get()), this.selfActor);
                }
            } else if (this.workersBeingReturned.containsKey(registeredMesosWorkerNode.getResourceID())) {
                LOG.info("Ignoring request to stop worker {} because it is already being stopped.", registeredMesosWorkerNode.getResourceID());
            } else {
                LOG.warn("Unrecognized worker {}.", registeredMesosWorkerNode.getResourceID());
            }
            return true;
        } catch (Exception e) {
            onFatalError(new ResourceManagerException("Unable to release a worker.", e));
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    public RegisteredMesosWorkerNode workerStarted(ResourceID resourceID) {
        MesosWorkerStore.Worker worker = this.workersInLaunch.get(resourceID);
        if (worker != null) {
            return new RegisteredMesosWorkerNode(worker);
        }
        return null;
    }

    protected void registered(Registered registered) {
        this.connectionMonitor.tell(registered, this.selfActor);
        try {
            this.workerStore.setFrameworkID(Option.apply(registered.frameworkId()));
            this.launchCoordinator.tell(registered, this.selfActor);
            this.reconciliationCoordinator.tell(registered, this.selfActor);
            this.taskMonitor.tell(registered, this.selfActor);
        } catch (Exception e) {
            onFatalError(new ResourceManagerException("Unable to store the assigned framework ID.", e));
        }
    }

    protected void reregistered(ReRegistered reRegistered) {
        this.connectionMonitor.tell(reRegistered, this.selfActor);
        this.launchCoordinator.tell(reRegistered, this.selfActor);
        this.reconciliationCoordinator.tell(reRegistered, this.selfActor);
        this.taskMonitor.tell(reRegistered, this.selfActor);
    }

    protected void disconnected(Disconnected disconnected) {
        this.connectionMonitor.tell(disconnected, this.selfActor);
        this.launchCoordinator.tell(disconnected, this.selfActor);
        this.reconciliationCoordinator.tell(disconnected, this.selfActor);
        this.taskMonitor.tell(disconnected, this.selfActor);
    }

    protected void resourceOffers(ResourceOffers resourceOffers) {
        this.launchCoordinator.tell(resourceOffers, this.selfActor);
    }

    protected void offerRescinded(OfferRescinded offerRescinded) {
        this.launchCoordinator.tell(offerRescinded, this.selfActor);
    }

    protected void statusUpdate(StatusUpdate statusUpdate) {
        this.taskMonitor.tell(statusUpdate, this.selfActor);
        this.reconciliationCoordinator.tell(statusUpdate, this.selfActor);
        this.schedulerDriver.acknowledgeStatusUpdate(statusUpdate.status());
    }

    protected void frameworkMessage(FrameworkMessage frameworkMessage) {
    }

    protected void slaveLost(SlaveLost slaveLost) {
    }

    protected void executorLost(ExecutorLost executorLost) {
    }

    public void acceptOffers(AcceptOffers acceptOffers) {
        try {
            ArrayList arrayList = new ArrayList(acceptOffers.operations().size());
            for (Protos.Offer.Operation operation : acceptOffers.operations()) {
                if (operation.getType() == Protos.Offer.Operation.Type.LAUNCH) {
                    for (Protos.TaskInfo taskInfo : operation.getLaunch().getTaskInfosList()) {
                        MesosWorkerStore.Worker remove = this.workersInNew.remove(extractResourceID(taskInfo.getTaskId()));
                        if (!$assertionsDisabled && remove == null) {
                            throw new AssertionError();
                        }
                        MesosWorkerStore.Worker launchWorker = remove.launchWorker(taskInfo.getSlaveId(), acceptOffers.hostname());
                        this.workerStore.putWorker(launchWorker);
                        this.workersInLaunch.put(extractResourceID(launchWorker.taskID()), launchWorker);
                        LOG.info("Launching Mesos task {} on host {}.", launchWorker.taskID().getValue(), launchWorker.hostname().get());
                        arrayList.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(launchWorker)));
                    }
                }
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                this.taskMonitor.tell((TaskMonitor.TaskGoalStateUpdated) it.next(), this.selfActor);
            }
            this.schedulerDriver.acceptOffers(acceptOffers.offerIds(), acceptOffers.operations(), acceptOffers.filters());
        } catch (Exception e) {
            onFatalError(new ResourceManagerException("unable to accept offers", e));
        }
    }

    public void reconcile(ReconciliationCoordinator.Reconcile reconcile) {
        this.reconciliationCoordinator.tell(reconcile, this.selfActor);
    }

    public void taskTerminated(TaskMonitor.TaskTerminated taskTerminated) {
        Protos.TaskID taskID = taskTerminated.taskID();
        Protos.TaskStatus status = taskTerminated.status();
        ResourceID extractResourceID = extractResourceID(taskID);
        try {
            if (!this.workerStore.removeWorker(taskID)) {
                LOG.info("Received a termination notice for an unrecognized worker: {}", extractResourceID);
                return;
            }
            if (!$assertionsDisabled && this.workersInNew.containsKey(extractResourceID)) {
                throw new AssertionError();
            }
            if (this.workersBeingReturned.remove(extractResourceID) != null) {
                LOG.info("Worker {} finished successfully with message: {}", extractResourceID, status.getMessage());
            } else {
                MesosWorkerStore.Worker remove = this.workersInLaunch.remove(extractResourceID);
                if (!$assertionsDisabled && remove == null) {
                    throw new AssertionError();
                }
                LOG.info("Worker {} failed with status: {}, reason: {}, message: {}.", extractResourceID, status.getState(), status.getReason(), status.getMessage());
                startNewWorker(remove.profile());
            }
            closeTaskManagerConnection(extractResourceID, new Exception(status.getMessage()));
        } catch (Exception e) {
            onFatalError(new ResourceManagerException("unable to remove worker", e));
        }
    }

    private CompletableFuture<Boolean> stopActor(@Nullable ActorRef actorRef, FiniteDuration finiteDuration) {
        return actorRef == null ? CompletableFuture.completedFuture(true) : FutureUtils.toJava(Patterns.gracefulStop(actorRef, finiteDuration)).exceptionally(th -> {
            this.actorSystem.stop(actorRef);
            this.log.warn("Could not stop actor {} gracefully.", actorRef.path(), th);
            return false;
        });
    }

    private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
        LOG.debug("LaunchableMesosWorker parameters: {}", this.taskManagerParameters);
        return new LaunchableMesosWorker(this.artifactServer, this.taskManagerParameters, this.taskManagerContainerSpec, taskID, this.mesosConfig);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ResourceID extractResourceID(Protos.TaskID taskID) {
        return new ResourceID(taskID.getValue());
    }

    static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) {
        switch (worker.state()) {
            case Launched:
                return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
            case Released:
                return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
            case New:
                return new TaskMonitor.New(worker.taskID());
            default:
                throw new IllegalArgumentException("unsupported worker state");
        }
    }

    private static TaskSchedulerBuilder createOptimizer() {
        return new TaskSchedulerBuilder() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.1
            TaskScheduler.Builder builder = new TaskScheduler.Builder();

            @Override // org.apache.flink.mesos.scheduler.TaskSchedulerBuilder
            public TaskSchedulerBuilder withLeaseRejectAction(Action1<VirtualMachineLease> action1) {
                this.builder.withLeaseRejectAction(action1);
                return this;
            }

            @Override // org.apache.flink.mesos.scheduler.TaskSchedulerBuilder
            public TaskSchedulerBuilder withRejectAllExpiredOffers() {
                this.builder.withRejectAllExpiredOffers();
                return this;
            }

            @Override // org.apache.flink.mesos.scheduler.TaskSchedulerBuilder
            public TaskSchedulerBuilder withLeaseOfferExpirySecs(long j) {
                this.builder.withLeaseOfferExpirySecs(j);
                return this;
            }

            @Override // org.apache.flink.mesos.scheduler.TaskSchedulerBuilder
            public TaskScheduler build() {
                return this.builder.build();
            }
        };
    }

    static {
        $assertionsDisabled = !MesosResourceManager.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) MesosResourceManager.class);
    }
}
