package org.apache.flink.runtime.state;

import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.TtlCatcher;
import org.apache.flink.api.common.state.TtlOveredRecordsConsumer;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.ttl.hotcold.HotColdStateFactory;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/DefaultKeyedStateStore.class */
public class DefaultKeyedStateStore implements KeyedStateStore {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultKeyedStateStore.class);
    protected final KeyedStateBackend<?> keyedStateBackend;

    @Nullable
    protected final KeyedStateBackend<?> coldKeyedStateBackend;
    protected final ExecutionConfig executionConfig;

    public DefaultKeyedStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig) {
        this(keyedStateBackend, null, executionConfig);
    }

    public DefaultKeyedStateStore(KeyedStateBackend<?> keyedStateBackend, @Nullable KeyedStateBackend<?> keyedStateBackend2, ExecutionConfig executionConfig) {
        this.keyedStateBackend = (KeyedStateBackend) Preconditions.checkNotNull(keyedStateBackend);
        this.coldKeyedStateBackend = keyedStateBackend2;
        this.executionConfig = (ExecutionConfig) Preconditions.checkNotNull(executionConfig);
    }

    public <T> ValueState<T> getState(ValueStateDescriptor<T> valueStateDescriptor) {
        Objects.requireNonNull(valueStateDescriptor, "The state properties must not be null");
        try {
            valueStateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            return getPartitionedState(valueStateDescriptor);
        } catch (Exception e) {
            throw new RuntimeException("Error while getting state", e);
        }
    }

    public <K, V> ValueState<V> getState(ValueStateDescriptor<V> valueStateDescriptor, TtlCatcher<K, V> ttlCatcher) {
        Objects.requireNonNull(valueStateDescriptor, "The state properties must not be null");
        try {
            valueStateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            return getPartitionedState(valueStateDescriptor, ttlCatcher);
        } catch (Exception e) {
            throw new RuntimeException("Error while getting state", e);
        }
    }

    public <T> ValueState<T> getColdState(ValueStateDescriptor<T> valueStateDescriptor) {
        Objects.requireNonNull(valueStateDescriptor, "The state properties must not be null");
        try {
            valueStateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            return getColdPartitionedState(valueStateDescriptor);
        } catch (Exception e) {
            throw new RuntimeException("Error while getting cold state", e);
        }
    }

    public <T> ListState<T> getListState(ListStateDescriptor<T> listStateDescriptor) {
        Objects.requireNonNull(listStateDescriptor, "The state properties must not be null");
        try {
            listStateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            return new UserFacingListState(getPartitionedState(listStateDescriptor));
        } catch (Exception e) {
            throw new RuntimeException("Error while getting state", e);
        }
    }

    public <T, K> ListState<T> getListState(ListStateDescriptor<T> listStateDescriptor, TtlCatcher<K, List<T>> ttlCatcher) {
        Objects.requireNonNull(listStateDescriptor, "The state properties must not be null");
        try {
            listStateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            return new UserFacingListState(getPartitionedState(listStateDescriptor, ttlCatcher));
        } catch (Exception e) {
            throw new RuntimeException("Error while getting state", e);
        }
    }

    public <T> ListState<T> getColdListState(ListStateDescriptor<T> listStateDescriptor) {
        Objects.requireNonNull(listStateDescriptor, "The state properties must not be null");
        try {
            listStateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            return new UserFacingListState(getColdPartitionedState(listStateDescriptor));
        } catch (Exception e) {
            throw new RuntimeException("Error while getting cold state", e);
        }
    }

    public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> reducingStateDescriptor) {
        Objects.requireNonNull(reducingStateDescriptor, "The state properties must not be null");
        try {
            reducingStateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            return getPartitionedState(reducingStateDescriptor);
        } catch (Exception e) {
            throw new RuntimeException("Error while getting state", e);
        }
    }

    public <T, K> ReducingState<T> getReducingState(ReducingStateDescriptor<T> reducingStateDescriptor, TtlCatcher<K, T> ttlCatcher) {
        Objects.requireNonNull(reducingStateDescriptor, "The state properties must not be null");
        try {
            reducingStateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            return getPartitionedState(reducingStateDescriptor, ttlCatcher);
        } catch (Exception e) {
            throw new RuntimeException("Error while getting state", e);
        }
    }

    public <T> ReducingState<T> getColdReducingState(ReducingStateDescriptor<T> reducingStateDescriptor) {
        Objects.requireNonNull(reducingStateDescriptor, "The state properties must not be null");
        try {
            reducingStateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            return getColdPartitionedState(reducingStateDescriptor);
        } catch (Exception e) {
            throw new RuntimeException("Error while getting cold state", e);
        }
    }

    public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> aggregatingStateDescriptor) {
        Objects.requireNonNull(aggregatingStateDescriptor, "The state properties must not be null");
        try {
            aggregatingStateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            return getPartitionedState(aggregatingStateDescriptor);
        } catch (Exception e) {
            throw new RuntimeException("Error while getting state", e);
        }
    }

    public <IN, ACC, OUT, K> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> aggregatingStateDescriptor, TtlCatcher<K, ACC> ttlCatcher) {
        Objects.requireNonNull(aggregatingStateDescriptor, "The state properties must not be null");
        try {
            aggregatingStateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            return getPartitionedState(aggregatingStateDescriptor, ttlCatcher);
        } catch (Exception e) {
            throw new RuntimeException("Error while getting state", e);
        }
    }

    public <IN, ACC, OUT> AggregatingState<IN, OUT> getColdAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> aggregatingStateDescriptor) {
        Objects.requireNonNull(aggregatingStateDescriptor, "The state properties must not be null");
        try {
            aggregatingStateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            return getColdPartitionedState(aggregatingStateDescriptor);
        } catch (Exception e) {
            throw new RuntimeException("Error while getting cold state", e);
        }
    }

    public <UK, UV, K> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> mapStateDescriptor, TtlCatcher<K, Tuple2<UK, UV>> ttlCatcher) {
        Objects.requireNonNull(mapStateDescriptor, "The state properties must not be null");
        try {
            mapStateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            return new UserFacingMapState(getPartitionedState(mapStateDescriptor, ttlCatcher));
        } catch (Exception e) {
            throw new RuntimeException("Error while getting state", e);
        }
    }

    public <UK, UV> MapState<UK, UV> getColdMapState(MapStateDescriptor<UK, UV> mapStateDescriptor) {
        Objects.requireNonNull(mapStateDescriptor, "The state properties must not be null");
        try {
            mapStateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            return new UserFacingMapState(getColdPartitionedState(mapStateDescriptor));
        } catch (Exception e) {
            throw new RuntimeException("Error while getting cold state", e);
        }
    }

    public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> mapStateDescriptor) {
        Objects.requireNonNull(mapStateDescriptor, "The state properties must not be null");
        try {
            mapStateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            return new UserFacingMapState(getPartitionedState(mapStateDescriptor));
        } catch (Exception e) {
            throw new RuntimeException("Error while getting state", e);
        }
    }

    protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return (S) getPartitionedState(stateDescriptor, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
    }

    @VisibleForTesting
    protected <N, S extends State, TTLK, TTLV> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor, N n, TypeSerializer<N> typeSerializer) throws Exception {
        if (!stateDescriptor.getTtlConfig().isColdStateEnabled()) {
            return (S) getPartitionedState(stateDescriptor, n, typeSerializer, null);
        }
        KeyedStateBackend<?> coldStateBackend = getColdStateBackend(stateDescriptor);
        if (coldStateBackend instanceof TtlOveredRecordsConsumer) {
            return (S) new HotColdStateFactory(stateDescriptor, this.keyedStateBackend, coldStateBackend, stateDescriptor2 -> {
                return ((TtlOveredRecordsConsumer) coldStateBackend).getTtlCatcher(stateDescriptor2, typeSerializer, this.keyedStateBackend.getClass());
            }, n, typeSerializer).createState();
        }
        LOG.warn("The {} can't be used as ttl overed records consumer, means cold instance for {} state will not be created", coldStateBackend.getClass().getName(), stateDescriptor.getName());
        return (S) getPartitionedState(stateDescriptor, n, typeSerializer, null);
    }

    protected <S extends State, K> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor, TtlCatcher<K, ?> ttlCatcher) throws Exception {
        return (S) getPartitionedState(stateDescriptor, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, ttlCatcher);
    }

    protected <N, S extends State, K> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor, N n, TypeSerializer<N> typeSerializer, TtlCatcher<K, ?> ttlCatcher) throws Exception {
        return ttlCatcher == null ? (S) this.keyedStateBackend.getPartitionedState(n, typeSerializer, stateDescriptor) : (S) this.keyedStateBackend.getPartitionedState(n, typeSerializer, stateDescriptor, ttlCatcher);
    }

    protected <S extends State> S getColdPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return (S) getColdPartitionedState(stateDescriptor, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
    }

    protected <N, S extends State> S getColdPartitionedState(StateDescriptor<S, ?> stateDescriptor, N n, TypeSerializer<N> typeSerializer) throws Exception {
        return (S) getColdStateBackend(stateDescriptor).getPartitionedState(n, typeSerializer, stateDescriptor);
    }

    protected KeyedStateBackend<?> getColdStateBackend(StateDescriptor<?, ?> stateDescriptor) {
        if (this.coldKeyedStateBackend != null) {
            return this.coldKeyedStateBackend;
        }
        LOG.warn("The cold state backend was not configured. Using standard keyed state backend for state " + stateDescriptor.getName());
        return this.keyedStateBackend;
    }
}
