package org.apache.hadoop.hbase.replication.regionserver;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.WALKeyWriteTimeBasedFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.class */
public class ReplicationSource implements ReplicationSourceInterface {
    private static final String HBASE_REPLICATION_FILTER_SYSTEM_WAL_ENTRY_FILTER = "hbase.replication.filter.systemWALEntryFilter";
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
    protected int queueSizePerGroup;
    protected ReplicationSourceLogQueue logQueue;
    protected ReplicationQueueStorage queueStorage;
    protected ReplicationPeer replicationPeer;
    protected Configuration conf;
    protected ReplicationQueueInfo replicationQueueInfo;
    private String peerId;
    protected ReplicationSourceManager manager;
    protected Server server;
    private long sleepForRetries;
    protected FileSystem fs;
    private UUID clusterId;
    protected String queueId;
    private int maxRetriesMultiplier;
    private MetricsSource metrics;
    private volatile ReplicationEndpoint replicationEndpoint;
    private boolean abortOnError;
    protected volatile WALEntryFilter walEntryFilter;
    private ReplicationThrottler throttler;
    private long defaultBandwidth;
    private long currentBandwidth;
    private WALFileLengthProvider walFileLengthProvider;
    private AtomicLong totalBufferUsed;
    public static final String WAIT_ON_ENDPOINT_SECONDS = "hbase.replication.wait.on.endpoint.seconds";
    public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
    private Thread initThread;
    private AtomicLong totalReplicatedEdits = new AtomicLong(0);
    volatile boolean sourceRunning = false;
    private AtomicBoolean startupOngoing = new AtomicBoolean(false);
    private AtomicBoolean retryStartup = new AtomicBoolean(false);
    private Class<? extends WALEntryFilter> implClass = null;

    @VisibleForTesting
    protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads = new ConcurrentHashMap<>();
    private int waitOnEndpointSeconds = -1;

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSource$LogsComparator.class */
    public static class LogsComparator implements Comparator<Path> {
        @Override // java.util.Comparator
        public int compare(Path path, Path path2) {
            return Long.compare(getTS(path), getTS(path2));
        }

        private static long getTS(Path path) {
            return Long.parseLong(path.getName().substring(path.getName().lastIndexOf(46) + 1));
        }
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void init(Configuration configuration, FileSystem fileSystem, ReplicationSourceManager replicationSourceManager, ReplicationQueueStorage replicationQueueStorage, ReplicationPeer replicationPeer, Server server, String str, UUID uuid, WALFileLengthProvider wALFileLengthProvider, MetricsSource metricsSource) throws IOException {
        this.server = server;
        this.conf = HBaseConfiguration.create(configuration);
        this.waitOnEndpointSeconds = this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, 30);
        decorateConf();
        this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000L);
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
        this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
        this.logQueue = new ReplicationSourceLogQueue(configuration, metricsSource, this);
        this.queueStorage = replicationQueueStorage;
        this.replicationPeer = replicationPeer;
        this.manager = replicationSourceManager;
        this.fs = fileSystem;
        this.metrics = metricsSource;
        this.clusterId = uuid;
        this.queueId = str;
        this.replicationQueueInfo = new ReplicationQueueInfo(str);
        this.peerId = this.replicationQueueInfo.getPeerId();
        this.defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0L);
        this.currentBandwidth = getCurrentBandwidth();
        this.throttler = new ReplicationThrottler(this.currentBandwidth / 10.0d);
        this.totalBufferUsed = replicationSourceManager.getTotalBufferUsed();
        this.walFileLengthProvider = wALFileLengthProvider;
        this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true);
        LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", new Object[]{str, replicationPeer.getId(), Long.valueOf(this.currentBandwidth)});
        this.implClass = initCustomSystemTableWALEntryFilter(this.conf);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Class<? extends WALEntryFilter> initCustomSystemTableWALEntryFilter(Configuration configuration) {
        Class cls = null;
        try {
            cls = Class.forName(configuration.get(HBASE_REPLICATION_FILTER_SYSTEM_WAL_ENTRY_FILTER, SystemTableWALEntryFilter.class.getName()), true, getClass().getClassLoader());
            LOG.info("Custom WAL entry filter {} is loaded successfully.", cls);
            return cls;
        } catch (Throwable th) {
            LOG.error("Error while loading custom WAL entry filter {}", cls);
            return SystemTableWALEntryFilter.class;
        }
    }

    private void decorateConf() {
        String str = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
        if (StringUtils.isNotEmpty(str)) {
            this.conf.set(HConstants.RPC_CODEC_CONF_KEY, str);
        }
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void enqueueLog(Path path) {
        String wALPrefixFromWALName = AbstractFSWALProvider.getWALPrefixFromWALName(path.getName());
        if (!this.logQueue.enqueueLog(path, wALPrefixFromWALName) && isSourceActive() && this.walEntryFilter != null) {
            tryStartNewShipper(wALPrefixFromWALName);
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("{} Added log file {} to queue of source {}.", new Object[]{logPeerId(), wALPrefixFromWALName, this.replicationQueueInfo.getQueueId()});
        }
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void addHFileRefs(TableName tableName, byte[] bArr, List<Pair<Path, Path>> list) throws ReplicationException {
        Map<TableName, List<String>> tableCFs = this.replicationPeer.getTableCFs();
        if (tableCFs == null) {
            this.queueStorage.addHFileRefs(this.peerId, list);
            this.metrics.incrSizeOfHFileRefsQueue(list.size());
            return;
        }
        List<String> list2 = tableCFs.get(tableName);
        if (!tableCFs.containsKey(tableName) || (list2 != null && !list2.contains(Bytes.toString(bArr)))) {
            LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", new Object[]{tableName, Bytes.toString(bArr), this.peerId});
        } else {
            this.queueStorage.addHFileRefs(this.peerId, list);
            this.metrics.incrSizeOfHFileRefsQueue(list.size());
        }
    }

    private ReplicationEndpoint createReplicationEndpoint() throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
        ReplicationEndpoint replicationEndpoint;
        ReplicationEndpoint postCreateReplicationEndPoint;
        RegionServerCoprocessorHost regionServerCoprocessorHost = null;
        if (this.server instanceof HRegionServer) {
            regionServerCoprocessorHost = ((HRegionServer) this.server).getRegionServerCoprocessorHost();
        }
        String replicationEndpointImpl = this.replicationPeer.getPeerConfig().getReplicationEndpointImpl();
        if (replicationEndpointImpl == null) {
            replicationEndpoint = new HBaseInterClusterReplicationEndpoint();
        } else {
            try {
                replicationEndpoint = (ReplicationEndpoint) Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            } catch (NoSuchMethodException | InvocationTargetException e) {
                throw new IllegalArgumentException(e);
            }
        }
        if (regionServerCoprocessorHost != null && (postCreateReplicationEndPoint = regionServerCoprocessorHost.postCreateReplicationEndPoint(replicationEndpoint)) != null) {
            replicationEndpoint = postCreateReplicationEndPoint;
        }
        return replicationEndpoint;
    }

    private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) throws IOException, TimeoutException {
        TableDescriptors tableDescriptors = null;
        if (this.server instanceof HRegionServer) {
            tableDescriptors = ((HRegionServer) this.server).getTableDescriptors();
        }
        replicationEndpoint.init(new ReplicationEndpoint.Context(this.conf, this.replicationPeer.getConfiguration(), this.fs, this.peerId, this.clusterId, this.replicationPeer, this.metrics, tableDescriptors, this.server));
        replicationEndpoint.start();
        replicationEndpoint.awaitRunning(this.waitOnEndpointSeconds, TimeUnit.SECONDS);
    }

    private void initializeWALEntryFilter(UUID uuid) {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(createSystemTableWalEntryFilterInstance());
        WALEntryFilter initWALKeyWriteTimeFilter = initWALKeyWriteTimeFilter();
        if (initWALKeyWriteTimeFilter != null) {
            newArrayList.add(initWALKeyWriteTimeFilter);
            LOG.info("WALKey writeTime based filter for replication is enabled.");
        }
        WALEntryFilter wALEntryfilter = this.replicationEndpoint.getWALEntryfilter();
        if (wALEntryfilter != null) {
            newArrayList.add(wALEntryfilter);
        }
        newArrayList.add(new ClusterMarkingEntryFilter(this.clusterId, uuid, this.replicationEndpoint));
        this.walEntryFilter = new ChainWALEntryFilter(newArrayList);
    }

    private WALEntryFilter createSystemTableWalEntryFilterInstance() {
        try {
            return this.implClass.newInstance();
        } catch (IllegalAccessException e) {
            LOG.error("Error while instantiating {}.", this.implClass.getName(), e);
            return new SystemTableWALEntryFilter();
        } catch (InstantiationException e2) {
            LOG.error("Error while instantiating {}.", this.implClass.getName(), e2);
            return new SystemTableWALEntryFilter();
        }
    }

    private WALEntryFilter initWALKeyWriteTimeFilter() {
        if (this.conf.getBoolean(HConstants.HBASE_REPLICATION_WAL_KEY_WRITE_TIME_FILTER_ENABLED, true)) {
            return new WALKeyWriteTimeBasedFilter(this.replicationPeer.getPeerCreationTime());
        }
        return null;
    }

    private void tryStartNewShipper(String str) {
        this.workerThreads.compute(str, (str2, replicationSourceShipper) -> {
            if (replicationSourceShipper != null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(), str2);
                }
                return replicationSourceShipper;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} Starting up worker for wal group {}", logPeerId(), str2);
            }
            ReplicationSourceShipper createNewShipper = createNewShipper(str);
            ReplicationSourceWALReader createNewWALReader = createNewWALReader(str, createNewShipper.getStartPosition());
            Threads.setDaemonThreadRunning(createNewWALReader, Thread.currentThread().getName() + ".replicationSource.wal-reader." + str + "," + this.queueId, (thread, th) -> {
                uncaughtException(thread, th, this.manager, getPeerId());
            });
            createNewShipper.setWALReader(createNewWALReader);
            createNewShipper.startup((thread2, th2) -> {
                uncaughtException(thread2, th2, this.manager, getPeerId());
            });
            return createNewShipper;
        });
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public Map<String, ReplicationStatus> getWalGroupStatus() {
        TreeMap treeMap = new TreeMap();
        for (Map.Entry<String, ReplicationSourceShipper> entry : this.workerThreads.entrySet()) {
            String key = entry.getKey();
            ReplicationSourceShipper value = entry.getValue();
            long lastTimeStampOfWalGroup = this.metrics.getLastTimeStampOfWalGroup(key);
            long ageOfLastShippedOp = this.metrics.getAgeOfLastShippedOp(key);
            int queueSize = this.logQueue.getQueueSize(key);
            long calculateReplicationDelay = ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStampOfWalGroup, queueSize);
            Path currentPath = value.getCurrentPath();
            long j = -1;
            if (currentPath != null) {
                try {
                    j = getFileSize(currentPath);
                } catch (IOException e) {
                    LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
                }
            } else {
                currentPath = new Path("NO_LOGS_IN_QUEUE");
                LOG.warn("{} No replication ongoing, waiting for new log", logPeerId());
            }
            ReplicationStatus.ReplicationStatusBuilder newBuilder = ReplicationStatus.newBuilder();
            newBuilder.withPeerId(getPeerId()).withQueueSize(queueSize).withWalGroup(key).withCurrentPath(currentPath).withCurrentPosition(value.getCurrentPosition()).withFileSize(j).withAgeOfLastShippedOp(ageOfLastShippedOp).withReplicationDelay(calculateReplicationDelay);
            treeMap.put(getPeerId() + "=>" + key, newBuilder.build());
        }
        return treeMap;
    }

    private long getFileSize(Path path) throws IOException {
        long length;
        try {
            length = this.fs.getContentSummary(path).getLength();
        } catch (FileNotFoundException e) {
            length = this.fs.getContentSummary(AbstractFSWALProvider.getArchivedLogPath(path, this.conf)).getLength();
        }
        return length;
    }

    protected ReplicationSourceShipper createNewShipper(String str) {
        return new ReplicationSourceShipper(this.conf, str, this.logQueue, this);
    }

    private ReplicationSourceWALReader createNewWALReader(String str, long j) {
        return this.replicationPeer.getPeerConfig().isSerial() ? new SerialReplicationSourceWALReader(this.fs, this.conf, this.logQueue, j, this.walEntryFilter, this, str) : new ReplicationSourceWALReader(this.fs, this.conf, this.logQueue, j, this.walEntryFilter, this, str);
    }

    WALEntryFilter getWalEntryFilter() {
        return this.walEntryFilter;
    }

    protected final void uncaughtException(Thread thread, Throwable th, ReplicationSourceManager replicationSourceManager, String str) {
        RSRpcServices.exitIfOOME(th);
        LOG.error("Unexpected exception in {} currentPath={}", new Object[]{thread.getName(), getCurrentPath(), th});
        if (this.abortOnError) {
            this.server.abort("Unexpected exception in " + thread.getName(), th);
        }
        if (replicationSourceManager == null) {
            return;
        }
        while (true) {
            try {
                LOG.info("Refreshing replication sources now due to previous error on thread: {}", thread.getName());
                replicationSourceManager.refreshSources(str);
                return;
            } catch (IOException e) {
                LOG.error("Replication sources refresh failed.", e);
                sleepForRetries("Sleeping before try refreshing sources again", this.maxRetriesMultiplier);
            }
        }
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public ReplicationEndpoint getReplicationEndpoint() {
        return this.replicationEndpoint;
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public ReplicationSourceManager getSourceManager() {
        return this.manager;
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void tryThrottle(int i) throws InterruptedException {
        checkBandwidthChangeAndResetThrottler();
        if (this.throttler.isEnabled()) {
            long nextSleepInterval = this.throttler.getNextSleepInterval(i);
            if (nextSleepInterval > 0) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), Long.valueOf(nextSleepInterval));
                }
                Thread.sleep(nextSleepInterval);
                this.throttler.resetStartTick();
            }
        }
    }

    private void checkBandwidthChangeAndResetThrottler() {
        long currentBandwidth = getCurrentBandwidth();
        if (currentBandwidth != this.currentBandwidth) {
            this.currentBandwidth = currentBandwidth;
            this.throttler.setBandwidth(this.currentBandwidth / 10.0d);
            LOG.info("ReplicationSource : " + this.peerId + " bandwidth throttling changed, currentBandWidth=" + this.currentBandwidth);
        }
    }

    private long getCurrentBandwidth() {
        long peerBandwidth = this.replicationPeer.getPeerBandwidth();
        return peerBandwidth != 0 ? peerBandwidth : this.defaultBandwidth;
    }

    protected boolean sleepForRetries(String str, int i) {
        try {
            if (LOG.isTraceEnabled()) {
                LOG.trace("{} {}, sleeping {} times {}", new Object[]{logPeerId(), str, Long.valueOf(this.sleepForRetries), Integer.valueOf(i)});
            }
            Thread.sleep(this.sleepForRetries * i);
        } catch (InterruptedException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
            }
            Thread.currentThread().interrupt();
        }
        return i < this.maxRetriesMultiplier;
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public boolean isPeerEnabled() {
        return this.replicationPeer.isPeerEnabled();
    }

    private void initialize() {
        UUID peerUUID;
        int i = 1;
        while (isSourceActive()) {
            try {
                ReplicationEndpoint createReplicationEndpoint = createReplicationEndpoint();
                try {
                    initAndStartReplicationEndpoint(createReplicationEndpoint);
                    this.replicationEndpoint = createReplicationEndpoint;
                    break;
                } catch (Exception e) {
                    LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e);
                    createReplicationEndpoint.stop();
                    if (!sleepForRetries("Error starting ReplicationEndpoint", i)) {
                        this.retryStartup.set(!this.abortOnError);
                        this.startupOngoing.set(false);
                        throw new RuntimeException("Exhausted retries to start replication endpoint.");
                    }
                    i++;
                }
            } catch (Exception e2) {
                LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e2);
                if (sleepForRetries("Error creating ReplicationEndpoint", i)) {
                    i++;
                }
            }
        }
        if (!isSourceActive()) {
            this.retryStartup.set(!this.abortOnError);
            this.startupOngoing.set(false);
            throw new IllegalStateException("Source should be active.");
        }
        int i2 = 1;
        while (true) {
            peerUUID = this.replicationEndpoint.getPeerUUID();
            if (!isSourceActive() || peerUUID != null) {
                break;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(), Long.valueOf(this.sleepForRetries * i2));
            }
            if (sleepForRetries("Cannot contact the peer's zk ensemble", i2)) {
                i2++;
            }
        }
        if (!isSourceActive()) {
            this.retryStartup.set(!this.abortOnError);
            this.startupOngoing.set(false);
            throw new IllegalStateException("Source should be active.");
        }
        if (null != this.clusterId && this.clusterId.equals(peerUUID) && !this.replicationEndpoint.canReplicateToSameCluster()) {
            terminate("ClusterId " + this.clusterId + " is replicating to itself: peerClusterId " + peerUUID + " which is not allowed by ReplicationEndpoint:" + this.replicationEndpoint.getClass().getName(), null, false);
            this.manager.removeSource(this);
            return;
        }
        LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};", new Object[]{logPeerId(), this.replicationQueueInfo.getQueueId(), this.clusterId, peerUUID});
        initializeWALEntryFilter(peerUUID);
        Iterator<String> it = this.logQueue.getQueues().keySet().iterator();
        while (it.hasNext()) {
            tryStartNewShipper(it.next());
        }
        this.startupOngoing.set(false);
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void startup() {
        this.sourceRunning = true;
        this.startupOngoing.set(true);
        this.initThread = new Thread(this::initialize);
        Threads.setDaemonThreadRunning(this.initThread, Thread.currentThread().getName() + ".replicationSource," + this.queueId, (thread, th) -> {
            this.sourceRunning = false;
            uncaughtException(thread, th, null, null);
            this.retryStartup.set(!this.abortOnError);
            do {
                if (this.retryStartup.get()) {
                    this.sourceRunning = true;
                    this.startupOngoing.set(true);
                    this.retryStartup.set(false);
                    try {
                        initialize();
                    } catch (Throwable th) {
                        this.sourceRunning = false;
                        uncaughtException(thread, th, null, null);
                        this.retryStartup.set(!this.abortOnError);
                    }
                }
                if (!this.startupOngoing.get() && !this.retryStartup.get()) {
                    return;
                }
            } while (!this.abortOnError);
        });
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void terminate(String str) {
        terminate(str, null);
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void terminate(String str, Exception exc) {
        terminate(str, exc, true);
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void terminate(String str, Exception exc, boolean z) {
        terminate(str, exc, z, true);
    }

    public void terminate(String str, Exception exc, boolean z, boolean z2) {
        if (exc == null) {
            LOG.info("{} Closing source {} because: {}", new Object[]{logPeerId(), this.queueId, str});
        } else {
            LOG.error("{} Closing source {} because an error occurred: {}", new Object[]{logPeerId(), this.queueId, str, exc});
        }
        this.sourceRunning = false;
        if (this.initThread != null && Thread.currentThread() != this.initThread) {
            this.initThread.interrupt();
            Threads.shutdown(this.initThread, this.sleepForRetries);
        }
        Collection<ReplicationSourceShipper> values = this.workerThreads.values();
        for (ReplicationSourceShipper replicationSourceShipper : values) {
            replicationSourceShipper.stopWorker();
            replicationSourceShipper.entryReader.setReaderRunning(false);
        }
        for (ReplicationSourceShipper replicationSourceShipper2 : values) {
            if (replicationSourceShipper2.isAlive() || replicationSourceShipper2.entryReader.isAlive()) {
                try {
                    Thread.sleep(this.sleepForRetries);
                } catch (InterruptedException e) {
                    LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), replicationSourceShipper2.getName());
                    Thread.currentThread().interrupt();
                }
                if (replicationSourceShipper2.isAlive()) {
                    replicationSourceShipper2.interrupt();
                }
                if (replicationSourceShipper2.entryReader.isAlive()) {
                    replicationSourceShipper2.entryReader.interrupt();
                }
            }
            replicationSourceShipper2.clearWALEntryBatch();
        }
        if (this.replicationEndpoint != null) {
            this.replicationEndpoint.stop();
        }
        if (z2) {
            for (ReplicationSourceShipper replicationSourceShipper3 : values) {
                Threads.shutdown(replicationSourceShipper3, this.sleepForRetries);
                LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), replicationSourceShipper3.getName());
            }
            if (this.replicationEndpoint != null) {
                try {
                    this.replicationEndpoint.awaitTerminated(this.sleepForRetries * this.maxRetriesMultiplier, TimeUnit.MILLISECONDS);
                } catch (TimeoutException e2) {
                    LOG.warn("{} Got exception while waiting for endpoint to shutdown for replication source : {}", new Object[]{logPeerId(), this.queueId, e2});
                }
            }
        }
        if (z) {
            this.metrics.clear();
        }
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public String getQueueId() {
        return this.queueId;
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public String getPeerId() {
        return this.peerId;
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public Path getCurrentPath() {
        for (ReplicationSourceShipper replicationSourceShipper : this.workerThreads.values()) {
            if (replicationSourceShipper.getCurrentPath() != null) {
                return replicationSourceShipper.getCurrentPath();
            }
        }
        return null;
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public boolean isSourceActive() {
        return !this.server.isStopped() && this.sourceRunning;
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public String getStats() {
        StringBuilder sb = new StringBuilder();
        sb.append("Total replicated edits: ").append(this.totalReplicatedEdits).append(", current progress: \n");
        for (Map.Entry<String, ReplicationSourceShipper> entry : this.workerThreads.entrySet()) {
            String key = entry.getKey();
            ReplicationSourceShipper value = entry.getValue();
            long currentPosition = value.getCurrentPosition();
            Path currentPath = value.getCurrentPath();
            sb.append("walGroup [").append(key).append("]: ");
            if (currentPath != null) {
                sb.append("currently replicating from: ").append(currentPath).append(" at position: ").append(currentPosition).append("\n");
            } else {
                sb.append("no replication ongoing, waiting for new log");
            }
        }
        return sb.toString();
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public MetricsSource getSourceMetrics() {
        return this.metrics;
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void postShipEdits(List<WAL.Entry> list, int i) {
        if (this.throttler.isEnabled()) {
            this.throttler.addPushSize(i);
        }
        this.totalReplicatedEdits.addAndGet(list.size());
        this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(this.totalBufferUsed.addAndGet(-i));
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public WALFileLengthProvider getWALFileLengthProvider() {
        return this.walFileLengthProvider;
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public ServerName getServerWALsBelongTo() {
        return this.server.getServerName();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Server getServer() {
        return this.server;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReplicationQueueStorage getQueueStorage() {
        return this.queueStorage;
    }

    public String logPeerId() {
        return "[Source for peer " + getPeerId() + "]:";
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void updateAdvanceReplicationMetricsData() {
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        for (Map.Entry<String, ReplicationSourceShipper> entry : this.workerThreads.entrySet()) {
            j3 += entry.getValue().getCurrentPosition();
            PriorityBlockingQueue<Path> priorityBlockingQueue = this.logQueue.getQueues().get(entry.getKey());
            if (priorityBlockingQueue != null) {
                Iterator<Path> it = priorityBlockingQueue.iterator();
                while (it.hasNext()) {
                    Path next = it.next();
                    OptionalLong logFileSizeIfBeingWritten = this.walFileLengthProvider.getLogFileSizeIfBeingWritten(next);
                    if (logFileSizeIfBeingWritten.isPresent()) {
                        j2 += logFileSizeIfBeingWritten.getAsLong();
                    } else {
                        j += getFileSizeOnDisk(next);
                    }
                }
            }
        }
        this.metrics.setTotalOfWALInQueue(j);
        this.metrics.setSizeOfWritingWAL(j2);
        this.metrics.setSizeOfShippedWAL(j3);
        this.metrics.setSizeOfLogToReplicate();
    }

    private long getFileSizeOnDisk(Path path) {
        long j = 0;
        try {
            j = getFileSize(path);
        } catch (IOException e) {
            LOG.warn("Error while getting file {} size. ", path, e);
        }
        return j;
    }
}
