package org.apache.flink.runtime.checkpoint.channel;

import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.class */
final class ChannelStateWriteRequestDispatcherImpl implements ChannelStateWriteRequestDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriteRequestDispatcherImpl.class);
    private final CheckpointStorageWorkerView streamFactoryResolver;
    private final ChannelStateSerializer serializer;
    private final int subtaskIndex;
    private String taskName;
    private long ongoingCheckpointId = -1;
    private long maxAbortedCheckpointId = -1;
    private Throwable abortedCause;
    private ChannelStateCheckpointWriter writer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelStateWriteRequestDispatcherImpl(String str, int i, CheckpointStorageWorkerView checkpointStorageWorkerView, ChannelStateSerializer channelStateSerializer) {
        this.taskName = str;
        this.subtaskIndex = i;
        this.streamFactoryResolver = (CheckpointStorageWorkerView) Preconditions.checkNotNull(checkpointStorageWorkerView);
        this.serializer = (ChannelStateSerializer) Preconditions.checkNotNull(channelStateSerializer);
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcher
    public void dispatch(ChannelStateWriteRequest channelStateWriteRequest) throws Exception {
        LOG.trace("process {}", channelStateWriteRequest);
        try {
            dispatchInternal(channelStateWriteRequest);
        } catch (Exception e) {
            try {
                channelStateWriteRequest.cancel(e);
            } catch (Exception e2) {
                e.addSuppressed(e2);
            }
            throw e;
        }
    }

    private void dispatchInternal(ChannelStateWriteRequest channelStateWriteRequest) throws Exception {
        if (isAbortedCheckpoint(channelStateWriteRequest.getCheckpointId())) {
            if (channelStateWriteRequest.getCheckpointId() == this.maxAbortedCheckpointId) {
                channelStateWriteRequest.cancel(this.abortedCause);
                return;
            } else {
                channelStateWriteRequest.cancel(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
                return;
            }
        }
        if (channelStateWriteRequest instanceof CheckpointStartRequest) {
            Preconditions.checkState(channelStateWriteRequest.getCheckpointId() > this.ongoingCheckpointId, String.format("Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.", Long.valueOf(this.ongoingCheckpointId), channelStateWriteRequest));
            failAndClearWriter(new IllegalStateException(String.format("Task[name=%s, subtaskIndex=%s] has uncompleted channelState writer of checkpointId=%s, but it received a new checkpoint start request of checkpointId=%s, it maybe a bug due to currently not supported concurrent unaligned checkpoint.", this.taskName, Integer.valueOf(this.subtaskIndex), Long.valueOf(this.ongoingCheckpointId), Long.valueOf(channelStateWriteRequest.getCheckpointId()))));
            this.writer = buildWriter((CheckpointStartRequest) channelStateWriteRequest);
            this.ongoingCheckpointId = channelStateWriteRequest.getCheckpointId();
            return;
        }
        if (channelStateWriteRequest instanceof CheckpointInProgressRequest) {
            CheckpointInProgressRequest checkpointInProgressRequest = (CheckpointInProgressRequest) channelStateWriteRequest;
            Preconditions.checkArgument(this.ongoingCheckpointId == checkpointInProgressRequest.getCheckpointId() && this.writer != null, "writer not found while processing request: " + checkpointInProgressRequest);
            checkpointInProgressRequest.execute(this.writer);
        } else {
            if (!(channelStateWriteRequest instanceof CheckpointAbortRequest)) {
                throw new IllegalArgumentException("unknown request type: " + channelStateWriteRequest);
            }
            CheckpointAbortRequest checkpointAbortRequest = (CheckpointAbortRequest) channelStateWriteRequest;
            if (channelStateWriteRequest.getCheckpointId() > this.maxAbortedCheckpointId) {
                this.maxAbortedCheckpointId = checkpointAbortRequest.getCheckpointId();
                this.abortedCause = checkpointAbortRequest.getThrowable();
            }
            if (checkpointAbortRequest.getCheckpointId() == this.ongoingCheckpointId) {
                failAndClearWriter(checkpointAbortRequest.getThrowable());
            } else if (channelStateWriteRequest.getCheckpointId() > this.ongoingCheckpointId) {
                failAndClearWriter(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
            }
        }
    }

    private boolean isAbortedCheckpoint(long j) {
        return j < this.ongoingCheckpointId || j <= this.maxAbortedCheckpointId;
    }

    private void failAndClearWriter(Throwable th) {
        if (this.writer == null) {
            return;
        }
        this.writer.fail(th);
        this.writer = null;
    }

    private ChannelStateCheckpointWriter buildWriter(CheckpointStartRequest checkpointStartRequest) throws Exception {
        return new ChannelStateCheckpointWriter(this.taskName, this.subtaskIndex, checkpointStartRequest, this.streamFactoryResolver.resolveCheckpointStorageLocation(checkpointStartRequest.getCheckpointId(), checkpointStartRequest.getLocationReference()), this.serializer, () -> {
            Preconditions.checkState(checkpointStartRequest.getCheckpointId() == this.ongoingCheckpointId, "The ongoingCheckpointId[%s] was changed when clear writer of checkpoint[%s], it might be a bug.", new Object[]{Long.valueOf(this.ongoingCheckpointId), Long.valueOf(checkpointStartRequest.getCheckpointId())});
            this.writer = null;
        });
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcher
    public void fail(Throwable th) {
        if (this.writer == null) {
            return;
        }
        try {
            this.writer.fail(th);
        } catch (Exception e) {
            LOG.warn("unable to fail write channel state writer", th);
        }
        this.writer = null;
    }
}
