package org.apache.flume.sink.kite;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.net.URI;
import java.security.PrivilegedAction;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.auth.FlumeAuthenticationUtil;
import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.sink.kite.parser.EntityParser;
import org.apache.flume.sink.kite.parser.EntityParserFactory;
import org.apache.flume.sink.kite.policy.FailurePolicy;
import org.apache.flume.sink.kite.policy.FailurePolicyFactory;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetNotFoundException;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.Flushable;
import org.kitesdk.data.Format;
import org.kitesdk.data.Formats;
import org.kitesdk.data.Syncable;
import org.kitesdk.data.URIBuilder;
import org.kitesdk.data.View;
import org.kitesdk.data.spi.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/kite/DatasetSink.class */
public class DatasetSink extends AbstractSink implements Configurable {
    private PrivilegedExecutor privilegedExecutor;
    private static final Logger LOG = LoggerFactory.getLogger(DatasetSink.class);
    private static final EntityParserFactory ENTITY_PARSER_FACTORY = new EntityParserFactory();
    private static final FailurePolicyFactory FAILURE_POLICY_FACTORY = new FailurePolicyFactory();
    private Context context = null;
    private String datasetName = null;
    private URI datasetUri = null;
    private Schema datasetSchema = null;
    private DatasetWriter<GenericRecord> writer = null;
    private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE;
    private int rollIntervalSeconds = DatasetSinkConstants.DEFAULT_ROLL_INTERVAL;
    private boolean commitOnBatch = DatasetSinkConstants.DEFAULT_FLUSHABLE_COMMIT_ON_BATCH;
    private boolean syncOnBatch = DatasetSinkConstants.DEFAULT_SYNCABLE_SYNC_ON_BATCH;
    private long lastRolledMillis = 0;
    private long bytesParsed = 0;
    private EntityParser<GenericRecord> parser = null;
    private FailurePolicy failurePolicy = null;
    private SinkCounter counter = null;
    private GenericRecord entity = null;
    private boolean reuseEntity = true;
    private Transaction transaction = null;
    private boolean committedBatch = false;

    protected List<String> allowedFormats() {
        return Lists.newArrayList(new String[]{"avro", "parquet"});
    }

    public void configure(Context context) {
        this.context = context;
        String string = context.getString(DatasetSinkConstants.AUTH_PRINCIPAL);
        String string2 = context.getString(DatasetSinkConstants.AUTH_KEYTAB);
        this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(string, string2).proxyAs(context.getString(DatasetSinkConstants.AUTH_PROXY_USER));
        String string3 = context.getString(DatasetSinkConstants.CONFIG_KITE_DATASET_URI);
        if (string3 != null) {
            this.datasetUri = URI.create(string3);
            this.datasetName = uriToName(this.datasetUri);
        } else {
            String string4 = context.getString(DatasetSinkConstants.CONFIG_KITE_REPO_URI);
            Preconditions.checkNotNull(string4, "No dataset configured. Setting kite.dataset.uri is required.");
            this.datasetName = context.getString(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME);
            Preconditions.checkNotNull(this.datasetName, "No dataset configured. Setting kite.dataset.uri is required.");
            this.datasetUri = new URIBuilder(string4, context.getString(DatasetSinkConstants.CONFIG_KITE_DATASET_NAMESPACE, DatasetSinkConstants.DEFAULT_NAMESPACE), this.datasetName).build();
        }
        setName(this.datasetUri.toString());
        if (context.getBoolean(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH, Boolean.valueOf(DatasetSinkConstants.DEFAULT_SYNCABLE_SYNC_ON_BATCH)).booleanValue()) {
            Preconditions.checkArgument(context.getBoolean(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH, Boolean.valueOf(DatasetSinkConstants.DEFAULT_FLUSHABLE_COMMIT_ON_BATCH)).booleanValue(), "Configuration error: kite.flushable.commiteOnBatch must be set to true when kite.syncable.syncOnBatch is set to true.");
        }
        this.failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(context);
        this.batchSize = context.getLong(DatasetSinkConstants.CONFIG_KITE_BATCH_SIZE, Long.valueOf(DatasetSinkConstants.DEFAULT_BATCH_SIZE)).longValue();
        this.rollIntervalSeconds = context.getInteger(DatasetSinkConstants.CONFIG_KITE_ROLL_INTERVAL, Integer.valueOf(DatasetSinkConstants.DEFAULT_ROLL_INTERVAL)).intValue();
        this.counter = new SinkCounter(this.datasetName);
    }

    public synchronized void start() {
        this.lastRolledMillis = System.currentTimeMillis();
        this.counter.start();
        LOG.info("Started DatasetSink " + getName());
        super.start();
    }

    @VisibleForTesting
    void roll() {
        this.lastRolledMillis = 0L;
    }

    @VisibleForTesting
    DatasetWriter<GenericRecord> getWriter() {
        return this.writer;
    }

    @VisibleForTesting
    void setWriter(DatasetWriter<GenericRecord> datasetWriter) {
        this.writer = datasetWriter;
    }

    @VisibleForTesting
    void setParser(EntityParser<GenericRecord> entityParser) {
        this.parser = entityParser;
    }

    @VisibleForTesting
    void setFailurePolicy(FailurePolicy failurePolicy) {
        this.failurePolicy = failurePolicy;
    }

    public synchronized void stop() {
        this.counter.stop();
        try {
            closeWriter();
            commitTransaction();
        } catch (EventDeliveryException e) {
            rollbackTransaction();
            LOG.warn("Closing the writer failed: " + e.getLocalizedMessage());
            LOG.debug("Exception follows.", e);
        }
        LOG.info("Stopped dataset sink: " + getName());
        super.stop();
    }

    public Sink.Status process() throws EventDeliveryException {
        Event take;
        long j = 0;
        try {
            if (shouldRoll()) {
                closeWriter();
                commitTransaction();
                createWriter();
            }
            Preconditions.checkNotNull(this.writer, "Can't process events with a null writer. This is likely a bug.");
            Channel channel = getChannel();
            enterTransaction(channel);
            while (j < this.batchSize && (take = channel.take()) != null) {
                write(take);
                j++;
            }
            if (this.commitOnBatch) {
                if (this.syncOnBatch && (this.writer instanceof Syncable)) {
                    this.writer.sync();
                } else if (this.writer instanceof Flushable) {
                    this.writer.flush();
                }
                boolean commitTransaction = commitTransaction();
                Preconditions.checkState(commitTransaction, "Tried to commit a batch when there was no transaction");
                this.committedBatch |= commitTransaction;
            }
            if (j == 0) {
                this.counter.incrementBatchEmptyCount();
                return Sink.Status.BACKOFF;
            }
            if (j < this.batchSize) {
                this.counter.incrementBatchUnderflowCount();
            } else {
                this.counter.incrementBatchCompleteCount();
            }
            this.counter.addToEventDrainSuccessCount(j);
            return Sink.Status.READY;
        } catch (Throwable th) {
            rollbackTransaction();
            if (this.commitOnBatch && this.committedBatch) {
                try {
                    closeWriter();
                } catch (EventDeliveryException e) {
                    LOG.warn("Error closing writer there may be temp files that need to be manually recovered: " + e.getLocalizedMessage());
                    LOG.debug("Exception follows.", e);
                }
            } else {
                this.writer = null;
            }
            Throwables.propagateIfInstanceOf(th, Error.class);
            Throwables.propagateIfInstanceOf(th, EventDeliveryException.class);
            throw new EventDeliveryException(th);
        }
    }

    @VisibleForTesting
    void write(Event event) throws EventDeliveryException {
        try {
            this.entity = this.parser.parse(event, this.reuseEntity ? this.entity : null);
            this.bytesParsed += event.getBody().length;
            this.writer.write(this.entity);
        } catch (RuntimeException e) {
            Throwables.propagateIfInstanceOf(e, EventDeliveryException.class);
            throw new EventDeliveryException(e);
        } catch (NonRecoverableEventException e2) {
            this.failurePolicy.handle(event, e2);
        } catch (DataFileWriter.AppendWriteException e3) {
            this.failurePolicy.handle(event, e3);
        }
    }

    @VisibleForTesting
    void createWriter() throws EventDeliveryException {
        this.committedBatch = false;
        try {
            View view = (View) this.privilegedExecutor.execute(new PrivilegedAction<Dataset<GenericRecord>>() { // from class: org.apache.flume.sink.kite.DatasetSink.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.security.PrivilegedAction
                public Dataset<GenericRecord> run() {
                    return Datasets.load(DatasetSink.this.datasetUri);
                }
            });
            DatasetDescriptor descriptor = view.getDataset().getDescriptor();
            Format format = descriptor.getFormat();
            Preconditions.checkArgument(allowedFormats().contains(format.getName()), "Unsupported format: " + format.getName());
            Schema schema = descriptor.getSchema();
            if (this.datasetSchema == null || !schema.equals(this.datasetSchema)) {
                this.datasetSchema = descriptor.getSchema();
                this.parser = ENTITY_PARSER_FACTORY.newParser(this.datasetSchema, this.context);
            }
            this.reuseEntity = !Formats.PARQUET.equals(format);
            this.commitOnBatch = this.context.getBoolean(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH, Boolean.valueOf(DatasetSinkConstants.DEFAULT_FLUSHABLE_COMMIT_ON_BATCH)).booleanValue() && Formats.AVRO.equals(format);
            this.syncOnBatch = this.context.getBoolean(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH, Boolean.valueOf(DatasetSinkConstants.DEFAULT_SYNCABLE_SYNC_ON_BATCH)).booleanValue() && Formats.AVRO.equals(format);
            this.datasetName = view.getDataset().getName();
            this.writer = view.newWriter();
            this.lastRolledMillis = System.currentTimeMillis();
            this.bytesParsed = 0L;
        } catch (DatasetNotFoundException e) {
            throw new EventDeliveryException("Dataset " + this.datasetUri + " not found. The dataset must be created before Flume can write to it.", e);
        } catch (RuntimeException e2) {
            throw new EventDeliveryException("Error trying to open a new writer for dataset " + this.datasetUri, e2);
        }
    }

    private boolean shouldRoll() {
        long currentTimeMillis = System.currentTimeMillis();
        long seconds = TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - this.lastRolledMillis);
        LOG.debug("Current time: {}, lastRolled: {}, diff: {} sec", new Object[]{Long.valueOf(currentTimeMillis), Long.valueOf(this.lastRolledMillis), Long.valueOf(seconds)});
        return seconds >= ((long) this.rollIntervalSeconds) || this.writer == null;
    }

    @VisibleForTesting
    void closeWriter() throws EventDeliveryException {
        try {
            if (this.writer != null) {
                try {
                    this.writer.close();
                    LOG.info("Closed writer for {} after {} seconds and {} bytes parsed", new Object[]{this.datasetUri, Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - this.lastRolledMillis)), Long.valueOf(this.bytesParsed)});
                    this.writer = null;
                    this.failurePolicy.close();
                } catch (RuntimeException e) {
                    throw new EventDeliveryException("Error trying to close the  writer for dataset " + this.datasetUri, e);
                } catch (DatasetIOException e2) {
                    throw new EventDeliveryException("Check HDFS permissions/health. IO error trying to close the  writer for dataset " + this.datasetUri, e2);
                }
            }
        } catch (Throwable th) {
            this.writer = null;
            this.failurePolicy.close();
            throw th;
        }
    }

    private void enterTransaction(Channel channel) throws EventDeliveryException {
        if (this.transaction == null) {
            this.transaction = channel.getTransaction();
            this.transaction.begin();
            this.failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(this.context);
        }
    }

    @VisibleForTesting
    boolean commitTransaction() throws EventDeliveryException {
        if (this.transaction == null) {
            return false;
        }
        this.failurePolicy.sync();
        this.transaction.commit();
        this.transaction.close();
        this.transaction = null;
        return true;
    }

    private void rollbackTransaction() {
        if (this.transaction != null) {
            try {
                try {
                    this.transaction.rollback();
                    this.transaction.close();
                    this.transaction = null;
                } catch (RuntimeException e) {
                    LOG.error("Transaction rollback failed: " + e.getLocalizedMessage());
                    LOG.debug("Exception follows.", e);
                    this.transaction.close();
                    this.transaction = null;
                }
            } catch (Throwable th) {
                this.transaction.close();
                this.transaction = null;
                throw th;
            }
        }
    }

    private static String uriToName(URI uri) {
        return (String) ((Map) Registration.lookupDatasetUri(URI.create(uri.getRawSchemeSpecificPart())).second()).get("dataset");
    }
}
