package org.apache.iceberg.spark.source;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MicroBatches;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/spark/source/SparkMicroBatchStream.class */
public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissionControl {
    private static final Joiner SLASH = Joiner.on("/");
    private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class);
    private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = Types.StructType.of(new Types.NestedField[0]);
    private final Table table;
    private final String branch;
    private final boolean caseSensitive;
    private final String expectedSchema;
    private final Broadcast<Table> tableBroadcast;
    private final long splitSize;
    private final int splitLookback;
    private final long splitOpenFileCost;
    private final boolean localityPreferred;
    private final StreamingOffset initialOffset;
    private final boolean skipDelete;
    private final boolean skipOverwrite;
    private final long fromTimestamp;
    private final int maxFilesPerMicroBatch;
    private final int maxRecordsPerMicroBatch;

    /* loaded from: input_file:org/apache/iceberg/spark/source/SparkMicroBatchStream$InitialOffsetStore.class */
    private static class InitialOffsetStore {
        private final Table table;
        private final FileIO io;
        private final String initialOffsetLocation;
        private final Long fromTimestamp;

        InitialOffsetStore(Table table, String str, Long l) {
            this.table = table;
            this.io = table.io();
            this.initialOffsetLocation = SparkMicroBatchStream.SLASH.join(str, "offsets/0", new Object[0]);
            this.fromTimestamp = l;
        }

        public StreamingOffset initialOffset() {
            InputFile newInputFile = this.io.newInputFile(this.initialOffsetLocation);
            if (newInputFile.exists()) {
                return readOffset(newInputFile);
            }
            this.table.refresh();
            StreamingOffset determineStartingOffset = SparkMicroBatchStream.determineStartingOffset(this.table, this.fromTimestamp);
            writeOffset(determineStartingOffset, this.io.newOutputFile(this.initialOffsetLocation));
            return determineStartingOffset;
        }

        private void writeOffset(StreamingOffset streamingOffset, OutputFile outputFile) {
            try {
                PositionOutputStream create = outputFile.create();
                try {
                    BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(create, StandardCharsets.UTF_8));
                    bufferedWriter.write(streamingOffset.json());
                    bufferedWriter.flush();
                    if (create != null) {
                        create.close();
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new UncheckedIOException(String.format("Failed writing offset to: %s", this.initialOffsetLocation), e);
            }
        }

        private StreamingOffset readOffset(InputFile inputFile) {
            try {
                SeekableInputStream newStream = inputFile.newStream();
                try {
                    StreamingOffset fromJson = StreamingOffset.fromJson(newStream);
                    if (newStream != null) {
                        newStream.close();
                    }
                    return fromJson;
                } finally {
                }
            } catch (IOException e) {
                throw new UncheckedIOException(String.format("Failed reading offset from: %s", this.initialOffsetLocation), e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SparkMicroBatchStream(JavaSparkContext javaSparkContext, Table table, SparkReadConf sparkReadConf, Schema schema, String str) {
        this.table = table;
        this.branch = sparkReadConf.branch();
        this.caseSensitive = sparkReadConf.caseSensitive();
        this.expectedSchema = SchemaParser.toJson(schema);
        this.localityPreferred = sparkReadConf.localityEnabled();
        this.tableBroadcast = javaSparkContext.broadcast(SerializableTableWithSize.copyOf(table));
        this.splitSize = sparkReadConf.splitSize();
        this.splitLookback = sparkReadConf.splitLookback();
        this.splitOpenFileCost = sparkReadConf.splitOpenFileCost();
        this.fromTimestamp = sparkReadConf.streamFromTimestamp();
        this.maxFilesPerMicroBatch = sparkReadConf.maxFilesPerMicroBatch();
        this.maxRecordsPerMicroBatch = sparkReadConf.maxRecordsPerMicroBatch();
        this.initialOffset = new InitialOffsetStore(table, str, Long.valueOf(this.fromTimestamp)).initialOffset();
        this.skipDelete = sparkReadConf.streamingSkipDeleteSnapshots();
        this.skipOverwrite = sparkReadConf.streamingSkipOverwriteSnapshots();
    }

    public Offset latestOffset() {
        this.table.refresh();
        if (this.table.currentSnapshot() != null && this.table.currentSnapshot().timestampMillis() >= this.fromTimestamp) {
            Snapshot currentSnapshot = this.table.currentSnapshot();
            return new StreamingOffset(currentSnapshot.snapshotId(), addedFilesCount(currentSnapshot), false);
        }
        return StreamingOffset.START_OFFSET;
    }

    public InputPartition[] planInputPartitions(Offset offset, Offset offset2) {
        Preconditions.checkArgument(offset2 instanceof StreamingOffset, "Invalid end offset: %s is not a StreamingOffset", offset2);
        Preconditions.checkArgument(offset instanceof StreamingOffset, "Invalid start offset: %s is not a StreamingOffset", offset);
        if (offset2.equals(StreamingOffset.START_OFFSET)) {
            return new InputPartition[0];
        }
        ArrayList newArrayList = Lists.newArrayList(TableScanUtil.planTasks(TableScanUtil.splitFiles(CloseableIterable.withNoopClose((Iterable) planFiles((StreamingOffset) offset, (StreamingOffset) offset2)), this.splitSize), this.splitSize, this.splitLookback, this.splitOpenFileCost));
        InputPartition[] inputPartitionArr = new InputPartition[newArrayList.size()];
        Tasks.range(inputPartitionArr.length).stopOnFailure().executeWith(this.localityPreferred ? ThreadPools.getWorkerPool() : null).run(num -> {
            inputPartitionArr[num.intValue()] = new SparkInputPartition(EMPTY_GROUPING_KEY_TYPE, (ScanTaskGroup) newArrayList.get(num.intValue()), this.tableBroadcast, this.branch, this.expectedSchema, this.caseSensitive, this.localityPreferred);
        });
        return inputPartitionArr;
    }

    public PartitionReaderFactory createReaderFactory() {
        return new SparkRowReaderFactory();
    }

    public Offset initialOffset() {
        return this.initialOffset;
    }

    public Offset deserializeOffset(String str) {
        return StreamingOffset.fromJson(str);
    }

    public void commit(Offset offset) {
    }

    public void stop() {
    }

    private List<FileScanTask> planFiles(StreamingOffset streamingOffset, StreamingOffset streamingOffset2) {
        ArrayList newArrayList = Lists.newArrayList();
        StreamingOffset determineStartingOffset = StreamingOffset.START_OFFSET.equals(streamingOffset) ? determineStartingOffset(this.table, Long.valueOf(this.fromTimestamp)) : streamingOffset;
        StreamingOffset streamingOffset3 = null;
        do {
            if (streamingOffset3 == null) {
                streamingOffset3 = determineStartingOffset;
            } else {
                streamingOffset3 = streamingOffset3.snapshotId() != streamingOffset2.snapshotId() ? new StreamingOffset(SnapshotUtil.snapshotAfter(this.table, streamingOffset3.snapshotId()).snapshotId(), 0L, false) : streamingOffset2;
            }
            Snapshot snapshot = this.table.snapshot(streamingOffset3.snapshotId());
            validateCurrentSnapshotExists(snapshot, streamingOffset3);
            if (shouldProcess(snapshot)) {
                Snapshot snapshot2 = this.table.snapshot(streamingOffset3.snapshotId());
                newArrayList.addAll(MicroBatches.from(snapshot2, this.table.io()).caseSensitive(this.caseSensitive).specsById(this.table.specs()).generate(streamingOffset3.position(), streamingOffset3.snapshotId() == streamingOffset2.snapshotId() ? streamingOffset2.position() : addedFilesCount(snapshot2), TableProperties.MAX_REF_AGE_MS_DEFAULT, streamingOffset3.shouldScanAllFiles()).tasks());
            } else {
                LOG.debug("Skipping snapshot: {} of table {}", Long.valueOf(streamingOffset3.snapshotId()), this.table.name());
            }
        } while (streamingOffset3.snapshotId() != streamingOffset2.snapshotId());
        return newArrayList;
    }

    private boolean shouldProcess(Snapshot snapshot) {
        String operation = snapshot.operation();
        boolean z = -1;
        switch (operation.hashCode()) {
            case -1411068134:
                if (operation.equals(DataOperations.APPEND)) {
                    z = false;
                    break;
                }
                break;
            case -1335458389:
                if (operation.equals(DataOperations.DELETE)) {
                    z = 2;
                    break;
                }
                break;
            case -745078901:
                if (operation.equals(DataOperations.OVERWRITE)) {
                    z = 3;
                    break;
                }
                break;
            case 1094496948:
                if (operation.equals(DataOperations.REPLACE)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return true;
            case true:
                return false;
            case true:
                Preconditions.checkState(this.skipDelete, "Cannot process delete snapshot: %s, to ignore deletes, set %s=true", snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
                return false;
            case true:
                Preconditions.checkState(this.skipOverwrite, "Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=true", snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS);
                return false;
            default:
                throw new IllegalStateException(String.format("Cannot process unknown snapshot operation: %s (snapshot id %s)", operation.toLowerCase(Locale.ROOT), Long.valueOf(snapshot.snapshotId())));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static StreamingOffset determineStartingOffset(Table table, Long l) {
        if (table.currentSnapshot() == null) {
            return StreamingOffset.START_OFFSET;
        }
        if (l == null) {
            return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0L, false);
        }
        if (table.currentSnapshot().timestampMillis() < l.longValue()) {
            return StreamingOffset.START_OFFSET;
        }
        try {
            Snapshot oldestAncestorAfter = SnapshotUtil.oldestAncestorAfter(table, l.longValue());
            return oldestAncestorAfter != null ? new StreamingOffset(oldestAncestorAfter.snapshotId(), 0L, false) : StreamingOffset.START_OFFSET;
        } catch (IllegalStateException e) {
            return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0L, false);
        }
    }

    public Offset latestOffset(Offset offset, ReadLimit readLimit) {
        Preconditions.checkArgument(offset instanceof StreamingOffset, "Invalid start offset: %s is not a StreamingOffset", offset);
        this.table.refresh();
        if (this.table.currentSnapshot() != null && this.table.currentSnapshot().timestampMillis() >= this.fromTimestamp) {
            StreamingOffset streamingOffset = (StreamingOffset) offset;
            if (offset.equals(StreamingOffset.START_OFFSET)) {
                streamingOffset = determineStartingOffset(this.table, Long.valueOf(this.fromTimestamp));
            }
            Snapshot snapshot = this.table.snapshot(streamingOffset.snapshotId());
            validateCurrentSnapshotExists(snapshot, streamingOffset);
            int position = (int) streamingOffset.position();
            boolean shouldScanAllFiles = streamingOffset.shouldScanAllFiles();
            boolean z = true;
            int i = 0;
            int i2 = 0;
            int i3 = 0;
            loop0: while (z) {
                List<Pair<ManifestFile, Integer>> skippedManifestIndexesFromSnapshot = MicroBatches.skippedManifestIndexesFromSnapshot(this.table.io(), snapshot, position, shouldScanAllFiles);
                for (int i4 = 0; i4 < skippedManifestIndexesFromSnapshot.size() && z; i4++) {
                    i3 = skippedManifestIndexesFromSnapshot.get(i4).second().intValue();
                    try {
                        CloseableIterable<FileScanTask> openManifestFile = MicroBatches.openManifestFile(this.table.io(), this.table.specs(), this.caseSensitive, snapshot, skippedManifestIndexesFromSnapshot.get(i4).first(), shouldScanAllFiles);
                        try {
                            CloseableIterator<FileScanTask> it = openManifestFile.iterator();
                            while (it.hasNext()) {
                                try {
                                    FileScanTask next = it.next();
                                    if (i3 >= position) {
                                        if (i + 1 > this.maxFilesPerMicroBatch || i2 + next.file().recordCount() > this.maxRecordsPerMicroBatch) {
                                            z = false;
                                            break;
                                        }
                                        i++;
                                        i2 = (int) (i2 + next.file().recordCount());
                                    }
                                    i3++;
                                } catch (Throwable th) {
                                    if (it != null) {
                                        try {
                                            it.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    }
                                    throw th;
                                    break loop0;
                                }
                            }
                            if (it != null) {
                                it.close();
                            }
                            if (openManifestFile != null) {
                                openManifestFile.close();
                            }
                        } catch (Throwable th3) {
                            if (openManifestFile != null) {
                                try {
                                    openManifestFile.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            }
                            throw th3;
                            break loop0;
                        }
                    } catch (IOException e) {
                        LOG.warn("Failed to close task iterable", e);
                    }
                }
                if (snapshot.snapshotId() == this.table.currentSnapshot().snapshotId()) {
                    break;
                }
                if (z) {
                    position = -1;
                    snapshot = SnapshotUtil.snapshotAfter(this.table, snapshot.snapshotId());
                    shouldScanAllFiles = false;
                }
            }
            StreamingOffset streamingOffset2 = new StreamingOffset(snapshot.snapshotId(), i3, shouldScanAllFiles);
            if (streamingOffset2.equals(streamingOffset)) {
                return null;
            }
            return streamingOffset2;
        }
        return StreamingOffset.START_OFFSET;
    }

    private long addedFilesCount(Snapshot snapshot) {
        long propertyAsLong = PropertyUtil.propertyAsLong(snapshot.summary(), "added-data-files", -1L);
        return propertyAsLong == -1 ? Iterables.size(snapshot.addedDataFiles(this.table.io())) : propertyAsLong;
    }

    private void validateCurrentSnapshotExists(Snapshot snapshot, StreamingOffset streamingOffset) {
        if (snapshot == null) {
            throw new IllegalStateException(String.format("Cannot load current offset at snapshot %d, the snapshot was expired or removed", Long.valueOf(streamingOffset.snapshotId())));
        }
    }

    public ReadLimit getDefaultReadLimit() {
        return (this.maxFilesPerMicroBatch == Integer.MAX_VALUE || this.maxRecordsPerMicroBatch == Integer.MAX_VALUE) ? this.maxFilesPerMicroBatch != Integer.MAX_VALUE ? ReadLimit.maxFiles(this.maxFilesPerMicroBatch) : this.maxRecordsPerMicroBatch != Integer.MAX_VALUE ? ReadLimit.maxRows(this.maxRecordsPerMicroBatch) : ReadLimit.allAvailable() : ReadLimit.compositeLimit(new ReadLimit[]{ReadLimit.maxFiles(this.maxFilesPerMicroBatch), ReadLimit.maxRows(this.maxFilesPerMicroBatch)});
    }
}
