/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.test;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import kafka.log.UnifiedLog;
import kafka.network.SocketServer;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;

public interface ClusterInstance {
    public Type type();

    public Map<Integer, KafkaBroker> brokers();

    default public Map<Integer, KafkaBroker> aliveBrokers() {
        return this.brokers().entrySet().stream().filter(entry -> !((KafkaBroker)entry.getValue()).isShutdown()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    public Map<Integer, ControllerServer> controllers();

    public ClusterConfig config();

    public Set<Integer> controllerIds();

    default public Set<Integer> brokerIds() {
        return this.brokers().keySet();
    }

    public ListenerName clientListener();

    default public Optional<ListenerName> controllerListenerName() {
        return Optional.empty();
    }

    public String bootstrapServers();

    public String bootstrapControllers();

    default public Collection<SocketServer> brokerSocketServers() {
        return this.brokers().values().stream().map(KafkaBroker::socketServer).collect(Collectors.toList());
    }

    public Collection<SocketServer> controllerSocketServers();

    default public SocketServer anyBrokerSocketServer() {
        return this.brokerSocketServers().stream().findFirst().orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
    }

    default public SocketServer anyControllerSocketServer() {
        return this.controllerSocketServers().stream().findFirst().orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
    }

    public String clusterId();

    default public <K, V> Producer<K, V> producer(Map<String, Object> configs) {
        HashMap<String, Object> props = new HashMap<String, Object>(configs);
        props.putIfAbsent("key.serializer", ByteArraySerializer.class.getName());
        props.putIfAbsent("value.serializer", ByteArraySerializer.class.getName());
        props.putIfAbsent("bootstrap.servers", this.bootstrapServers());
        return new KafkaProducer(this.setClientSaslConfig(props));
    }

    default public <K, V> Producer<K, V> producer() {
        return this.producer(Map.of());
    }

    default public <K, V> Consumer<K, V> consumer(Map<String, Object> configs) {
        HashMap<String, Object> props = new HashMap<String, Object>(configs);
        props.putIfAbsent("key.deserializer", ByteArrayDeserializer.class.getName());
        props.putIfAbsent("value.deserializer", ByteArrayDeserializer.class.getName());
        props.putIfAbsent("auto.offset.reset", "earliest");
        props.putIfAbsent("group.id", "group_" + TestUtils.randomString(5));
        props.putIfAbsent("bootstrap.servers", this.bootstrapServers());
        return new KafkaConsumer(this.setClientSaslConfig(props));
    }

    default public <K, V> Consumer<K, V> consumer() {
        return this.consumer(Map.of());
    }

    default public Admin admin(Map<String, Object> configs, boolean usingBootstrapControllers) {
        HashMap<String, Object> props = new HashMap<String, Object>(configs);
        if (usingBootstrapControllers) {
            props.putIfAbsent("bootstrap.controllers", this.bootstrapControllers());
            props.remove("bootstrap.servers");
        } else {
            props.putIfAbsent("bootstrap.servers", this.bootstrapServers());
            props.remove("bootstrap.controllers");
        }
        return Admin.create(this.setClientSaslConfig(props));
    }

    default public Map<String, Object> setClientSaslConfig(Map<String, Object> configs) {
        HashMap<String, Object> props = new HashMap<String, Object>(configs);
        if (this.config().brokerSecurityProtocol() == SecurityProtocol.SASL_PLAINTEXT) {
            props.putIfAbsent("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name);
            props.putIfAbsent("sasl.mechanism", "PLAIN");
            props.putIfAbsent("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", "plain-admin", "plain-admin-secret"));
        }
        return props;
    }

    default public Admin admin(Map<String, Object> configs) {
        return this.admin(configs, false);
    }

    default public Admin admin() {
        return this.admin(Map.of(), false);
    }

    default public Set<GroupProtocol> supportedGroupProtocols() {
        if (this.brokers().values().stream().allMatch(b -> b.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled())) {
            return Set.of(GroupProtocol.CLASSIC, GroupProtocol.CONSUMER);
        }
        return Collections.singleton(GroupProtocol.CLASSIC);
    }

    public Optional<FaultHandlerException> firstFatalException();

    public Optional<FaultHandlerException> firstNonFatalException();

    public void start();

    public void stop();

    public void shutdownBroker(int var1);

    public void startBroker(int var1);

    default public void waitTopicDeletion(String topic) throws InterruptedException {
        this.waitForTopic(topic, 0);
    }

    default public void createTopic(String topicName, int partitions, short replicas) throws InterruptedException {
        try (Admin admin = this.admin();){
            admin.createTopics(Collections.singletonList(new NewTopic(topicName, partitions, replicas)));
            this.waitForTopic(topicName, partitions);
        }
    }

    public void waitForReadyBrokers() throws InterruptedException;

    default public void waitForTopic(String topic, int partitions) throws InterruptedException {
        Collection<KafkaBroker> brokers = this.aliveBrokers().values();
        TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker -> partitions == 0 ? broker.metadataCache().numPartitions(topic).isEmpty() : broker.metadataCache().numPartitions(topic).contains((Object)partitions)), 60000L, topic + " metadata not propagated after 60000 ms");
        for (ControllerServer controller : this.controllers().values()) {
            long controllerOffset = controller.raftManager().replicatedLog().endOffset().offset() - 1L;
            TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker -> ((BrokerServer)broker).sharedServer().loader().lastAppliedOffset() >= controllerOffset), 60000L, "Timeout waiting for controller metadata propagating to brokers");
        }
        if (partitions == 0) {
            List topicPartitions = IntStream.range(0, 1).mapToObj(partition -> new TopicPartition(topic, partition)).collect(Collectors.toList());
            TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker -> topicPartitions.stream().allMatch(tp -> broker.replicaManager().onlinePartition(tp).isEmpty())), "Replica manager's should have deleted all of this topic's partitions");
            TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker -> topicPartitions.stream().allMatch(tp -> broker.logManager().getLog(tp, false).isEmpty())), "Replica logs not deleted after delete topic is complete");
            TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker -> topicPartitions.stream().allMatch(tp -> {
                List liveLogDirs = CollectionConverters.asJava((Seq)broker.logManager().liveLogDirs());
                return liveLogDirs.stream().allMatch(logDir -> {
                    OffsetCheckpointFile checkpointFile;
                    try {
                        checkpointFile = new OffsetCheckpointFile(new File((File)logDir, "cleaner-offset-checkpoint"), null);
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return !checkpointFile.read().containsKey(tp);
                });
            })), "Cleaner offset for deleted partition should have been removed");
            TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker -> CollectionConverters.asJava((Seq)broker.config().logDirs()).stream().allMatch(logDir -> topicPartitions.stream().noneMatch(tp -> new File((String)logDir, tp.topic() + "-" + tp.partition()).exists()))), "Failed to soft-delete the data to a delete directory");
            TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker -> CollectionConverters.asJava((Seq)broker.config().logDirs()).stream().allMatch(logDir -> topicPartitions.stream().allMatch(tp -> Arrays.stream(Objects.requireNonNull(new File((String)logDir).list())).noneMatch(partitionDirectoryName -> partitionDirectoryName.startsWith(tp.topic() + "-" + tp.partition()) && partitionDirectoryName.endsWith(UnifiedLog.DeleteDirSuffix()))))), "Failed to hard-delete the delete directory");
        }
    }

    default public List<Authorizer> authorizers() {
        ArrayList<Authorizer> authorizers = new ArrayList<Authorizer>();
        authorizers.addAll(this.brokers().values().stream().filter(server -> server.authorizer().isDefined()).map(server -> (Authorizer)server.authorizer().get()).collect(Collectors.toList()));
        authorizers.addAll(this.controllers().values().stream().filter(server -> server.authorizer().isDefined()).map(server -> (Authorizer)server.authorizer().get()).collect(Collectors.toList()));
        return authorizers;
    }

    default public void waitAcls(AclBindingFilter filter, Collection<AccessControlEntry> entries) throws InterruptedException {
        for (Authorizer authorizer : this.authorizers()) {
            AtomicReference actualEntries = new AtomicReference(new HashSet());
            TestUtils.waitForCondition(() -> {
                HashSet accessControlEntrySet = new HashSet();
                authorizer.acls(filter).forEach(aclBinding -> accessControlEntrySet.add(aclBinding.entry()));
                actualEntries.set(accessControlEntrySet);
                return accessControlEntrySet.containsAll(entries) && entries.containsAll(accessControlEntrySet);
            }, "expected acls: " + String.valueOf(entries) + ", actual acls: " + String.valueOf(actualEntries.get()));
        }
    }
}

