package org.apache.hadoop.mapreduce.task.reduce;

import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.task.reduce.MapHost;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.class */
public class ShuffleSchedulerImpl<K, V> implements ShuffleScheduler<K, V> {
    private static final ThreadLocal<Long> SHUFFLE_START = new ThreadLocal<Long>() { // from class: org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public Long initialValue() {
            return 0L;
        }
    };
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleSchedulerImpl.class);
    private static final int MAX_MAPS_AT_ONCE = 20;
    private static final long INITIAL_PENALTY = 10000;
    private static final float PENALTY_GROWTH_RATE = 1.3f;
    private static final int REPORT_FAILURE_LIMIT = 10;
    private static final float BYTES_PER_MILLIS_TO_MBS = 9.536743E-4f;
    private final boolean[] finishedMaps;
    private final int totalMaps;
    private int remainingMaps;
    private final TaskAttemptID reduceId;
    private final TaskStatus status;
    private final ExceptionReporter reporter;
    private final int abortFailureLimit;
    private final Progress progress;
    private final Counters.Counter shuffledMapsCounter;
    private final Counters.Counter reduceShuffleBytes;
    private final Counters.Counter failedShuffleCounter;
    private final int maxFailedUniqueFetches;
    private final int maxFetchFailuresBeforeReporting;
    private final boolean reportReadErrorImmediately;
    private long maxPenalty;
    private int maxHostFailures;
    private Map<String, MapHost> mapLocations = new HashMap();
    private Set<MapHost> pendingHosts = new HashSet();
    private Set<TaskAttemptID> obsoleteMaps = new HashSet();
    private final Random random = new Random();
    private final DelayQueue<Penalty> penalties = new DelayQueue<>();
    private final ShuffleSchedulerImpl<K, V>.Referee referee = new Referee();
    private final Map<TaskAttemptID, IntWritable> failureCounts = new HashMap();
    private final Map<String, IntWritable> hostFailures = new HashMap();
    private volatile int maxMapRuntime = 0;
    private long totalBytesShuffledTillNow = 0;
    private final DecimalFormat mbpsFormat = new DecimalFormat("0.00");
    private final CopyTimeTracker copyTimeTracker = new CopyTimeTracker();
    private final long startTime = Time.monotonicNow();
    private long lastProgressTime = this.startTime;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl$CopyTimeTracker.class */
    public static class CopyTimeTracker {
        List<Interval> intervals = Collections.emptyList();
        long copyMillis = 0;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl$CopyTimeTracker$Interval.class */
        public static class Interval {
            final long start;
            final long end;

            public Interval(long j, long j2) {
                this.start = j;
                this.end = j2;
            }

            public long getIntervalLength() {
                return this.end - this.start;
            }
        }

        public void add(long j, long j2) {
            this.copyMillis = getTotalCopyMillis(new Interval(j, j2));
        }

        public long getCopyMillis() {
            return this.copyMillis;
        }

        private long getTotalCopyMillis(Interval interval) {
            if (interval == null) {
                return this.copyMillis;
            }
            ArrayList arrayList = new ArrayList(this.intervals.size() + 1);
            for (Interval interval2 : this.intervals) {
                if (interval2.end < interval.start) {
                    arrayList.add(interval2);
                } else if (interval2.start > interval.end) {
                    arrayList.add(interval);
                    interval = interval2;
                } else {
                    interval = new Interval(Math.min(interval2.start, interval.start), Math.max(interval.end, interval2.end));
                }
            }
            arrayList.add(interval);
            this.intervals = arrayList;
            long j = 0;
            Iterator<Interval> it = this.intervals.iterator();
            while (it.hasNext()) {
                j += it.next().getIntervalLength();
            }
            return j;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl$Penalty.class */
    public static class Penalty implements Delayed {
        MapHost host;
        private long endTime;

        Penalty(MapHost mapHost, long j) {
            this.host = mapHost;
            this.endTime = Time.monotonicNow() + j;
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return timeUnit.convert(this.endTime - Time.monotonicNow(), TimeUnit.MILLISECONDS);
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            long j = ((Penalty) delayed).endTime;
            if (this.endTime == j) {
                return 0;
            }
            return this.endTime < j ? -1 : 1;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl$Referee.class */
    private class Referee extends Thread {
        public Referee() {
            setName("ShufflePenaltyReferee");
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    MapHost mapHost = ((Penalty) ShuffleSchedulerImpl.this.penalties.take()).host;
                    synchronized (ShuffleSchedulerImpl.this) {
                        if (mapHost.markAvailable() == MapHost.State.PENDING) {
                            ShuffleSchedulerImpl.this.pendingHosts.add(mapHost);
                            ShuffleSchedulerImpl.this.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    return;
                } catch (Throwable th) {
                    ShuffleSchedulerImpl.this.reporter.reportException(th);
                    return;
                }
            }
        }
    }

    public ShuffleSchedulerImpl(JobConf jobConf, TaskStatus taskStatus, TaskAttemptID taskAttemptID, ExceptionReporter exceptionReporter, Progress progress, Counters.Counter counter, Counters.Counter counter2, Counters.Counter counter3) {
        this.maxPenalty = 60000L;
        this.totalMaps = jobConf.getNumMapTasks();
        this.abortFailureLimit = Math.max(30, this.totalMaps / 10);
        this.remainingMaps = this.totalMaps;
        this.finishedMaps = new boolean[this.remainingMaps];
        this.reporter = exceptionReporter;
        this.status = taskStatus;
        this.reduceId = taskAttemptID;
        this.progress = progress;
        this.shuffledMapsCounter = counter;
        this.reduceShuffleBytes = counter2;
        this.failedShuffleCounter = counter3;
        this.referee.start();
        this.maxFailedUniqueFetches = Math.min(this.totalMaps, 5);
        this.maxFetchFailuresBeforeReporting = jobConf.getInt(MRJobConfig.SHUFFLE_FETCH_FAILURES, 10);
        this.reportReadErrorImmediately = jobConf.getBoolean(MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);
        this.maxPenalty = jobConf.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY, 60000L);
        this.maxHostFailures = jobConf.getInt(MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES, 5);
    }

    @Override // org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler
    public void resolve(TaskCompletionEvent taskCompletionEvent) {
        switch (taskCompletionEvent.getTaskStatus()) {
            case SUCCEEDED:
                URI baseURI = getBaseURI(this.reduceId, taskCompletionEvent.getTaskTrackerHttp());
                addKnownMapOutput(baseURI.getHost() + ":" + baseURI.getPort(), baseURI.toString(), taskCompletionEvent.getTaskAttemptId());
                this.maxMapRuntime = Math.max(this.maxMapRuntime, taskCompletionEvent.getTaskRunTime());
                return;
            case FAILED:
            case KILLED:
            case OBSOLETE:
                obsoleteMapOutput(taskCompletionEvent.getTaskAttemptId());
                LOG.info("Ignoring obsolete output of " + taskCompletionEvent.getTaskStatus() + " map-task: '" + taskCompletionEvent.getTaskAttemptId() + "'");
                return;
            case TIPFAILED:
                tipFailed(taskCompletionEvent.getTaskAttemptId().getTaskID());
                LOG.info("Ignoring output of failed map TIP: '" + taskCompletionEvent.getTaskAttemptId() + "'");
                return;
            default:
                return;
        }
    }

    static URI getBaseURI(TaskAttemptID taskAttemptID, String str) {
        StringBuffer stringBuffer = new StringBuffer(str);
        if (!str.endsWith("/")) {
            stringBuffer.append("/");
        }
        stringBuffer.append("mapOutput?job=");
        stringBuffer.append(taskAttemptID.getJobID());
        stringBuffer.append("&reduce=");
        stringBuffer.append(taskAttemptID.getTaskID().getId());
        stringBuffer.append("&map=");
        return URI.create(stringBuffer.toString());
    }

    public synchronized void copySucceeded(TaskAttemptID taskAttemptID, MapHost mapHost, long j, long j2, long j3, MapOutput<K, V> mapOutput) throws IOException {
        this.failureCounts.remove(taskAttemptID);
        this.hostFailures.remove(mapHost.getHostName());
        int id = taskAttemptID.getTaskID().getId();
        if (this.finishedMaps[id]) {
            LOG.warn("Aborting already-finished MapOutput for " + taskAttemptID);
            mapOutput.abort();
            return;
        }
        mapOutput.commit();
        this.finishedMaps[id] = true;
        this.shuffledMapsCounter.increment(1L);
        int i = this.remainingMaps - 1;
        this.remainingMaps = i;
        if (i == 0) {
            notifyAll();
        }
        long j4 = j3 - j2;
        if (j4 == 0) {
            j4 = 1;
        }
        String str = "copy task(" + taskAttemptID + " succeeded at " + this.mbpsFormat.format((((float) j) / ((float) j4)) * BYTES_PER_MILLIS_TO_MBS) + " MB/s)";
        this.copyTimeTracker.add(j2, j3);
        this.totalBytesShuffledTillNow += j;
        updateStatus(str);
        this.reduceShuffleBytes.increment(j);
        this.lastProgressTime = Time.monotonicNow();
        LOG.debug("map {} done {}", taskAttemptID, this.status.getStateString());
    }

    private synchronized void updateStatus(String str) {
        int i = this.totalMaps - this.remainingMaps;
        long copyMillis = this.copyTimeTracker.getCopyMillis();
        if (copyMillis == 0) {
            copyMillis = 1;
        }
        float f = (((float) this.totalBytesShuffledTillNow) / ((float) copyMillis)) * BYTES_PER_MILLIS_TO_MBS;
        this.progress.set(i / this.totalMaps);
        this.status.setStateString(i + " / " + this.totalMaps + " copied.");
        if (str != null) {
            this.progress.setStatus(str + " Aggregated copy rate(" + i + " of " + this.totalMaps + " at " + this.mbpsFormat.format(f) + " MB/s)");
        } else {
            this.progress.setStatus("copy(" + i + " of " + this.totalMaps + " at " + this.mbpsFormat.format(f) + " MB/s)");
        }
    }

    private void updateStatus() {
        updateStatus(null);
    }

    public synchronized void hostFailed(String str) {
        if (!this.hostFailures.containsKey(str)) {
            this.hostFailures.put(str, new IntWritable(1));
        } else {
            IntWritable intWritable = this.hostFailures.get(str);
            intWritable.set(intWritable.get() + 1);
        }
    }

    @VisibleForTesting
    synchronized int hostFailureCount(String str) {
        int i = 0;
        if (this.hostFailures.containsKey(str)) {
            i = this.hostFailures.get(str).get();
        }
        return i;
    }

    @VisibleForTesting
    synchronized int fetchFailureCount(TaskAttemptID taskAttemptID) {
        int i = 0;
        if (this.failureCounts.containsKey(taskAttemptID)) {
            i = this.failureCounts.get(taskAttemptID).get();
        }
        return i;
    }

    public synchronized void copyFailed(TaskAttemptID taskAttemptID, MapHost mapHost, boolean z, boolean z2) {
        int i = 1;
        if (this.failureCounts.containsKey(taskAttemptID)) {
            IntWritable intWritable = this.failureCounts.get(taskAttemptID);
            intWritable.set(intWritable.get() + 1);
            i = intWritable.get();
        } else {
            this.failureCounts.put(taskAttemptID, new IntWritable(1));
        }
        String hostName = mapHost.getHostName();
        if (this.hostFailures.get(hostName) == null) {
            this.hostFailures.put(hostName, new IntWritable(1));
        }
        boolean z3 = this.hostFailures.get(hostName).get() > getMaxHostFailures();
        if (i >= this.abortFailureLimit) {
            try {
                throw new IOException(i + " failures downloading " + taskAttemptID);
            } catch (IOException e) {
                this.reporter.reportException(e);
            }
        }
        checkAndInformMRAppMaster(i, taskAttemptID, z, z2, z3);
        checkReducerHealth();
        penalize(mapHost, Math.min((long) (10000.0d * Math.pow(1.2999999523162842d, i)), this.maxPenalty));
        this.failedShuffleCounter.increment(1L);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void penalize(MapHost mapHost, long j) {
        mapHost.penalize();
        this.penalties.add((DelayQueue<Penalty>) new Penalty(mapHost, j));
    }

    public void reportLocalError(IOException iOException) {
        try {
            LOG.error("Shuffle failed : local error on this node: " + InetAddress.getLocalHost());
        } catch (UnknownHostException e) {
            LOG.error("Shuffle failed : local error on this node");
        }
        this.reporter.reportException(iOException);
    }

    private void checkAndInformMRAppMaster(int i, TaskAttemptID taskAttemptID, boolean z, boolean z2, boolean z3) {
        if (z2 || ((this.reportReadErrorImmediately && z) || i % this.maxFetchFailuresBeforeReporting == 0 || z3)) {
            LOG.info("Reporting fetch failure for " + taskAttemptID + " to MRAppMaster.");
            this.status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) taskAttemptID);
        }
    }

    private void checkReducerHealth() {
        long value = this.failedShuffleCounter.getValue();
        int i = this.totalMaps - this.remainingMaps;
        boolean z = ((float) value) / ((float) (value + ((long) i))) < 0.5f;
        boolean z2 = ((float) i) / ((float) this.totalMaps) >= 0.5f;
        boolean z3 = ((float) ((int) (Time.monotonicNow() - this.lastProgressTime))) / ((float) Math.max((int) (this.lastProgressTime - this.startTime), this.maxMapRuntime)) >= 0.5f;
        if ((this.failureCounts.size() >= this.maxFailedUniqueFetches || this.failureCounts.size() == this.totalMaps - i) && !z) {
            if (!z2 || z3) {
                LOG.error("Shuffle failed with too many fetch failures and insufficient progress!");
                this.reporter.reportException(new IOException("Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out."));
            }
        }
    }

    public synchronized void tipFailed(TaskID taskID) {
        if (this.finishedMaps[taskID.getId()]) {
            return;
        }
        this.finishedMaps[taskID.getId()] = true;
        int i = this.remainingMaps - 1;
        this.remainingMaps = i;
        if (i == 0) {
            notifyAll();
        }
        updateStatus();
    }

    public synchronized void addKnownMapOutput(String str, String str2, TaskAttemptID taskAttemptID) {
        MapHost mapHost = this.mapLocations.get(str);
        if (mapHost == null) {
            mapHost = new MapHost(str, str2);
            this.mapLocations.put(str, mapHost);
        }
        mapHost.addKnownMap(taskAttemptID);
        if (mapHost.getState() == MapHost.State.PENDING) {
            this.pendingHosts.add(mapHost);
            notifyAll();
        }
    }

    public synchronized void obsoleteMapOutput(TaskAttemptID taskAttemptID) {
        this.obsoleteMaps.add(taskAttemptID);
    }

    public synchronized void putBackKnownMapOutput(MapHost mapHost, TaskAttemptID taskAttemptID) {
        mapHost.addKnownMap(taskAttemptID);
    }

    public synchronized MapHost getHost() throws InterruptedException {
        while (this.pendingHosts.isEmpty()) {
            wait();
        }
        Iterator<MapHost> it = this.pendingHosts.iterator();
        MapHost next = it.next();
        int nextInt = this.random.nextInt(this.pendingHosts.size());
        for (int i = 0; i < nextInt; i++) {
            next = it.next();
        }
        this.pendingHosts.remove(next);
        next.markBusy();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Assigning " + next + " with " + next.getNumKnownMapOutputs() + " to " + Thread.currentThread().getName());
        }
        SHUFFLE_START.set(Long.valueOf(Time.monotonicNow()));
        return next;
    }

    public synchronized List<TaskAttemptID> getMapsForHost(MapHost mapHost) {
        List<TaskAttemptID> andClearKnownMaps = mapHost.getAndClearKnownMaps();
        Iterator<TaskAttemptID> it = andClearKnownMaps.iterator();
        ArrayList arrayList = new ArrayList();
        int i = 0;
        int size = andClearKnownMaps.size();
        while (it.hasNext()) {
            TaskAttemptID next = it.next();
            if (!this.obsoleteMaps.contains(next) && !this.finishedMaps[next.getTaskID().getId()]) {
                arrayList.add(next);
                i++;
                if (i >= 20) {
                    break;
                }
            }
        }
        while (it.hasNext()) {
            TaskAttemptID next2 = it.next();
            if (!this.obsoleteMaps.contains(next2) && !this.finishedMaps[next2.getTaskID().getId()]) {
                mapHost.addKnownMap(next2);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("assigned " + i + " of " + size + " to " + mapHost + " to " + Thread.currentThread().getName());
        }
        return arrayList;
    }

    public synchronized void freeHost(MapHost mapHost) {
        if (mapHost.getState() != MapHost.State.PENALIZED && mapHost.markAvailable() == MapHost.State.PENDING) {
            this.pendingHosts.add(mapHost);
            notifyAll();
        }
        LOG.info(mapHost + " freed by " + Thread.currentThread().getName() + " in " + (Time.monotonicNow() - SHUFFLE_START.get().longValue()) + "ms");
    }

    public synchronized void resetKnownMaps() {
        this.mapLocations.clear();
        this.obsoleteMaps.clear();
        this.pendingHosts.clear();
    }

    @Override // org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler
    public synchronized boolean waitUntilDone(int i) throws InterruptedException {
        if (this.remainingMaps <= 0) {
            return true;
        }
        wait(i);
        return this.remainingMaps == 0;
    }

    @Override // org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler
    public void close() throws InterruptedException {
        this.referee.interrupt();
        this.referee.join();
    }

    public int getMaxHostFailures() {
        return this.maxHostFailures;
    }
}
