package org.apache.flink.runtime.state.ttl.hotcold;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nonnull;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;

/* loaded from: input_file:org/apache/flink/runtime/state/ttl/hotcold/HotColdAggregatingState.class */
public class HotColdAggregatingState<K, N, IN, ACC, OUT> extends AbstractHotColdState<K, N, ACC, InternalAggregatingState<K, N, IN, ACC, OUT>> implements InternalAggregatingState<K, N, IN, ACC, OUT> {
    private final Lock lock;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HotColdAggregatingState(@Nonnull InternalAggregatingState<K, N, IN, ACC, OUT> internalAggregatingState, @Nonnull InternalAggregatingState<K, N, IN, ACC, OUT> internalAggregatingState2, @Nonnull Lock lock) {
        super(internalAggregatingState, internalAggregatingState2);
        this.lock = lock;
    }

    public OUT get() throws Exception {
        OUT out = (OUT) ((InternalAggregatingState) this.hot).get();
        return out != null ? out : (OUT) ((InternalAggregatingState) this.cold).get();
    }

    public void add(IN in) throws Exception {
        this.lock.lock();
        if (((InternalAggregatingState) this.hot).get() != null) {
            ((InternalAggregatingState) this.hot).add(in);
            this.lock.unlock();
            return;
        }
        SV internal = ((InternalAggregatingState) this.cold).getInternal();
        if (internal != 0) {
            ((InternalAggregatingState) this.cold).clear();
            ((InternalAggregatingState) this.hot).updateInternal(internal);
            ((InternalAggregatingState) this.hot).add(in);
        } else {
            ((InternalAggregatingState) this.hot).add(in);
        }
        this.lock.unlock();
    }

    @Override // org.apache.flink.runtime.state.internal.InternalAppendingState
    public ACC getInternal() throws Exception {
        ACC acc = (ACC) ((InternalAggregatingState) this.hot).getInternal();
        return acc != null ? acc : (ACC) ((InternalAggregatingState) this.cold).getInternal();
    }

    @Override // org.apache.flink.runtime.state.internal.InternalAppendingState
    public void updateInternal(ACC acc) throws Exception {
        ((InternalAggregatingState) this.hot).updateInternal(acc);
        ((InternalAggregatingState) this.cold).clear();
    }

    @Override // org.apache.flink.runtime.state.internal.InternalMergingState
    public void mergeNamespaces(N n, Collection<N> collection) throws Exception {
        this.lock.lock();
        Iterator<N> it = collection.iterator();
        while (it.hasNext()) {
            transferStateFromColdToHot(it.next());
        }
        transferStateFromColdToHot(n);
        this.lock.unlock();
        ((InternalAggregatingState) this.hot).mergeNamespaces(n, collection);
    }

    private void transferStateFromColdToHot(N n) throws Exception {
        SV internal;
        if (n != null) {
            ((InternalAggregatingState) this.cold).setCurrentNamespace(n);
            ((InternalAggregatingState) this.hot).setCurrentNamespace(n);
            if (((InternalAggregatingState) this.hot).getInternal() == 0 && (internal = ((InternalAggregatingState) this.cold).getInternal()) != 0) {
                ((InternalAggregatingState) this.hot).updateInternal(internal);
            }
        }
    }
}
