package org.apache.hudi.util;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieHeartbeatException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/util/ClientIds.class */
public class ClientIds implements AutoCloseable, Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(ClientIds.class);
    private static final String HEARTBEAT_FOLDER_NAME = ".ids";
    private static final String HEARTBEAT_FILE_NAME_PREFIX = "_";
    public static final String INIT_CLIENT_ID = "";
    public static final long DEFAULT_HEARTBEAT_INTERVAL_IN_MS = 60000;
    public static final int DEFAULT_NUM_TOLERABLE_HEARTBEAT_MISSES = 5;
    private final transient FileSystem fs;
    private final Path heartbeatFilePath;
    private final long heartbeatIntervalInMs;
    private final long heartbeatTimeoutThresholdInMs;
    private ScheduledExecutorService executor;
    private boolean started;

    /* loaded from: input_file:org/apache/hudi/util/ClientIds$Builder.class */
    public static class Builder {
        private FileSystem fs;
        private String basePath;
        private String clientId = "";
        private long heartbeatIntervalInMs = ClientIds.DEFAULT_HEARTBEAT_INTERVAL_IN_MS;
        private int numTolerableHeartbeatMisses = 5;

        public Builder fs(FileSystem fileSystem) {
            this.fs = fileSystem;
            return this;
        }

        public Builder basePath(String str) {
            this.basePath = str;
            return this;
        }

        public Builder clientId(String str) {
            this.clientId = str;
            return this;
        }

        public Builder conf(Configuration configuration) {
            this.basePath = configuration.getString(FlinkOptions.PATH);
            this.fs = HadoopFSUtils.getFs(this.basePath, HadoopConfigurations.getHadoopConf(configuration));
            this.clientId = configuration.getString(FlinkOptions.WRITE_CLIENT_ID);
            return this;
        }

        public Builder heartbeatIntervalInMs(long j) {
            this.heartbeatIntervalInMs = j;
            return this;
        }

        public Builder numTolerableHeartbeatMisses(int i) {
            this.numTolerableHeartbeatMisses = i;
            return this;
        }

        public ClientIds build() {
            return new ClientIds((FileSystem) Objects.requireNonNull(this.fs), (String) Objects.requireNonNull(this.basePath), this.clientId, this.heartbeatIntervalInMs, this.numTolerableHeartbeatMisses);
        }
    }

    private ClientIds(FileSystem fileSystem, String str, String str2, long j, int i) {
        this.fs = fileSystem;
        this.heartbeatFilePath = getHeartbeatFilePath(str, str2);
        this.heartbeatIntervalInMs = j;
        this.heartbeatTimeoutThresholdInMs = i * j;
    }

    public void start() {
        if (this.started) {
            LOG.info("The service heartbeat client is already started, skips the action");
        }
        updateHeartbeat();
        this.executor = Executors.newScheduledThreadPool(1);
        this.executor.scheduleAtFixedRate(this::updateHeartbeat, this.heartbeatIntervalInMs, this.heartbeatIntervalInMs, TimeUnit.MILLISECONDS);
        this.started = true;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.executor != null) {
            this.executor.shutdownNow();
            this.executor = null;
        }
        this.started = false;
    }

    public static boolean isHeartbeatExpired(FileSystem fileSystem, Path path, long j) {
        try {
            if (fileSystem.exists(path)) {
                return System.currentTimeMillis() - fileSystem.getFileStatus(path).getModificationTime() > j;
            }
            return false;
        } catch (IOException e) {
            LOG.error("Check heartbeat file existence error: " + path);
            return false;
        }
    }

    private String getHeartbeatFolderPath(String str) {
        return str + "/.hoodie/.aux/" + HEARTBEAT_FOLDER_NAME;
    }

    private Path getHeartbeatFilePath(String str, String str2) {
        return new Path(getHeartbeatFolderPath(str), StringUtils.isNullOrEmpty(str2) ? HEARTBEAT_FILE_NAME_PREFIX : HEARTBEAT_FILE_NAME_PREFIX + str2);
    }

    private void updateHeartbeat() throws HoodieHeartbeatException {
        updateHeartbeat(this.heartbeatFilePath);
    }

    private void updateHeartbeat(Path path) throws HoodieHeartbeatException {
        try {
            FSDataOutputStream create = this.fs.create(path, true);
            if (create != null) {
                create.close();
            }
        } catch (IOException e) {
            throw new HoodieHeartbeatException("Unable to generate heartbeat for file path " + path, e);
        }
    }

    @VisibleForTesting
    public String nextId(Configuration configuration) {
        String string = configuration.getString(FlinkOptions.PATH);
        String nextId = nextId(configuration, string);
        updateHeartbeat(getHeartbeatFilePath(string, nextId));
        return nextId;
    }

    private String nextId(Configuration configuration, String str) {
        Path path = new Path(getHeartbeatFolderPath(str));
        FileSystem fs = HadoopFSUtils.getFs(path, HadoopConfigurations.getHadoopConf(configuration));
        try {
            if (!fs.exists(path)) {
                return "";
            }
            List list = (List) Arrays.stream(fs.listStatus(path)).map((v0) -> {
                return v0.getPath();
            }).sorted(Comparator.comparing((v0) -> {
                return v0.getName();
            })).collect(Collectors.toList());
            if (list.isEmpty()) {
                return "";
            }
            List<Path> list2 = (List) list.stream().filter(path2 -> {
                return isHeartbeatExpired(fs, path2, this.heartbeatTimeoutThresholdInMs);
            }).collect(Collectors.toList());
            if (list2.isEmpty()) {
                String clientId = getClientId((Path) list.get(list.size() - 1));
                return "".equals(clientId) ? "1" : (Integer.parseInt(clientId) + 1) + "";
            }
            for (Path path3 : list2) {
                fs.delete(path3, true);
                LOG.warn("Delete inactive ckp metadata path: " + path3);
            }
            return getClientId((Path) list2.get(0));
        } catch (IOException e) {
            throw new RuntimeException("Generate next client id error", e);
        }
    }

    private static String getClientId(Path path) {
        String[] split = path.getName().split(HEARTBEAT_FILE_NAME_PREFIX);
        return split.length > 1 ? split[1] : "";
    }
}
