package org.apache.hadoop.hbase.replication.regionserver;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.class */
public class HFileReplicator implements Callable<Void>, Closeable {
    public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY = "hbase.replication.bulkload.copy.maxthreads";
    public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10;
    public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY = "hbase.replication.bulkload.copy.hfiles.perthread";
    public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10;
    private static final String UNDERSCORE = "_";
    private Configuration sourceClusterConf;
    private String sourceBaseNamespaceDirPath;
    private String sourceHFileArchiveDirPath;
    private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap;
    private FileSystem sinkFs;
    private FsDelegationToken fsDelegationToken;
    private UserProvider userProvider;
    private Configuration conf;
    private Connection connection;
    private Path hbaseStagingDir;
    private ThreadPoolExecutor exec;
    private int maxCopyThreads;
    private int copiesPerThread;
    private List<String> sourceClusterIds;
    private static final Logger LOG = LoggerFactory.getLogger(HFileReplicator.class);
    private static final FsPermission PERM_700 = FsPermission.valueOf("-rwx------");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/HFileReplicator$Copier.class */
    public class Copier implements Callable<Void> {
        private FileSystem sourceFs;
        private Path stagingDir;
        private List<String> hfiles;

        public Copier(FileSystem fileSystem, Path path, List<String> list) throws IOException {
            this.sourceFs = fileSystem;
            this.stagingDir = path;
            this.hfiles = list;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws IOException {
            int size = this.hfiles.size();
            HFileReplicator.LOG.debug("Total HFiles received for copy: " + size);
            if (HFileReplicator.LOG.isTraceEnabled()) {
                HFileReplicator.LOG.trace("HFiles are: " + this.hfiles);
            }
            for (int i = 0; i < size; i++) {
                Path path = new Path(HFileReplicator.this.sourceBaseNamespaceDirPath, this.hfiles.get(i));
                Path path2 = new Path(this.stagingDir, path.getName());
                try {
                    FileUtil.copy(this.sourceFs, path, HFileReplicator.this.sinkFs, path2, false, HFileReplicator.this.conf);
                } catch (FileNotFoundException e) {
                    HFileReplicator.LOG.warn("Failed to copy hfile from " + path + " to " + path2 + ". Trying to copy from hfile archive directory.", e);
                    Path path3 = new Path(HFileReplicator.this.sourceHFileArchiveDirPath, this.hfiles.get(i));
                    try {
                        FileUtil.copy(this.sourceFs, path3, HFileReplicator.this.sinkFs, path2, false, HFileReplicator.this.conf);
                    } catch (FileNotFoundException e2) {
                        HFileReplicator.LOG.error("Failed to copy hfile from " + path3 + " to " + path2 + ". Hence ignoring this hfile from replication..", e2);
                    }
                }
                HFileReplicator.this.sinkFs.setPermission(path2, HFileReplicator.PERM_700);
            }
            return null;
        }
    }

    public HFileReplicator(Configuration configuration, String str, String str2, Map<String, List<Pair<byte[], List<String>>>> map, Configuration configuration2, Connection connection, List<String> list) throws IOException {
        this.sourceClusterConf = configuration;
        this.sourceBaseNamespaceDirPath = str;
        this.sourceHFileArchiveDirPath = str2;
        this.bulkLoadHFileMap = map;
        this.conf = configuration2;
        this.connection = connection;
        this.sourceClusterIds = list;
        this.userProvider = UserProvider.instantiate(configuration2);
        this.fsDelegationToken = new FsDelegationToken(this.userProvider, "renewer");
        this.hbaseStagingDir = new Path(CommonFSUtils.getRootDir(configuration2), HConstants.BULKLOAD_STAGING_DIR_NAME);
        this.maxCopyThreads = this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY, 10);
        this.exec = Threads.getBoundedCachedThreadPool(this.maxCopyThreads, 60L, TimeUnit.SECONDS, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HFileReplicationCopier-%1$d-" + this.sourceBaseNamespaceDirPath).build());
        this.copiesPerThread = configuration2.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY, 10);
        this.sinkFs = FileSystem.get(configuration2);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.exec != null) {
            this.exec.shutdown();
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Void call() throws Exception {
        try {
            return doBulkLoad();
        } finally {
            close();
        }
    }

    private Void doBulkLoad() throws Exception {
        for (Map.Entry<String, Path> entry : copyHFilesToStagingDir().entrySet()) {
            String key = entry.getKey();
            Path value = entry.getValue();
            LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(this.conf);
            loadIncrementalHFiles.setClusterIds(this.sourceClusterIds);
            Configuration create = HBaseConfiguration.create(this.conf);
            create.set("create.table", "no");
            loadIncrementalHFiles.setConf(create);
            TableName valueOf = TableName.valueOf(key);
            Table table = this.connection.getTable(valueOf);
            LinkedList linkedList = new LinkedList();
            loadIncrementalHFiles.prepareHFileQueue(value, table, (Deque<LoadIncrementalHFiles.LoadQueueItem>) linkedList, false);
            if (linkedList.isEmpty()) {
                LOG.warn("Did not find any files to replicate in directory {}", value.toUri());
                return null;
            }
            try {
                RegionLocator regionLocator = this.connection.getRegionLocator(valueOf);
                Throwable th = null;
                try {
                    try {
                        Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
                        this.fsDelegationToken.acquireDelegationToken(this.sinkFs);
                        loadIncrementalHFiles.setBulkToken(value.toString());
                        loadIncrementalHFiles.loadHFileQueue(table, this.connection, linkedList, startEndKeys);
                        if (!linkedList.isEmpty()) {
                            retryBulkLoadForFailedQueues(linkedList, loadIncrementalHFiles, table, regionLocator);
                        }
                        if (regionLocator != null) {
                            if (0 != 0) {
                                try {
                                    regionLocator.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                regionLocator.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
                cleanup(value.toString(), table);
            }
        }
        return null;
    }

    private void cleanup(String str, Table table) {
        this.fsDelegationToken.releaseDelegationToken();
        if (str != null) {
            try {
                this.sinkFs.delete(new Path(str), true);
            } catch (IOException e) {
                LOG.warn("Failed to delete the staging directory " + str, e);
            }
        }
        if (table != null) {
            try {
                table.close();
            } catch (IOException e2) {
                LOG.warn("Failed to close the table.", e2);
            }
        }
    }

    private void retryBulkLoadForFailedQueues(Deque<LoadIncrementalHFiles.LoadQueueItem> deque, LoadIncrementalHFiles loadIncrementalHFiles, Table table, RegionLocator regionLocator) throws IOException, ReplicationException {
        int i = 0;
        int i2 = this.conf.getInt("hbase.loadincremental.retries.number", 10);
        do {
            i++;
            if (i != 0) {
                LOG.warn("Error occured while replicating HFiles, retry attempt " + i + " with " + deque.size() + " files still remaining to replicate.");
            }
            if (i2 != 0 && i >= i2) {
                throw new ReplicationException("Retry attempted " + i + " times without completing, bailing out.");
            }
            loadIncrementalHFiles.loadHFileQueue(table, this.connection, deque, regionLocator.getStartEndKeys());
        } while (!deque.isEmpty());
    }

    private Map<String, Path> copyHFilesToStagingDir() throws IOException {
        HashMap hashMap = new HashMap();
        FileSystem fileSystem = null;
        try {
            Path path = new Path(this.sourceBaseNamespaceDirPath);
            this.sourceClusterConf.setBoolean(String.format("fs.%s.impl.disable.cache", path.toUri().getScheme()), true);
            fileSystem = path.getFileSystem(this.sourceClusterConf);
            User current = this.userProvider.getCurrent();
            for (Map.Entry<String, List<Pair<byte[], List<String>>>> entry : this.bulkLoadHFileMap.entrySet()) {
                String key = entry.getKey();
                Path createStagingDir = createStagingDir(this.hbaseStagingDir, current, TableName.valueOf(key));
                List<Pair<byte[], List<String>>> value = entry.getValue();
                int size = value.size();
                for (int i = 0; i < size; i++) {
                    Pair<byte[], List<String>> pair = value.get(i);
                    byte[] first = pair.getFirst();
                    List<String> second = pair.getSecond();
                    Path path2 = new Path(createStagingDir, Bytes.toString(first));
                    int size2 = second.size();
                    ArrayList arrayList = new ArrayList();
                    int i2 = 0;
                    while (size2 > i2 + this.copiesPerThread) {
                        arrayList.add(this.exec.submit(new Copier(fileSystem, path2, second.subList(i2, i2 + this.copiesPerThread))));
                        i2 += this.copiesPerThread;
                    }
                    int i3 = size2 - i2;
                    if (i3 > 0) {
                        arrayList.add(this.exec.submit(new Copier(fileSystem, path2, second.subList(i2, i2 + i3))));
                    }
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        try {
                            ((Future) it.next()).get();
                        } catch (InterruptedException | ExecutionException e) {
                            throw new IOException("Failed to copy HFiles to local file system.", e);
                        }
                    }
                }
                hashMap.put(key, createStagingDir);
            }
            if (fileSystem != null) {
                fileSystem.close();
            }
            if (this.exec != null) {
                this.exec.shutdown();
            }
            return hashMap;
        } catch (Throwable th) {
            if (fileSystem != null) {
                fileSystem.close();
            }
            if (this.exec != null) {
                this.exec.shutdown();
            }
            throw th;
        }
    }

    private Path createStagingDir(Path path, User user, TableName tableName) throws IOException {
        return createStagingDir(path, user, user.getShortName() + "__" + tableName.getNameAsString().replace(Addressing.HOSTNAME_PORT_SEPARATOR, UNDERSCORE) + "__" + new BigInteger(320, ThreadLocalRandom.current()).toString(32));
    }

    private Path createStagingDir(Path path, User user, String str) throws IOException {
        Path path2 = new Path(path, str);
        this.sinkFs.mkdirs(path2, PERM_700);
        this.sinkFs.setPermission(path2, PERM_700);
        return path2;
    }
}
