package org.apache.hadoop.hdfs.server.federation.router;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
import org.apache.hadoop.hdfs.server.federation.MockNamenode;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.class */
public class TestRouterFaultTolerant {
    private static final Logger LOG = LoggerFactory.getLogger(TestRouterFaultTolerant.class);
    private static final int NUM_FILES = 10;
    private static final int NUM_ROUTERS = 2;
    private Map<String, MockNamenode> namenodes = new HashMap();
    private List<Router> routers = new ArrayList();
    private ExecutorService service;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant$TaskResults.class */
    public static class TaskResults {
        private final AtomicInteger success = new AtomicInteger(0);
        private final AtomicInteger failure = new AtomicInteger(0);

        TaskResults() {
        }

        public void incrSuccess() {
            this.success.incrementAndGet();
        }

        public void incrFailure() {
            this.failure.incrementAndGet();
        }

        public int getSuccess() {
            return this.success.get();
        }

        public int getFailure() {
            return this.failure.get();
        }

        public int getTotal() {
            return this.success.get() + this.failure.get();
        }

        public String toString() {
            return "Success=" + getSuccess() + " Failure=" + getFailure();
        }
    }

    @Before
    public void setup() throws Exception {
        LOG.info("Start the Namenodes");
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        hdfsConfiguration.setInt("dfs.namenode.handler.count", NUM_FILES);
        for (String str : Arrays.asList("ns0", "ns1")) {
            MockNamenode mockNamenode = new MockNamenode(str, hdfsConfiguration);
            mockNamenode.transitionToActive();
            mockNamenode.addFileSystemMock();
            this.namenodes.put(str, mockNamenode);
        }
        LOG.info("Start the Routers");
        Configuration build = new RouterConfigBuilder().stateStore().admin().rpc().build();
        build.set("dfs.federation.router.rpc-address", "0.0.0.0:0");
        build.set("dfs.federation.router.http-address", "0.0.0.0:0");
        build.set("dfs.federation.router.admin-address", "0.0.0.0:0");
        build.setTimeDuration("dfs.federation.router.connect.timeout", 500L, TimeUnit.MILLISECONDS);
        Configuration stateStoreConfiguration = FederationStateStoreTestUtils.getStateStoreConfiguration();
        stateStoreConfiguration.setClass("dfs.federation.router.namenode.resolver.client.class", MembershipNamenodeResolver.class, ActiveNamenodeResolver.class);
        stateStoreConfiguration.setClass("dfs.federation.router.file.resolver.client.class", MultipleDestinationMountTableResolver.class, FileSubclusterResolver.class);
        build.addResource(stateStoreConfiguration);
        int i = 0;
        while (i < NUM_ROUTERS) {
            build.setBoolean("dfs.federation.router.client.allow-partial-listing", i != 0);
            Router router = new Router();
            router.init(build);
            router.start();
            this.routers.add(router);
            i++;
        }
        LOG.info("Registering the subclusters in the Routers");
        MockNamenode.registerSubclusters(this.routers, this.namenodes.values(), Collections.singleton("ns1"));
        this.service = Executors.newFixedThreadPool(NUM_FILES);
    }

    @After
    public void cleanup() throws Exception {
        LOG.info("Stopping the cluster");
        Iterator<MockNamenode> it = this.namenodes.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        this.namenodes.clear();
        this.routers.forEach(router -> {
            router.stop();
        });
        this.routers.clear();
        if (this.service != null) {
            this.service.shutdown();
            this.service = null;
        }
    }

    private void updateMountPointFaultTolerant(String str) throws IOException {
        MountTableManager mountTableManager = FederationTestUtils.getAdminClient(getRandomRouter()).getMountTableManager();
        MountTable mountTable = (MountTable) mountTableManager.getMountTableEntries(GetMountTableEntriesRequest.newInstance(str)).getEntries().get(0);
        mountTable.setFaultTolerant(true);
        Assert.assertTrue(mountTableManager.updateMountTableEntry(UpdateMountTableEntryRequest.newInstance(mountTable)).getStatus());
        FederationTestUtils.refreshRoutersCaches(this.routers);
    }

    @Test
    public void testWriteWithFailedSubcluster() throws Exception {
        LOG.info("Stop ns1 to simulate an unavailable subcluster");
        this.namenodes.get("ns1").stop();
        ArrayList arrayList = new ArrayList();
        for (DestinationOrder destinationOrder : Arrays.asList(DestinationOrder.HASH_ALL, DestinationOrder.SPACE, DestinationOrder.RANDOM, DestinationOrder.HASH)) {
            arrayList.add(() -> {
                testWriteWithFailedSubcluster(destinationOrder);
                return true;
            });
        }
        Assert.assertEquals(r0.size(), collectResults("Full tests", arrayList).getSuccess());
    }

    private void testWriteWithFailedSubcluster(DestinationOrder destinationOrder) throws Exception {
        FileSystem fileSystem = FederationTestUtils.getFileSystem(this.routers.get(0));
        FileSystem fileSystem2 = FederationTestUtils.getFileSystem(this.routers.get(1));
        FileSystem fileSystem3 = FederationTestUtils.getFileSystem(this.namenodes.get("ns0").getRPCPort());
        String str = "/" + destinationOrder + "-failsubcluster";
        Path path = new Path(str);
        LOG.info("Setup {} with order {}", str, destinationOrder);
        FederationTestUtils.createMountTableEntry(getRandomRouter(), str, destinationOrder, this.namenodes.keySet());
        LOG.info("Write in {} should succeed writing in ns0 and fail for ns1", path);
        checkDirectoriesFaultTolerant(path, destinationOrder, fileSystem, fileSystem2, fileSystem3, false);
        checkFilesFaultTolerant(path, destinationOrder, fileSystem, fileSystem2, fileSystem3, false);
        LOG.info("Make {} fault tolerant and everything succeeds", path);
        IOException iOException = null;
        try {
            updateMountPointFaultTolerant(str);
        } catch (IOException e) {
            iOException = e;
        }
        if (!DestinationOrder.FOLDER_ALL.contains(destinationOrder)) {
            Assert.assertTrue(iOException.getMessage().startsWith("Invalid entry, fault tolerance only supported for ALL order"));
            return;
        }
        Assert.assertNull(iOException);
        checkDirectoriesFaultTolerant(path, destinationOrder, fileSystem, fileSystem2, fileSystem3, true);
        checkFilesFaultTolerant(path, destinationOrder, fileSystem, fileSystem2, fileSystem3, true);
    }

    private void checkDirectoriesFaultTolerant(Path path, DestinationOrder destinationOrder, FileSystem fileSystem, FileSystem fileSystem2, FileSystem fileSystem3, boolean z) throws Exception {
        FileStatus[] listStatus = listStatus(fileSystem2, path);
        LOG.info("Create directories in {}", path);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < NUM_FILES; i++) {
            arrayList.add(getDirCreateTask(getRandomRouterFileSystem(), new Path(path, String.format("dir-%s-%03d", Boolean.valueOf(z), Integer.valueOf(i)))));
        }
        TaskResults collectResults = collectResults("Create dir " + path, arrayList);
        LOG.info("Check directories results for {}: {}", path, collectResults);
        if (z || DestinationOrder.FOLDER_ALL.contains(destinationOrder)) {
            Assert.assertEquals(10L, collectResults.getSuccess());
            Assert.assertEquals(0L, collectResults.getFailure());
        } else {
            assertBothResults("check dir " + path, NUM_FILES, collectResults);
        }
        LOG.info("Check directories listing for {}", path);
        arrayList.add(getListFailTask(fileSystem, path));
        int length = listStatus.length + collectResults.getSuccess();
        arrayList.add(getListSuccessTask(fileSystem2, path, length));
        Assert.assertEquals("Failed listing", 2L, collectResults("List " + path, arrayList).getSuccess());
        arrayList.add(getContentSummaryFailTask(fileSystem, path));
        arrayList.add(getContentSummarySuccessTask(fileSystem2, path, length));
        Assert.assertEquals("Failed content summary", 2L, collectResults("Content summary " + path, arrayList).getSuccess());
    }

    private void checkFilesFaultTolerant(Path path, DestinationOrder destinationOrder, FileSystem fileSystem, FileSystem fileSystem2, FileSystem fileSystem3, boolean z) throws Exception {
        Path pathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(listStatus(fileSystem2, path)[0].getPath());
        LOG.info("Create files in {}", pathWithoutSchemeAndAuthority);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < NUM_FILES; i++) {
            arrayList.add(getFileCreateTask(getRandomRouterFileSystem(), String.format("%s/file-%03d.txt", pathWithoutSchemeAndAuthority, Integer.valueOf(i)), fileSystem3));
        }
        TaskResults collectResults = collectResults("Create file " + pathWithoutSchemeAndAuthority, arrayList);
        LOG.info("Check files results for {}: {}", pathWithoutSchemeAndAuthority, collectResults);
        if (z) {
            Assert.assertEquals("Not enough success in " + path, 10L, collectResults.getSuccess());
            Assert.assertEquals("Nothing should fail in " + path, 0L, collectResults.getFailure());
        } else {
            Assert.assertEquals("Nothing should succeed in " + path, 0L, collectResults.getSuccess());
            Assert.assertEquals("Everything should fail in " + path, 10L, collectResults.getFailure());
        }
        LOG.info("Check files listing for {}", pathWithoutSchemeAndAuthority);
        arrayList.add(getListFailTask(fileSystem, pathWithoutSchemeAndAuthority));
        arrayList.add(getListSuccessTask(fileSystem2, pathWithoutSchemeAndAuthority, collectResults.getSuccess()));
        Assert.assertEquals(2L, collectResults("List " + pathWithoutSchemeAndAuthority, arrayList).getSuccess());
        arrayList.add(getContentSummaryFailTask(fileSystem, pathWithoutSchemeAndAuthority));
        arrayList.add(getContentSummarySuccessTask(fileSystem2, pathWithoutSchemeAndAuthority, collectResults.getSuccess()));
        Assert.assertEquals(2L, collectResults("Content summary " + pathWithoutSchemeAndAuthority, arrayList).getSuccess());
    }

    private static String toString(FileStatus[] fileStatusArr) {
        StringBuilder sb = new StringBuilder();
        sb.append("[");
        for (FileStatus fileStatus : fileStatusArr) {
            if (sb.length() > 1) {
                sb.append(", ");
            }
            sb.append(Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()));
        }
        sb.append("]");
        return sb.toString();
    }

    private FileStatus[] listStatus(FileSystem fileSystem, Path path) throws IOException {
        FileStatus[] fileStatusArr = new FileStatus[0];
        try {
            fileStatusArr = fileSystem.listStatus(path);
        } catch (FileNotFoundException e) {
            LOG.debug("File not found: {}", e.getMessage());
        }
        return fileStatusArr;
    }

    private static Callable<Boolean> getFileCreateTask(FileSystem fileSystem, String str, FileSystem fileSystem2) {
        return () -> {
            try {
                Path path = new Path(str);
                fileSystem.create(path).close();
                FileStatus fileStatus = fileSystem2.getFileStatus(path);
                Assert.assertTrue("File not created properly: " + fileStatus, fileStatus.getLen() > 0);
                return true;
            } catch (RemoteException e) {
                return false;
            }
        };
    }

    private static Callable<Boolean> getDirCreateTask(FileSystem fileSystem, Path path) {
        return () -> {
            try {
                fileSystem.mkdirs(path);
                return true;
            } catch (RemoteException e) {
                return false;
            }
        };
    }

    private static Callable<Boolean> getListFailTask(FileSystem fileSystem, Path path) {
        return () -> {
            try {
                fileSystem.listStatus(path);
                return false;
            } catch (RemoteException e) {
                return true;
            }
        };
    }

    private static Callable<Boolean> getListSuccessTask(FileSystem fileSystem, Path path, int i) {
        return () -> {
            Assert.assertEquals(toString(fileSystem.listStatus(path)), i, r0.length);
            return true;
        };
    }

    private static Callable<Boolean> getContentSummaryFailTask(FileSystem fileSystem, Path path) {
        return () -> {
            try {
                fileSystem.getContentSummary(path);
                return false;
            } catch (RemoteException e) {
                return true;
            }
        };
    }

    private static Callable<Boolean> getContentSummarySuccessTask(FileSystem fileSystem, Path path, int i) {
        return () -> {
            Assert.assertEquals("Wrong summary for " + path, i, fileSystem.getContentSummary(path).getFileAndDirectoryCount());
            return true;
        };
    }

    private TaskResults collectResults(String str, Collection<Callable<Boolean>> collection) throws Exception {
        TaskResults taskResults = new TaskResults();
        this.service.invokeAll(collection).forEach(future -> {
            try {
                if (((Boolean) future.get()).booleanValue()) {
                    LOG.info("Got success for {}", str);
                    taskResults.incrSuccess();
                } else {
                    LOG.info("Got failure for {}", str);
                    taskResults.incrFailure();
                }
            } catch (Exception e) {
                StringWriter stringWriter = new StringWriter();
                PrintWriter printWriter = new PrintWriter(stringWriter);
                if (e instanceof ExecutionException) {
                    e.getCause().printStackTrace(printWriter);
                } else {
                    e.printStackTrace(printWriter);
                }
                Assert.fail("Failed to run \"" + str + "\": " + stringWriter);
            }
        });
        collection.clear();
        return taskResults;
    }

    private static void assertBothResults(String str, int i, TaskResults taskResults) {
        Assert.assertEquals(str, i, taskResults.getTotal());
        Assert.assertTrue("Expected some success for " + str, taskResults.getSuccess() > 0);
        Assert.assertTrue("Expected some failure for " + str, taskResults.getFailure() > 0);
    }

    private Router getRandomRouter() {
        return this.routers.get(new Random().nextInt(this.routers.size()));
    }

    private FileSystem getRandomRouterFileSystem() throws Exception {
        UserGroupInformation createUserForTesting = UserGroupInformation.createUserForTesting("user-" + UUID.randomUUID(), new String[]{"group"});
        Router randomRouter = getRandomRouter();
        return (FileSystem) createUserForTesting.doAs(() -> {
            return FederationTestUtils.getFileSystem(randomRouter);
        });
    }

    @Test
    public void testReadWithFailedSubcluster() throws Exception {
        DestinationOrder destinationOrder = DestinationOrder.HASH_ALL;
        String str = "/" + destinationOrder + "-testread";
        Path path = new Path(str);
        LOG.info("Setup {} with order {}", str, destinationOrder);
        FederationTestUtils.createMountTableEntry(this.routers, str, destinationOrder, this.namenodes.keySet());
        FileSystem randomRouterFileSystem = getRandomRouterFileSystem();
        Path path2 = new Path(path, "fileexisting");
        Path path3 = new Path(path, "filenotexisting");
        FSDataOutputStream create = randomRouterFileSystem.create(path2);
        Assert.assertNotNull(create);
        create.close();
        Assert.assertNotNull("We should be able to read the file", randomRouterFileSystem.open(path2));
        LambdaTestUtils.intercept(FileNotFoundException.class, () -> {
            return randomRouterFileSystem.open(path3);
        });
        String str2 = null;
        for (Map.Entry<String, MockNamenode> entry : this.namenodes.entrySet()) {
            String key = entry.getKey();
            try {
                Assert.assertNotNull(FederationTestUtils.getFileSystem(entry.getValue().getRPCPort()).getFileStatus(path2));
                Assert.assertNull("The file cannot be in two subclusters", str2);
                str2 = key;
            } catch (FileNotFoundException e) {
                LOG.debug("File not found in {}", key);
            }
        }
        Assert.assertNotNull("The file has to be in one subcluster", str2);
        LOG.info("Stop {} to simulate an unavailable subcluster", str2);
        this.namenodes.get(str2).stop();
        try {
            randomRouterFileSystem.open(path2);
            Assert.fail("It should throw an unavailable cluster exception");
        } catch (RemoteException e2) {
            IOException unwrapRemoteException = e2.unwrapRemoteException();
            Assert.assertTrue("Expected an unavailable exception for:" + unwrapRemoteException.getClass(), RouterRpcClient.isUnavailableException(unwrapRemoteException));
        }
    }
}
