package org.apache.atlas.hook;

import java.io.IOException;
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode$Enforced$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.p001sparkproject.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/atlas/hook/AtlasTopicCreator.class */
public class AtlasTopicCreator {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AtlasTopicCreator.class);
    public static final String ATLAS_NOTIFICATION_CREATE_TOPICS_KEY = "atlas.notification.create.topics";

    public void createAtlasTopic(Configuration configuration, String... strArr) {
        if (!configuration.getBoolean(ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)) {
            LOG.info("Not creating topics {} as {} is false", StringUtils.join(strArr, ","), ATLAS_NOTIFICATION_CREATE_TOPICS_KEY);
            return;
        }
        if (handleSecurity(configuration)) {
            ZkUtils createZkUtils = createZkUtils(configuration);
            for (String str : strArr) {
                try {
                    LOG.warn("Attempting to create topic {}", str);
                    if (ifTopicExists(str, createZkUtils)) {
                        LOG.warn("Ignoring call to create topic {}, as it already exists.", str);
                    } else {
                        createTopic(configuration, str, createZkUtils);
                    }
                } catch (Throwable th) {
                    LOG.error("Failed while creating topic {}", str, th);
                }
            }
            createZkUtils.close();
        }
    }

    @VisibleForTesting
    protected boolean handleSecurity(Configuration configuration) {
        if (!AuthenticationUtil.isKerberosAuthenticationEnabled(configuration)) {
            return true;
        }
        String string = configuration.getString("atlas.notification.kafka.service.principal");
        String string2 = configuration.getString("atlas.notification.kafka.keytab.location");
        org.apache.hadoop.conf.Configuration configuration2 = new org.apache.hadoop.conf.Configuration();
        SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, configuration2);
        try {
            String serverPrincipal = SecurityUtil.getServerPrincipal(string, (String) null);
            UserGroupInformation.setConfiguration(configuration2);
            UserGroupInformation.loginUserFromKeytab(serverPrincipal, string2);
            return true;
        } catch (IOException e) {
            LOG.warn("Could not login as {} from keytab file {}", string, string2, e);
            return false;
        }
    }

    @VisibleForTesting
    protected boolean ifTopicExists(String str, ZkUtils zkUtils) {
        return AdminUtils.topicExists(zkUtils, str);
    }

    @VisibleForTesting
    protected void createTopic(Configuration configuration, String str, ZkUtils zkUtils) {
        int i = configuration.getInt("atlas.notification.hook.numthreads", 1);
        int i2 = configuration.getInt("atlas.notification.replicas", 1);
        AdminUtils.createTopic(zkUtils, str, i, i2, new Properties(), RackAwareMode$Enforced$.MODULE$);
        LOG.warn("Created topic {} with partitions {} and replicas {}", str, Integer.valueOf(i), Integer.valueOf(i2));
    }

    @VisibleForTesting
    protected ZkUtils createZkUtils(Configuration configuration) {
        Tuple2 createZkClientAndConnection = ZkUtils.createZkClientAndConnection(configuration.getString("atlas.kafka.zookeeper.connect"), configuration.getInt("atlas.kafka.zookeeper.session.timeout.ms", 400), configuration.getInt("atlas.kafka.zookeeper.connection.timeout.ms", 200));
        return new ZkUtils((ZkClient) createZkClientAndConnection.mo14794_1(), (ZkConnection) createZkClientAndConnection.mo14793_2(), false);
    }

    public static void main(String[] strArr) throws AtlasException {
        new AtlasTopicCreator().createAtlasTopic(ApplicationProperties.get(), strArr);
    }
}
