package org.apache.hadoop.hbase.replication.regionserver;

import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RetryingCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.EntryBuffers;
import org.apache.hadoop.hbase.wal.OutputSink;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.class */
public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class);
    private static String CLIENT_RETRIES_NUMBER = "hbase.region.replica.replication.client.retries.number";
    private Configuration conf;
    private ClusterConnection connection;
    private TableDescriptors tableDescriptors;
    private WALSplitter.PipelineController controller;
    private RegionReplicaOutputSink outputSink;
    private EntryBuffers entryBuffers;
    private int numWriterThreads;
    private int operationTimeout;
    private ExecutorService pool;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint$RegionReplicaOutputSink.class */
    public static class RegionReplicaOutputSink extends OutputSink {
        private final RegionReplicaSinkWriter sinkWriter;
        private final TableDescriptors tableDescriptors;
        private final Cache<TableName, Boolean> memstoreReplicationEnabled;

        public RegionReplicaOutputSink(WALSplitter.PipelineController pipelineController, TableDescriptors tableDescriptors, EntryBuffers entryBuffers, ClusterConnection clusterConnection, ExecutorService executorService, int i, int i2) {
            super(pipelineController, entryBuffers, i);
            this.sinkWriter = new RegionReplicaSinkWriter(this, clusterConnection, executorService, i2, tableDescriptors);
            this.tableDescriptors = tableDescriptors;
            this.memstoreReplicationEnabled = CacheBuilder.newBuilder().expireAfterWrite(clusterConnection.getConfiguration().getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000), TimeUnit.MILLISECONDS).initialCapacity(10).maximumSize(1000L).build();
        }

        @Override // org.apache.hadoop.hbase.wal.OutputSink
        public void append(EntryBuffers.RegionEntryBuffer regionEntryBuffer) throws IOException {
            List<WAL.Entry> entries = regionEntryBuffer.getEntries();
            if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty() || !requiresReplication(regionEntryBuffer.getTableName(), entries)) {
                return;
            }
            this.sinkWriter.append(regionEntryBuffer.getTableName(), regionEntryBuffer.getEncodedRegionName(), CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries);
        }

        void flush() throws IOException {
            this.entryBuffers.waitUntilDrained();
        }

        @Override // org.apache.hadoop.hbase.wal.OutputSink
        public boolean keepRegionEvent(WAL.Entry entry) {
            return true;
        }

        @Override // org.apache.hadoop.hbase.wal.OutputSink
        public List<Path> close() throws IOException {
            finishWriterThreads(true);
            return null;
        }

        @Override // org.apache.hadoop.hbase.wal.OutputSink
        public Map<String, Long> getOutputCounts() {
            return null;
        }

        @Override // org.apache.hadoop.hbase.wal.OutputSink
        public int getNumberOfRecoveredRegions() {
            return 0;
        }

        AtomicLong getSkippedEditsCounter() {
            return this.totalSkippedEdits;
        }

        private boolean requiresReplication(TableName tableName, List<WAL.Entry> list) throws IOException {
            if (this.tableDescriptors == null) {
                return true;
            }
            Boolean bool = (Boolean) this.memstoreReplicationEnabled.getIfPresent(tableName);
            if (bool == null) {
                TableDescriptor tableDescriptor = this.tableDescriptors.get(tableName);
                bool = Boolean.valueOf(tableDescriptor == null || tableDescriptor.hasRegionMemStoreReplication());
                this.memstoreReplicationEnabled.put(tableName, bool);
            }
            if (!bool.booleanValue()) {
                int i = 0;
                Iterator<WAL.Entry> it = list.iterator();
                while (it.hasNext()) {
                    if (it.next().getEdit().isMetaEdit()) {
                        bool = true;
                    } else {
                        it.remove();
                        i++;
                    }
                }
                this.totalSkippedEdits.addAndGet(i);
            }
            return bool.booleanValue();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.hadoop.hbase.wal.OutputSink
        public int getNumOpenWriters() {
            return 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint$RegionReplicaReplayCallable.class */
    public static class RegionReplicaReplayCallable extends RegionAdminServiceCallable<AdminProtos.ReplicateWALEntryResponse> {
        private final List<WAL.Entry> entries;
        private final byte[] initialEncodedRegionName;
        private final AtomicLong skippedEntries;

        public RegionReplicaReplayCallable(ClusterConnection clusterConnection, RpcControllerFactory rpcControllerFactory, TableName tableName, HRegionLocation hRegionLocation, RegionInfo regionInfo, byte[] bArr, List<WAL.Entry> list, AtomicLong atomicLong) {
            super(clusterConnection, rpcControllerFactory, hRegionLocation, tableName, bArr, regionInfo.getReplicaId());
            this.entries = list;
            this.skippedEntries = atomicLong;
            this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.hadoop.hbase.client.RegionAdminServiceCallable
        public AdminProtos.ReplicateWALEntryResponse call(HBaseRpcController hBaseRpcController) throws Exception {
            boolean z = false;
            if (!Bytes.equals(this.location.getRegionInfo().getEncodedNameAsBytes(), this.initialEncodedRegionName)) {
                z = true;
            }
            if (!this.entries.isEmpty() && !z) {
                Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> buildReplicateWALEntryRequest = ReplicationProtbufUtil.buildReplicateWALEntryRequest((WAL.Entry[]) this.entries.toArray(new WAL.Entry[this.entries.size()]), this.location.getRegionInfo().getEncodedNameAsBytes(), null, null, null);
                hBaseRpcController.setCellScanner(buildReplicateWALEntryRequest.getSecond());
                return this.stub.replay(hBaseRpcController, buildReplicateWALEntryRequest.getFirst());
            }
            if (z) {
                if (RegionReplicaReplicationEndpoint.LOG.isTraceEnabled()) {
                    RegionReplicaReplicationEndpoint.LOG.trace("Skipping " + this.entries.size() + " entries in table " + this.tableName + " because located region " + this.location.getRegionInfo().getEncodedName() + " is different than the original region " + Bytes.toStringBinary(this.initialEncodedRegionName) + " from WALEdit");
                    Iterator<WAL.Entry> it = this.entries.iterator();
                    while (it.hasNext()) {
                        RegionReplicaReplicationEndpoint.LOG.trace("Skipping : " + it.next());
                    }
                }
                this.skippedEntries.addAndGet(this.entries.size());
            }
            return AdminProtos.ReplicateWALEntryResponse.newBuilder().build();
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint$RegionReplicaSinkWriter.class */
    static class RegionReplicaSinkWriter {
        RegionReplicaOutputSink sink;
        ClusterConnection connection;
        RpcControllerFactory rpcControllerFactory;
        RpcRetryingCallerFactory rpcRetryingCallerFactory;
        int operationTimeout;
        ExecutorService pool;
        Cache<TableName, Boolean> disabledAndDroppedTables;
        TableDescriptors tableDescriptors;

        public RegionReplicaSinkWriter(RegionReplicaOutputSink regionReplicaOutputSink, ClusterConnection clusterConnection, ExecutorService executorService, int i, TableDescriptors tableDescriptors) {
            this.sink = regionReplicaOutputSink;
            this.connection = clusterConnection;
            this.operationTimeout = i;
            this.rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(clusterConnection.getConfiguration());
            this.rpcControllerFactory = RpcControllerFactory.instantiate(clusterConnection.getConfiguration());
            this.pool = executorService;
            this.tableDescriptors = tableDescriptors;
            this.disabledAndDroppedTables = CacheBuilder.newBuilder().expireAfterWrite(clusterConnection.getConfiguration().getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000), TimeUnit.MILLISECONDS).initialCapacity(10).maximumSize(1000L).build();
        }

        /* JADX WARN: Code restructure failed: missing block: B:117:0x00d8, code lost:
        
            throw new org.apache.hadoop.hbase.HBaseIOException("Cannot locate locations for " + r12 + ", row:" + org.apache.hadoop.hbase.util.Bytes.toStringBinary(r14));
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void append(org.apache.hadoop.hbase.TableName r12, byte[] r13, byte[] r14, java.util.List<org.apache.hadoop.hbase.wal.WAL.Entry> r15) throws java.io.IOException {
            /*
                Method dump skipped, instructions count: 1128
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter.append(org.apache.hadoop.hbase.TableName, byte[], byte[], java.util.List):void");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint$RetryingRpcCallable.class */
    public static class RetryingRpcCallable<V> implements Callable<V> {
        RpcRetryingCallerFactory factory;
        RetryingCallable<V> callable;
        int timeout;

        public RetryingRpcCallable(RpcRetryingCallerFactory rpcRetryingCallerFactory, RetryingCallable<V> retryingCallable, int i) {
            this.factory = rpcRetryingCallerFactory;
            this.callable = retryingCallable;
            this.timeout = i;
        }

        @Override // java.util.concurrent.Callable
        public V call() throws Exception {
            return (V) this.factory.newCaller().callWithRetries(this.callable, this.timeout);
        }
    }

    @Override // org.apache.hadoop.hbase.replication.BaseReplicationEndpoint, org.apache.hadoop.hbase.replication.ReplicationEndpoint
    public void init(ReplicationEndpoint.Context context) throws IOException {
        super.init(context);
        this.conf = HBaseConfiguration.create(context.getConfiguration());
        this.tableDescriptors = context.getTableDescriptors();
        int i = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 15);
        if (i > 10) {
            i /= this.conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 3);
        }
        this.conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
        this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, this.conf.getInt(CLIENT_RETRIES_NUMBER, i));
        this.numWriterThreads = this.conf.getInt("hbase.region.replica.replication.writer.threads", 3);
        this.controller = new WALSplitter.PipelineController();
        this.entryBuffers = new EntryBuffers(this.controller, this.conf.getLong("hbase.region.replica.replication.buffersize", 134217728L));
        this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 1200000);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint
    public void doStart() {
        try {
            this.connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
            this.pool = getDefaultThreadPool(this.conf);
            this.outputSink = new RegionReplicaOutputSink(this.controller, this.tableDescriptors, this.entryBuffers, this.connection, this.pool, this.numWriterThreads, this.operationTimeout);
            this.outputSink.startWriterThreads();
            super.doStart();
        } catch (IOException e) {
            LOG.warn("Received exception while creating connection :" + e);
            notifyFailed(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint
    public void doStop() {
        if (this.outputSink != null) {
            try {
                this.outputSink.close();
            } catch (IOException e) {
                LOG.warn("Got exception while trying to close OutputSink", e);
            }
        }
        if (this.pool != null) {
            this.pool.shutdownNow();
            try {
                if (!this.pool.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                    LOG.warn("Failed to shutdown the thread pool after 10 seconds");
                }
            } catch (InterruptedException e2) {
                LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e2);
            }
        }
        if (this.connection != null) {
            try {
                this.connection.close();
            } catch (IOException e3) {
                LOG.warn("Got exception closing connection :" + e3);
            }
        }
        super.doStop();
    }

    private ExecutorService getDefaultThreadPool(Configuration configuration) {
        int i = configuration.getInt("hbase.region.replica.replication.threads.max", Opcodes.ACC_NATIVE);
        if (i == 0) {
            i = Runtime.getRuntime().availableProcessors() * 8;
        }
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(i, i, configuration.getLong("hbase.region.replica.replication.threads.keepalivetime", 60L), TimeUnit.SECONDS, new LinkedBlockingQueue(i * configuration.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 100)), Threads.newDaemonThreadFactory(getClass().getSimpleName() + "-rpc-shared-"));
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        return threadPoolExecutor;
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationEndpoint
    public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
        while (isRunning()) {
            try {
                Iterator<WAL.Entry> it = replicateContext.getEntries().iterator();
                while (it.hasNext()) {
                    this.entryBuffers.appendEntry(it.next());
                }
                this.outputSink.flush();
                this.ctx.getMetrics().incrLogEditsFiltered(this.outputSink.getSkippedEditsCounter().getAndSet(0L));
                return true;
            } catch (IOException e) {
                LOG.warn("Received IOException while trying to replicate" + StringUtils.stringifyException(e));
                this.outputSink.restartWriterThreadsIfNeeded();
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        return false;
    }

    @Override // org.apache.hadoop.hbase.replication.BaseReplicationEndpoint, org.apache.hadoop.hbase.replication.ReplicationEndpoint
    public boolean canReplicateToSameCluster() {
        return true;
    }

    @Override // org.apache.hadoop.hbase.replication.BaseReplicationEndpoint
    protected WALEntryFilter getScopeWALEntryFilter() {
        return null;
    }
}
