package io.hetu.core.heuristicindex;

import io.airlift.log.Logger;
import io.hetu.core.filesystem.HetuLocalFileSystemClient;
import io.hetu.core.filesystem.LocalConfig;
import io.hetu.core.heuristicindex.util.IndexConstants;
import io.hetu.core.heuristicindex.util.IndexServiceUtils;
import io.prestosql.spi.connector.CreateIndexMetadata;
import io.prestosql.spi.filesystem.HetuFileSystemClient;
import io.prestosql.spi.heuristicindex.Index;
import io.prestosql.spi.heuristicindex.IndexWriter;
import io.prestosql.spi.heuristicindex.Pair;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/* loaded from: input_file:io/hetu/core/heuristicindex/FileIndexWriter.class */
public class FileIndexWriter implements IndexWriter {
    private static final HetuFileSystemClient LOCAL_FS_CLIENT = new HetuLocalFileSystemClient(new LocalConfig(new Properties()), Paths.get("/", new String[0]));
    private static final Logger LOG = Logger.get(FileIndexWriter.class);
    private final String dataSourceFileName;
    private final String dataSourceFileLastModifiedTime;
    private final Map<Long, Map<String, List<Map.Entry<List<Object>, Integer>>>> indexPages = new ConcurrentHashMap();
    private final Map<Long, AtomicInteger> pageCountExpected = new ConcurrentHashMap();
    private final CreateIndexMetadata createIndexMetadata;
    private final HetuFileSystemClient fs;
    private final Path root;
    private Path tmpPath;

    public FileIndexWriter(CreateIndexMetadata createIndexMetadata, Properties properties, HetuFileSystemClient hetuFileSystemClient, Path path) {
        this.createIndexMetadata = createIndexMetadata;
        this.dataSourceFileName = Paths.get(properties.getProperty("datasource_file_path"), new String[0]).toString();
        this.dataSourceFileLastModifiedTime = properties.getProperty("datasource_file_modification");
        this.fs = (HetuFileSystemClient) Objects.requireNonNull(hetuFileSystemClient);
        this.root = path;
    }

    public void addData(Map<String, List<Object>> map, Properties properties) throws IOException {
        long parseLong = Long.parseLong(properties.getProperty("datasource_stripe_offset"));
        this.indexPages.computeIfAbsent(Long.valueOf(parseLong), l -> {
            return new ConcurrentHashMap();
        });
        for (Map.Entry<String, List<Object>> entry : map.entrySet()) {
            this.indexPages.get(Long.valueOf(parseLong)).computeIfAbsent(entry.getKey(), str -> {
                return Collections.synchronizedList(new LinkedList());
            }).add(new AbstractMap.SimpleEntry(entry.getValue(), Integer.valueOf(Integer.parseInt(properties.getProperty("datasource_page_number")))));
        }
        int decrementAndGet = this.pageCountExpected.computeIfAbsent(Long.valueOf(parseLong), l2 -> {
            return new AtomicInteger();
        }).decrementAndGet();
        if (properties.getProperty("datasource_total_pages") != null) {
            int parseInt = Integer.parseInt(properties.getProperty("datasource_total_pages"));
            LOG.debug("offset %d finishing page received, expected page count: %d, actual received: %d, remaining: %d", new Object[]{Long.valueOf(parseLong), Integer.valueOf(parseInt), Integer.valueOf(-decrementAndGet), Integer.valueOf(this.pageCountExpected.get(Long.valueOf(parseLong)).addAndGet(parseInt))});
        }
        if (this.pageCountExpected.get(Long.valueOf(parseLong)).get() == 0) {
            synchronized (this.pageCountExpected.get(Long.valueOf(parseLong))) {
                if (this.indexPages.containsKey(Long.valueOf(parseLong))) {
                    LOG.debug("All pages for offset %d have been received. Persisting.", new Object[]{Long.valueOf(parseLong)});
                    ArrayList arrayList = new ArrayList();
                    for (Map.Entry<String, List<Map.Entry<List<Object>, Integer>>> entry2 : this.indexPages.get(Long.valueOf(parseLong)).entrySet()) {
                        entry2.getValue().sort(Comparator.comparingInt((v0) -> {
                            return v0.getValue();
                        }));
                        arrayList.add(new Pair<>(entry2.getKey(), (List) entry2.getValue().stream().map((v0) -> {
                            return v0.getKey();
                        }).flatMap((v0) -> {
                            return v0.stream();
                        }).collect(Collectors.toList())));
                    }
                    persistStripe(Long.valueOf(parseLong), arrayList);
                    this.indexPages.remove(Long.valueOf(parseLong));
                } else {
                    LOG.debug("All pages for offset %d have been received, but the values are missing. This stripe should have already been persisted by another thread.", new Object[]{Long.valueOf(parseLong)});
                }
            }
        }
    }

    public void persist() throws IOException {
        for (Long l : this.indexPages.keySet()) {
            LOG.error("Offset %d data is NOT PERSISTED. Current page count: %d. Check debug log.", new Object[]{l, Integer.valueOf(this.pageCountExpected.get(l).get())});
        }
        Path path = Paths.get(this.root.toString(), this.createIndexMetadata.getTableName(), (String) ((Pair) this.createIndexMetadata.getIndexColumns().iterator().next()).getFirst(), this.createIndexMetadata.getIndexType(), this.dataSourceFileName, IndexConstants.LAST_MODIFIED_FILE_PREFIX + this.dataSourceFileLastModifiedTime + ".tar");
        try {
            try {
                IndexServiceUtils.writeToHdfs(LOCAL_FS_CLIENT, this.fs, this.tmpPath, path);
                LOCAL_FS_CLIENT.deleteRecursively(this.tmpPath);
            } catch (IOException e) {
                LOG.debug("Error copying index files to remote filesystem: ", new Object[]{e});
                this.fs.delete(path);
                LOCAL_FS_CLIENT.deleteRecursively(this.tmpPath);
            }
        } catch (Throwable th) {
            LOCAL_FS_CLIENT.deleteRecursively(this.tmpPath);
            throw th;
        }
    }

    private void persistStripe(Long l, List<Pair<String, List<Object>>> list) throws IOException {
        synchronized (this) {
            if (this.tmpPath == null) {
                this.tmpPath = Files.createTempDirectory("tmp-indexwriter-", new FileAttribute[0]);
            }
        }
        int i = 0;
        Iterator<Pair<String, List<Object>>> it = list.iterator();
        while (it.hasNext()) {
            i += ((List) it.next().getSecond()).size();
        }
        Index createIndex = HeuristicIndexFactory.createIndex(this.createIndexMetadata.getIndexType());
        Throwable th = null;
        try {
            createIndex.setProperties(this.createIndexMetadata.getProperties());
            createIndex.setExpectedNumOfEntries(i);
            createIndex.addValues(list);
            OutputStream newOutputStream = LOCAL_FS_CLIENT.newOutputStream(this.tmpPath.resolve(l + "." + createIndex.getId()), new OpenOption[0]);
            Throwable th2 = null;
            try {
                try {
                    createIndex.serialize(newOutputStream);
                    if (newOutputStream != null) {
                        if (0 != 0) {
                            try {
                                newOutputStream.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            newOutputStream.close();
                        }
                    }
                    if (createIndex != null) {
                        if (0 == 0) {
                            createIndex.close();
                            return;
                        }
                        try {
                            createIndex.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (newOutputStream != null) {
                    if (th2 != null) {
                        try {
                            newOutputStream.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        newOutputStream.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createIndex != null) {
                if (0 != 0) {
                    try {
                        createIndex.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createIndex.close();
                }
            }
            throw th8;
        }
    }
}
