/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.mutiny.operators.multi.replay;

import io.smallrye.mutiny.Context;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.operators.multi.replay.AppendOnlyReplayList;
import io.smallrye.mutiny.subscription.ContextSupport;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ReplayOperator<T>
extends AbstractMulti<T> {
    private final Multi<T> upstream;
    private final AppendOnlyReplayList replayList;
    private final AtomicBoolean upstreamSubscriptionRequested = new AtomicBoolean();
    private volatile Flow.Subscription upstreamSubscription = null;
    private final CopyOnWriteArrayList<ReplaySubscription> subscriptions = new CopyOnWriteArrayList();

    public ReplayOperator(Multi<T> upstream, long numberOfItemsToReplay) {
        this.upstream = upstream;
        this.replayList = new AppendOnlyReplayList(numberOfItemsToReplay);
    }

    public ReplayOperator(Multi<T> upstream, long numberOfItemsToReplay, Iterable<T> seed) {
        this.upstream = upstream;
        this.replayList = new AppendOnlyReplayList(numberOfItemsToReplay, seed);
    }

    @Override
    public void subscribe(MultiSubscriber<? super T> subscriber) {
        if (this.upstreamSubscriptionRequested.compareAndSet(false, true)) {
            this.upstream.subscribe(new UpstreamSubscriber(subscriber));
        }
        ReplaySubscription replaySubscription = new ReplaySubscription(subscriber);
        subscriber.onSubscribe(replaySubscription);
        this.subscriptions.add(replaySubscription);
    }

    private class UpstreamSubscriber
    implements MultiSubscriber<T>,
    ContextSupport {
        private final MultiSubscriber<? super T> initialSubscriber;

        public UpstreamSubscriber(MultiSubscriber<? super T> initialSubscriber) {
            this.initialSubscriber = initialSubscriber;
        }

        @Override
        public void onItem(T item) {
            ReplayOperator.this.replayList.push(item);
            this.triggerDrainLoops();
        }

        @Override
        public void onFailure(Throwable failure) {
            ReplayOperator.this.replayList.pushFailure(failure);
            this.markAsDone();
            this.triggerDrainLoops();
        }

        @Override
        public void onCompletion() {
            ReplayOperator.this.replayList.pushCompletion();
            this.markAsDone();
            this.triggerDrainLoops();
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            ReplayOperator.this.upstreamSubscription = subscription;
            ReplayOperator.this.upstreamSubscription.request(Long.MAX_VALUE);
        }

        @Override
        public Context context() {
            if (this.initialSubscriber instanceof ContextSupport) {
                return ((ContextSupport)((Object)this.initialSubscriber)).context();
            }
            return Context.empty();
        }

        private void triggerDrainLoops() {
            ReplayOperator.this.subscriptions.forEach(rec$ -> ((ReplaySubscription)rec$).drain());
        }

        private void markAsDone() {
            ReplayOperator.this.upstreamSubscription = Subscriptions.CANCELLED;
        }
    }

    private class ReplaySubscription
    implements Flow.Subscription {
        private final MultiSubscriber<? super T> downstream;
        private final AtomicLong demand = new AtomicLong();
        private volatile boolean done = false;
        private final AppendOnlyReplayList.Cursor cursor;
        private final AtomicInteger wip = new AtomicInteger();

        private ReplaySubscription(MultiSubscriber<? super T> downstream) {
            this.downstream = downstream;
            this.cursor = ReplayOperator.this.replayList.newCursor();
            this.cursor.hasNext();
        }

        @Override
        public void request(long n) {
            if (this.done) {
                return;
            }
            if (n <= 0L) {
                this.cancel();
                this.downstream.onFailure(Subscriptions.getInvalidRequestException());
                return;
            }
            Subscriptions.add(this.demand, n);
            if (this.cursor.hasNext()) {
                this.drain();
            }
        }

        @Override
        public void cancel() {
            this.done = true;
            ReplayOperator.this.subscriptions.remove(this);
        }

        private void drain() {
            if (this.done) {
                return;
            }
            if (this.wip.getAndIncrement() > 0) {
                return;
            }
            do {
                long emitted;
                if (this.done) {
                    return;
                }
                long max = this.demand.get();
                for (emitted = 0L; emitted < max && this.cursor.hasNext(); ++emitted) {
                    if (this.done) {
                        return;
                    }
                    this.cursor.moveToNext();
                    if (this.cursor.hasReachedCompletion()) {
                        this.cancel();
                        this.cursor.readCompletion();
                        this.downstream.onComplete();
                        return;
                    }
                    if (this.cursor.hasReachedFailure()) {
                        this.cancel();
                        this.downstream.onFailure(this.cursor.readFailure());
                        return;
                    }
                    Object item = this.cursor.read();
                    assert (item != null);
                    this.downstream.onItem(item);
                }
                this.demand.addAndGet(-emitted);
            } while (this.wip.decrementAndGet() != 0);
        }
    }
}

