package org.apache.flink.runtime.state.ttl.hotcold;

import java.io.IOException;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.LockProvider;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.TtlCatcher;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.SupplierWithException;

/* loaded from: input_file:org/apache/flink/runtime/state/ttl/hotcold/HotColdStateFactory.class */
public class HotColdStateFactory<K, N, SV, TTLCK, TTLCV, S extends State, IS extends S> {
    public static final String COLD_STATE_SUFFIX = "_cold";
    private final Map<StateDescriptor.Type, SupplierWithException<AbstractHotColdState<K, N, SV, ?>, Exception>> stateFactories;
    private final StateDescriptor<S, SV> stateDesc;
    private final KeyedStateBackend<K> hotKeyedStateBackend;
    private final KeyedStateBackend<K> coldKeyedStateBackend;
    private final FunctionWithException<StateDescriptor<S, ?>, TtlCatcher<TTLCK, TTLCV>, IOException> ttlCatcherSupplier;
    private final N namespace;
    private final TypeSerializer<N> namespaceSerializer;

    public HotColdStateFactory(StateDescriptor<S, SV> stateDescriptor, KeyedStateBackend<K> keyedStateBackend, KeyedStateBackend<K> keyedStateBackend2, FunctionWithException<StateDescriptor<S, ?>, TtlCatcher<TTLCK, TTLCV>, IOException> functionWithException, N n, TypeSerializer<N> typeSerializer) {
        this.namespace = n;
        this.namespaceSerializer = typeSerializer;
        Preconditions.checkState(stateDescriptor.getTtlConfig().getUseColdStateStrategy() != StateTtlConfig.UseColdStateStrategy.DISABLED_COLD_STATE_STRATEGY, String.format("%s can't be used with disabled cold state strategy", HotColdStateFactory.class));
        stateDescriptor.enableTimeToLive(recreateHotTtlConfig(stateDescriptor.getTtlConfig()));
        this.stateDesc = stateDescriptor;
        this.hotKeyedStateBackend = keyedStateBackend;
        this.coldKeyedStateBackend = keyedStateBackend2;
        this.ttlCatcherSupplier = functionWithException;
        this.stateFactories = createStateFactories();
    }

    private StateTtlConfig recreateHotTtlConfig(StateTtlConfig stateTtlConfig) {
        StateTtlConfig.Builder useColdStateStrategy = StateTtlConfig.newBuilder(stateTtlConfig.getTtl()).setUpdateType(stateTtlConfig.getUpdateType()).returnExpiredIfNotCleanedUpAndNotDelete().setCleanUpStrategies(stateTtlConfig.getCleanupStrategies().getStrategies()).setUseColdStateStrategy(stateTtlConfig.getUseColdStateStrategy());
        if (!stateTtlConfig.getCleanupStrategies().isCleanupInBackground()) {
            useColdStateStrategy.disableCleanupInBackground();
        }
        return useColdStateStrategy.build();
    }

    private Map<StateDescriptor.Type, SupplierWithException<AbstractHotColdState<K, N, SV, ?>, Exception>> createStateFactories() {
        return (Map) Stream.of((Object[]) new Tuple2[]{Tuple2.of(StateDescriptor.Type.VALUE, this::createHotColdValueState), Tuple2.of(StateDescriptor.Type.LIST, this::createHotColdListState), Tuple2.of(StateDescriptor.Type.MAP, this::createHotColdMapState), Tuple2.of(StateDescriptor.Type.REDUCING, this::createHotColdReducingState), Tuple2.of(StateDescriptor.Type.AGGREGATING, this::createHotColdAggregatingState)}).collect(Collectors.toMap(tuple2 -> {
            return (StateDescriptor.Type) tuple2.f0;
        }, tuple22 -> {
            return (SupplierWithException) tuple22.f1;
        }));
    }

    /* JADX WARN: Incorrect return type in method signature: ()TIS; */
    public State createState() throws Exception {
        SupplierWithException<AbstractHotColdState<K, N, SV, ?>, Exception> supplierWithException = this.stateFactories.get(this.stateDesc.getType());
        if (supplierWithException == null) {
            throw new FlinkRuntimeException(String.format("State type: %s is not supported by %s", this.stateDesc.getType(), HotColdStateFactory.class));
        }
        return (State) supplierWithException.get();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private StateDescriptor<S, SV> getColdStateWithTtlDesc(StateDescriptor<?, ?> stateDescriptor) {
        if (this.stateDesc.getTtlConfig().getUseColdStateStrategy() != StateTtlConfig.UseColdStateStrategy.INFINITY_COLD_STATE_STRATEGY) {
            stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(this.stateDesc.getTtlConfig().getUseColdStateStrategy().getTtl()).build());
        }
        return stateDescriptor;
    }

    private AbstractHotColdState<K, N, SV, ?> createHotColdValueState() throws Exception {
        StateDescriptor<S, ?> valueStateDescriptor = new ValueStateDescriptor<>(this.stateDesc.getName() + COLD_STATE_SUFFIX, this.stateDesc.getSerializer());
        return new HotColdValueState((InternalValueState) this.hotKeyedStateBackend.getPartitionedState(this.namespace, this.namespaceSerializer, this.stateDesc, (TtlCatcher) this.ttlCatcherSupplier.apply(getColdStateWithTtlDesc(valueStateDescriptor))), (InternalValueState) this.coldKeyedStateBackend.getPartitionedState(this.namespace, this.namespaceSerializer, valueStateDescriptor));
    }

    private AbstractHotColdState<K, N, SV, ?> createHotColdListState() throws Exception {
        StateDescriptor<S, SV> coldStateWithTtlDesc = getColdStateWithTtlDesc(new ListStateDescriptor(this.stateDesc.getName() + COLD_STATE_SUFFIX, this.stateDesc.getElementSerializer()));
        InternalListState internalListState = (InternalListState) this.coldKeyedStateBackend.getPartitionedState(this.namespace, this.namespaceSerializer, coldStateWithTtlDesc);
        TtlCatcher<?, ?> ttlCatcher = (TtlCatcher) this.ttlCatcherSupplier.apply(coldStateWithTtlDesc);
        InternalListState internalListState2 = (InternalListState) this.hotKeyedStateBackend.getPartitionedState(this.namespace, this.namespaceSerializer, this.stateDesc, ttlCatcher);
        if (ttlCatcher instanceof LockProvider) {
            return new HotColdListState(internalListState2, internalListState, ((LockProvider) ttlCatcher).getLock());
        }
        throw new IllegalArgumentException(HotColdListState.class.getName() + " requires the lock to be provided");
    }

    private <UK, UV> AbstractHotColdState<K, N, SV, ?> createHotColdMapState() throws Exception {
        MapStateDescriptor mapStateDescriptor = this.stateDesc;
        StateDescriptor<S, SV> coldStateWithTtlDesc = getColdStateWithTtlDesc(new MapStateDescriptor(this.stateDesc.getName() + COLD_STATE_SUFFIX, mapStateDescriptor.getKeySerializer(), mapStateDescriptor.getValueSerializer()));
        InternalMapState internalMapState = (InternalMapState) this.coldKeyedStateBackend.getPartitionedState(this.namespace, this.namespaceSerializer, coldStateWithTtlDesc);
        TtlCatcher<?, ?> ttlCatcher = (TtlCatcher) this.ttlCatcherSupplier.apply(coldStateWithTtlDesc);
        InternalMapState internalMapState2 = (InternalMapState) this.hotKeyedStateBackend.getPartitionedState(this.namespace, this.namespaceSerializer, this.stateDesc, ttlCatcher);
        if (!(ttlCatcher instanceof LockProvider)) {
            throw new IllegalArgumentException(HotColdMapState.class.getName() + " requires the lock to be provided");
        }
        ((LockProvider) ttlCatcher).getLock();
        return new HotColdMapState(internalMapState2, internalMapState);
    }

    private AbstractHotColdState<K, N, SV, ?> createHotColdReducingState() throws Exception {
        StateDescriptor<S, SV> coldStateWithTtlDesc = getColdStateWithTtlDesc(new ReducingStateDescriptor(this.stateDesc.getName() + COLD_STATE_SUFFIX, this.stateDesc.getReduceFunction(), this.stateDesc.getSerializer()));
        InternalReducingState internalReducingState = (InternalReducingState) this.coldKeyedStateBackend.getPartitionedState(this.namespace, this.namespaceSerializer, coldStateWithTtlDesc);
        TtlCatcher<?, ?> ttlCatcher = (TtlCatcher) this.ttlCatcherSupplier.apply(coldStateWithTtlDesc);
        InternalReducingState internalReducingState2 = (InternalReducingState) this.hotKeyedStateBackend.getPartitionedState(this.namespace, this.namespaceSerializer, this.stateDesc, ttlCatcher);
        if (ttlCatcher instanceof LockProvider) {
            return new HotColdReducingState(internalReducingState2, internalReducingState, ((LockProvider) ttlCatcher).getLock());
        }
        throw new IllegalArgumentException(HotColdReducingState.class.getName() + " requires the lock to be provided");
    }

    private <IN, OUT> AbstractHotColdState<K, N, SV, ?> createHotColdAggregatingState() throws Exception {
        StateDescriptor<S, SV> coldStateWithTtlDesc = getColdStateWithTtlDesc(new AggregatingStateDescriptor(this.stateDesc.getName() + COLD_STATE_SUFFIX, this.stateDesc.getAggregateFunction(), this.stateDesc.getSerializer()));
        InternalAggregatingState internalAggregatingState = (InternalAggregatingState) this.coldKeyedStateBackend.getPartitionedState(this.namespace, this.namespaceSerializer, coldStateWithTtlDesc);
        TtlCatcher<?, ?> ttlCatcher = (TtlCatcher) this.ttlCatcherSupplier.apply(coldStateWithTtlDesc);
        InternalAggregatingState internalAggregatingState2 = (InternalAggregatingState) this.hotKeyedStateBackend.getPartitionedState(this.namespace, this.namespaceSerializer, this.stateDesc, ttlCatcher);
        if (ttlCatcher instanceof LockProvider) {
            return new HotColdAggregatingState(internalAggregatingState2, internalAggregatingState, ((LockProvider) ttlCatcher).getLock());
        }
        throw new IllegalArgumentException(HotColdAggregatingState.class.getName() + " requires the lock to be provided");
    }
}
