package org.apache.flink.changelog.fs;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/changelog/fs/StateChangeFormat.class */
public class StateChangeFormat implements StateChangelogHandleStreamHandleReader.StateChangeIterator {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) StateChangeFormat.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<StateChangeSet, Long> write(OutputStreamWithPos outputStreamWithPos, Collection<StateChangeSet> collection) throws IOException {
        ArrayList<StateChangeSet> arrayList = new ArrayList(collection);
        arrayList.sort(Comparator.comparing((v0) -> {
            return v0.getLogId();
        }).thenComparing((v0) -> {
            return v0.getSequenceNumber();
        }));
        DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(outputStreamWithPos);
        HashMap hashMap = new HashMap();
        for (StateChangeSet stateChangeSet : arrayList) {
            hashMap.put(stateChangeSet, Long.valueOf(outputStreamWithPos.getPos()));
            writeChangeSet(dataOutputViewStreamWrapper, stateChangeSet.getChanges());
        }
        return hashMap;
    }

    private void writeChangeSet(DataOutputViewStreamWrapper dataOutputViewStreamWrapper, List<StateChange> list) throws IOException {
        TreeMap treeMap = new TreeMap((Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getKeyGroup();
        })));
        dataOutputViewStreamWrapper.writeInt(treeMap.size());
        for (Map.Entry entry : treeMap.entrySet()) {
            dataOutputViewStreamWrapper.writeInt(((List) entry.getValue()).size());
            dataOutputViewStreamWrapper.writeInt(((Integer) entry.getKey()).intValue());
            for (StateChange stateChange : (List) entry.getValue()) {
                dataOutputViewStreamWrapper.writeInt(stateChange.getChange().length);
                dataOutputViewStreamWrapper.write(stateChange.getChange());
            }
        }
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader.StateChangeIterator
    public CloseableIterator<StateChange> read(StreamStateHandle streamStateHandle, long j) throws IOException {
        final FSDataInputStream openInputStream = streamStateHandle.openInputStream();
        final DataInputViewStreamWrapper wrap = wrap(openInputStream);
        if (j != 0) {
            LOG.debug("seek from {} to {}", Long.valueOf(openInputStream.getPos()), Long.valueOf(j));
            wrap.skipBytesToRead((int) j);
        }
        return new CloseableIterator<StateChange>() { // from class: org.apache.flink.changelog.fs.StateChangeFormat.1
            int numUnreadGroups;
            int numLeftInGroup;
            int keyGroup;

            {
                this.numUnreadGroups = wrap.readInt();
                int i = this.numUnreadGroups;
                this.numUnreadGroups = i - 1;
                this.numLeftInGroup = i == 0 ? 0 : wrap.readInt();
                this.keyGroup = this.numLeftInGroup == 0 ? 0 : wrap.readInt();
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                advance();
                return this.numLeftInGroup > 0;
            }

            private void advance() {
                if (this.numLeftInGroup != 0 || this.numUnreadGroups <= 0) {
                    return;
                }
                this.numUnreadGroups--;
                try {
                    this.numLeftInGroup = wrap.readInt();
                    this.keyGroup = wrap.readInt();
                } catch (IOException e) {
                    ExceptionUtils.rethrow(e);
                }
            }

            @Override // java.util.Iterator
            public StateChange next() {
                advance();
                if (this.numLeftInGroup == 0) {
                    throw new NoSuchElementException();
                }
                this.numLeftInGroup--;
                try {
                    return readChange();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            private StateChange readChange() throws IOException {
                int readInt = wrap.readInt();
                byte[] bArr = new byte[readInt];
                IOUtils.readFully(wrap, bArr, 0, readInt);
                return new StateChange(this.keyGroup, bArr);
            }

            @Override // java.lang.AutoCloseable
            public void close() throws Exception {
                StateChangeFormat.LOG.trace("close {}", openInputStream);
                openInputStream.close();
            }
        };
    }

    private DataInputViewStreamWrapper wrap(InputStream inputStream) throws IOException {
        BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
        return new DataInputViewStreamWrapper(bufferedInputStream.read() == 1 ? SnappyStreamCompressionDecorator.INSTANCE.decorateWithCompression(bufferedInputStream) : bufferedInputStream);
    }
}
