package org.apache.flink.connector.file.sink.compactor.operator;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.class */
public class CompactCoordinator extends AbstractStreamOperator<CompactorRequest> implements OneInputStreamOperator<CommittableMessage<FileSinkCommittable>, CompactorRequest>, BoundedOneInput {
    static final ListStateDescriptor<byte[]> REMAINING_COMMITTABLE_RAW_STATES_DESC = new ListStateDescriptor<>("remaining_compact_commit_raw_state", BytePrimitiveArraySerializer.INSTANCE);
    private final FileCompactStrategy strategy;
    private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer;
    private final Map<String, CompactorRequest> packingRequests = new HashMap();
    private final Map<String, CompactTrigger> triggers = new HashMap();
    private ListState<FileSinkCommittable> remainingCommittableState;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator$CompactTrigger.class */
    public static class CompactTrigger {
        private final long threshold;
        private final int numCheckpointsBeforeCompaction;
        private long size;
        private long triggeredCpId = -1;

        /* JADX INFO: Access modifiers changed from: package-private */
        public CompactTrigger(FileCompactStrategy fileCompactStrategy) {
            this.threshold = fileCompactStrategy.getSizeThreshold();
            this.numCheckpointsBeforeCompaction = fileCompactStrategy.getNumCheckpointsBeforeCompaction();
        }

        public CompactTriggerResult onElement(FileSinkCommittable fileSinkCommittable) {
            InProgressFileWriter.PendingFileRecoverable pendingFile = fileSinkCommittable.getPendingFile();
            if (pendingFile == null) {
                return CompactTriggerResult.PASS_THROUGH;
            }
            if (pendingFile.getPath() == null || !pendingFile.getPath().getName().startsWith(".")) {
                return CompactTriggerResult.PASS_THROUGH;
            }
            long size = pendingFile.getSize();
            if (size < 0) {
                return CompactTriggerResult.PASS_THROUGH;
            }
            if (this.threshold < 0) {
                return CompactTriggerResult.CONTINUE;
            }
            this.size += size;
            return this.size >= this.threshold ? CompactTriggerResult.FIRE_AND_PURGE : CompactTriggerResult.CONTINUE;
        }

        public CompactTriggerResult onCheckpoint(long j) {
            if (this.numCheckpointsBeforeCompaction < 0) {
                return CompactTriggerResult.CONTINUE;
            }
            if (this.triggeredCpId < 0) {
                this.triggeredCpId = j - 1;
            }
            return j - this.triggeredCpId >= ((long) this.numCheckpointsBeforeCompaction) ? CompactTriggerResult.FIRE_AND_PURGE : CompactTriggerResult.CONTINUE;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator$CompactTriggerResult.class */
    public enum CompactTriggerResult {
        CONTINUE,
        FIRE_AND_PURGE,
        PASS_THROUGH
    }

    public CompactCoordinator(FileCompactStrategy fileCompactStrategy, SimpleVersionedSerializer<FileSinkCommittable> simpleVersionedSerializer) {
        this.strategy = fileCompactStrategy;
        this.committableSerializer = (SimpleVersionedSerializer) Preconditions.checkNotNull(simpleVersionedSerializer);
    }

    public void processElement(StreamRecord<CommittableMessage<FileSinkCommittable>> streamRecord) throws Exception {
        if (((CommittableMessage) streamRecord.getValue()) instanceof CommittableWithLineage) {
            FileSinkCommittable fileSinkCommittable = (FileSinkCommittable) ((CommittableWithLineage) streamRecord.getValue()).getCommittable();
            if (packAndTrigger(fileSinkCommittable)) {
                fireAndPurge(fileSinkCommittable.getBucketId());
            }
        }
    }

    private boolean packAndTrigger(FileSinkCommittable fileSinkCommittable) {
        String bucketId = fileSinkCommittable.getBucketId();
        CompactorRequest computeIfAbsent = this.packingRequests.computeIfAbsent(bucketId, CompactorRequest::new);
        if (fileSinkCommittable.hasInProgressFileToCleanup() || fileSinkCommittable.hasCompactedFileToCleanup()) {
            Preconditions.checkState(!fileSinkCommittable.hasPendingFile());
            computeIfAbsent.addToPassthrough(fileSinkCommittable);
            return false;
        }
        if (!fileSinkCommittable.hasPendingFile()) {
            throw new RuntimeException("Committable to compact has no content.");
        }
        CompactTriggerResult onElement = this.triggers.computeIfAbsent(bucketId, str -> {
            return new CompactTrigger(this.strategy);
        }).onElement(fileSinkCommittable);
        switch (onElement) {
            case PASS_THROUGH:
                computeIfAbsent.addToPassthrough(fileSinkCommittable);
                return false;
            case CONTINUE:
                computeIfAbsent.addToCompact(fileSinkCommittable);
                return false;
            case FIRE_AND_PURGE:
                computeIfAbsent.addToCompact(fileSinkCommittable);
                return true;
            default:
                throw new RuntimeException("Unexpected trigger result:" + onElement);
        }
    }

    private void fireAndPurge(String str) {
        this.triggers.remove(str);
        CompactorRequest remove = this.packingRequests.remove(str);
        if (remove != null) {
            this.output.collect(new StreamRecord(remove));
        }
    }

    public void endInput() throws Exception {
        Iterator<CompactorRequest> it = this.packingRequests.values().iterator();
        while (it.hasNext()) {
            this.output.collect(new StreamRecord(it.next()));
        }
        this.packingRequests.clear();
        this.triggers.clear();
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        super.prepareSnapshotPreBarrier(j);
        ArrayList arrayList = new ArrayList(this.triggers.size());
        for (Map.Entry<String, CompactTrigger> entry : this.triggers.entrySet()) {
            String key = entry.getKey();
            if (entry.getValue().onCheckpoint(j) == CompactTriggerResult.FIRE_AND_PURGE) {
                arrayList.add(key);
            }
        }
        arrayList.forEach(this::fireAndPurge);
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        List list = (List) this.packingRequests.values().stream().flatMap(compactorRequest -> {
            return compactorRequest.getCommittableToCompact().stream();
        }).collect(Collectors.toList());
        Stream<R> flatMap = this.packingRequests.values().stream().flatMap(compactorRequest2 -> {
            return compactorRequest2.getCommittableToPassthrough().stream();
        });
        list.getClass();
        flatMap.forEach((v1) -> {
            r1.add(v1);
        });
        this.remainingCommittableState.update(list);
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.remainingCommittableState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(REMAINING_COMMITTABLE_RAW_STATES_DESC), this.committableSerializer);
        Iterable<FileSinkCommittable> iterable = (Iterable) this.remainingCommittableState.get();
        if (iterable != null) {
            for (FileSinkCommittable fileSinkCommittable : iterable) {
                if (packAndTrigger(fileSinkCommittable)) {
                    fireAndPurge(fileSinkCommittable.getBucketId());
                }
            }
        }
    }
}
