package org.apache.flume.sink.hdfs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Channel;
import org.apache.flume.Clock;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.SystemClock;
import org.apache.flume.Transaction;
import org.apache.flume.auth.FlumeAuthenticationUtil;
import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.tools.FlumeSendAlarmMgr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/hdfs/HDFSEventSink.class */
public class HDFSEventSink extends AbstractSink implements Configurable, BatchSizeSupported {
    private static final long defaultRollInterval = 30;
    private static final long defaultRollSize = 1024;
    private static final long defaultRollCount = 10;
    private static final String defaultFileName = "FlumeData";
    private static final String defaultSuffix = "";
    private static final String defaultInUsePrefix = "";
    private static final String defaultInUseSuffix = ".tmp";
    private static final long defaultBatchSize = 1000;
    private static final String defaultFileType = "SequenceFile";
    private static final int defaultMaxOpenFiles = 5000;
    private static final long defaultRetryInterval = 180;
    private static final int defaultTryCount = Integer.MAX_VALUE;
    private int sinkFailCount;
    private int sinkCurrentFailCount;
    private FlumeSendAlarmMgr sendAlarmMgr;
    private final int CAUSE_ID_39007 = 39007;
    public static final String IN_USE_SUFFIX_PARAM_NAME = "hdfs.inUseSuffix";
    private static final long defaultCallTimeout = 30000;
    private static final long defaultBatchCallTimeout = 0;
    private static final int defaultThreadPoolSize = 10;
    private static final int defaultRollTimerPoolSize = 1;
    private boolean fileCloseByEndEvent;
    private final HDFSWriterFactory writerFactory;
    private WriterLinkedHashMap sfWriters;
    private long rollInterval;
    private long rollSize;
    private long rollCount;
    private long batchSize;
    private int threadsPoolSize;
    private int rollTimerPoolSize;
    private CompressionCodec codeC;
    private SequenceFile.CompressionType compType;
    private String fileType;
    private String filePath;
    private String fileName;
    private String suffix;
    private String inUsePrefix;
    private String inUseSuffix;
    private TimeZone timeZone;
    private int maxOpenFiles;
    private ExecutorService callTimeoutPool;
    private ScheduledExecutorService timedRollerPool;
    private ScheduledThreadPoolExecutor checkHDFSStateExecutor;
    private boolean isHDFSOK;
    private Object hdfsStatusLock;
    private boolean needRounding;
    private int roundUnit;
    private int roundValue;
    private boolean useLocalTime;
    private long callTimeout;
    private long batchCallTimeout;
    private boolean batchCallTimeoutControl;
    private Context context;
    private SinkCounter sinkCounter;
    private volatile int idleTimeout;
    private Clock clock;
    private FileSystem mockFs;
    private HDFSWriter mockWriter;
    private final Object sfWritersLock;
    private long retryInterval;
    private int tryCount;
    private PrivilegedExecutor privExecutor;
    private int monTime;
    private boolean isSendAlarm;
    private static final Logger LOG = LoggerFactory.getLogger(HDFSEventSink.class);
    private static String DIRECTORY_DELIMITER = System.getProperty("file.separator");
    private static String EVENTPOSTION = "eventPostion";

    /* loaded from: input_file:org/apache/flume/sink/hdfs/HDFSEventSink$WriterCallback.class */
    public interface WriterCallback {
        void run(String str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flume/sink/hdfs/HDFSEventSink$WriterLinkedHashMap.class */
    public static class WriterLinkedHashMap extends LinkedHashMap<String, BucketWriter> {
        private final int maxOpenFiles;

        public WriterLinkedHashMap(int i) {
            super(16, 0.75f, true);
            this.maxOpenFiles = i;
        }

        @Override // java.util.LinkedHashMap
        protected boolean removeEldestEntry(Map.Entry<String, BucketWriter> entry) {
            if (size() <= this.maxOpenFiles) {
                return false;
            }
            try {
                entry.getValue().close();
                return true;
            } catch (InterruptedException e) {
                HDFSEventSink.LOG.warn(entry.getKey().toString(), e);
                Thread.currentThread().interrupt();
                return true;
            }
        }
    }

    public HDFSEventSink() {
        this(new HDFSWriterFactory());
    }

    public HDFSEventSink(HDFSWriterFactory hDFSWriterFactory) {
        this.sinkFailCount = 0;
        this.sinkCurrentFailCount = 0;
        this.sendAlarmMgr = FlumeSendAlarmMgr.getInstance();
        this.CAUSE_ID_39007 = 39007;
        this.checkHDFSStateExecutor = null;
        this.isHDFSOK = true;
        this.hdfsStatusLock = new Object();
        this.needRounding = false;
        this.roundUnit = 13;
        this.roundValue = defaultRollTimerPoolSize;
        this.useLocalTime = false;
        this.sfWritersLock = new Object();
        this.monTime = 0;
        this.isSendAlarm = false;
        this.writerFactory = hDFSWriterFactory;
    }

    @VisibleForTesting
    Map<String, BucketWriter> getSfWriters() {
        return this.sfWriters;
    }

    public void configure(Context context) {
        this.context = context;
        this.filePath = (String) Preconditions.checkNotNull(context.getString("hdfs.path"), "hdfs.path is required");
        this.fileName = context.getString("hdfs.filePrefix", defaultFileName);
        this.suffix = context.getString("hdfs.fileSuffix", "");
        this.inUsePrefix = context.getString("hdfs.inUsePrefix", "");
        if (context.getBoolean("hdfs.emptyInUseSuffix", false).booleanValue()) {
            this.inUseSuffix = "";
            if (context.getString(IN_USE_SUFFIX_PARAM_NAME) != null) {
                LOG.warn("Ignoring parameter hdfs.inUseSuffix for hdfs sink: " + getName());
            }
        } else {
            this.inUseSuffix = context.getString(IN_USE_SUFFIX_PARAM_NAME, defaultInUseSuffix);
        }
        String string = context.getString("hdfs.timeZone");
        this.timeZone = string == null ? null : TimeZone.getTimeZone(string);
        this.rollInterval = context.getLong("hdfs.rollInterval", Long.valueOf(defaultRollInterval)).longValue();
        this.rollSize = context.getLong("hdfs.rollSize", Long.valueOf(defaultRollSize)).longValue();
        this.rollCount = context.getLong("hdfs.rollCount", Long.valueOf(defaultRollCount)).longValue();
        this.batchSize = context.getLong("hdfs.batchSize", Long.valueOf(defaultBatchSize)).longValue();
        this.idleTimeout = context.getInteger("hdfs.idleTimeout", 0).intValue();
        String string2 = context.getString("hdfs.codeC");
        this.fileType = context.getString("hdfs.fileType", defaultFileType);
        this.maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", Integer.valueOf(defaultMaxOpenFiles)).intValue();
        this.callTimeout = context.getLong("hdfs.callTimeout", Long.valueOf(defaultCallTimeout)).longValue();
        this.batchCallTimeout = context.getLong("hdfs.batchCallTimeout", Long.valueOf(defaultBatchCallTimeout)).longValue();
        if (this.batchCallTimeout > defaultBatchCallTimeout) {
            this.batchCallTimeoutControl = true;
        }
        this.threadsPoolSize = context.getInteger("hdfs.threadsPoolSize", Integer.valueOf(defaultThreadPoolSize)).intValue();
        this.rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize", Integer.valueOf(defaultRollTimerPoolSize)).intValue();
        String string3 = context.getString("hdfs.kerberosPrincipal");
        String string4 = context.getString("hdfs.kerberosKeytab");
        String string5 = context.getString("hdfs.proxyUser");
        this.tryCount = context.getInteger("hdfs.closeTries", Integer.valueOf(defaultTryCount)).intValue();
        if (this.tryCount <= 0) {
            LOG.warn("Retry count value : " + this.tryCount + " is not valid. The sink will try to close the file until the file is eventually closed.");
            this.tryCount = defaultTryCount;
        }
        this.retryInterval = context.getLong("hdfs.retryInterval", Long.valueOf(defaultRetryInterval)).longValue();
        if (this.retryInterval <= defaultBatchCallTimeout) {
            LOG.warn("Retry Interval value: " + this.retryInterval + " is not valid. If the first close of a file fails, it may remain open and will not be renamed.");
            this.tryCount = defaultRollTimerPoolSize;
        }
        Preconditions.checkArgument(this.batchSize > defaultBatchCallTimeout, "batchSize must be greater than 0");
        if (string2 == null) {
            this.codeC = null;
            this.compType = SequenceFile.CompressionType.NONE;
        } else {
            this.codeC = getCodec(string2);
            this.compType = SequenceFile.CompressionType.BLOCK;
        }
        if (this.fileType.equalsIgnoreCase("DataStream") && string2 != null) {
            throw new IllegalArgumentException("fileType: " + this.fileType + " which does NOT support compressed output. Please don't set codeC or change the fileType if compressed output is desired.");
        }
        if (this.fileType.equalsIgnoreCase("CompressedStream")) {
            Preconditions.checkNotNull(this.codeC, "It's essential to set compress codec when fileType is: " + this.fileType);
        }
        this.privExecutor = FlumeAuthenticationUtil.getAuthenticator(string3, string4).proxyAs(string5);
        this.fileCloseByEndEvent = context.getBoolean("hdfs.fileCloseByEndEvent", true).booleanValue();
        this.needRounding = context.getBoolean("hdfs.round", false).booleanValue();
        if (this.needRounding) {
            String string6 = context.getString("hdfs.roundUnit", "second");
            if (string6.equalsIgnoreCase("hour")) {
                this.roundUnit = 11;
            } else if (string6.equalsIgnoreCase("minute")) {
                this.roundUnit = 12;
            } else if (string6.equalsIgnoreCase("second")) {
                this.roundUnit = 13;
            } else {
                LOG.warn("Rounding unit is not valid, please set one ofminute, hour, or second. Rounding will be disabled");
                this.needRounding = false;
            }
            this.roundValue = context.getInteger("hdfs.roundValue", Integer.valueOf(defaultRollTimerPoolSize)).intValue();
            if (this.roundUnit == 13 || this.roundUnit == 12) {
                Preconditions.checkArgument(this.roundValue > 0 && this.roundValue <= 60, "Round valuemust be > 0 and <= 60");
            } else if (this.roundUnit == 11) {
                Preconditions.checkArgument(this.roundValue > 0 && this.roundValue <= 24, "Round valuemust be > 0 and <= 24");
            }
        }
        this.useLocalTime = context.getBoolean("hdfs.useLocalTimeStamp", false).booleanValue();
        if (this.useLocalTime) {
            this.clock = new SystemClock();
        }
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(getName());
        }
        this.monTime = context.getInteger("monTime", 0).intValue();
        this.sinkCounter.setMonTime(this.monTime);
        this.sinkFailCount = context.getInteger("hdfs.failcount", Integer.valueOf(defaultThreadPoolSize)).intValue();
        if (this.sinkFailCount < 0) {
            throw new IllegalArgumentException("sinkFailCount is invalid,it must greater than zero");
        }
    }

    private static boolean codecMatches(Class<? extends CompressionCodec> cls, String str) {
        String simpleName = cls.getSimpleName();
        if (cls.getName().equals(str) || simpleName.equalsIgnoreCase(str)) {
            return true;
        }
        return simpleName.endsWith("Codec") && simpleName.substring(0, simpleName.length() - "Codec".length()).equalsIgnoreCase(str);
    }

    @VisibleForTesting
    static CompressionCodec getCodec(String str) {
        Configuration configuration = new Configuration();
        List<Class> codecClasses = CompressionCodecFactory.getCodecClasses(configuration);
        CompressionCodec compressionCodec = null;
        ArrayList arrayList = new ArrayList();
        arrayList.add("None");
        for (Class cls : codecClasses) {
            arrayList.add(cls.getSimpleName());
            if (codecMatches(cls, str)) {
                try {
                    compressionCodec = (CompressionCodec) cls.newInstance();
                } catch (IllegalAccessException e) {
                    LOG.error("Unable to access " + cls + " class");
                } catch (InstantiationException e2) {
                    LOG.error("Unable to instantiate " + cls + " class");
                }
            }
        }
        if (compressionCodec == null) {
            if (!str.equalsIgnoreCase("None")) {
                throw new IllegalArgumentException("Unsupported compression codec " + str + ".  Please choose from: " + arrayList);
            }
        } else if (compressionCodec instanceof org.apache.hadoop.conf.Configurable) {
            ((org.apache.hadoop.conf.Configurable) compressionCodec).setConf(configuration);
        }
        return compressionCodec;
    }

    public Sink.Status process() throws EventDeliveryException {
        Event take;
        this.sinkCounter.setUpdateTime();
        if (!getHDFSSatatus()) {
            LOG.error("opreation the hdfs file errors.");
            increseCountAndSendAlarm();
            this.sinkCounter.setUpdateTime();
            return Sink.Status.BACKOFF;
        }
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            try {
                LinkedHashSet linkedHashSet = new LinkedHashSet();
                ArrayList arrayList = new ArrayList();
                int i = 0;
                while (i < this.batchSize && (take = channel.take()) != null) {
                    arrayList.add(take);
                    i += defaultRollTimerPoolSize;
                }
                if (this.batchCallTimeoutControl) {
                    processEventsWithTimeOut(arrayList, linkedHashSet);
                } else {
                    processEvents(arrayList, linkedHashSet);
                }
                if (i == 0) {
                    this.sinkCounter.incrementBatchEmptyCount();
                } else if (i == this.batchSize) {
                    this.sinkCounter.incrementBatchCompleteCount();
                } else {
                    this.sinkCounter.incrementBatchUnderflowCount();
                }
                Iterator<BucketWriter> it = linkedHashSet.iterator();
                while (it.hasNext()) {
                    it.next().flush();
                }
                transaction.commit();
                initCountAndSendAlarm();
                if (i < defaultRollTimerPoolSize) {
                    Sink.Status status = Sink.Status.BACKOFF;
                    transaction.close();
                    return status;
                }
                this.sinkCounter.addToEventDrainSuccessCount(i);
                Sink.Status status2 = Sink.Status.READY;
                transaction.close();
                return status2;
            } catch (IOException e) {
                transaction.rollback();
                LOG.warn("HDFS IO error", e);
                increseCountAndSendAlarm();
                Sink.Status status3 = Sink.Status.BACKOFF;
                transaction.close();
                return status3;
            } catch (Throwable th) {
                transaction.rollback();
                increseCountAndSendAlarm();
                LOG.error("process failed", th);
                if (th instanceof Error) {
                    throw ((Error) th);
                }
                throw new EventDeliveryException(th);
            }
        } catch (Throwable th2) {
            transaction.close();
            throw th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processEvents(List<Event> list, Set<BucketWriter> set) throws IOException, InterruptedException {
        BucketWriter bucketWriter;
        for (Event event : list) {
            String escapeString = BucketPath.escapeString(this.filePath, event.getHeaders(), this.timeZone, this.needRounding, this.roundUnit, this.roundValue, this.useLocalTime);
            String escapeString2 = BucketPath.escapeString(this.fileName, event.getHeaders(), this.timeZone, this.needRounding, this.roundUnit, this.roundValue, this.useLocalTime);
            String str = escapeString + DIRECTORY_DELIMITER + escapeString2;
            WriterCallback writerCallback = new WriterCallback() { // from class: org.apache.flume.sink.hdfs.HDFSEventSink.1
                @Override // org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback
                public void run(String str2) {
                    HDFSEventSink.LOG.info("Writer callback called.");
                    synchronized (HDFSEventSink.this.sfWritersLock) {
                        HDFSEventSink.this.sfWriters.remove(str2);
                    }
                }
            };
            synchronized (this.sfWritersLock) {
                bucketWriter = this.sfWriters.get(str);
                if (bucketWriter == null) {
                    bucketWriter = initializeBucketWriter(escapeString, escapeString2, str, this.writerFactory.getWriter(this.fileType), writerCallback);
                    this.sfWriters.put(str, bucketWriter);
                }
            }
            try {
                bucketWriter.append(event);
            } catch (BucketClosedException e) {
                LOG.info("Bucket was closed while trying to append, reinitializing bucket and writing event.");
                bucketWriter = initializeBucketWriter(escapeString, escapeString2, str, this.writerFactory.getWriter(this.fileType), writerCallback);
                synchronized (this.sfWritersLock) {
                    this.sfWriters.put(str, bucketWriter);
                    bucketWriter.append(event);
                }
            }
            if (!set.contains(bucketWriter)) {
                set.add(bucketWriter);
            }
        }
    }

    private void processEventsWithTimeOut(final List<Event> list, final Set<BucketWriter> set) throws IOException {
        try {
            this.callTimeoutPool.submit(new Callable<Void>() { // from class: org.apache.flume.sink.hdfs.HDFSEventSink.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    HDFSEventSink.this.processEvents(list, set);
                    return null;
                }
            }).get(this.batchCallTimeout, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            LOG.error("get the result form future error. {}", e);
            throw new IOException(e);
        }
    }

    private void increseCountAndSendAlarm() {
        this.sinkCurrentFailCount += defaultRollTimerPoolSize;
        if (this.sinkFailCount <= 0 || this.sinkCurrentFailCount <= this.sinkFailCount) {
            return;
        }
        this.sendAlarmMgr.sendSinkFailAlarm(getName(), 39007);
        this.sinkCurrentFailCount = this.sinkFailCount;
        this.isSendAlarm = true;
    }

    private void initCountAndSendAlarm() {
        this.sinkCurrentFailCount = 0;
        if (this.isSendAlarm) {
            this.sendAlarmMgr.sendSinkNormalAlarm(getName(), 39007);
        }
        this.isSendAlarm = false;
    }

    @VisibleForTesting
    BucketWriter initializeBucketWriter(String str, String str2, String str3, HDFSWriter hDFSWriter, WriterCallback writerCallback) {
        BucketWriter bucketWriter = new BucketWriter(this.rollInterval, this.rollSize, this.rollCount, this.batchSize, this.context, str, str2, this.inUsePrefix, this.inUseSuffix, this.suffix, this.codeC, this.compType, this.mockFs == null ? hDFSWriter : this.mockWriter, this.timedRollerPool, this.privExecutor, this.sinkCounter, this.idleTimeout, writerCallback, str3, this.callTimeout, this.callTimeoutPool, this.retryInterval, this.tryCount, this.fileCloseByEndEvent);
        bucketWriter.setBatchCallTimeoutControl(this.batchCallTimeoutControl);
        if (this.mockFs != null) {
            bucketWriter.setFileSystem(this.mockFs);
        }
        return bucketWriter;
    }

    public void stop() {
        synchronized (this.sfWritersLock) {
            for (Map.Entry<String, BucketWriter> entry : this.sfWriters.entrySet()) {
                LOG.info("Closing {}", entry.getKey());
                try {
                    entry.getValue().close(false, true);
                } catch (Exception e) {
                    LOG.warn("Exception while closing " + entry.getKey() + ". Exception follows.", e);
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
        stopCheckHDFS();
        this.checkHDFSStateExecutor = null;
        ExecutorService[] executorServiceArr = {this.callTimeoutPool, this.timedRollerPool};
        int length = executorServiceArr.length;
        for (int i = 0; i < length; i += defaultRollTimerPoolSize) {
            ExecutorService executorService = executorServiceArr[i];
            executorService.shutdown();
            while (!executorService.isTerminated()) {
                try {
                    executorService.awaitTermination(Math.max(defaultCallTimeout, this.callTimeout), TimeUnit.MILLISECONDS);
                } catch (InterruptedException e2) {
                    LOG.warn("shutdown interrupted on " + executorService, e2);
                }
            }
        }
        this.callTimeoutPool = null;
        this.timedRollerPool = null;
        synchronized (this.sfWritersLock) {
            this.sfWriters.clear();
            this.sfWriters = null;
        }
        this.sinkCounter.stop();
        super.stop();
    }

    public void start() {
        this.callTimeoutPool = Executors.newFixedThreadPool(this.threadsPoolSize, new ThreadFactoryBuilder().setNameFormat("hdfs-" + getName() + "-call-runner-%d").build());
        this.timedRollerPool = Executors.newScheduledThreadPool(this.rollTimerPoolSize, new ThreadFactoryBuilder().setNameFormat("hdfs-" + getName() + "-roll-timer-%d").build());
        startCheckHDFS();
        this.sfWriters = new WriterLinkedHashMap(this.maxOpenFiles);
        this.sinkCounter.setMonTime(this.monTime);
        this.sinkCounter.start();
        this.sendAlarmMgr.sendSinkNormalAlarm(getName(), 39007);
        this.isSendAlarm = false;
        super.start();
    }

    public String toString() {
        return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() + " }";
    }

    @VisibleForTesting
    void setBucketClock(Clock clock) {
        BucketPath.setClock(clock);
    }

    @VisibleForTesting
    void setMockFs(FileSystem fileSystem) {
        this.mockFs = fileSystem;
    }

    @VisibleForTesting
    void setMockWriter(HDFSWriter hDFSWriter) {
        this.mockWriter = hDFSWriter;
    }

    @VisibleForTesting
    int getTryCount() {
        return this.tryCount;
    }

    public boolean isFirstEvent(Event event) throws IOException {
        String str = (String) event.getHeaders().get(EVENTPOSTION);
        return "F".equals(str) || "FE".equals(str);
    }

    public boolean isLastEvent(Event event) throws IOException {
        String str = (String) event.getHeaders().get(EVENTPOSTION);
        return "E".equals(str) || "FE".equals(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean checkHDFSPathState() {
        try {
            return ((Boolean) this.callTimeoutPool.submit(new Callable<Boolean>() { // from class: org.apache.flume.sink.hdfs.HDFSEventSink.3
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Boolean call() throws Exception {
                    Configuration configuration = new Configuration();
                    Path path = new Path(BucketPath.escapeString(HDFSEventSink.this.filePath, new HashMap(), HDFSEventSink.this.timeZone, HDFSEventSink.this.needRounding, HDFSEventSink.this.roundUnit, HDFSEventSink.this.roundValue, HDFSEventSink.this.useLocalTime));
                    try {
                        FileSystem fileSystem = path.getFileSystem(configuration);
                        if (fileSystem == null) {
                            return new Boolean(false);
                        }
                        if (fileSystem.exists(path) || fileSystem.mkdirs(path)) {
                            return new Boolean(true);
                        }
                        HDFSEventSink.LOG.error("do mkdir {} errors.", HDFSEventSink.this.filePath);
                        return new Boolean(false);
                    } catch (Exception e) {
                        HDFSEventSink.LOG.error("execute hdfs error. {}", e);
                        return false;
                    }
                }
            }).get(this.callTimeout, TimeUnit.MILLISECONDS)).booleanValue();
        } catch (Exception e) {
            LOG.error("get the result form future error. {}", e);
            return false;
        }
    }

    private void startCheckHDFS() {
        if (null == this.checkHDFSStateExecutor) {
            this.checkHDFSStateExecutor = new ScheduledThreadPoolExecutor(defaultRollTimerPoolSize);
        }
        this.checkHDFSStateExecutor.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.flume.sink.hdfs.HDFSEventSink.4
            @Override // java.lang.Runnable
            public void run() {
                boolean z;
                try {
                    z = HDFSEventSink.this.checkHDFSPathState();
                } catch (Throwable th) {
                    HDFSEventSink.LOG.error("get hdfs state exception:", th);
                    z = false;
                }
                synchronized (HDFSEventSink.this.hdfsStatusLock) {
                    HDFSEventSink.this.isHDFSOK = z;
                }
            }
        }, defaultRollCount, defaultRollCount, TimeUnit.SECONDS);
    }

    private void stopCheckHDFS() {
        LOG.info("stopping check hdfs state");
        if (null != this.checkHDFSStateExecutor) {
            try {
                this.checkHDFSStateExecutor.shutdownNow();
                while (!this.checkHDFSStateExecutor.isTerminated()) {
                    this.checkHDFSStateExecutor.awaitTermination(defaultRollCount, TimeUnit.SECONDS);
                }
            } catch (Throwable th) {
                LOG.error("stop check hdfs state occur exception:", th);
            }
        }
        LOG.info("stop check hdfs state success");
    }

    private boolean getHDFSSatatus() {
        boolean z;
        synchronized (this.hdfsStatusLock) {
            z = this.isHDFSOK;
        }
        return z;
    }

    public long getBatchSize() {
        return this.batchSize;
    }
}
