/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.ClientServerInputMultiplexer;
import io.rsocket.core.ClientSetup;
import io.rsocket.core.DefaultClientSetup;
import io.rsocket.core.DefaultConnectionSetupPayload;
import io.rsocket.core.FragmentationUtils;
import io.rsocket.core.Invalidatable;
import io.rsocket.core.LeaseSpec;
import io.rsocket.core.LoggingDuplexConnection;
import io.rsocket.core.PayloadValidationUtils;
import io.rsocket.core.RSocketRequester;
import io.rsocket.core.RSocketResponder;
import io.rsocket.core.ReassemblyUtils;
import io.rsocket.core.ReconnectMono;
import io.rsocket.core.RequesterLeaseTracker;
import io.rsocket.core.ResponderLeaseTracker;
import io.rsocket.core.ResumableClientSetup;
import io.rsocket.core.Resume;
import io.rsocket.core.StreamIdSupplier;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.lease.TrackingLeaseSender;
import io.rsocket.plugins.DuplexConnectionInterceptor;
import io.rsocket.plugins.InitializingInterceptorRegistry;
import io.rsocket.plugins.InterceptorRegistry;
import io.rsocket.plugins.RequestInterceptor;
import io.rsocket.resume.ClientRSocketSession;
import io.rsocket.resume.ResumableDuplexConnection;
import io.rsocket.resume.ResumableFramesStore;
import io.rsocket.transport.ClientTransport;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.EmptyPayload;
import java.time.Duration;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;

public class RSocketConnector {
    private static final String CLIENT_TAG = "client";
    private static final BiConsumer<RSocket, Invalidatable> INVALIDATE_FUNCTION = (r, i) -> r.onClose().subscribe(null, __ -> i.invalidate(), i::invalidate);
    private Mono<Payload> setupPayloadMono = Mono.empty();
    private String metadataMimeType = "application/binary";
    private String dataMimeType = "application/binary";
    private Duration keepAliveInterval = Duration.ofSeconds(20L);
    private Duration keepAliveMaxLifeTime = Duration.ofSeconds(90L);
    @Nullable
    private SocketAcceptor acceptor;
    private InitializingInterceptorRegistry interceptors = new InitializingInterceptorRegistry();
    private Retry retrySpec;
    private Resume resume;
    @Nullable
    private Consumer<LeaseSpec> leaseConfigurer;
    private int mtu = 0;
    private int maxInboundPayloadSize = Integer.MAX_VALUE;
    private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;

    private RSocketConnector() {
    }

    public static RSocketConnector create() {
        return new RSocketConnector();
    }

    public static Mono<RSocket> connectWith(ClientTransport transport) {
        return RSocketConnector.create().connect(() -> transport);
    }

    public RSocketConnector setupPayload(Mono<Payload> setupPayloadMono) {
        this.setupPayloadMono = setupPayloadMono;
        return this;
    }

    public RSocketConnector setupPayload(Payload payload) {
        if (payload instanceof DefaultPayload) {
            this.setupPayloadMono = Mono.just((Object)payload);
        } else {
            this.setupPayloadMono = Mono.just((Object)DefaultPayload.create(Objects.requireNonNull(payload)));
            payload.release();
        }
        return this;
    }

    public RSocketConnector dataMimeType(String dataMimeType) {
        this.dataMimeType = Objects.requireNonNull(dataMimeType);
        return this;
    }

    public RSocketConnector metadataMimeType(String metadataMimeType) {
        this.metadataMimeType = Objects.requireNonNull(metadataMimeType);
        return this;
    }

    public RSocketConnector keepAlive(Duration interval, Duration maxLifeTime) {
        if (!interval.negated().isNegative()) {
            throw new IllegalArgumentException("`interval` for keepAlive must be > 0");
        }
        if (!maxLifeTime.negated().isNegative()) {
            throw new IllegalArgumentException("`maxLifeTime` for keepAlive must be > 0");
        }
        this.keepAliveInterval = interval;
        this.keepAliveMaxLifeTime = maxLifeTime;
        return this;
    }

    public RSocketConnector interceptors(Consumer<InterceptorRegistry> configurer) {
        configurer.accept(this.interceptors);
        return this;
    }

    public RSocketConnector acceptor(SocketAcceptor acceptor) {
        this.acceptor = acceptor;
        return this;
    }

    public RSocketConnector reconnect(Retry retry) {
        this.retrySpec = Objects.requireNonNull(retry);
        return this;
    }

    public RSocketConnector resume(Resume resume) {
        this.resume = resume;
        return this;
    }

    public RSocketConnector lease() {
        return this.lease(config -> {});
    }

    public RSocketConnector lease(Consumer<LeaseSpec> leaseConfigurer) {
        this.leaseConfigurer = leaseConfigurer;
        return this;
    }

    public RSocketConnector maxInboundPayloadSize(int maxInboundPayloadSize) {
        this.maxInboundPayloadSize = ReassemblyUtils.assertInboundPayloadSize(maxInboundPayloadSize);
        return this;
    }

    public RSocketConnector fragment(int mtu) {
        this.mtu = FragmentationUtils.assertMtu(mtu);
        return this;
    }

    public RSocketConnector payloadDecoder(PayloadDecoder decoder) {
        Objects.requireNonNull(decoder);
        this.payloadDecoder = decoder;
        return this;
    }

    public Mono<RSocket> connect(ClientTransport transport) {
        return this.connect(() -> transport);
    }

    public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
        return (Mono)Mono.fromSupplier(transportSupplier).flatMap(ct -> {
            int maxFrameLength = ct.maxFrameLength();
            Mono connectionMono = Mono.fromCallable(() -> {
                PayloadValidationUtils.assertValidateSetup(maxFrameLength, this.maxInboundPayloadSize, this.mtu);
                return ct;
            }).flatMap(transport -> transport.connect()).map(sourceConnection -> this.interceptors.initConnection(DuplexConnectionInterceptor.Type.SOURCE, (DuplexConnection)sourceConnection)).map(source -> LoggingDuplexConnection.wrapIfEnabled(source));
            return connectionMono.flatMap(connection -> this.setupPayloadMono.defaultIfEmpty((Object)EmptyPayload.INSTANCE).map(setupPayload -> Tuples.of((Object)connection, (Object)setupPayload)).doOnError(ex -> connection.dispose()).doOnCancel(() -> ((DuplexConnection)connection).dispose())).flatMap(tuple2 -> {
                DuplexConnection sourceConnection = (DuplexConnection)tuple2.getT1();
                Payload setupPayload = (Payload)tuple2.getT2();
                boolean leaseEnabled = this.leaseConfigurer != null;
                boolean resumeEnabled = this.resume != null;
                DefaultClientSetup clientSetup = new DefaultClientSetup();
                ByteBuf resumeToken = resumeEnabled ? this.resume.getTokenSupplier().get() : Unpooled.EMPTY_BUFFER;
                ByteBuf setupFrame = SetupFrameCodec.encode(sourceConnection.alloc(), leaseEnabled, (int)this.keepAliveInterval.toMillis(), (int)this.keepAliveMaxLifeTime.toMillis(), resumeToken, this.metadataMimeType, this.dataMimeType, setupPayload);
                sourceConnection.sendFrame(0, setupFrame.retainedSlice());
                return ((ClientSetup)clientSetup).init(sourceConnection).flatMap(tuple -> {
                    RequesterLeaseTracker requesterLeaseTracker;
                    LeaseSpec leases;
                    DuplexConnection wrappedConnection;
                    KeepAliveHandler keepAliveHandler;
                    ByteBuf serverResponse = (ByteBuf)tuple.getT1();
                    DuplexConnection clientServerConnection = (DuplexConnection)tuple.getT2();
                    InitializingInterceptorRegistry interceptors = this.interceptors;
                    if (resumeEnabled) {
                        ResumableFramesStore resumableFramesStore = this.resume.getStoreFactory(CLIENT_TAG).apply((ByteBuf)resumeToken);
                        ResumableDuplexConnection resumableDuplexConnection = new ResumableDuplexConnection(CLIENT_TAG, resumeToken, clientServerConnection, resumableFramesStore);
                        ResumableClientSetup resumableClientSetup = new ResumableClientSetup();
                        ClientRSocketSession session = new ClientRSocketSession(resumeToken, resumableDuplexConnection, (Mono<DuplexConnection>)connectionMono, resumableClientSetup::init, resumableFramesStore, this.resume.getSessionDuration(), this.resume.getRetry(), this.resume.isCleanupStoreOnKeepAlive());
                        keepAliveHandler = new KeepAliveHandler.ResumableKeepAliveHandler(resumableDuplexConnection, session, session);
                        wrappedConnection = resumableDuplexConnection;
                    } else {
                        keepAliveHandler = new KeepAliveHandler.DefaultKeepAliveHandler();
                        wrappedConnection = clientServerConnection;
                    }
                    ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer(wrappedConnection, interceptors, true);
                    if (leaseEnabled) {
                        leases = new LeaseSpec();
                        this.leaseConfigurer.accept(leases);
                        requesterLeaseTracker = new RequesterLeaseTracker(CLIENT_TAG, leases.maxPendingRequests);
                    } else {
                        leases = null;
                        requesterLeaseTracker = null;
                    }
                    Sinks.Empty requesterOnAllClosedSink = Sinks.unsafe().empty();
                    Sinks.Empty responderOnAllClosedSink = Sinks.unsafe().empty();
                    RSocketRequester rSocketRequester = new RSocketRequester(multiplexer.asClientConnection(), this.payloadDecoder, StreamIdSupplier.clientSupplier(), this.mtu, maxFrameLength, this.maxInboundPayloadSize, (int)this.keepAliveInterval.toMillis(), (int)this.keepAliveMaxLifeTime.toMillis(), keepAliveHandler, interceptors::initRequesterRequestInterceptor, requesterLeaseTracker, (Sinks.Empty<Void>)requesterOnAllClosedSink, (Mono<Void>)Mono.whenDelayError((Publisher[])new Publisher[]{responderOnAllClosedSink.asMono(), requesterOnAllClosedSink.asMono()}));
                    RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester);
                    SocketAcceptor acceptor = this.acceptor != null ? this.acceptor : SocketAcceptor.with(new RSocket(){});
                    DefaultConnectionSetupPayload setup = new DefaultConnectionSetupPayload(setupFrame);
                    return interceptors.initSocketAcceptor(acceptor).accept(setup, wrappedRSocketRequester).map(rSocketHandler -> {
                        RSocket wrappedRSocketHandler = interceptors.initResponder((RSocket)rSocketHandler);
                        ResponderLeaseTracker responderLeaseTracker = leaseEnabled ? new ResponderLeaseTracker(CLIENT_TAG, wrappedConnection, leases.sender) : null;
                        RSocketResponder rSocketResponder = new RSocketResponder(multiplexer.asServerConnection(), wrappedRSocketHandler, this.payloadDecoder, responderLeaseTracker, this.mtu, maxFrameLength, this.maxInboundPayloadSize, leaseEnabled && leases.sender instanceof TrackingLeaseSender ? rSocket -> interceptors.initResponderRequestInterceptor((RSocket)rSocket, (RequestInterceptor)((Object)leases.sender)) : x$0 -> interceptors.initResponderRequestInterceptor((RSocket)x$0, new RequestInterceptor[0]), (Sinks.Empty<Void>)responderOnAllClosedSink);
                        return wrappedRSocketRequester;
                    }).doFinally(signalType -> setup.release());
                });
            });
        }).as(source -> {
            if (this.retrySpec != null) {
                return new ReconnectMono<RSocket>(source.retryWhen(this.retrySpec), Disposable::dispose, INVALIDATE_FUNCTION);
            }
            return source;
        });
    }
}

