package io.prestosql.snapshot;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.log.Logger;
import io.prestosql.execution.Lifespan;
import io.prestosql.metadata.Split;
import io.prestosql.spi.connector.CatalogName;
import io.prestosql.spi.connector.ConnectorPartitionHandle;
import io.prestosql.split.SplitSource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;

/* loaded from: input_file:io/prestosql/snapshot/MarkerSplitSource.class */
public class MarkerSplitSource implements SplitSource {
    private static final Logger LOG = Logger.get(MarkerSplitSource.class);
    private final SplitSource source;
    private final MarkerAnnouncer announcer;
    private boolean sourceExhausted;
    private int bufferPosition;
    private boolean sentFinalMarker;
    private OptionalLong resumingSnapshotId = OptionalLong.empty();
    private final List<Split> splitBuffer = new ArrayList();
    private final Map<Long, Integer> snapshotBufferPositions = new HashMap();
    private OptionalLong firstSnapshot = OptionalLong.empty();
    private final Set<MarkerSplitSource> allDependencies = new HashSet();
    private final Set<MarkerSplitSource> remainingDependencies = new HashSet();

    public MarkerSplitSource(SplitSource splitSource, MarkerAnnouncer markerAnnouncer) {
        this.source = splitSource;
        this.announcer = markerAnnouncer;
        this.snapshotBufferPositions.put(0L, 0);
    }

    @Override // io.prestosql.split.SplitSource
    public CatalogName getCatalogName() {
        return this.source.getCatalogName();
    }

    public void addDependency(MarkerSplitSource markerSplitSource) {
        this.allDependencies.add(markerSplitSource);
        this.remainingDependencies.add(markerSplitSource);
    }

    public void finishDependency(MarkerSplitSource markerSplitSource) {
        this.remainingDependencies.remove(markerSplitSource);
    }

    @Override // io.prestosql.split.SplitSource
    public ListenableFuture<SplitSource.SplitBatch> getNextBatch(ConnectorPartitionHandle connectorPartitionHandle, Lifespan lifespan, int i) {
        Preconditions.checkArgument(i > 0, "Cannot fetch a batch of zero size");
        if (!this.resumingSnapshotId.isPresent()) {
            if (this.sourceExhausted && this.bufferPosition == this.splitBuffer.size()) {
                return Futures.immediateFuture(recordSnapshot(lifespan, this.announcer.forceGenerateMarker(this), true));
            }
            OptionalLong shouldGenerateMarker = this.announcer.shouldGenerateMarker(this);
            return shouldGenerateMarker.isPresent() ? Futures.immediateFuture(recordSnapshot(lifespan, shouldGenerateMarker.getAsLong(), false)) : !this.remainingDependencies.isEmpty() ? Futures.immediateFuture(new SplitSource.SplitBatch(Collections.emptyList(), false)) : Futures.transform(prepareNextBatch(connectorPartitionHandle, lifespan, i), splitBatch -> {
                if (splitBatch != null) {
                    List<Split> splits = splitBatch.getSplits();
                    incrementSplitCount(splits.size());
                    if (splitBatch.isLastBatch()) {
                        splitBatch = splits.size() == 0 ? recordSnapshot(lifespan, this.announcer.forceGenerateMarker(this), true) : new SplitSource.SplitBatch(splits, false);
                    }
                }
                return splitBatch;
            }, MoreExecutors.directExecutor());
        }
        long asLong = this.resumingSnapshotId.getAsLong();
        this.resumingSnapshotId = OptionalLong.empty();
        List singletonList = Collections.singletonList(new Split(getCatalogName(), MarkerSplit.resumeSplit(getCatalogName(), asLong), lifespan));
        boolean z = this.sourceExhausted && this.bufferPosition == this.splitBuffer.size();
        if (z) {
            this.sentFinalMarker = true;
        }
        LOG.debug("Sending out resuming marker %d after %d splits for source: %s (%s)", new Object[]{Long.valueOf(asLong), Integer.valueOf(this.bufferPosition), getCatalogName(), toString()});
        return Futures.immediateFuture(new SplitSource.SplitBatch(singletonList, z));
    }

    private ListenableFuture<SplitSource.SplitBatch> prepareNextBatch(ConnectorPartitionHandle connectorPartitionHandle, Lifespan lifespan, int i) {
        int size = this.splitBuffer.size() - this.bufferPosition;
        if (size >= i || this.sourceExhausted) {
            int min = Math.min(size, i);
            List<Split> subList = this.splitBuffer.subList(this.bufferPosition, this.bufferPosition + min);
            this.bufferPosition += min;
            return Futures.immediateFuture(new SplitSource.SplitBatch(subList, this.sourceExhausted && min == size));
        }
        ArrayList arrayList = null;
        if (size > 0) {
            arrayList = new ArrayList(this.splitBuffer.subList(this.bufferPosition, this.bufferPosition + size));
            this.bufferPosition += size;
            i -= size;
        }
        ArrayList arrayList2 = arrayList;
        return Futures.transform(this.source.getNextBatch(connectorPartitionHandle, lifespan, i), splitBatch -> {
            if (splitBatch != null) {
                List<Split> splits = splitBatch.getSplits();
                this.splitBuffer.addAll(splits);
                this.bufferPosition += splits.size();
                Preconditions.checkState(this.bufferPosition == this.splitBuffer.size());
                if (splitBatch.isLastBatch()) {
                    this.sourceExhausted = true;
                }
                if (arrayList2 != null) {
                    arrayList2.addAll(splits);
                    splitBatch = new SplitSource.SplitBatch(arrayList2, splitBatch.isLastBatch());
                }
            } else if (arrayList2 != null) {
                splitBatch = new SplitSource.SplitBatch(arrayList2, false);
            }
            return splitBatch;
        }, MoreExecutors.directExecutor());
    }

    private SplitSource.SplitBatch recordSnapshot(Lifespan lifespan, long j, boolean z) {
        if (!this.firstSnapshot.isPresent()) {
            this.firstSnapshot = OptionalLong.of(j);
        }
        this.snapshotBufferPositions.put(Long.valueOf(j), Integer.valueOf(this.bufferPosition));
        LOG.debug("Generating snapshot %d after %d splits for source: %s (%s)", new Object[]{Long.valueOf(j), Integer.valueOf(this.bufferPosition), this.source.getCatalogName(), this.source.toString()});
        SplitSource.SplitBatch splitBatch = new SplitSource.SplitBatch(Collections.singletonList(new Split(getCatalogName(), MarkerSplit.snapshotSplit(getCatalogName(), j), lifespan)), z);
        if (z) {
            this.sentFinalMarker = true;
            deactivate();
        }
        return splitBatch;
    }

    public void resumeSnapshot(long j) {
        Integer num = this.snapshotBufferPositions.get(Long.valueOf(j));
        if (num != null) {
            this.bufferPosition = num.intValue();
        } else if (!this.firstSnapshot.isPresent()) {
            this.bufferPosition = 0;
        } else if (j < this.firstSnapshot.getAsLong()) {
            this.bufferPosition = 0;
            this.firstSnapshot = OptionalLong.empty();
        } else {
            Preconditions.checkState(this.sourceExhausted && this.bufferPosition == this.splitBuffer.size());
        }
        if (j != 0) {
            this.resumingSnapshotId = OptionalLong.of(j);
        }
        this.sentFinalMarker = false;
        this.remainingDependencies.clear();
        this.remainingDependencies.addAll(this.allDependencies);
    }

    private void incrementSplitCount(int i) {
        this.announcer.incrementSplitCount(i);
    }

    private void deactivate() {
        this.announcer.deactivateSplitSource(this);
    }

    @Override // io.prestosql.split.SplitSource, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.announcer.deactivateSplitSource(this);
        this.source.close();
    }

    @Override // io.prestosql.split.SplitSource
    public boolean isFinished() {
        return this.sentFinalMarker;
    }
}
