/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.metadata.migration;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.image.loader.SnapshotManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.metadata.migration.LegacyPropagator;
import org.apache.kafka.metadata.migration.MigrationClient;
import org.apache.kafka.metadata.migration.MigrationState;
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState;
import org.apache.kafka.metadata.migration.ZkRecordConsumer;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KRaftMigrationDriver
implements MetadataPublisher {
    private final Time time;
    private final Logger log;
    private final int nodeId;
    private final MigrationClient zkMigrationClient;
    private final LegacyPropagator propagator;
    private final ZkRecordConsumer zkRecordConsumer;
    private final KafkaEventQueue eventQueue;
    private final FaultHandler faultHandler;
    private final Consumer<KRaftMigrationDriver> initialZkLoadHandler;
    private volatile LeaderAndEpoch leaderAndEpoch;
    private volatile MigrationState migrationState;
    private volatile ZkMigrationLeadershipState migrationLeadershipState;
    private volatile MetadataImage image;

    public KRaftMigrationDriver(int nodeId, ZkRecordConsumer zkRecordConsumer, MigrationClient zkMigrationClient, LegacyPropagator propagator, Consumer<KRaftMigrationDriver> initialZkLoadHandler, FaultHandler faultHandler) {
        this.nodeId = nodeId;
        this.zkRecordConsumer = zkRecordConsumer;
        this.zkMigrationClient = zkMigrationClient;
        this.propagator = propagator;
        this.time = Time.SYSTEM;
        this.log = LoggerFactory.getLogger(KRaftMigrationDriver.class);
        this.migrationState = MigrationState.UNINITIALIZED;
        this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext("KRaftMigrationDriver"), "kraft-migration");
        this.image = MetadataImage.EMPTY;
        this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN;
        this.initialZkLoadHandler = initialZkLoadHandler;
        this.faultHandler = faultHandler;
    }

    public void start() {
        this.eventQueue.prepend((EventQueue.Event)new PollEvent());
    }

    public void shutdown() throws InterruptedException {
        this.eventQueue.beginShutdown("KRaftMigrationDriver#shutdown");
        this.log.debug("Shutting down KRaftMigrationDriver");
        this.eventQueue.close();
    }

    private void initializeMigrationState() {
        this.log.info("Recovering migration state");
        this.apply("Recovery", this.zkMigrationClient::getOrCreateMigrationRecoveryState);
        String maybeDone = this.migrationLeadershipState.zkMigrationComplete() ? "done" : "not done";
        this.log.info("Recovered migration state {}. ZK migration is {}.", (Object)this.migrationLeadershipState, (Object)maybeDone);
        this.initialZkLoadHandler.accept(this);
        this.transitionTo(MigrationState.INACTIVE);
    }

    private boolean isControllerQuorumReadyForMigration() {
        return true;
    }

    private boolean areZkBrokersReadyForMigration() {
        if (this.image == MetadataImage.EMPTY) {
            this.log.info("Waiting for initial metadata publish before checking if Zk brokers are registered.");
            return false;
        }
        Set<Integer> kraftRegisteredZkBrokers = this.image.cluster().zkBrokers().keySet();
        Set<Integer> zkRegisteredZkBrokers = this.zkMigrationClient.readBrokerIdsFromTopicAssignments();
        zkRegisteredZkBrokers.removeAll(kraftRegisteredZkBrokers);
        if (zkRegisteredZkBrokers.isEmpty()) {
            return true;
        }
        this.log.info("Still waiting for ZK brokers {} to register with KRaft.", zkRegisteredZkBrokers);
        return false;
    }

    private void apply(String name, Function<ZkMigrationLeadershipState, ZkMigrationLeadershipState> stateMutator) {
        ZkMigrationLeadershipState beforeState = this.migrationLeadershipState;
        ZkMigrationLeadershipState afterState = stateMutator.apply(beforeState);
        this.log.trace("{} transitioned from {} to {}", new Object[]{name, beforeState, afterState});
        this.migrationLeadershipState = afterState;
    }

    private boolean isValidStateChange(MigrationState newState) {
        if (this.migrationState == newState) {
            return true;
        }
        switch (this.migrationState) {
            case UNINITIALIZED: 
            case DUAL_WRITE: {
                return newState == MigrationState.INACTIVE;
            }
            case INACTIVE: {
                return newState == MigrationState.WAIT_FOR_CONTROLLER_QUORUM;
            }
            case WAIT_FOR_CONTROLLER_QUORUM: {
                return newState == MigrationState.INACTIVE || newState == MigrationState.WAIT_FOR_BROKERS;
            }
            case WAIT_FOR_BROKERS: {
                return newState == MigrationState.INACTIVE || newState == MigrationState.BECOME_CONTROLLER;
            }
            case BECOME_CONTROLLER: {
                return newState == MigrationState.INACTIVE || newState == MigrationState.ZK_MIGRATION || newState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM;
            }
            case ZK_MIGRATION: {
                return newState == MigrationState.INACTIVE || newState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM;
            }
            case KRAFT_CONTROLLER_TO_BROKER_COMM: {
                return newState == MigrationState.INACTIVE || newState == MigrationState.DUAL_WRITE;
            }
        }
        this.log.error("Migration driver trying to transition from an unknown state {}", (Object)this.migrationState);
        return false;
    }

    private void transitionTo(MigrationState newState) {
        if (!this.isValidStateChange(newState)) {
            this.log.error("Error transition in migration driver from {} to {}", (Object)this.migrationState, (Object)newState);
            return;
        }
        if (newState != this.migrationState) {
            this.log.debug("{} transitioning from {} to {} state", new Object[]{this.nodeId, this.migrationState, newState});
        } else {
            this.log.trace("{} transitioning from {} to {} state", new Object[]{this.nodeId, this.migrationState, newState});
        }
        switch (newState) {
            case UNINITIALIZED: {
                throw new IllegalStateException("Illegal transition from " + (Object)((Object)this.migrationState) + " to " + (Object)((Object)newState) + " state in Zk to KRaft migration");
            }
        }
        this.migrationState = newState;
    }

    @Override
    public String name() {
        return "KRaftMigrationDriver";
    }

    @Override
    public void publishSnapshot(MetadataDelta delta, MetadataImage newImage, SnapshotManifest manifest) {
        this.eventQueue.append((EventQueue.Event)new MetadataChangeEvent(delta, newImage, manifest.provenance(), true));
    }

    @Override
    public void publishLogDelta(MetadataDelta delta, MetadataImage newImage, LogDeltaManifest manifest) {
        if (!this.leaderAndEpoch.equals((Object)manifest.leaderAndEpoch())) {
            this.eventQueue.append((EventQueue.Event)new KRaftLeaderEvent(manifest.leaderAndEpoch()));
        }
        this.eventQueue.append((EventQueue.Event)new MetadataChangeEvent(delta, newImage, manifest.provenance(), false));
    }

    @Override
    public void close() throws Exception {
        this.eventQueue.close();
    }

    static String recordBatchToString(Collection<ApiMessageAndVersion> batch) {
        String batchString = batch.stream().map(apiMessageAndVersion -> {
            if (apiMessageAndVersion.message().apiKey() == MetadataRecordType.CONFIG_RECORD.id()) {
                StringBuilder sb = new StringBuilder();
                sb.append("ApiMessageAndVersion(");
                ConfigRecord record = (ConfigRecord)apiMessageAndVersion.message();
                sb.append("ConfigRecord(");
                sb.append("resourceType=");
                sb.append(record.resourceType());
                sb.append(", resourceName=");
                sb.append(record.resourceName());
                sb.append(", name=");
                sb.append(record.name());
                sb.append(")");
                sb.append(" at version ");
                sb.append(apiMessageAndVersion.version());
                sb.append(")");
                return sb.toString();
            }
            return apiMessageAndVersion.toString();
        }).collect(Collectors.joining(","));
        return "[" + batchString + "]";
    }

    class MetadataChangeEvent
    extends MigrationEvent {
        private final MetadataDelta delta;
        private final MetadataImage image;
        private final MetadataProvenance provenance;
        private final boolean isSnapshot;

        MetadataChangeEvent(MetadataDelta delta, MetadataImage image, MetadataProvenance provenance, boolean isSnapshot) {
            this.delta = delta;
            this.image = image;
            this.provenance = provenance;
            this.isSnapshot = isSnapshot;
        }

        public void run() throws Exception {
            String metadataType;
            KRaftMigrationDriver.this.image = this.image;
            String string = metadataType = this.isSnapshot ? "snapshot" : "delta";
            if (KRaftMigrationDriver.this.migrationState != MigrationState.DUAL_WRITE) {
                KRaftMigrationDriver.this.log.trace("Received metadata {}, but the controller is not in dual-write mode. Ignoring the change to be replicated to Zookeeper", (Object)metadataType);
                return;
            }
            if (this.delta.featuresDelta() != null) {
                KRaftMigrationDriver.this.propagator.setMetadataVersion(this.image.features().metadataVersion());
            }
            if (this.image.highestOffsetAndEpoch().compareTo(KRaftMigrationDriver.this.migrationLeadershipState.offsetAndEpoch()) >= 0) {
                if (this.delta.topicsDelta() != null) {
                    this.delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
                        if (this.delta.topicsDelta().createdTopicIds().contains(topicId)) {
                            KRaftMigrationDriver.this.apply("Create topic " + topicDelta.name(), migrationState -> KRaftMigrationDriver.this.zkMigrationClient.createTopic(topicDelta.name(), (Uuid)topicId, topicDelta.partitionChanges(), (ZkMigrationLeadershipState)migrationState));
                        } else {
                            KRaftMigrationDriver.this.apply("Updating topic " + topicDelta.name(), migrationState -> KRaftMigrationDriver.this.zkMigrationClient.updateTopicPartitions(Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()), (ZkMigrationLeadershipState)migrationState));
                        }
                    });
                }
                if (this.delta.configsDelta() != null) {
                    this.delta.configsDelta().changes().forEach((configResource, configDelta) -> KRaftMigrationDriver.this.apply("Updating config resource " + configResource, migrationState -> KRaftMigrationDriver.this.zkMigrationClient.writeConfigs((ConfigResource)configResource, this.image.configs().configMapForResource((ConfigResource)configResource), (ZkMigrationLeadershipState)migrationState)));
                }
                if (this.delta.clientQuotasDelta() != null) {
                    this.delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
                        Map<String, Double> quotaMap = this.image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
                        KRaftMigrationDriver.this.apply("Updating client quota " + clientQuotaEntity, migrationState -> KRaftMigrationDriver.this.zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, (ZkMigrationLeadershipState)migrationState));
                    });
                }
                if (this.delta.producerIdsDelta() != null) {
                    KRaftMigrationDriver.this.apply("Updating next producer ID", migrationState -> KRaftMigrationDriver.this.zkMigrationClient.writeProducerId(this.delta.producerIdsDelta().nextProducerId(), (ZkMigrationLeadershipState)migrationState));
                }
                KRaftMigrationDriver.this.log.trace("Sending RPCs to brokers for metadata {}.", (Object)metadataType);
                KRaftMigrationDriver.this.propagator.sendRPCsToBrokersFromMetadataDelta(this.delta, this.image, KRaftMigrationDriver.this.migrationLeadershipState.zkControllerEpoch());
            } else {
                KRaftMigrationDriver.this.log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", (Object)metadataType, (Object)this.provenance);
            }
        }
    }

    class SendRPCsToBrokersEvent
    extends MigrationEvent {
        SendRPCsToBrokersEvent() {
        }

        public void run() throws Exception {
            if (KRaftMigrationDriver.this.migrationState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM) {
                if (KRaftMigrationDriver.this.image.highestOffsetAndEpoch().compareTo(KRaftMigrationDriver.this.migrationLeadershipState.offsetAndEpoch()) >= 0) {
                    KRaftMigrationDriver.this.log.trace("Sending RPCs to broker before moving to dual-write mode using at offset and epoch {}", (Object)KRaftMigrationDriver.this.image.highestOffsetAndEpoch());
                    KRaftMigrationDriver.this.propagator.sendRPCsToBrokersFromMetadataImage(KRaftMigrationDriver.this.image, KRaftMigrationDriver.this.migrationLeadershipState.zkControllerEpoch());
                    KRaftMigrationDriver.this.transitionTo(MigrationState.DUAL_WRITE);
                } else {
                    KRaftMigrationDriver.this.log.trace("Ignoring using metadata image since migration leadership state is at a greater offset and epoch {}", (Object)KRaftMigrationDriver.this.migrationLeadershipState.offsetAndEpoch());
                }
            }
        }
    }

    class MigrateMetadataEvent
    extends MigrationEvent {
        MigrateMetadataEvent() {
        }

        public void run() throws Exception {
            HashSet brokersInMetadata = new HashSet();
            KRaftMigrationDriver.this.log.info("Starting ZK migration");
            KRaftMigrationDriver.this.zkRecordConsumer.beginMigration();
            try {
                AtomicInteger count = new AtomicInteger(0);
                KRaftMigrationDriver.this.zkMigrationClient.readAllMetadata(batch -> {
                    try {
                        if (KRaftMigrationDriver.this.log.isTraceEnabled()) {
                            KRaftMigrationDriver.this.log.trace("Migrating {} records from ZK: {}", (Object)batch.size(), (Object)KRaftMigrationDriver.recordBatchToString(batch));
                        } else {
                            KRaftMigrationDriver.this.log.info("Migrating {} records from ZK", (Object)batch.size());
                        }
                        CompletableFuture<?> future = KRaftMigrationDriver.this.zkRecordConsumer.acceptBatch((List<ApiMessageAndVersion>)batch);
                        count.addAndGet(batch.size());
                        future.get();
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    catch (ExecutionException e) {
                        throw new RuntimeException(e.getCause());
                    }
                }, brokersInMetadata::add);
                OffsetAndEpoch offsetAndEpochAfterMigration = KRaftMigrationDriver.this.zkRecordConsumer.completeMigration();
                KRaftMigrationDriver.this.log.info("Completed migration of metadata from Zookeeper to KRaft. A total of {} metadata records were generated. The current metadata offset is now {} with an epoch of {}. Saw {} brokers in the migrated metadata {}.", new Object[]{count.get(), offsetAndEpochAfterMigration.offset(), offsetAndEpochAfterMigration.epoch(), brokersInMetadata.size(), brokersInMetadata});
                ZkMigrationLeadershipState newState = KRaftMigrationDriver.this.migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(offsetAndEpochAfterMigration.offset(), offsetAndEpochAfterMigration.epoch());
                KRaftMigrationDriver.this.apply("Migrate metadata from Zk", state -> KRaftMigrationDriver.this.zkMigrationClient.setMigrationRecoveryState(newState));
                KRaftMigrationDriver.this.transitionTo(MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM);
            }
            catch (Throwable t) {
                KRaftMigrationDriver.this.zkRecordConsumer.abortMigration();
            }
        }

        @Override
        public void handleException(Throwable e) {
            KRaftMigrationDriver.this.log.error("Had an exception in " + this.getClass().getSimpleName(), e);
        }
    }

    class WaitForZkBrokersEvent
    extends MigrationEvent {
        WaitForZkBrokersEvent() {
        }

        public void run() throws Exception {
            switch (KRaftMigrationDriver.this.migrationState) {
                case WAIT_FOR_BROKERS: {
                    if (!KRaftMigrationDriver.this.areZkBrokersReadyForMigration()) break;
                    KRaftMigrationDriver.this.log.debug("Zk brokers are registered and ready for migration");
                    KRaftMigrationDriver.this.transitionTo(MigrationState.BECOME_CONTROLLER);
                    break;
                }
            }
        }

        @Override
        public void handleException(Throwable e) {
            KRaftMigrationDriver.this.log.error("Had an exception in " + this.getClass().getSimpleName(), e);
        }
    }

    class BecomeZkControllerEvent
    extends MigrationEvent {
        BecomeZkControllerEvent() {
        }

        public void run() throws Exception {
            switch (KRaftMigrationDriver.this.migrationState) {
                case BECOME_CONTROLLER: {
                    KRaftMigrationDriver.this.apply("BecomeZkLeaderEvent", KRaftMigrationDriver.this.zkMigrationClient::claimControllerLeadership);
                    if (KRaftMigrationDriver.this.migrationLeadershipState.zkControllerEpochZkVersion() == -1) break;
                    if (!KRaftMigrationDriver.this.migrationLeadershipState.zkMigrationComplete()) {
                        KRaftMigrationDriver.this.transitionTo(MigrationState.ZK_MIGRATION);
                        break;
                    }
                    KRaftMigrationDriver.this.transitionTo(MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM);
                    break;
                }
            }
        }

        @Override
        public void handleException(Throwable e) {
            KRaftMigrationDriver.this.log.error("Had an exception in " + this.getClass().getSimpleName(), e);
        }
    }

    class WaitForControllerQuorumEvent
    extends MigrationEvent {
        WaitForControllerQuorumEvent() {
        }

        public void run() throws Exception {
            switch (KRaftMigrationDriver.this.migrationState) {
                case WAIT_FOR_CONTROLLER_QUORUM: {
                    if (!KRaftMigrationDriver.this.isControllerQuorumReadyForMigration()) break;
                    KRaftMigrationDriver.this.log.debug("Controller Quorum is ready for Zk to KRaft migration");
                    KRaftMigrationDriver.this.transitionTo(MigrationState.WAIT_FOR_BROKERS);
                    break;
                }
            }
        }

        @Override
        public void handleException(Throwable e) {
            KRaftMigrationDriver.this.log.error("Had an exception in " + this.getClass().getSimpleName(), e);
        }
    }

    class KRaftLeaderEvent
    extends MigrationEvent {
        private final LeaderAndEpoch leaderAndEpoch;

        KRaftLeaderEvent(LeaderAndEpoch leaderAndEpoch) {
            this.leaderAndEpoch = leaderAndEpoch;
        }

        public void run() throws Exception {
            KRaftMigrationDriver.this.leaderAndEpoch = this.leaderAndEpoch;
            boolean isActive = this.leaderAndEpoch.isLeader(KRaftMigrationDriver.this.nodeId);
            switch (KRaftMigrationDriver.this.migrationState) {
                case UNINITIALIZED: {
                    long deadline = KRaftMigrationDriver.this.time.nanoseconds() + TimeUnit.NANOSECONDS.convert(10L, TimeUnit.SECONDS);
                    KRaftMigrationDriver.this.eventQueue.scheduleDeferred("poll", (Function)new EventQueue.DeadlineFunction(deadline), (EventQueue.Event)this);
                    break;
                }
                default: {
                    if (!isActive) {
                        KRaftMigrationDriver.this.apply("KRaftLeaderEvent is not active", state -> ZkMigrationLeadershipState.EMPTY);
                        KRaftMigrationDriver.this.transitionTo(MigrationState.INACTIVE);
                        break;
                    }
                    KRaftMigrationDriver.this.apply("KRaftLeaderEvent is active", state -> state.withNewKRaftController(KRaftMigrationDriver.this.nodeId, this.leaderAndEpoch.epoch()));
                    KRaftMigrationDriver.this.transitionTo(MigrationState.WAIT_FOR_CONTROLLER_QUORUM);
                }
            }
        }

        @Override
        public void handleException(Throwable e) {
            KRaftMigrationDriver.this.log.error("Had an exception in " + this.getClass().getSimpleName(), e);
        }
    }

    class PollEvent
    extends MigrationEvent {
        PollEvent() {
        }

        public void run() throws Exception {
            switch (KRaftMigrationDriver.this.migrationState) {
                case UNINITIALIZED: {
                    KRaftMigrationDriver.this.initializeMigrationState();
                    break;
                }
                case INACTIVE: {
                    break;
                }
                case WAIT_FOR_CONTROLLER_QUORUM: {
                    KRaftMigrationDriver.this.eventQueue.append((EventQueue.Event)new WaitForControllerQuorumEvent());
                    break;
                }
                case BECOME_CONTROLLER: {
                    KRaftMigrationDriver.this.eventQueue.append((EventQueue.Event)new BecomeZkControllerEvent());
                    break;
                }
                case WAIT_FOR_BROKERS: {
                    KRaftMigrationDriver.this.eventQueue.append((EventQueue.Event)new WaitForZkBrokersEvent());
                    break;
                }
                case ZK_MIGRATION: {
                    KRaftMigrationDriver.this.eventQueue.append((EventQueue.Event)new MigrateMetadataEvent());
                    break;
                }
                case KRAFT_CONTROLLER_TO_BROKER_COMM: {
                    KRaftMigrationDriver.this.eventQueue.append((EventQueue.Event)new SendRPCsToBrokersEvent());
                    break;
                }
            }
            long deadline = KRaftMigrationDriver.this.time.nanoseconds() + TimeUnit.NANOSECONDS.convert(1L, TimeUnit.SECONDS);
            KRaftMigrationDriver.this.eventQueue.scheduleDeferred("poll", (Function)new EventQueue.DeadlineFunction(deadline), (EventQueue.Event)new PollEvent());
        }

        @Override
        public void handleException(Throwable e) {
            KRaftMigrationDriver.this.log.error("Had an exception in " + this.getClass().getSimpleName(), e);
        }
    }

    abstract class MigrationEvent
    implements EventQueue.Event {
        MigrationEvent() {
        }

        public void handleException(Throwable e) {
            KRaftMigrationDriver.this.faultHandler.handleFault("Error during ZK migration", e);
        }
    }
}

