/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.mongodb.core.messaging;

import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.Cancelable;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.Subscription;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest;
import org.springframework.data.mongodb.core.messaging.Task;
import org.springframework.data.mongodb.core.messaging.TaskFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
import org.springframework.util.ObjectUtils;

public class DefaultMessageListenerContainer
implements MessageListenerContainer {
    private final Executor taskExecutor;
    private final TaskFactory taskFactory;
    private final Optional<ErrorHandler> errorHandler;
    private final Map<SubscriptionRequest, Subscription> subscriptions = new LinkedHashMap<SubscriptionRequest, Subscription>();
    private final ReadWriteLock lifecycleMonitor = new ReentrantReadWriteLock();
    private final org.springframework.data.util.Lock lifecycleRead = org.springframework.data.util.Lock.of((Lock)this.lifecycleMonitor.readLock());
    private final org.springframework.data.util.Lock lifecycleWrite = org.springframework.data.util.Lock.of((Lock)this.lifecycleMonitor.writeLock());
    private final ReadWriteLock subscriptionMonitor = new ReentrantReadWriteLock();
    private final org.springframework.data.util.Lock subscriptionRead = org.springframework.data.util.Lock.of((Lock)this.subscriptionMonitor.readLock());
    private final org.springframework.data.util.Lock subscriptionWrite = org.springframework.data.util.Lock.of((Lock)this.subscriptionMonitor.writeLock());
    private boolean running = false;

    public DefaultMessageListenerContainer(MongoTemplate template) {
        this(template, (Executor)new SimpleAsyncTaskExecutor());
    }

    public DefaultMessageListenerContainer(MongoTemplate template, Executor taskExecutor) {
        this(template, taskExecutor, null);
    }

    public DefaultMessageListenerContainer(MongoTemplate template, Executor taskExecutor, @Nullable ErrorHandler errorHandler) {
        Assert.notNull((Object)template, (String)"Template must not be null");
        Assert.notNull((Object)taskExecutor, (String)"TaskExecutor must not be null");
        this.taskExecutor = taskExecutor;
        this.taskFactory = new TaskFactory(template);
        this.errorHandler = Optional.ofNullable(errorHandler);
    }

    public boolean isAutoStartup() {
        return false;
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    public void start() {
        this.lifecycleWrite.executeWithoutResult(() -> {
            if (!this.running) {
                this.subscriptions.values().stream().filter(it -> !it.isActive()).filter(TaskSubscription.class::isInstance).map(TaskSubscription.class::cast).map(TaskSubscription::getTask).forEach(this.taskExecutor::execute);
                this.running = true;
            }
        });
    }

    public void stop() {
        this.lifecycleWrite.executeWithoutResult(() -> {
            if (this.running) {
                this.subscriptions.values().forEach(Cancelable::cancel);
                this.running = false;
            }
        });
    }

    public boolean isRunning() {
        return (Boolean)this.lifecycleRead.execute(() -> this.running);
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    @Override
    public <S, T> Subscription register(SubscriptionRequest<S, ? super T, ? extends SubscriptionRequest.RequestOptions> request, Class<T> bodyType) {
        return this.register(request, bodyType, this.errorHandler.orElseGet(() -> new DecoratingLoggingErrorHandler(exception -> this.lookup(request).ifPresent(Cancelable::cancel))));
    }

    @Override
    public <S, T> Subscription register(SubscriptionRequest<S, ? super T, ? extends SubscriptionRequest.RequestOptions> request, Class<T> bodyType, ErrorHandler errorHandler) {
        return this.register(request, this.taskFactory.forRequest(request, bodyType, errorHandler));
    }

    @Override
    public Optional<Subscription> lookup(SubscriptionRequest<?, ?, ?> request) {
        return (Optional)this.subscriptionRead.execute(() -> Optional.ofNullable(this.subscriptions.get(request)));
    }

    public Subscription register(SubscriptionRequest request, Task task) {
        return (Subscription)this.subscriptionWrite.execute(() -> {
            if (this.subscriptions.containsKey(request)) {
                return this.subscriptions.get(request);
            }
            TaskSubscription subscription = new TaskSubscription(task);
            this.subscriptions.put(request, subscription);
            if (this.isRunning()) {
                this.taskExecutor.execute((Runnable)((Object)task));
            }
            return subscription;
        });
    }

    @Override
    public void remove(Subscription subscription) {
        this.subscriptionWrite.executeWithoutResult(() -> {
            if (this.subscriptions.containsValue(subscription)) {
                if (subscription.isActive()) {
                    subscription.cancel();
                }
                this.subscriptions.values().remove(subscription);
            }
        });
    }

    static class TaskSubscription
    implements Subscription {
        private final Task task;

        TaskSubscription(Task task) {
            this.task = task;
        }

        Task getTask() {
            return this.task;
        }

        @Override
        public boolean isActive() {
            return this.task.isActive();
        }

        @Override
        public boolean await(Duration timeout) throws InterruptedException {
            return this.task.awaitStart(timeout);
        }

        @Override
        public void cancel() throws DataAccessResourceFailureException {
            this.task.cancel();
        }

        public boolean equals(@Nullable Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TaskSubscription that = (TaskSubscription)o;
            return ObjectUtils.nullSafeEquals((Object)this.task, (Object)that.task);
        }

        public int hashCode() {
            return ObjectUtils.nullSafeHashCode((Object)this.task);
        }
    }

    private static class DecoratingLoggingErrorHandler
    implements ErrorHandler {
        private final Log logger = LogFactory.getLog(DecoratingLoggingErrorHandler.class);
        private final ErrorHandler delegate;

        DecoratingLoggingErrorHandler(ErrorHandler delegate) {
            this.delegate = delegate;
        }

        public void handleError(Throwable t) {
            if (this.logger.isErrorEnabled()) {
                this.logger.error((Object)"Unexpected error occurred while listening to MongoDB", t);
            }
            this.delegate.handleError(t);
        }
    }
}

