package org.apache.flume.sink.hive;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.auth.FlumeAuthenticationUtil;
import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.sink.hive.HiveWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/hive/HiveSink.class */
public class HiveSink extends AbstractSink implements Configurable, BatchSizeSupported {
    private static final Logger LOG = LoggerFactory.getLogger(HiveSink.class);
    private static final int DEFAULT_MAXOPENCONNECTIONS = 500;
    private static final int DEFAULT_TXNSPERBATCH = 100;
    private static final int DEFAULT_BATCHSIZE = 15000;
    private static final int DEFAULT_CALLTIMEOUT = 10000;
    private static final int DEFAULT_IDLETIMEOUT = 0;
    private static final int DEFAULT_HEARTBEATINTERVAL = 240;
    private Map<HiveEndPoint, HiveWriter> allWriters;
    private SinkCounter sinkCounter;
    private volatile int idleTimeout;
    private String metaStoreUri;
    private String proxyUser;
    private String database;
    private String table;
    private List<String> partitionVals;
    private Integer txnsPerBatchAsk;
    private Integer batchSize;
    private Integer maxOpenConnections;
    private boolean autoCreatePartitions;
    private String serializerType;
    private HiveEventSerializer serializer;
    private String partitionTransformerClass;
    private PartitionTransformer transformer;
    private PrivilegedExecutor privExecutor;
    private Integer callTimeout;
    private Integer heartBeatInterval;
    private ExecutorService callTimeoutPool;
    private boolean useLocalTime;
    private TimeZone timeZone;
    private boolean needRounding;
    private int roundUnit;
    private Integer roundValue;
    private Timer heartBeatTimer = new Timer();
    private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
    private Boolean generatePartitionFromEventBody;
    private HiveConf hiveConf;

    @VisibleForTesting
    Map<HiveEndPoint, HiveWriter> getAllWriters() {
        return this.allWriters;
    }

    public HiveConf getHiveConf() {
        return this.hiveConf;
    }

    public void configure(Context context) {
        this.metaStoreUri = context.getString(Config.HIVE_METASTORE);
        if (this.metaStoreUri == null) {
            throw new IllegalArgumentException("hive.metastore config setting is not specified for sink " + getName());
        }
        if (this.metaStoreUri.equalsIgnoreCase("null")) {
            this.metaStoreUri = null;
        }
        loadConfiguration(context);
        String string = context.getString("hive.kerberosPrincipal");
        String string2 = context.getString("hive.kerberosKeytab");
        this.proxyUser = null;
        this.privExecutor = FlumeAuthenticationUtil.getAuthenticator(string, string2).proxyAs(this.proxyUser);
        this.database = context.getString(Config.HIVE_DATABASE);
        if (this.database == null) {
            throw new IllegalArgumentException("hive.database config setting is not specified for sink " + getName());
        }
        this.table = context.getString(Config.HIVE_TABLE);
        if (this.table == null) {
            throw new IllegalArgumentException("hive.table config setting is not specified for sink " + getName());
        }
        String string3 = context.getString(Config.HIVE_PARTITION);
        if (string3 != null) {
            this.partitionVals = Arrays.asList(string3.split(HiveDelimitedTextSerializer.defaultDelimiter));
        }
        this.txnsPerBatchAsk = context.getInteger(Config.HIVE_TXNS_PER_BATCH_ASK, Integer.valueOf(DEFAULT_TXNSPERBATCH));
        if (this.txnsPerBatchAsk.intValue() < 0) {
            LOG.warn(getName() + ". hive.txnsPerBatchAsk must be  positive number. Defaulting to " + DEFAULT_TXNSPERBATCH);
            this.txnsPerBatchAsk = Integer.valueOf(DEFAULT_TXNSPERBATCH);
        }
        this.batchSize = context.getInteger(Config.BATCH_SIZE, Integer.valueOf(DEFAULT_BATCHSIZE));
        if (this.batchSize.intValue() < 0) {
            LOG.warn(getName() + ". batchSize must be  positive number. Defaulting to " + DEFAULT_BATCHSIZE);
            this.batchSize = Integer.valueOf(DEFAULT_BATCHSIZE);
        }
        this.idleTimeout = context.getInteger(Config.IDLE_TIMEOUT, Integer.valueOf(DEFAULT_IDLETIMEOUT)).intValue();
        if (this.idleTimeout < 0) {
            LOG.warn(getName() + ". idleTimeout must be  positive number. Defaulting to " + DEFAULT_IDLETIMEOUT);
            this.idleTimeout = DEFAULT_IDLETIMEOUT;
        }
        this.callTimeout = context.getInteger(Config.CALL_TIMEOUT, Integer.valueOf(DEFAULT_CALLTIMEOUT));
        if (this.callTimeout.intValue() < 0) {
            LOG.warn(getName() + ". callTimeout must be  positive number. Defaulting to " + DEFAULT_CALLTIMEOUT);
            this.callTimeout = Integer.valueOf(DEFAULT_CALLTIMEOUT);
        }
        this.heartBeatInterval = context.getInteger(Config.HEART_BEAT_INTERVAL, Integer.valueOf(DEFAULT_HEARTBEATINTERVAL));
        if (this.heartBeatInterval.intValue() < 0) {
            LOG.warn(getName() + ". heartBeatInterval must be  positive number. Defaulting to " + DEFAULT_HEARTBEATINTERVAL);
            this.heartBeatInterval = Integer.valueOf(DEFAULT_HEARTBEATINTERVAL);
        }
        this.maxOpenConnections = context.getInteger(Config.MAX_OPEN_CONNECTIONS, Integer.valueOf(DEFAULT_MAXOPENCONNECTIONS));
        this.autoCreatePartitions = context.getBoolean("autoCreatePartitions", true).booleanValue();
        this.useLocalTime = context.getBoolean(Config.USE_LOCAL_TIME_STAMP, false).booleanValue();
        String string4 = context.getString(Config.TIME_ZONE);
        this.timeZone = string4 == null ? null : TimeZone.getTimeZone(string4);
        this.needRounding = context.getBoolean(Config.ROUND, false).booleanValue();
        String string5 = context.getString(Config.ROUND_UNIT, Config.MINUTE);
        if (string5.equalsIgnoreCase(Config.HOUR)) {
            this.roundUnit = 11;
        } else if (string5.equalsIgnoreCase(Config.MINUTE)) {
            this.roundUnit = 12;
        } else if (string5.equalsIgnoreCase(Config.SECOND)) {
            this.roundUnit = 13;
        } else {
            LOG.warn(getName() + ". Rounding unit is not valid, please set one of minute, hour or second. Rounding will be disabled");
            this.needRounding = false;
        }
        this.roundValue = context.getInteger(Config.ROUND_VALUE, 1);
        if (this.roundUnit == 13 || this.roundUnit == 12) {
            Preconditions.checkArgument(this.roundValue.intValue() > 0 && this.roundValue.intValue() <= 60, "Round value must be > 0 and <= 60");
        } else if (this.roundUnit == 11) {
            Preconditions.checkArgument(this.roundValue.intValue() > 0 && this.roundValue.intValue() <= 24, "Round value must be > 0 and <= 24");
        }
        this.generatePartitionFromEventBody = context.getBoolean(Config.GENERATE_PARTITION_FROM_EVENT_BODY, false);
        this.serializerType = context.getString(Config.SERIALIZER, "");
        if (this.serializerType.isEmpty()) {
            throw new IllegalArgumentException("serializer config setting is not specified for sink " + getName());
        }
        this.serializer = createSerializer(this.serializerType);
        this.serializer.configure(context);
        partitionTransformerCreator(context);
        Preconditions.checkArgument(this.batchSize.intValue() > 0, "batchSize must be greater than 0");
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(getName());
        }
    }

    private void partitionTransformerCreator(Context context) {
        this.partitionTransformerClass = context.getString(Config.PARTITION_TRANSFORMER_CLASS, "org.apache.flume.sink.hive.DefaultPartitionTransformerImpl");
        try {
            this.transformer = (PartitionTransformer) Class.forName(this.partitionTransformerClass).newInstance();
            this.transformer.configure(context);
        } catch (Exception e) {
            throw new FlumeException("The given \"partitionTransformerClass\" is not a valid transformer.", e);
        }
    }

    @VisibleForTesting
    protected SinkCounter getCounter() {
        return this.sinkCounter;
    }

    private HiveEventSerializer createSerializer(String str) {
        if (str.compareToIgnoreCase(HiveDelimitedTextSerializer.ALIAS) == 0 || str.compareTo(HiveDelimitedTextSerializer.class.getName()) == 0) {
            return new HiveDelimitedTextSerializer();
        }
        if (str.compareToIgnoreCase(HiveJsonSerializer.ALIAS) == 0 || str.compareTo(HiveJsonSerializer.class.getName()) == 0) {
            return new HiveJsonSerializer();
        }
        try {
            return (HiveEventSerializer) Class.forName(str).newInstance();
        } catch (Exception e) {
            throw new IllegalArgumentException("Unable to instantiate serializer: " + str + " on sink: " + getName(), e);
        }
    }

    public Sink.Status process() throws EventDeliveryException {
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            try {
                try {
                    if (this.timeToSendHeartBeat.compareAndSet(true, false)) {
                        enableHeartBeatOnAllWriters();
                    }
                    int drainOneBatch = drainOneBatch(channel);
                    transaction.commit();
                    if (drainOneBatch < 1) {
                        Sink.Status status = Sink.Status.BACKOFF;
                        if (1 == 0) {
                            transaction.rollback();
                        }
                        transaction.close();
                        return status;
                    }
                    Sink.Status status2 = Sink.Status.READY;
                    if (1 == 0) {
                        transaction.rollback();
                    }
                    transaction.close();
                    return status2;
                } catch (Exception e) {
                    this.sinkCounter.incrementEventWriteOrChannelFail(e);
                    throw new EventDeliveryException(e);
                }
            } catch (InterruptedException e2) {
                LOG.warn(getName() + ": Thread was interrupted.", e2);
                Sink.Status status3 = Sink.Status.BACKOFF;
                if (DEFAULT_IDLETIMEOUT == 0) {
                    transaction.rollback();
                }
                transaction.close();
                return status3;
            }
        } catch (Throwable th) {
            if (DEFAULT_IDLETIMEOUT == 0) {
                transaction.rollback();
            }
            transaction.close();
            throw th;
        }
    }

    private int drainOneBatch(Channel channel) throws Exception {
        Event take;
        int i = DEFAULT_IDLETIMEOUT;
        try {
            HashMap newHashMap = Maps.newHashMap();
            while (i < this.batchSize.intValue() && (take = channel.take()) != null) {
                if (this.generatePartitionFromEventBody.booleanValue()) {
                    take.getHeaders().put("timestamp", this.transformer.transformDateTime(this.serializer.getTimeString(new String(take.getBody(), StandardCharsets.UTF_8))));
                }
                HiveEndPoint makeEndPoint = makeEndPoint(this.metaStoreUri, this.database, this.table, this.partitionVals, take.getHeaders(), this.timeZone, this.needRounding, this.roundUnit, this.roundValue, this.useLocalTime);
                HiveWriter orCreateWriter = getOrCreateWriter(newHashMap, makeEndPoint);
                LOG.debug("{} : Writing event to {}", getName(), makeEndPoint);
                orCreateWriter.write(take);
                i++;
            }
            if (i == 0) {
                this.sinkCounter.incrementBatchEmptyCount();
            } else if (i == this.batchSize.intValue()) {
                this.sinkCounter.incrementBatchCompleteCount();
            } else {
                this.sinkCounter.incrementBatchUnderflowCount();
            }
            this.sinkCounter.addToEventDrainAttemptCount(i);
            Iterator<HiveWriter> it = newHashMap.values().iterator();
            while (it.hasNext()) {
                it.next().flush(true);
            }
            this.sinkCounter.addToEventDrainSuccessCount(i);
            return i;
        } catch (HiveWriter.Failure e) {
            LOG.warn(getName() + " : " + e.getMessage(), e);
            abortAllWriters();
            closeAllWriters();
            throw e;
        }
    }

    private void enableHeartBeatOnAllWriters() {
        Iterator<HiveWriter> it = this.allWriters.values().iterator();
        while (it.hasNext()) {
            it.next().setHearbeatNeeded();
        }
    }

    private HiveWriter getOrCreateWriter(Map<HiveEndPoint, HiveWriter> map, HiveEndPoint hiveEndPoint) throws HiveWriter.ConnectException, InterruptedException {
        try {
            HiveWriter hiveWriter = this.allWriters.get(hiveEndPoint);
            if (hiveWriter == null) {
                LOG.info(getName() + ": Creating Writer to Hive end point : " + hiveEndPoint);
                hiveWriter = new HiveWriter(hiveEndPoint, this.txnsPerBatchAsk.intValue(), this.autoCreatePartitions, this.callTimeout.intValue(), this.callTimeoutPool, this.hiveConf, this.serializer, this.sinkCounter);
                this.sinkCounter.incrementConnectionCreatedCount();
                if (this.allWriters.size() > this.maxOpenConnections.intValue() && closeIdleWriters() == 0) {
                    closeEldestWriter();
                }
                this.allWriters.put(hiveEndPoint, hiveWriter);
                map.put(hiveEndPoint, hiveWriter);
            } else if (map.get(hiveEndPoint) == null) {
                map.put(hiveEndPoint, hiveWriter);
            }
            return hiveWriter;
        } catch (HiveWriter.ConnectException e) {
            this.sinkCounter.incrementConnectionFailedCount();
            throw e;
        }
    }

    private HiveEndPoint makeEndPoint(String str, String str2, String str3, List<String> list, Map<String, String> map, TimeZone timeZone, boolean z, int i, Integer num, boolean z2) {
        if (list == null) {
            return new HiveEndPoint(str, str2, str3, (List) null);
        }
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            newArrayList.add(BucketPath.escapeString(it.next(), map, timeZone, z, i, num.intValue(), z2));
        }
        return new HiveEndPoint(str, str2, str3, newArrayList);
    }

    private void closeEldestWriter() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        HiveEndPoint hiveEndPoint = DEFAULT_IDLETIMEOUT;
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (entry.getValue().getLastUsed() < currentTimeMillis) {
                hiveEndPoint = entry.getKey();
                currentTimeMillis = entry.getValue().getLastUsed();
            }
        }
        try {
            this.sinkCounter.incrementConnectionCreatedCount();
            LOG.info(getName() + ": Closing least used Writer to Hive EndPoint : " + hiveEndPoint);
            this.allWriters.remove(hiveEndPoint).close();
        } catch (InterruptedException e) {
            LOG.warn(getName() + ": Interrupted when attempting to close writer for end point: " + hiveEndPoint, e);
            throw e;
        }
    }

    private int closeIdleWriters() throws InterruptedException {
        int i = DEFAULT_IDLETIMEOUT;
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (currentTimeMillis - entry.getValue().getLastUsed() > this.idleTimeout) {
                i++;
                newArrayList.add(entry.getKey());
            }
        }
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            HiveEndPoint hiveEndPoint = (HiveEndPoint) it.next();
            this.sinkCounter.incrementConnectionClosedCount();
            LOG.info(getName() + ": Closing idle Writer to Hive end point : {}", hiveEndPoint);
            this.allWriters.remove(hiveEndPoint).close();
        }
        return i;
    }

    private void closeAllWriters() throws InterruptedException {
        Iterator<Map.Entry<HiveEndPoint, HiveWriter>> it = this.allWriters.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().close();
        }
        this.allWriters.clear();
    }

    private void abortAllWriters() throws InterruptedException {
        Iterator<Map.Entry<HiveEndPoint, HiveWriter>> it = this.allWriters.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().abort();
        }
    }

    public void stop() {
        Iterator<Map.Entry<HiveEndPoint, HiveWriter>> it = this.allWriters.entrySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().getValue().close();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        this.callTimeoutPool.shutdown();
        while (!this.callTimeoutPool.isTerminated()) {
            try {
                this.callTimeoutPool.awaitTermination(Math.max(DEFAULT_CALLTIMEOUT, this.callTimeout.intValue()), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e2) {
                LOG.warn(getName() + ":Shutdown interrupted on " + this.callTimeoutPool, e2);
            }
        }
        this.callTimeoutPool = null;
        this.allWriters.clear();
        this.allWriters = null;
        this.sinkCounter.stop();
        super.stop();
        LOG.info("Hive Sink {} stopped", getName());
    }

    public void start() {
        this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("hive-" + getName() + "-call-runner-%d").build());
        this.allWriters = Maps.newHashMap();
        this.sinkCounter.start();
        super.start();
        setupHeartBeatTimer();
        LOG.info(getName() + ": Hive Sink {} started", getName());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupHeartBeatTimer() {
        if (this.heartBeatInterval.intValue() > 0) {
            this.heartBeatTimer.schedule(new TimerTask() { // from class: org.apache.flume.sink.hive.HiveSink.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    HiveSink.this.timeToSendHeartBeat.set(true);
                    HiveSink.this.setupHeartBeatTimer();
                }
            }, this.heartBeatInterval.intValue() * 1000);
        }
    }

    public long getBatchSize() {
        return this.batchSize.intValue();
    }

    public String toString() {
        return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() + " }";
    }

    void loadConfiguration(Context context) {
        String string = context.getString(Config.HIVE_SITE);
        String string2 = context.getString(Config.CORE_SITE);
        String string3 = context.getString(Config.METASTORE_SITE);
        if (StringUtils.isEmpty(string) || StringUtils.isEmpty(string2) || StringUtils.isEmpty(string3)) {
            return;
        }
        Configuration configuration = new Configuration();
        configuration.addResource(new Path(string));
        configuration.addResource(new Path(string3));
        configuration.addResource(new Path(string2));
        this.hiveConf = new HiveConf();
        this.hiveConf.addResource(configuration);
    }
}
