package org.apache.flink.contrib.streaming.state;

import java.util.Collection;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.util.FlinkRuntimeException;
import org.rocksdb.ColumnFamilyHandle;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBReducingState.class */
class RocksDBReducingState<K, N, V> extends AbstractRocksDBAppendingState<K, N, V, V, V> implements InternalReducingState<K, N, V> {
    private final ReduceFunction<V> reduceFunction;

    private RocksDBReducingState(ColumnFamilyHandle columnFamilyHandle, TypeSerializer<N> typeSerializer, TypeSerializer<V> typeSerializer2, V v, ReduceFunction<V> reduceFunction, RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) {
        super(columnFamilyHandle, typeSerializer, typeSerializer2, v, rocksDBKeyedStateBackend);
        this.reduceFunction = reduceFunction;
    }

    @Override // org.apache.flink.runtime.state.internal.InternalKvState
    public TypeSerializer<K> getKeySerializer() {
        return this.backend.getKeySerializer();
    }

    @Override // org.apache.flink.runtime.state.internal.InternalKvState
    public TypeSerializer<N> getNamespaceSerializer() {
        return this.namespaceSerializer;
    }

    @Override // org.apache.flink.runtime.state.internal.InternalKvState
    public TypeSerializer<V> getValueSerializer() {
        return this.valueSerializer;
    }

    @Override // org.apache.flink.api.common.state.AppendingState
    public V get() {
        return getInternal();
    }

    @Override // org.apache.flink.api.common.state.AppendingState
    public void add(V v) throws Exception {
        byte[] keyBytes = getKeyBytes();
        V internal = getInternal(keyBytes);
        updateInternal(keyBytes, internal == null ? v : this.reduceFunction.reduce(internal, v));
    }

    @Override // org.apache.flink.runtime.state.internal.InternalMergingState
    public void mergeNamespaces(N n, Collection<N> collection) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        try {
            V v = null;
            for (N n2 : collection) {
                if (n2 != null) {
                    setCurrentNamespace(n2);
                    byte[] serializeCurrentKeyWithGroupAndNamespace = serializeCurrentKeyWithGroupAndNamespace();
                    byte[] bArr = this.backend.db.get(this.columnFamily, serializeCurrentKeyWithGroupAndNamespace);
                    if (bArr != null) {
                        this.backend.db.delete(this.columnFamily, this.writeOptions, serializeCurrentKeyWithGroupAndNamespace);
                        this.dataInputView.setBuffer(bArr);
                        V mo6077deserialize = this.valueSerializer.mo6077deserialize(this.dataInputView);
                        v = v != null ? this.reduceFunction.reduce(v, mo6077deserialize) : mo6077deserialize;
                    }
                }
            }
            if (v != null) {
                setCurrentNamespace(n);
                byte[] serializeCurrentKeyWithGroupAndNamespace2 = serializeCurrentKeyWithGroupAndNamespace();
                byte[] bArr2 = this.backend.db.get(this.columnFamily, serializeCurrentKeyWithGroupAndNamespace2);
                if (bArr2 != null) {
                    this.dataInputView.setBuffer(bArr2);
                    v = this.reduceFunction.reduce(v, this.valueSerializer.mo6077deserialize(this.dataInputView));
                }
                this.dataOutputView.clear();
                this.valueSerializer.serialize(v, this.dataOutputView);
                this.backend.db.put(this.columnFamily, this.writeOptions, serializeCurrentKeyWithGroupAndNamespace2, this.dataOutputView.getCopyOfBuffer());
            }
        } catch (Exception e) {
            throw new FlinkRuntimeException("Error while merging state in RocksDB", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Incorrect return type in method signature: <K:Ljava/lang/Object;N:Ljava/lang/Object;SV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/api/common/state/StateDescriptor<TS;TSV;>;Lorg/apache/flink/api/java/tuple/Tuple2<Lorg/rocksdb/ColumnFamilyHandle;Lorg/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo<TN;TSV;>;>;Lorg/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend<TK;>;)TIS; */
    /* JADX WARN: Multi-variable type inference failed */
    public static State create(StateDescriptor stateDescriptor, Tuple2 tuple2, RocksDBKeyedStateBackend rocksDBKeyedStateBackend) {
        return new RocksDBReducingState((ColumnFamilyHandle) tuple2.f0, ((RegisteredKeyValueStateBackendMetaInfo) tuple2.f1).getNamespaceSerializer(), ((RegisteredKeyValueStateBackendMetaInfo) tuple2.f1).getStateSerializer(), stateDescriptor.getDefaultValue(), ((ReducingStateDescriptor) stateDescriptor).getReduceFunction(), rocksDBKeyedStateBackend);
    }
}
