/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.queue;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.queue.EventQueue;
import org.slf4j.Logger;

public final class KafkaEventQueue
implements EventQueue {
    private final Time time;
    private final ReentrantLock lock;
    private final Logger log;
    private final EventHandler eventHandler;
    private final Thread eventHandlerThread;
    private long closingTimeNs;
    private EventQueue.Event cleanupEvent;

    public KafkaEventQueue(Time time, LogContext logContext, String threadNamePrefix) {
        this.time = time;
        this.lock = new ReentrantLock();
        this.log = logContext.logger(KafkaEventQueue.class);
        this.eventHandler = new EventHandler();
        this.eventHandlerThread = new KafkaThread(threadNamePrefix + "EventHandler", (Runnable)this.eventHandler, false);
        this.closingTimeNs = Long.MAX_VALUE;
        this.cleanupEvent = null;
        this.eventHandlerThread.start();
    }

    public Time time() {
        return this.time;
    }

    @Override
    public void enqueue(EventQueue.EventInsertionType insertionType, String tag, Function<OptionalLong, OptionalLong> deadlineNsCalculator, EventQueue.Event event) {
        EventContext eventContext = new EventContext(event, insertionType, tag);
        Exception e = this.eventHandler.enqueue(eventContext, deadlineNsCalculator);
        if (e != null) {
            eventContext.completeWithException(e);
        }
    }

    @Override
    public void cancelDeferred(String tag) {
        this.eventHandler.cancelDeferred(tag);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void beginShutdown(String source, EventQueue.Event newCleanupEvent, long timeSpan, TimeUnit timeUnit) {
        if (timeSpan < 0L) {
            throw new IllegalArgumentException("beginShutdown must be called with a non-negative timeout.");
        }
        Objects.requireNonNull(newCleanupEvent);
        this.lock.lock();
        try {
            if (this.cleanupEvent != null) {
                this.log.debug("{}: Event queue is already shutting down.", (Object)source);
                return;
            }
            this.log.info("{}: shutting down event queue.", (Object)source);
            this.cleanupEvent = newCleanupEvent;
            long newClosingTimeNs = this.time.nanoseconds() + timeUnit.toNanos(timeSpan);
            if (this.closingTimeNs >= newClosingTimeNs) {
                this.closingTimeNs = newClosingTimeNs;
            }
            this.eventHandler.cond.signal();
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public int size() {
        return this.eventHandler.size();
    }

    @Override
    public void wakeup() {
        this.eventHandler.wakeUp();
    }

    @Override
    public void close() throws InterruptedException {
        this.beginShutdown("KafkaEventQueue#close");
        this.eventHandlerThread.join();
        this.log.info("closed event queue.");
    }

    private class EventHandler
    implements Runnable {
        private int size = 0;
        private final Map<String, EventContext> tagToEventContext = new HashMap<String, EventContext>();
        private final EventContext head = new EventContext(null, null, null);
        private final TreeMap<Long, EventContext> deadlineMap = new TreeMap();
        private final Condition cond = KafkaEventQueue.access$000(KafkaEventQueue.this).newCondition();

        private EventHandler() {
        }

        @Override
        public void run() {
            try {
                this.handleEvents();
                KafkaEventQueue.this.cleanupEvent.run();
            }
            catch (Throwable e) {
                KafkaEventQueue.this.log.warn("event handler thread exiting with exception", e);
            }
        }

        private void remove(EventContext eventContext) {
            eventContext.remove();
            if (eventContext.deadlineNs.isPresent()) {
                this.deadlineMap.remove(eventContext.deadlineNs.getAsLong());
                eventContext.deadlineNs = OptionalLong.empty();
            }
            if (eventContext.tag != null) {
                this.tagToEventContext.remove(eventContext.tag, eventContext);
                eventContext.tag = null;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handleEvents() throws InterruptedException {
            EventContext toTimeout = null;
            EventContext toRun = null;
            while (true) {
                if (toTimeout != null) {
                    toTimeout.completeWithTimeout();
                } else if (toRun != null) {
                    toRun.run(KafkaEventQueue.this.log);
                }
                KafkaEventQueue.this.lock.lock();
                try {
                    long now;
                    if (toTimeout != null) {
                        --this.size;
                        toTimeout = null;
                    }
                    if (toRun != null) {
                        --this.size;
                        toRun = null;
                    }
                    long awaitNs = Long.MAX_VALUE;
                    Map.Entry<Long, EventContext> entry = this.deadlineMap.firstEntry();
                    if (entry != null) {
                        now = KafkaEventQueue.this.time.nanoseconds();
                        long timeoutNs = entry.getKey();
                        EventContext eventContext = entry.getValue();
                        if (timeoutNs <= now) {
                            if (eventContext.insertionType == EventQueue.EventInsertionType.DEFERRED) {
                                this.remove(eventContext);
                                toRun = eventContext;
                                continue;
                            }
                            this.remove(eventContext);
                            toTimeout = eventContext;
                            continue;
                        }
                        if (KafkaEventQueue.this.closingTimeNs <= now) {
                            this.remove(eventContext);
                            toTimeout = eventContext;
                            continue;
                        }
                        awaitNs = timeoutNs - now;
                    }
                    if (this.head.next == this.head) {
                        if (KafkaEventQueue.this.closingTimeNs != Long.MAX_VALUE && this.deadlineMap.isEmpty()) {
                            return;
                        }
                    } else {
                        toRun = this.head.next;
                        this.remove(toRun);
                        continue;
                    }
                    if (KafkaEventQueue.this.closingTimeNs != Long.MAX_VALUE) {
                        now = KafkaEventQueue.this.time.nanoseconds();
                        if (awaitNs > KafkaEventQueue.this.closingTimeNs - now) {
                            awaitNs = KafkaEventQueue.this.closingTimeNs - now;
                        }
                    }
                    if (awaitNs == Long.MAX_VALUE) {
                        this.cond.await();
                        continue;
                    }
                    this.cond.awaitNanos(awaitNs);
                    continue;
                }
                finally {
                    KafkaEventQueue.this.lock.unlock();
                    continue;
                }
                break;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        Exception enqueue(EventContext eventContext, Function<OptionalLong, OptionalLong> deadlineNsCalculator) {
            KafkaEventQueue.this.lock.lock();
            try {
                EventContext toRemove;
                if (KafkaEventQueue.this.closingTimeNs != Long.MAX_VALUE) {
                    RejectedExecutionException rejectedExecutionException = new RejectedExecutionException();
                    return rejectedExecutionException;
                }
                OptionalLong existingDeadlineNs = OptionalLong.empty();
                if (eventContext.tag != null && (toRemove = this.tagToEventContext.put(eventContext.tag, eventContext)) != null) {
                    existingDeadlineNs = toRemove.deadlineNs;
                    this.remove(toRemove);
                    --this.size;
                }
                OptionalLong deadlineNs = deadlineNsCalculator.apply(existingDeadlineNs);
                boolean queueWasEmpty = this.head.isSingleton();
                boolean shouldSignal = false;
                switch (eventContext.insertionType) {
                    case APPEND: {
                        this.head.insertBefore(eventContext);
                        if (!queueWasEmpty) break;
                        shouldSignal = true;
                        break;
                    }
                    case PREPEND: {
                        this.head.insertAfter(eventContext);
                        if (!queueWasEmpty) break;
                        shouldSignal = true;
                        break;
                    }
                    case DEFERRED: {
                        if (deadlineNs.isPresent()) break;
                        RuntimeException runtimeException = new RuntimeException("You must specify a deadline for deferred events.");
                        return runtimeException;
                    }
                }
                if (deadlineNs.isPresent()) {
                    long prevStartNs;
                    long insertNs = deadlineNs.getAsLong();
                    long l = prevStartNs = this.deadlineMap.isEmpty() ? Long.MAX_VALUE : this.deadlineMap.firstKey();
                    while (this.deadlineMap.putIfAbsent(insertNs, eventContext) != null) {
                        ++insertNs;
                    }
                    eventContext.deadlineNs = OptionalLong.of(insertNs);
                    if (insertNs <= prevStartNs) {
                        shouldSignal = true;
                    }
                }
                ++this.size;
                if (shouldSignal) {
                    this.cond.signal();
                }
            }
            finally {
                KafkaEventQueue.this.lock.unlock();
            }
            return null;
        }

        void cancelDeferred(String tag) {
            KafkaEventQueue.this.lock.lock();
            try {
                EventContext eventContext = this.tagToEventContext.get(tag);
                if (eventContext != null) {
                    this.remove(eventContext);
                    --this.size;
                }
            }
            finally {
                KafkaEventQueue.this.lock.unlock();
            }
        }

        void wakeUp() {
            KafkaEventQueue.this.lock.lock();
            try {
                ((KafkaEventQueue)KafkaEventQueue.this).eventHandler.cond.signal();
            }
            finally {
                KafkaEventQueue.this.lock.unlock();
            }
        }

        int size() {
            KafkaEventQueue.this.lock.lock();
            try {
                int n = this.size;
                return n;
            }
            finally {
                KafkaEventQueue.this.lock.unlock();
            }
        }
    }

    private static class EventContext {
        private final EventQueue.Event event;
        private final EventQueue.EventInsertionType insertionType;
        private EventContext prev = this;
        private EventContext next = this;
        private OptionalLong deadlineNs = OptionalLong.empty();
        private String tag;

        EventContext(EventQueue.Event event, EventQueue.EventInsertionType insertionType, String tag) {
            this.event = event;
            this.insertionType = insertionType;
            this.tag = tag;
        }

        void insertAfter(EventContext other) {
            this.next.prev = other;
            other.next = this.next;
            other.prev = this;
            this.next = other;
        }

        void insertBefore(EventContext other) {
            this.prev.next = other;
            other.prev = this.prev;
            other.next = this;
            this.prev = other;
        }

        void remove() {
            this.prev.next = this.next;
            this.next.prev = this.prev;
            this.prev = this;
            this.next = this;
        }

        boolean isSingleton() {
            return this.prev == this && this.next == this;
        }

        void run(Logger log) throws InterruptedException {
            try {
                this.event.run();
            }
            catch (InterruptedException e) {
                throw e;
            }
            catch (Exception e) {
                try {
                    this.event.handleException(e);
                }
                catch (Throwable t) {
                    log.error("Unexpected exception in handleException", t);
                }
            }
        }

        void completeWithTimeout() {
            this.completeWithException((Throwable)new TimeoutException());
        }

        void completeWithException(Throwable t) {
            this.event.handleException(t);
        }
    }
}

