package org.apache.kafka.image.loader;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/image/loader/MetadataBatchLoader.class */
public class MetadataBatchLoader {
    private final Logger log;
    private final Time time;
    private final FaultHandler faultHandler;
    private final MetadataUpdater callback;
    private MetadataImage image;
    private MetadataDelta delta;
    private long lastOffset;
    private int lastEpoch;
    private long lastContainedLogTimeMs;
    private long numBytes;
    private int numBatches;
    private long totalBatchElapsedNs;
    private TransactionState transactionState;
    private boolean hasSeenRecord;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.image.loader.MetadataBatchLoader$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/image/loader/MetadataBatchLoader$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$image$loader$MetadataBatchLoader$TransactionState;
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType = new int[MetadataRecordType.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.BEGIN_TRANSACTION_RECORD.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.END_TRANSACTION_RECORD.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.ABORT_TRANSACTION_RECORD.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$kafka$image$loader$MetadataBatchLoader$TransactionState = new int[TransactionState.values().length];
            try {
                $SwitchMap$org$apache$kafka$image$loader$MetadataBatchLoader$TransactionState[TransactionState.STARTED_TRANSACTION.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$image$loader$MetadataBatchLoader$TransactionState[TransactionState.CONTINUED_TRANSACTION.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$image$loader$MetadataBatchLoader$TransactionState[TransactionState.ABORTED_TRANSACTION.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$kafka$image$loader$MetadataBatchLoader$TransactionState[TransactionState.ENDED_TRANSACTION.ordinal()] = 4;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$kafka$image$loader$MetadataBatchLoader$TransactionState[TransactionState.NO_TRANSACTION.ordinal()] = 5;
            } catch (NoSuchFieldError e8) {
            }
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/kafka/image/loader/MetadataBatchLoader$MetadataUpdater.class */
    public interface MetadataUpdater {
        void update(MetadataDelta metadataDelta, MetadataImage metadataImage, LogDeltaManifest logDeltaManifest);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/image/loader/MetadataBatchLoader$TransactionState.class */
    public enum TransactionState {
        NO_TRANSACTION,
        STARTED_TRANSACTION,
        CONTINUED_TRANSACTION,
        ENDED_TRANSACTION,
        ABORTED_TRANSACTION
    }

    public MetadataBatchLoader(LogContext logContext, Time time, FaultHandler faultHandler, MetadataUpdater metadataUpdater) {
        this.log = logContext.logger(MetadataBatchLoader.class);
        this.time = time;
        this.faultHandler = faultHandler;
        this.callback = metadataUpdater;
        resetToImage(MetadataImage.EMPTY);
        this.hasSeenRecord = false;
    }

    public boolean hasSeenRecord() {
        return this.hasSeenRecord;
    }

    public void resetToImage(MetadataImage metadataImage) {
        this.image = metadataImage;
        this.hasSeenRecord = true;
        this.delta = new MetadataDelta.Builder().setImage(metadataImage).build();
        this.transactionState = TransactionState.NO_TRANSACTION;
        this.lastOffset = metadataImage.provenance().lastContainedOffset();
        this.lastEpoch = metadataImage.provenance().lastContainedEpoch();
        this.lastContainedLogTimeMs = metadataImage.provenance().lastContainedLogTimeMs();
        this.numBytes = 0L;
        this.numBatches = 0;
        this.totalBatchElapsedNs = 0L;
    }

    public long loadBatch(Batch<ApiMessageAndVersion> batch, LeaderAndEpoch leaderAndEpoch) {
        long nanoseconds = this.time.nanoseconds();
        int i = 0;
        this.lastContainedLogTimeMs = batch.appendTimestamp();
        this.lastEpoch = batch.epoch();
        Iterator it = batch.records().iterator();
        while (it.hasNext()) {
            try {
                replay((ApiMessageAndVersion) it.next());
            } catch (Throwable th) {
                this.faultHandler.handleFault("Error loading metadata log record from offset " + batch.baseOffset() + i, th);
            }
            if (this.transactionState == TransactionState.STARTED_TRANSACTION && (i > 0 || this.numBatches > 0)) {
                LogDeltaManifest build = LogDeltaManifest.newBuilder().provenance(new MetadataProvenance(this.lastOffset, this.lastEpoch, this.lastContainedLogTimeMs)).leaderAndEpoch(leaderAndEpoch).numBatches(this.numBatches).elapsedNs(this.totalBatchElapsedNs).numBytes(this.numBytes).build();
                if (this.log.isDebugEnabled()) {
                    this.log.debug("handleCommit: Generated a metadata delta between {} and {} from {} batch(es) in {} us.", new Object[]{Long.valueOf(this.image.offset()), Long.valueOf(build.provenance().lastContainedOffset()), Integer.valueOf(build.numBatches()), Long.valueOf(TimeUnit.NANOSECONDS.toMicros(build.elapsedNs()))});
                }
                applyDeltaAndUpdate(this.delta, build);
                this.transactionState = TransactionState.STARTED_TRANSACTION;
            }
            this.lastOffset = batch.baseOffset() + i;
            i++;
        }
        long nanoseconds2 = this.time.nanoseconds() - nanoseconds;
        this.lastOffset = batch.lastOffset();
        this.numBytes += batch.sizeInBytes();
        this.numBatches++;
        this.totalBatchElapsedNs += nanoseconds2;
        return this.totalBatchElapsedNs;
    }

    public void maybeFlushBatches(LeaderAndEpoch leaderAndEpoch) {
        LogDeltaManifest build = LogDeltaManifest.newBuilder().provenance(new MetadataProvenance(this.lastOffset, this.lastEpoch, this.lastContainedLogTimeMs)).leaderAndEpoch(leaderAndEpoch).numBatches(this.numBatches).elapsedNs(this.totalBatchElapsedNs).numBytes(this.numBytes).build();
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$image$loader$MetadataBatchLoader$TransactionState[this.transactionState.ordinal()]) {
            case BrokerRegistrationChangeRecord.HIGHEST_SUPPORTED_VERSION /* 1 */:
            case 2:
                this.log.debug("handleCommit: not publishing since a transaction starting at {} is still in progress. {} batch(es) processed so far.", Long.valueOf(this.image.offset()), Integer.valueOf(this.numBatches));
                return;
            case 3:
                this.log.debug("handleCommit: publishing empty delta between {} and {} from {} batch(es) since a transaction was aborted", new Object[]{Long.valueOf(this.image.offset()), Long.valueOf(build.provenance().lastContainedOffset()), Integer.valueOf(build.numBatches())});
                applyDeltaAndUpdate(new MetadataDelta.Builder().setImage(this.image).build(), build);
                return;
            case 4:
            case 5:
                if (this.log.isDebugEnabled()) {
                    this.log.debug("handleCommit: Generated a metadata delta between {} and {} from {} batch(es) in {} us.", new Object[]{Long.valueOf(this.image.offset()), Long.valueOf(build.provenance().lastContainedOffset()), Integer.valueOf(build.numBatches()), Long.valueOf(TimeUnit.NANOSECONDS.toMicros(build.elapsedNs()))});
                }
                applyDeltaAndUpdate(this.delta, build);
                return;
            default:
                return;
        }
    }

    private void replay(ApiMessageAndVersion apiMessageAndVersion) {
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.fromId(apiMessageAndVersion.message().apiKey()).ordinal()]) {
            case BrokerRegistrationChangeRecord.HIGHEST_SUPPORTED_VERSION /* 1 */:
                if (this.transactionState == TransactionState.STARTED_TRANSACTION || this.transactionState == TransactionState.CONTINUED_TRANSACTION) {
                    throw new RuntimeException("Encountered BeginTransactionRecord while already in a transaction");
                }
                this.transactionState = TransactionState.STARTED_TRANSACTION;
                return;
            case 2:
                if (this.transactionState != TransactionState.CONTINUED_TRANSACTION && this.transactionState != TransactionState.STARTED_TRANSACTION) {
                    throw new RuntimeException("Encountered EndTransactionRecord without having seen a BeginTransactionRecord");
                }
                this.transactionState = TransactionState.ENDED_TRANSACTION;
                return;
            case 3:
                if (this.transactionState != TransactionState.CONTINUED_TRANSACTION && this.transactionState != TransactionState.STARTED_TRANSACTION) {
                    throw new RuntimeException("Encountered AbortTransactionRecord without having seen a BeginTransactionRecord");
                }
                this.transactionState = TransactionState.ABORTED_TRANSACTION;
                return;
            default:
                switch (AnonymousClass1.$SwitchMap$org$apache$kafka$image$loader$MetadataBatchLoader$TransactionState[this.transactionState.ordinal()]) {
                    case BrokerRegistrationChangeRecord.HIGHEST_SUPPORTED_VERSION /* 1 */:
                        this.transactionState = TransactionState.CONTINUED_TRANSACTION;
                        break;
                    case 3:
                    case 4:
                        this.transactionState = TransactionState.NO_TRANSACTION;
                        break;
                }
                this.hasSeenRecord = true;
                this.delta.replay(apiMessageAndVersion.message());
                return;
        }
    }

    private void applyDeltaAndUpdate(MetadataDelta metadataDelta, LogDeltaManifest logDeltaManifest) {
        try {
            this.image = metadataDelta.apply(logDeltaManifest.provenance());
        } catch (Throwable th) {
            this.faultHandler.handleFault("Error generating new metadata image from metadata delta between offset " + this.image.offset() + " and " + logDeltaManifest.provenance().lastContainedOffset(), th);
        }
        this.callback.update(metadataDelta, this.image, logDeltaManifest);
        resetToImage(this.image);
    }
}
