package org.apache.kafka.metadata.migration;

import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.loader.LoaderManifestType;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.migration.MigrationManifest;
import org.apache.kafka.metadata.migration.TopicMigrationClient;
import org.apache.kafka.metadata.util.RecordRedactor;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.util.Deadline;
import org.apache.kafka.server.util.FutureUtils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver.class */
public class KRaftMigrationDriver implements MetadataPublisher {
    private static final Consumer<Throwable> NO_OP_HANDLER = th -> {
    };
    private static final int METADATA_COMMIT_MAX_WAIT_MS = 300000;
    private final Time time;
    private final Logger log;
    private final int nodeId;
    private final MigrationClient zkMigrationClient;
    private final KRaftMigrationZkWriter zkMetadataWriter;
    private final LegacyPropagator propagator;
    private final ZkRecordConsumer zkRecordConsumer;
    private final KafkaEventQueue eventQueue;
    private final PollTimeSupplier pollTimeSupplier;
    private final QuorumControllerMetrics controllerMetrics;
    private final FaultHandler faultHandler;
    private final QuorumFeatures quorumFeatures;
    private final RecordRedactor recordRedactor;
    private final Consumer<MetadataPublisher> initialZkLoadHandler;
    private volatile MigrationDriverState migrationState;
    private volatile ZkMigrationLeadershipState migrationLeadershipState;
    private volatile MetadataImage image;
    private volatile boolean firstPublish;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.metadata.migration.KRaftMigrationDriver$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$metadata$migration$MigrationDriverState;
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$metadata$migration$ZkMigrationState = new int[ZkMigrationState.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$metadata$migration$ZkMigrationState[ZkMigrationState.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$metadata$migration$ZkMigrationState[ZkMigrationState.PRE_MIGRATION.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$metadata$migration$ZkMigrationState[ZkMigrationState.MIGRATION.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$metadata$migration$ZkMigrationState[ZkMigrationState.POST_MIGRATION.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$org$apache$kafka$metadata$migration$MigrationDriverState = new int[MigrationDriverState.values().length];
            try {
                $SwitchMap$org$apache$kafka$metadata$migration$MigrationDriverState[MigrationDriverState.UNINITIALIZED.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$metadata$migration$MigrationDriverState[MigrationDriverState.DUAL_WRITE.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$kafka$metadata$migration$MigrationDriverState[MigrationDriverState.INACTIVE.ordinal()] = 3;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$kafka$metadata$migration$MigrationDriverState[MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM.ordinal()] = 4;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$kafka$metadata$migration$MigrationDriverState[MigrationDriverState.WAIT_FOR_BROKERS.ordinal()] = 5;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$kafka$metadata$migration$MigrationDriverState[MigrationDriverState.BECOME_CONTROLLER.ordinal()] = 6;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$kafka$metadata$migration$MigrationDriverState[MigrationDriverState.ZK_MIGRATION.ordinal()] = 7;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$kafka$metadata$migration$MigrationDriverState[MigrationDriverState.SYNC_KRAFT_TO_ZK.ordinal()] = 8;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$org$apache$kafka$metadata$migration$MigrationDriverState[MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM.ordinal()] = 9;
            } catch (NoSuchFieldError e13) {
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$BecomeZkControllerEvent.class */
    class BecomeZkControllerEvent extends MigrationEvent {
        BecomeZkControllerEvent() {
            super();
        }

        public void run() throws Exception {
            if (KRaftMigrationDriver.this.checkDriverState(MigrationDriverState.BECOME_CONTROLLER)) {
                KRaftMigrationDriver kRaftMigrationDriver = KRaftMigrationDriver.this;
                MigrationClient migrationClient = KRaftMigrationDriver.this.zkMigrationClient;
                migrationClient.getClass();
                kRaftMigrationDriver.applyMigrationOperation("Claiming ZK controller leadership", migrationClient::claimControllerLeadership);
                if (KRaftMigrationDriver.this.migrationLeadershipState.zkControllerEpochZkVersion() == -1) {
                    KRaftMigrationDriver.this.log.info("Unable to claim leadership, will retry until we learn of a different KRaft leader");
                } else if (KRaftMigrationDriver.this.migrationLeadershipState.initialZkMigrationComplete()) {
                    KRaftMigrationDriver.this.transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
                } else {
                    KRaftMigrationDriver.this.transitionTo(MigrationDriverState.ZK_MIGRATION);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$Builder.class */
    public static class Builder {
        private Integer nodeId;
        private ZkRecordConsumer zkRecordConsumer;
        private MigrationClient zkMigrationClient;
        private LegacyPropagator propagator;
        private Consumer<MetadataPublisher> initialZkLoadHandler;
        private FaultHandler faultHandler;
        private QuorumFeatures quorumFeatures;
        private KafkaConfigSchema configSchema;
        private QuorumControllerMetrics controllerMetrics;
        private Time time;

        public Builder setNodeId(int i) {
            this.nodeId = Integer.valueOf(i);
            return this;
        }

        public Builder setZkRecordConsumer(ZkRecordConsumer zkRecordConsumer) {
            this.zkRecordConsumer = zkRecordConsumer;
            return this;
        }

        public Builder setZkMigrationClient(MigrationClient migrationClient) {
            this.zkMigrationClient = migrationClient;
            return this;
        }

        public Builder setPropagator(LegacyPropagator legacyPropagator) {
            this.propagator = legacyPropagator;
            return this;
        }

        public Builder setInitialZkLoadHandler(Consumer<MetadataPublisher> consumer) {
            this.initialZkLoadHandler = consumer;
            return this;
        }

        public Builder setFaultHandler(FaultHandler faultHandler) {
            this.faultHandler = faultHandler;
            return this;
        }

        public Builder setQuorumFeatures(QuorumFeatures quorumFeatures) {
            this.quorumFeatures = quorumFeatures;
            return this;
        }

        public Builder setConfigSchema(KafkaConfigSchema kafkaConfigSchema) {
            this.configSchema = kafkaConfigSchema;
            return this;
        }

        public Builder setControllerMetrics(QuorumControllerMetrics quorumControllerMetrics) {
            this.controllerMetrics = quorumControllerMetrics;
            return this;
        }

        public Builder setTime(Time time) {
            this.time = time;
            return this;
        }

        public KRaftMigrationDriver build() {
            if (this.nodeId == null) {
                throw new IllegalStateException("You must specify the node ID of this controller.");
            }
            if (this.zkRecordConsumer == null) {
                throw new IllegalStateException("You must specify the ZkRecordConsumer.");
            }
            if (this.zkMigrationClient == null) {
                throw new IllegalStateException("You must specify the MigrationClient.");
            }
            if (this.propagator == null) {
                throw new IllegalStateException("You must specify the MetadataPropagator.");
            }
            if (this.initialZkLoadHandler == null) {
                throw new IllegalStateException("You must specify the initial ZK load callback.");
            }
            if (this.faultHandler == null) {
                throw new IllegalStateException("You must specify the FaultHandler.");
            }
            if (this.configSchema == null) {
                throw new IllegalStateException("You must specify the KafkaConfigSchema.");
            }
            if (this.controllerMetrics == null) {
                throw new IllegalStateException("You must specify the QuorumControllerMetrics.");
            }
            if (this.time == null) {
                throw new IllegalStateException("You must specify the Time.");
            }
            return new KRaftMigrationDriver(this.nodeId.intValue(), this.zkRecordConsumer, this.zkMigrationClient, this.propagator, this.initialZkLoadHandler, this.faultHandler, this.quorumFeatures, this.configSchema, this.controllerMetrics, this.time);
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$KRaftLeaderEvent.class */
    class KRaftLeaderEvent extends MigrationEvent {
        private final LeaderAndEpoch leaderAndEpoch;

        KRaftLeaderEvent(LeaderAndEpoch leaderAndEpoch) {
            super();
            this.leaderAndEpoch = leaderAndEpoch;
        }

        public void run() throws Exception {
            if (this.leaderAndEpoch.isLeader(KRaftMigrationDriver.this.nodeId)) {
                KRaftMigrationDriver.this.applyMigrationOperation("Became active migration driver", zkMigrationLeadershipState -> {
                    return KRaftMigrationDriver.this.zkMigrationClient.getOrCreateMigrationRecoveryState(zkMigrationLeadershipState).withNewKRaftController(KRaftMigrationDriver.this.nodeId, this.leaderAndEpoch.epoch());
                });
                KRaftMigrationDriver.this.transitionTo(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM);
            } else {
                KRaftMigrationDriver.this.applyMigrationOperation("Became inactive migration driver", zkMigrationLeadershipState2 -> {
                    return zkMigrationLeadershipState2.withNewKRaftController(this.leaderAndEpoch.leaderId().orElse(ZkMigrationLeadershipState.EMPTY.kraftControllerId()), this.leaderAndEpoch.epoch());
                });
                KRaftMigrationDriver.this.transitionTo(MigrationDriverState.INACTIVE);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$MetadataChangeEvent.class */
    public class MetadataChangeEvent extends MigrationEvent {
        private final MetadataDelta delta;
        private final MetadataImage image;
        private final MetadataProvenance provenance;
        private final boolean isSnapshot;
        private final Consumer<Throwable> completionHandler;

        MetadataChangeEvent(MetadataDelta metadataDelta, MetadataImage metadataImage, MetadataProvenance metadataProvenance, boolean z, Consumer<Throwable> consumer) {
            super();
            this.delta = metadataDelta;
            this.image = metadataImage;
            this.provenance = metadataProvenance;
            this.isSnapshot = z;
            this.completionHandler = consumer;
        }

        public void run() throws Exception {
            if (!KRaftMigrationDriver.this.firstPublish && this.image.isEmpty()) {
                KRaftMigrationDriver.this.log.debug("Encountered an empty MetadataImage while waiting for the first image to be published. Ignoring this image since it either does not include bootstrap records or it is a valid image for an older unsupported metadata version.");
                this.completionHandler.accept(null);
                return;
            }
            KRaftMigrationDriver.this.firstPublish = true;
            MetadataImage metadataImage = KRaftMigrationDriver.this.image;
            KRaftMigrationDriver.this.image = this.image;
            String str = this.isSnapshot ? "snapshot" : "delta";
            if (KRaftMigrationDriver.this.migrationState.equals(MigrationDriverState.INACTIVE)) {
                this.completionHandler.accept(null);
                return;
            }
            if (!KRaftMigrationDriver.this.migrationState.allowDualWrite()) {
                KRaftMigrationDriver.this.log.trace("Received metadata {}, but the controller is not in dual-write mode. Ignoring the change to be replicated to Zookeeper", str);
                this.completionHandler.accept(null);
                if (this.delta.clusterDelta() != null) {
                    KRaftMigrationDriver.this.wakeup();
                    return;
                }
                return;
            }
            if (!KRaftMigrationDriver.this.migrationLeadershipState.initialZkMigrationComplete()) {
                KRaftMigrationDriver.this.log.info("Ignoring {} {} since the migration has not finished.", str, this.provenance);
                this.completionHandler.accept(null);
                return;
            }
            if (this.image.highestOffsetAndEpoch().compareTo(KRaftMigrationDriver.this.migrationLeadershipState.offsetAndEpoch()) < 0) {
                KRaftMigrationDriver.this.log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", str, this.provenance);
                this.completionHandler.accept(null);
                return;
            }
            TreeMap treeMap = new TreeMap();
            long nanoseconds = KRaftMigrationDriver.this.time.nanoseconds();
            if (this.isSnapshot) {
                KRaftMigrationZkWriter kRaftMigrationZkWriter = KRaftMigrationDriver.this.zkMetadataWriter;
                MetadataImage metadataImage2 = this.image;
                KRaftMigrationDriver kRaftMigrationDriver = KRaftMigrationDriver.this;
                kRaftMigrationZkWriter.handleSnapshot(metadataImage2, KRaftMigrationDriver.countingOperationConsumer(treeMap, (str2, kRaftMigrationOperation) -> {
                    kRaftMigrationDriver.applyMigrationOperation(str2, kRaftMigrationOperation);
                }));
                KRaftMigrationDriver.this.controllerMetrics.updateZkWriteSnapshotTimeMs(TimeUnit.NANOSECONDS.toMillis(KRaftMigrationDriver.this.time.nanoseconds() - nanoseconds));
            } else {
                KRaftMigrationZkWriter kRaftMigrationZkWriter2 = KRaftMigrationDriver.this.zkMetadataWriter;
                MetadataImage metadataImage3 = this.image;
                MetadataDelta metadataDelta = this.delta;
                KRaftMigrationDriver kRaftMigrationDriver2 = KRaftMigrationDriver.this;
                if (kRaftMigrationZkWriter2.handleDelta(metadataImage, metadataImage3, metadataDelta, KRaftMigrationDriver.countingOperationConsumer(treeMap, (str3, kRaftMigrationOperation2) -> {
                    kRaftMigrationDriver2.applyMigrationOperation(str3, kRaftMigrationOperation2);
                }))) {
                    KRaftMigrationDriver.this.controllerMetrics.updateZkWriteDeltaTimeMs(TimeUnit.NANOSECONDS.toMillis(KRaftMigrationDriver.this.time.nanoseconds() - nanoseconds));
                }
            }
            if (treeMap.isEmpty()) {
                KRaftMigrationDriver.this.log.trace("Did not make any ZK writes when handling KRaft {}", this.isSnapshot ? "snapshot" : "delta");
            } else {
                KRaftMigrationDriver.this.log.debug("Made the following ZK writes when handling KRaft {}: {}", this.isSnapshot ? "snapshot" : "delta", treeMap);
            }
            ZkMigrationLeadershipState withKRaftMetadataOffsetAndEpoch = KRaftMigrationDriver.this.migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(this.image.highestOffsetAndEpoch().offset(), this.image.highestOffsetAndEpoch().epoch());
            KRaftMigrationDriver.this.controllerMetrics.updateDualWriteOffset(this.image.highestOffsetAndEpoch().offset());
            KRaftMigrationDriver.this.applyMigrationOperation("Updating ZK migration state after " + str, zkMigrationLeadershipState -> {
                return KRaftMigrationDriver.this.zkMigrationClient.setMigrationRecoveryState(withKRaftMetadataOffsetAndEpoch);
            });
            if (this.isSnapshot) {
                KRaftMigrationDriver.this.log.debug("Sending full metadata RPCs to brokers for snapshot.");
                KRaftMigrationDriver.this.propagator.sendRPCsToBrokersFromMetadataImage(this.image, KRaftMigrationDriver.this.migrationLeadershipState.zkControllerEpoch());
            } else if (this.delta.topicsDelta() == null && this.delta.clusterDelta() == null) {
                KRaftMigrationDriver.this.log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", str);
            } else {
                KRaftMigrationDriver.this.log.trace("Sending incremental metadata RPCs to brokers for delta.");
                KRaftMigrationDriver.this.propagator.sendRPCsToBrokersFromMetadataDelta(this.delta, this.image, KRaftMigrationDriver.this.migrationLeadershipState.zkControllerEpoch());
            }
            this.completionHandler.accept(null);
        }

        @Override // org.apache.kafka.metadata.migration.KRaftMigrationDriver.MigrationEvent
        public void handleException(Throwable th) {
            this.completionHandler.accept(th);
            super.handleException(th);
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$MigrateMetadataEvent.class */
    class MigrateMetadataEvent extends MigrationEvent {
        MigrateMetadataEvent() {
            super();
        }

        public void run() throws Exception {
            if (KRaftMigrationDriver.this.checkDriverState(MigrationDriverState.ZK_MIGRATION)) {
                HashSet hashSet = new HashSet();
                KRaftMigrationDriver.this.log.info("Starting ZK migration");
                MigrationManifest.Builder newBuilder = MigrationManifest.newBuilder(KRaftMigrationDriver.this.time);
                try {
                    FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "", "the metadata layer to begin the migration transaction", KRaftMigrationDriver.this.zkRecordConsumer.beginMigration(), Deadline.fromDelay(KRaftMigrationDriver.this.time, 300000L, TimeUnit.MILLISECONDS), KRaftMigrationDriver.this.time);
                } catch (Throwable th) {
                    KRaftMigrationDriver.this.log.error("Could not start the migration", th);
                    super.handleException(th);
                }
                try {
                    MigrationClient migrationClient = KRaftMigrationDriver.this.zkMigrationClient;
                    Consumer<List<ApiMessageAndVersion>> consumer = list -> {
                        try {
                            KRaftMigrationDriver.this.log.info("Migrating {} records from ZK", Integer.valueOf(list.size()));
                            if (KRaftMigrationDriver.this.log.isTraceEnabled()) {
                                list.forEach(apiMessageAndVersion -> {
                                    KRaftMigrationDriver.this.log.trace(KRaftMigrationDriver.this.recordRedactor.toLoggableString(apiMessageAndVersion.message()));
                                });
                            }
                            FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "", "the metadata layer to commit migration record batch", KRaftMigrationDriver.this.zkRecordConsumer.acceptBatch(list), Deadline.fromDelay(KRaftMigrationDriver.this.time, 300000L, TimeUnit.MILLISECONDS), KRaftMigrationDriver.this.time);
                            newBuilder.acceptBatch(list);
                        } catch (Throwable th2) {
                            throw new RuntimeException(th2);
                        }
                    };
                    hashSet.getClass();
                    migrationClient.readAllMetadata(consumer, (v1) -> {
                        r2.add(v1);
                    });
                    OffsetAndEpoch offsetAndEpoch = (OffsetAndEpoch) FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "", "the metadata layer to complete the migration", KRaftMigrationDriver.this.zkRecordConsumer.completeMigration(), Deadline.fromDelay(KRaftMigrationDriver.this.time, 300000L, TimeUnit.MILLISECONDS), KRaftMigrationDriver.this.time);
                    KRaftMigrationDriver.this.log.info("Completed migration of metadata from ZooKeeper to KRaft. {}. The current metadata offset is now {} with an epoch of {}. Saw {} brokers in the migrated metadata {}.", new Object[]{newBuilder.build(), Long.valueOf(offsetAndEpoch.offset()), Integer.valueOf(offsetAndEpoch.epoch()), Integer.valueOf(hashSet.size()), hashSet});
                    ZkMigrationLeadershipState withKRaftMetadataOffsetAndEpoch = KRaftMigrationDriver.this.migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(offsetAndEpoch.offset(), offsetAndEpoch.epoch());
                    KRaftMigrationDriver.this.applyMigrationOperation("Finished initial migration of ZK metadata to KRaft", zkMigrationLeadershipState -> {
                        return KRaftMigrationDriver.this.zkMigrationClient.setMigrationRecoveryState(withKRaftMetadataOffsetAndEpoch);
                    });
                    KRaftMigrationDriver.this.transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
                } catch (Throwable th2) {
                    KRaftMigrationDriver.this.log.error("Aborting the metadata migration from ZooKeeper to KRaft. {}.", newBuilder.build(), th2);
                    KRaftMigrationDriver.this.zkRecordConsumer.abortMigration();
                    super.handleException(th2);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$MigrationEvent.class */
    abstract class MigrationEvent implements EventQueue.Event {
        MigrationEvent() {
        }

        public void handleException(Throwable th) {
            if (th instanceof MigrationClientAuthException) {
                KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper authentication in " + this, th);
                return;
            }
            if (th instanceof MigrationClientException) {
                KRaftMigrationDriver.this.log.info(String.format("Encountered ZooKeeper error during event %s. Will retry.", this), th.getCause());
            } else if (th instanceof RejectedExecutionException) {
                KRaftMigrationDriver.this.log.debug("Not processing {} because the event queue is closed.", this);
            } else {
                KRaftMigrationDriver.this.faultHandler.handleFault("Unhandled error in " + this, th);
            }
        }

        public String toString() {
            return getClass().getSimpleName();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$PollEvent.class */
    public class PollEvent extends MigrationEvent {
        PollEvent() {
            super();
        }

        public void run() throws Exception {
            switch (AnonymousClass1.$SwitchMap$org$apache$kafka$metadata$migration$MigrationDriverState[KRaftMigrationDriver.this.migrationState.ordinal()]) {
                case BrokerRegistrationChangeRecord.HIGHEST_SUPPORTED_VERSION /* 1 */:
                    KRaftMigrationDriver.this.recoverMigrationStateFromZK();
                    break;
                case 4:
                    KRaftMigrationDriver.this.eventQueue.append(new WaitForControllerQuorumEvent());
                    break;
                case 5:
                    KRaftMigrationDriver.this.eventQueue.append(new WaitForZkBrokersEvent());
                    break;
                case 6:
                    KRaftMigrationDriver.this.eventQueue.append(new BecomeZkControllerEvent());
                    break;
                case 7:
                    KRaftMigrationDriver.this.eventQueue.append(new MigrateMetadataEvent());
                    break;
                case 8:
                    KRaftMigrationDriver.this.eventQueue.append(new SyncKRaftMetadataEvent());
                    break;
                case 9:
                    KRaftMigrationDriver.this.eventQueue.append(new SendRPCsToBrokersEvent());
                    break;
            }
            KRaftMigrationDriver.this.eventQueue.scheduleDeferred("poll", new EventQueue.DeadlineFunction(KRaftMigrationDriver.this.time.nanoseconds() + TimeUnit.NANOSECONDS.convert(KRaftMigrationDriver.this.pollTimeSupplier.nextPollTimeMs(), TimeUnit.MILLISECONDS)), new PollEvent());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$PollTimeSupplier.class */
    public static class PollTimeSupplier {
        private long pollCount = 0;
        private final ExponentialBackoff pollBackoff = new ExponentialBackoff(100, 2, 60000, 0.02d);

        PollTimeSupplier() {
        }

        void reset() {
            this.pollCount = 0L;
        }

        public long nextPollTimeMs() {
            long backoff = this.pollBackoff.backoff(this.pollCount);
            this.pollCount++;
            return backoff;
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$SendRPCsToBrokersEvent.class */
    class SendRPCsToBrokersEvent extends MigrationEvent {
        SendRPCsToBrokersEvent() {
            super();
        }

        public void run() throws Exception {
            if (KRaftMigrationDriver.this.checkDriverState(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM)) {
                if (KRaftMigrationDriver.this.image.highestOffsetAndEpoch().compareTo(KRaftMigrationDriver.this.migrationLeadershipState.offsetAndEpoch()) < 0) {
                    KRaftMigrationDriver.this.log.info("Not sending metadata RPCs with current metadata image since does not contain the offset that was last written to ZK during the migration. Image offset {} is less than migration leadership state offset {}", KRaftMigrationDriver.this.image.highestOffsetAndEpoch(), KRaftMigrationDriver.this.migrationLeadershipState.offsetAndEpoch());
                    return;
                }
                KRaftMigrationDriver.this.log.info("Sending RPCs to broker before moving to dual-write mode using at offset and epoch {}", KRaftMigrationDriver.this.image.highestOffsetAndEpoch());
                KRaftMigrationDriver.this.propagator.sendRPCsToBrokersFromMetadataImage(KRaftMigrationDriver.this.image, KRaftMigrationDriver.this.migrationLeadershipState.zkControllerEpoch());
                KRaftMigrationDriver.this.transitionTo(MigrationDriverState.DUAL_WRITE);
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$SyncKRaftMetadataEvent.class */
    class SyncKRaftMetadataEvent extends MigrationEvent {
        SyncKRaftMetadataEvent() {
            super();
        }

        public void run() throws Exception {
            if (KRaftMigrationDriver.this.checkDriverState(MigrationDriverState.SYNC_KRAFT_TO_ZK)) {
                if (KRaftMigrationDriver.this.image.highestOffsetAndEpoch().compareTo(KRaftMigrationDriver.this.migrationLeadershipState.offsetAndEpoch()) < 0) {
                    KRaftMigrationDriver.this.log.info("Ignoring image {} which does not contain a superset of the metadata in ZK. Staying in SYNC_KRAFT_TO_ZK until a newer image is loaded", KRaftMigrationDriver.this.image.provenance());
                    return;
                }
                KRaftMigrationDriver.this.log.info("Performing a full metadata sync from KRaft to ZK.");
                TreeMap treeMap = new TreeMap();
                long nanoseconds = KRaftMigrationDriver.this.time.nanoseconds();
                KRaftMigrationZkWriter kRaftMigrationZkWriter = KRaftMigrationDriver.this.zkMetadataWriter;
                MetadataImage metadataImage = KRaftMigrationDriver.this.image;
                KRaftMigrationDriver kRaftMigrationDriver = KRaftMigrationDriver.this;
                kRaftMigrationZkWriter.handleSnapshot(metadataImage, KRaftMigrationDriver.countingOperationConsumer(treeMap, (str, kRaftMigrationOperation) -> {
                    kRaftMigrationDriver.applyMigrationOperation(str, kRaftMigrationOperation);
                }));
                KRaftMigrationDriver.this.controllerMetrics.updateZkWriteSnapshotTimeMs(TimeUnit.NANOSECONDS.toMillis(nanoseconds - KRaftMigrationDriver.this.time.nanoseconds()));
                KRaftMigrationDriver.this.log.info("Made the following ZK writes when reconciling with KRaft state: {}", treeMap);
                KRaftMigrationDriver.this.transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$WaitForControllerQuorumEvent.class */
    class WaitForControllerQuorumEvent extends MigrationEvent {
        WaitForControllerQuorumEvent() {
            super();
        }

        public void run() throws Exception {
            if (KRaftMigrationDriver.this.checkDriverState(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM)) {
                if (!KRaftMigrationDriver.this.firstPublish) {
                    KRaftMigrationDriver.this.log.trace("Waiting until we have received metadata before proceeding with migration");
                    return;
                }
                ZkMigrationState zkMigrationState = KRaftMigrationDriver.this.image.features().zkMigrationState();
                switch (AnonymousClass1.$SwitchMap$org$apache$kafka$metadata$migration$ZkMigrationState[zkMigrationState.ordinal()]) {
                    case BrokerRegistrationChangeRecord.HIGHEST_SUPPORTED_VERSION /* 1 */:
                        KRaftMigrationDriver.this.log.error("The controller's ZkMigrationState is NONE which means this cluster should not be migrated from ZooKeeper. This controller should not be configured with 'zookeeper.metadata.migration.enable' set to true. Will not proceed with a migration.");
                        KRaftMigrationDriver.this.transitionTo(MigrationDriverState.INACTIVE);
                        return;
                    case 2:
                        if (KRaftMigrationDriver.this.isControllerQuorumReadyForMigration()) {
                            KRaftMigrationDriver.this.log.info("Controller Quorum is ready for Zk to KRaft migration. Now waiting for ZK brokers.");
                            KRaftMigrationDriver.this.transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
                            return;
                        }
                        return;
                    case 3:
                        if (KRaftMigrationDriver.this.migrationLeadershipState.initialZkMigrationComplete()) {
                            KRaftMigrationDriver.this.log.info("Migration is in already progress, not waiting on ZK brokers.");
                            KRaftMigrationDriver.this.transitionTo(MigrationDriverState.BECOME_CONTROLLER);
                            return;
                        } else {
                            KRaftMigrationDriver.this.log.error("KRaft controller indicates an active migration, but the ZK state does not.");
                            KRaftMigrationDriver.this.transitionTo(MigrationDriverState.INACTIVE);
                            return;
                        }
                    case 4:
                        KRaftMigrationDriver.this.log.error("KRaft controller indicates a completed migration, but the migration driver is somehow active.");
                        KRaftMigrationDriver.this.transitionTo(MigrationDriverState.INACTIVE);
                        return;
                    default:
                        throw new IllegalStateException("Unsupported ZkMigrationState " + zkMigrationState);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$WaitForZkBrokersEvent.class */
    class WaitForZkBrokersEvent extends MigrationEvent {
        WaitForZkBrokersEvent() {
            super();
        }

        public void run() throws Exception {
            if (KRaftMigrationDriver.this.checkDriverState(MigrationDriverState.WAIT_FOR_BROKERS) && KRaftMigrationDriver.this.areZkBrokersReadyForMigration()) {
                KRaftMigrationDriver.this.log.info("Zk brokers are registered and ready for migration");
                KRaftMigrationDriver.this.transitionTo(MigrationDriverState.BECOME_CONTROLLER);
            }
        }
    }

    KRaftMigrationDriver(int i, ZkRecordConsumer zkRecordConsumer, MigrationClient migrationClient, LegacyPropagator legacyPropagator, Consumer<MetadataPublisher> consumer, FaultHandler faultHandler, QuorumFeatures quorumFeatures, KafkaConfigSchema kafkaConfigSchema, QuorumControllerMetrics quorumControllerMetrics, Time time) {
        this.nodeId = i;
        this.zkRecordConsumer = zkRecordConsumer;
        this.zkMigrationClient = migrationClient;
        this.propagator = legacyPropagator;
        this.time = time;
        LogContext logContext = new LogContext("[KRaftMigrationDriver id=" + i + "] ");
        this.controllerMetrics = quorumControllerMetrics;
        this.log = logContext.logger(KRaftMigrationDriver.class);
        this.migrationState = MigrationDriverState.UNINITIALIZED;
        this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, "controller-" + i + "-migration-driver-");
        this.pollTimeSupplier = new PollTimeSupplier();
        this.image = MetadataImage.EMPTY;
        this.firstPublish = false;
        this.initialZkLoadHandler = consumer;
        this.faultHandler = faultHandler;
        this.quorumFeatures = quorumFeatures;
        this.zkMetadataWriter = new KRaftMigrationZkWriter(migrationClient);
        this.recordRedactor = new RecordRedactor(kafkaConfigSchema);
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    public void start() {
        this.eventQueue.prepend(new PollEvent());
    }

    public CompletableFuture<MigrationDriverState> migrationState() {
        CompletableFuture<MigrationDriverState> completableFuture = new CompletableFuture<>();
        this.eventQueue.append(() -> {
            completableFuture.complete(this.migrationState);
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void recoverMigrationStateFromZK() {
        MigrationClient migrationClient = this.zkMigrationClient;
        migrationClient.getClass();
        applyMigrationOperation("Recovering migration state from ZK", migrationClient::getOrCreateMigrationRecoveryState);
        this.log.info("Initial migration of ZK metadata is {}.", this.migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done");
        this.initialZkLoadHandler.accept(this);
        transitionTo(MigrationDriverState.INACTIVE);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isControllerQuorumReadyForMigration() {
        Optional<String> reasonAllControllersZkMigrationNotReady = this.quorumFeatures.reasonAllControllersZkMigrationNotReady();
        if (!reasonAllControllersZkMigrationNotReady.isPresent()) {
            return true;
        }
        this.log.warn("Still waiting for all controller nodes ready to begin the migration. Not ready due to:" + reasonAllControllersZkMigrationNotReady.get());
        return false;
    }

    private boolean imageDoesNotContainAllBrokers(MetadataImage metadataImage, Set<Integer> set) {
        for (BrokerRegistration brokerRegistration : metadataImage.cluster().brokers().values()) {
            if (brokerRegistration.isMigratingZkBroker()) {
                set.remove(Integer.valueOf(brokerRegistration.id()));
            }
        }
        return !set.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean areZkBrokersReadyForMigration() {
        if (!this.firstPublish) {
            this.log.info("Waiting for initial metadata publish before checking if Zk brokers are registered.");
            return false;
        }
        if (this.image.cluster().isEmpty()) {
            this.log.info("No brokers are known to KRaft, waiting for brokers to register.");
            return false;
        }
        Set<Integer> readBrokerIds = this.zkMigrationClient.readBrokerIds();
        if (readBrokerIds.isEmpty()) {
            this.log.info("No brokers are registered in ZK, waiting for brokers to register.");
            return false;
        }
        if (imageDoesNotContainAllBrokers(this.image, readBrokerIds)) {
            this.log.info("Still waiting for ZK brokers {} to register with KRaft.", readBrokerIds);
            return false;
        }
        HashSet hashSet = new HashSet();
        this.zkMigrationClient.topicClient().iterateTopics(EnumSet.of(TopicMigrationClient.TopicVisitorInterest.TOPICS), (str, uuid, map) -> {
            Collection values = map.values();
            hashSet.getClass();
            values.forEach((v1) -> {
                r1.addAll(v1);
            });
        });
        if (!imageDoesNotContainAllBrokers(this.image, hashSet)) {
            return true;
        }
        this.log.info("Still waiting for ZK brokers {} found in metadata to register with KRaft.", hashSet);
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void applyMigrationOperation(String str, KRaftMigrationOperation kRaftMigrationOperation) {
        ZkMigrationLeadershipState zkMigrationLeadershipState = this.migrationLeadershipState;
        ZkMigrationLeadershipState apply = kRaftMigrationOperation.apply(zkMigrationLeadershipState);
        if (apply.loggableChangeSinceState(zkMigrationLeadershipState)) {
            this.log.info("{}. Transitioned migration state from {} to {}", new Object[]{str, zkMigrationLeadershipState, apply});
        } else if (apply.equals(zkMigrationLeadershipState)) {
            this.log.trace("{}. Kept migration state as {}", str, apply);
        } else {
            this.log.trace("{}. Transitioned migration state from {} to {}", new Object[]{str, zkMigrationLeadershipState, apply});
        }
        this.migrationLeadershipState = apply;
    }

    private boolean isValidStateChange(MigrationDriverState migrationDriverState) {
        if (this.migrationState == migrationDriverState) {
            return true;
        }
        if (migrationDriverState == MigrationDriverState.UNINITIALIZED) {
            return false;
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$metadata$migration$MigrationDriverState[this.migrationState.ordinal()]) {
            case BrokerRegistrationChangeRecord.HIGHEST_SUPPORTED_VERSION /* 1 */:
            case 2:
                return migrationDriverState == MigrationDriverState.INACTIVE;
            case 3:
                return migrationDriverState == MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM;
            case 4:
                return migrationDriverState == MigrationDriverState.INACTIVE || migrationDriverState == MigrationDriverState.BECOME_CONTROLLER || migrationDriverState == MigrationDriverState.WAIT_FOR_BROKERS;
            case 5:
                return migrationDriverState == MigrationDriverState.INACTIVE || migrationDriverState == MigrationDriverState.BECOME_CONTROLLER;
            case 6:
                return migrationDriverState == MigrationDriverState.INACTIVE || migrationDriverState == MigrationDriverState.ZK_MIGRATION || migrationDriverState == MigrationDriverState.SYNC_KRAFT_TO_ZK;
            case 7:
                return migrationDriverState == MigrationDriverState.INACTIVE || migrationDriverState == MigrationDriverState.SYNC_KRAFT_TO_ZK;
            case 8:
                return migrationDriverState == MigrationDriverState.INACTIVE || migrationDriverState == MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM;
            case 9:
                return migrationDriverState == MigrationDriverState.INACTIVE || migrationDriverState == MigrationDriverState.DUAL_WRITE;
            default:
                this.log.error("Migration driver trying to transition from an unknown state {}", this.migrationState);
                return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean checkDriverState(MigrationDriverState migrationDriverState) {
        if (this.migrationState.equals(migrationDriverState)) {
            return true;
        }
        this.log.info("Expected driver state {} but found {}. Not running this event {}.", new Object[]{migrationDriverState, this.migrationState, getClass().getSimpleName()});
        return false;
    }

    void transitionTo(MigrationDriverState migrationDriverState) {
        if (!isValidStateChange(migrationDriverState)) {
            throw new IllegalStateException(String.format("Invalid transition in migration driver from %s to %s", this.migrationState, migrationDriverState));
        }
        if (migrationDriverState != this.migrationState) {
            this.log.debug("{} transitioning from {} to {} state", new Object[]{Integer.valueOf(this.nodeId), this.migrationState, migrationDriverState});
            this.pollTimeSupplier.reset();
            wakeup();
        } else {
            this.log.trace("{} transitioning from {} to {} state", new Object[]{Integer.valueOf(this.nodeId), this.migrationState, migrationDriverState});
        }
        this.migrationState = migrationDriverState;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void wakeup() {
        this.eventQueue.append(new PollEvent());
    }

    @Override // org.apache.kafka.image.publisher.MetadataPublisher
    public String name() {
        return "KRaftMigrationDriver";
    }

    @Override // org.apache.kafka.image.publisher.MetadataPublisher
    public void onControllerChange(LeaderAndEpoch leaderAndEpoch) {
        this.eventQueue.append(new KRaftLeaderEvent(leaderAndEpoch));
    }

    @Override // org.apache.kafka.image.publisher.MetadataPublisher
    public void onMetadataUpdate(MetadataDelta metadataDelta, MetadataImage metadataImage, LoaderManifest loaderManifest) {
        enqueueMetadataChangeEvent(metadataDelta, metadataImage, loaderManifest.provenance(), loaderManifest.type() == LoaderManifestType.SNAPSHOT, NO_OP_HANDLER);
    }

    @Override // org.apache.kafka.image.publisher.MetadataPublisher, java.lang.AutoCloseable
    public void close() throws InterruptedException {
        this.eventQueue.beginShutdown("KRaftMigrationDriver#shutdown");
        this.log.debug("Shutting down KRaftMigrationDriver");
        this.eventQueue.close();
    }

    void enqueueMetadataChangeEvent(MetadataDelta metadataDelta, MetadataImage metadataImage, MetadataProvenance metadataProvenance, boolean z, Consumer<Throwable> consumer) {
        this.eventQueue.append(new MetadataChangeEvent(metadataDelta, metadataImage, metadataProvenance, z, consumer));
    }

    static KRaftMigrationOperationConsumer countingOperationConsumer(Map<String, Integer> map, BiConsumer<String, KRaftMigrationOperation> biConsumer) {
        return (str, str2, kRaftMigrationOperation) -> {
            map.compute(str, (str, num) -> {
                if (num == null) {
                    return 1;
                }
                return Integer.valueOf(num.intValue() + 1);
            });
            biConsumer.accept(str2, kRaftMigrationOperation);
        };
    }
}
