package org.apache.flink.runtime.state;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/state/StateSerializerProvider.class */
public abstract class StateSerializerProvider<T> {

    @Nullable
    TypeSerializer<T> registeredSerializer;

    /* loaded from: input_file:org/apache/flink/runtime/state/StateSerializerProvider$NewStateSerializerProvider.class */
    private static class NewStateSerializerProvider<T> extends StateSerializerProvider<T> {
        NewStateSerializerProvider(TypeSerializer<T> typeSerializer) {
            super((TypeSerializer) Preconditions.checkNotNull(typeSerializer));
        }

        @Override // org.apache.flink.runtime.state.StateSerializerProvider
        @Nonnull
        public TypeSerializer<T> currentSchemaSerializer() {
            return this.registeredSerializer;
        }

        @Override // org.apache.flink.runtime.state.StateSerializerProvider
        @Nonnull
        public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState(TypeSerializer<T> typeSerializer) {
            throw new UnsupportedOperationException("A serializer has already been registered for the state; re-registration is not allowed.");
        }

        @Override // org.apache.flink.runtime.state.StateSerializerProvider
        @Nonnull
        public TypeSerializer<T> previousSchemaSerializer() {
            throw new UnsupportedOperationException("This is a NewStateSerializerProvider; you cannot get a restore serializer because there was no restored state.");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateSerializerProvider$RestoredStateSerializerProvider.class */
    private static class RestoredStateSerializerProvider<T> extends StateSerializerProvider<T> {

        @Nonnull
        private final TypeSerializerSnapshot<T> previousSerializerSnapshot;
        private boolean isRegisteredWithIncompatibleSerializer;

        @Nullable
        private TypeSerializer<T> cachedRestoredSerializer;

        RestoredStateSerializerProvider(TypeSerializerSnapshot<T> typeSerializerSnapshot) {
            super(null);
            this.isRegisteredWithIncompatibleSerializer = false;
            this.previousSerializerSnapshot = (TypeSerializerSnapshot) Preconditions.checkNotNull(typeSerializerSnapshot);
        }

        @Override // org.apache.flink.runtime.state.StateSerializerProvider
        @Nonnull
        public TypeSerializer<T> currentSchemaSerializer() {
            if (this.registeredSerializer == null) {
                return previousSchemaSerializer();
            }
            Preconditions.checkState(!this.isRegisteredWithIncompatibleSerializer, "Unable to provide a serializer with the current schema, because the restored state was registered with a new serializer that has incompatible schema.");
            return this.registeredSerializer;
        }

        @Override // org.apache.flink.runtime.state.StateSerializerProvider
        @Nonnull
        public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState(TypeSerializer<T> typeSerializer) {
            Preconditions.checkNotNull(typeSerializer);
            if (this.registeredSerializer != null) {
                throw new UnsupportedOperationException("A serializer has already been registered for the state; re-registration is not allowed.");
            }
            TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility = this.previousSerializerSnapshot.resolveSchemaCompatibility(typeSerializer);
            if (resolveSchemaCompatibility.isIncompatible()) {
                this.isRegisteredWithIncompatibleSerializer = true;
            }
            this.registeredSerializer = typeSerializer;
            return resolveSchemaCompatibility;
        }

        @Override // org.apache.flink.runtime.state.StateSerializerProvider
        @Nonnull
        public final TypeSerializer<T> previousSchemaSerializer() {
            if (this.cachedRestoredSerializer != null) {
                return this.cachedRestoredSerializer;
            }
            this.cachedRestoredSerializer = this.previousSerializerSnapshot.restoreSerializer();
            return this.cachedRestoredSerializer;
        }
    }

    public static <T> StateSerializerProvider<T> fromRestoredState(TypeSerializerSnapshot<T> typeSerializerSnapshot) {
        return new RestoredStateSerializerProvider(typeSerializerSnapshot);
    }

    public static <T> StateSerializerProvider<T> fromNewState(TypeSerializer<T> typeSerializer) {
        return new NewStateSerializerProvider(typeSerializer);
    }

    private StateSerializerProvider(@Nullable TypeSerializer<T> typeSerializer) {
        this.registeredSerializer = typeSerializer;
    }

    @Nonnull
    public abstract TypeSerializer<T> currentSchemaSerializer();

    @Nonnull
    public abstract TypeSerializer<T> previousSchemaSerializer();

    @Nonnull
    public abstract TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState(TypeSerializer<T> typeSerializer);
}
