package org.apache.flink.connector.file.table.batch.compact;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
import org.apache.flink.connector.file.table.stream.compact.CompactReader;
import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
import org.apache.flink.connector.file.table.utils.CompactFileUtils;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

/* loaded from: input_file:org/apache/flink/connector/file/table/batch/compact/BatchCompactOperator.class */
public class BatchCompactOperator<T> extends AbstractStreamOperator<CompactMessages.CompactOutput> implements OneInputStreamOperator<CompactMessages.CoordinatorOutput, CompactMessages.CompactOutput>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    public static final String UNCOMPACTED_PREFIX = "uncompacted-";
    public static final String COMPACTED_PREFIX = "compacted-";
    public static final String ATTEMPT_PREFIX = "attempt-";
    private final SupplierWithException<FileSystem, IOException> fsFactory;
    private final CompactReader.Factory<T> readerFactory;
    private final CompactWriter.Factory<T> writerFactory;
    private transient FileSystem fileSystem;
    private transient Map<String, List<Path>> compactedFiles;

    public BatchCompactOperator(SupplierWithException<FileSystem, IOException> supplierWithException, CompactReader.Factory<T> factory, CompactWriter.Factory<T> factory2) {
        this.fsFactory = supplierWithException;
        this.readerFactory = factory;
        this.writerFactory = factory2;
    }

    public void open() throws Exception {
        this.fileSystem = (FileSystem) this.fsFactory.get();
        this.compactedFiles = new HashMap();
    }

    public void processElement(StreamRecord<CompactMessages.CoordinatorOutput> streamRecord) throws Exception {
        CompactMessages.CoordinatorOutput coordinatorOutput = (CompactMessages.CoordinatorOutput) streamRecord.getValue();
        if (!(coordinatorOutput instanceof CompactMessages.CompactionUnit)) {
            throw new UnsupportedOperationException("Unsupported input message: " + coordinatorOutput);
        }
        CompactMessages.CompactionUnit compactionUnit = (CompactMessages.CompactionUnit) coordinatorOutput;
        String partition = compactionUnit.getPartition();
        List<Path> paths = compactionUnit.getPaths();
        Configuration configuration = getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
        Path path = null;
        if (paths.size() == 1) {
            path = paths.get(0);
        } else if (paths.size() > 1) {
            path = CompactFileUtils.doCompact(this.fileSystem, partition, paths, createCompactedFile(paths, getRuntimeContext().getAttemptNumber()), configuration, this.readerFactory, this.writerFactory);
        }
        if (path != null) {
            this.compactedFiles.computeIfAbsent(partition, str -> {
                return new ArrayList();
            }).add(path);
        }
    }

    public void endInput() throws Exception {
        this.output.collect(new StreamRecord(new CompactMessages.CompactOutput(this.compactedFiles)));
    }

    public void close() throws Exception {
        this.compactedFiles.clear();
    }

    private static Path createCompactedFile(List<Path> list, int i) {
        Path convertFromUncompacted = convertFromUncompacted(list.get(0));
        return new Path(convertFromUncompacted.getParent(), convertToCompactWithAttempt(i, convertFromUncompacted.getName()));
    }

    public static Path convertFromUncompacted(Path path) {
        Preconditions.checkArgument(path.getName().startsWith(UNCOMPACTED_PREFIX), "This should be uncompacted file: " + path);
        return new Path(path.getParent(), path.getName().substring(UNCOMPACTED_PREFIX.length()));
    }

    private static String convertToCompactWithAttempt(int i, String str) {
        return String.format("%s%s%d-%s", "compacted-", ATTEMPT_PREFIX, Integer.valueOf(i), str);
    }
}
