/*
 * Decompiled with CFR 0.152.
 */
package org.mariadb.r2dbc.client;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import io.r2dbc.spi.R2dbcTransientResourceException;
import io.r2dbc.spi.TransactionDefinition;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URLDecoder;
import java.sql.SQLSyntaxErrorException;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLParameters;
import org.mariadb.r2dbc.ExceptionFactory;
import org.mariadb.r2dbc.HaMode;
import org.mariadb.r2dbc.MariadbConnectionConfiguration;
import org.mariadb.r2dbc.MariadbConnectionFactory;
import org.mariadb.r2dbc.MariadbTransactionDefinition;
import org.mariadb.r2dbc.SslMode;
import org.mariadb.r2dbc.client.Client;
import org.mariadb.r2dbc.client.DecoderState;
import org.mariadb.r2dbc.client.Exchange;
import org.mariadb.r2dbc.client.MariadbFrameDecoder;
import org.mariadb.r2dbc.client.MariadbPacketEncoder;
import org.mariadb.r2dbc.client.MariadbResult;
import org.mariadb.r2dbc.client.RedoContext;
import org.mariadb.r2dbc.client.ServerVersion;
import org.mariadb.r2dbc.client.SimpleContext;
import org.mariadb.r2dbc.message.ClientMessage;
import org.mariadb.r2dbc.message.Context;
import org.mariadb.r2dbc.message.Protocol;
import org.mariadb.r2dbc.message.ServerMessage;
import org.mariadb.r2dbc.message.client.ExecutePacket;
import org.mariadb.r2dbc.message.client.PreparePacket;
import org.mariadb.r2dbc.message.client.QueryPacket;
import org.mariadb.r2dbc.message.client.QuitPacket;
import org.mariadb.r2dbc.message.client.SslRequestPacket;
import org.mariadb.r2dbc.message.flow.AuthenticationFlow;
import org.mariadb.r2dbc.message.server.CompletePrepareResult;
import org.mariadb.r2dbc.message.server.ErrorPacket;
import org.mariadb.r2dbc.message.server.InitialHandshakePacket;
import org.mariadb.r2dbc.util.HostAddress;
import org.mariadb.r2dbc.util.PrepareCache;
import org.mariadb.r2dbc.util.ServerPrepareResult;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
import reactor.netty.channel.AbortedException;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;

public class SimpleClient
implements Client {
    private static final Logger logger = Loggers.getLogger(SimpleClient.class);
    private static final Pattern REDIRECT_URL_FORMAT = Pattern.compile("(mariadb|mysql)://(([^/@:]+)?(:([^/]+))?@)?(([^/:]+)(:([0-9]+))?)(/([^?]+)(/?(.*))?)?$", 34);
    protected MariadbConnectionConfiguration configuration;
    protected final ReentrantLock lock;
    protected Connection connection;
    protected HostAddress hostAddress;
    private ServerMessageSubscriber messageSubscriber;
    private Sinks.Many<ClientMessage> requestSink = Sinks.many().unicast().onBackpressureBuffer();
    private Queue<Exchange> exchangeQueue = (Queue)Queues.get((int)Queues.SMALL_BUFFER_SIZE).get();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private MariadbFrameDecoder decoder;
    private MariadbPacketEncoder encoder;
    private final PrepareCache prepareCache;
    private final Scheduler scheduler;
    private ByteBufAllocator byteBufAllocator;
    protected volatile Context context;
    private volatile boolean closeRequested = false;

    protected SimpleClient(Connection connection, MariadbConnectionConfiguration configuration, HostAddress hostAddress, ReentrantLock lock) {
        this.connection = connection;
        this.configuration = configuration;
        this.hostAddress = hostAddress;
        this.lock = lock;
        this.prepareCache = new PrepareCache(this.configuration.useServerPrepStmts() ? this.configuration.getPrepareCacheSize() : 0, this);
        EventLoop eventLoop = connection.channel().eventLoop();
        this.scheduler = Schedulers.fromExecutorService((ExecutorService)eventLoop, (String)eventLoop.toString());
        this.decoder = new MariadbFrameDecoder(this.exchangeQueue, this, configuration);
        this.encoder = new MariadbPacketEncoder();
        this.byteBufAllocator = connection.outbound().alloc();
        Queue receiverQueue = (Queue)Queues.get((int)Queues.SMALL_BUFFER_SIZE).get();
        this.messageSubscriber = new ServerMessageSubscriber(this.lock, this.exchangeQueue, receiverQueue);
        connection.addHandlerFirst((ChannelHandler)this.decoder);
        if (configuration.getSslConfig().getSslMode() == SslMode.TUNNEL) {
            try {
                SSLEngine engine;
                SslContext sslContext = configuration.getSslConfig().getSslContext();
                if (hostAddress != null) {
                    engine = sslContext.newEngine(connection.channel().alloc(), hostAddress.getHost(), hostAddress.getPort());
                    SSLParameters sslParameters = engine.getSSLParameters();
                    if (!configuration.getSslConfig().sslTunnelDisableHostVerification()) {
                        sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
                    }
                    engine.setSSLParameters(sslParameters);
                } else {
                    engine = sslContext.newEngine(connection.channel().alloc());
                }
                connection.addHandlerFirst((ChannelHandler)new SslHandler(engine));
            }
            catch (SSLException e) {
                this.handleConnectionError(e);
            }
        }
        if (logger.isTraceEnabled()) {
            connection.addHandlerFirst(LoggingHandler.class.getSimpleName(), (ChannelHandler)new LoggingHandler(SimpleClient.class, LogLevel.TRACE));
        }
        connection.inbound().receiveObject().cast(ServerMessage.class).onErrorResume(this::receiveResumeError).subscribe((CoreSubscriber)this.messageSubscriber);
        this.requestSink.asFlux().map(this.encoder::encodeFlux).flatMap(b -> connection.outbound().send((Publisher)b), 1).onErrorResume(this::sendResumeError).doAfterTerminate(this::closeChannelIfNeeded).subscribe();
    }

    @Override
    public Mono<Void> redirect() {
        if (this.configuration.permitRedirect() && this.context.getRedirectValue() != null && this.exchangeQueue.size() <= 1 && (this.context.getServerStatus() & 1) == 0) {
            String redirectValue = this.context.getRedirectValue();
            this.context.setRedirect(null);
            Matcher matcher = REDIRECT_URL_FORMAT.matcher(redirectValue);
            if (!matcher.matches()) {
                return Mono.error((Throwable)new SQLSyntaxErrorException("error parsing redirection string '" + redirectValue + "'. format must be 'mariadb/mysql://[<user>[:<password>]@]<host>[:<port>]/[<db>[?<opt1>=<value1>[&<opt2>=<value2>]]]'"));
            }
            try {
                String host = matcher.group(7) != null ? URLDecoder.decode(matcher.group(7), "UTF-8") : matcher.group(6);
                int port = matcher.group(9) != null ? Integer.parseInt(matcher.group(9)) : 3306;
                HostAddress hostAddress = new HostAddress(host, port);
                String user = matcher.group(3) != null ? matcher.group(3) : this.configuration.getUsername();
                CharSequence password = matcher.group(5) != null ? matcher.group(5) : this.configuration.getPassword();
                MariadbConnectionConfiguration redirectConf = this.configuration.redirectConf(hostAddress, user, password);
                return SimpleClient.connect(ConnectionProvider.newConnection(), InetSocketAddress.createUnresolved(hostAddress.getHost(), hostAddress.getPort()), hostAddress, redirectConf, this.lock).delayUntil(client -> AuthenticationFlow.exchange(client, redirectConf, hostAddress)).doOnError(e -> HaMode.failHost(hostAddress)).onErrorComplete().cast(SimpleClient.class).flatMap(client -> MariadbConnectionFactory.setSessionVariables(redirectConf, client).then(Mono.just((Object)client))).flatMap(this::refreshClient).then();
            }
            catch (UnsupportedEncodingException e2) {
                return Mono.error((Throwable)e2);
            }
        }
        return Mono.empty();
    }

    public Mono<Void> refreshClient(SimpleClient client) {
        return this.quitOrClose().then(Mono.fromCallable(() -> {
            this.isClosed.set(false);
            this.closeRequested = false;
            this.connection = client.connection;
            this.context = client.context;
            this.configuration = client.configuration;
            this.hostAddress = client.hostAddress;
            this.prepareCache.clear();
            this.requestSink = client.requestSink;
            this.decoder = client.decoder;
            this.encoder = client.encoder;
            this.byteBufAllocator = client.byteBufAllocator;
            this.messageSubscriber = client.messageSubscriber;
            this.exchangeQueue = client.exchangeQueue;
            return Mono.empty();
        })).then();
    }

    public static Mono<SimpleClient> connect(ConnectionProvider connectionProvider, SocketAddress socketAddress, HostAddress hostAddress, MariadbConnectionConfiguration configuration, ReentrantLock lock) {
        TcpClient tcpClient = TcpClient.create((ConnectionProvider)connectionProvider).remoteAddress(() -> socketAddress).runOn(configuration.loopResources());
        tcpClient = SimpleClient.setSocketOption(configuration, tcpClient);
        return tcpClient.connect().flatMap(it -> Mono.just((Object)new SimpleClient((Connection)it, configuration, hostAddress, lock)));
    }

    public static TcpClient setSocketOption(MariadbConnectionConfiguration configuration, TcpClient tcpClient) {
        if (configuration.getConnectTimeout() != null) {
            tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)Math.toIntExact(configuration.getConnectTimeout().toMillis()));
        }
        if (configuration.isTcpKeepAlive()) {
            tcpClient = tcpClient.option(ChannelOption.SO_KEEPALIVE, (Object)true);
        }
        if (configuration.isTcpAbortiveClose()) {
            tcpClient = tcpClient.option(ChannelOption.SO_LINGER, (Object)0);
        }
        return tcpClient;
    }

    @Override
    public Context getContext() {
        return this.context;
    }

    @Override
    public void handleConnectionError(Throwable throwable) {
        if (AbortedException.isConnectionReset((Throwable)throwable) && !this.isConnected()) {
            this.messageSubscriber.close((R2dbcException)new R2dbcNonTransientResourceException("Cannot execute command since connection is already closed", "08000", throwable));
        } else {
            R2dbcNonTransientResourceException error = throwable instanceof SSLHandshakeException ? new R2dbcNonTransientResourceException(throwable.getMessage(), "08000", throwable) : new R2dbcNonTransientResourceException("Connection error", "08000", throwable);
            this.messageSubscriber.close((R2dbcException)error);
            this.closeChannelIfNeeded();
        }
    }

    private Mono<Void> sendResumeError(Throwable throwable) {
        this.handleConnectionError(throwable);
        this.requestSink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
        return this.quitOrClose();
    }

    private Mono<ServerMessage> receiveResumeError(Throwable throwable) {
        Mono empty = Mono.empty();
        return this.sendResumeError(throwable).then(empty);
    }

    @Override
    public boolean closeChannelIfNeeded() {
        if (this.isClosed.compareAndSet(false, true)) {
            Channel channel = this.connection.channel();
            this.messageSubscriber.close((R2dbcException)new R2dbcNonTransientResourceException("Connection unexpectedly closed", "08000"));
            if (channel.isOpen()) {
                this.connection.dispose();
            }
            return true;
        }
        return false;
    }

    @Override
    public Mono<Void> close() {
        this.closeRequested = true;
        return this.quitOrClose();
    }

    private Mono<Void> quitOrClose() {
        return Mono.defer(() -> {
            this.messageSubscriber.close((R2dbcException)new R2dbcNonTransientResourceException(this.closeRequested ? "Connection has been closed" : "Connection closed", "08000"));
            if (this.isClosed.compareAndSet(false, true)) {
                Channel channel = this.connection.channel();
                if (!channel.isOpen()) {
                    this.connection.dispose();
                    return this.connection.onDispose();
                }
                return this.connection.outbound().send(this.encoder.encodeFlux(QuitPacket.INSTANCE)).then().doOnSuccess(v -> this.connection.dispose()).then(this.connection.onDispose());
            }
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> sendSslRequest(SslRequestPacket sslRequest, MariadbConnectionConfiguration configuration) {
        try {
            SslHandler sslHandler;
            SslContext sslContext = configuration.getSslConfig().getSslContext();
            if (this.hostAddress != null) {
                sslHandler = sslContext.newHandler(this.connection.channel().alloc(), this.hostAddress.getHost(), this.hostAddress.getPort());
                if (configuration.getSslConfig().getSslMode() == SslMode.VERIFY_FULL) {
                    SSLEngine sslEngine = sslHandler.engine();
                    SSLParameters sslParameters = sslEngine.getSSLParameters();
                    sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
                    sslEngine.setSSLParameters(sslParameters);
                }
            } else {
                sslHandler = sslContext.newHandler(this.connection.channel().alloc());
            }
            this.requestSink.emitNext((Object)sslRequest, Sinks.EmitFailureHandler.FAIL_FAST);
            this.connection.addHandlerFirst((ChannelHandler)sslHandler);
        }
        catch (Throwable e) {
            this.closeChannelIfNeeded();
            return Mono.error((Throwable)e);
        }
        return Mono.empty();
    }

    @Override
    public long getThreadId() {
        return this.context.getThreadId();
    }

    @Override
    public Mono<Void> beginTransaction() {
        return this.executeWhenNotInTransaction("BEGIN");
    }

    @Override
    public Mono<Void> beginTransaction(TransactionDefinition definition) {
        StringBuilder sb = new StringBuilder("START TRANSACTION");
        boolean first = true;
        if (Boolean.TRUE.equals(definition.getAttribute(TransactionDefinition.READ_ONLY))) {
            sb.append(" READ ONLY");
            first = false;
        }
        if (Boolean.TRUE.equals(definition.getAttribute(MariadbTransactionDefinition.WITH_CONSISTENT_SNAPSHOT))) {
            if (!first) {
                sb.append(",");
            }
            sb.append(" WITH CONSISTENT SNAPSHOT");
        }
        return this.executeWhenNotInTransaction(sb.toString());
    }

    @Override
    public Mono<Void> commitTransaction() {
        return this.executeWhenTransaction("COMMIT");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Mono<Void> executeWhenTransaction(String sql) {
        try {
            this.lock.lock();
            if (!this.exchangeQueue.isEmpty() || (this.context.getServerStatus() & 1) > 0) {
                Flux<ServerMessage> messages = this.sendCommand(new QueryPacket(sql), false);
                Mono mono = messages.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release).handle(ExceptionFactory.withSql(sql)::handleErrorResponse).flatMap(m -> this.redirect().then(Mono.just((Object)m))).then();
                return mono;
            }
            Mono mono = Mono.empty();
            return mono;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Mono<Void> executeWhenNotInTransaction(String sql) {
        try {
            this.lock.lock();
            if (!this.exchangeQueue.isEmpty() || (this.context.getServerStatus() & 1) == 0) {
                Flux<ServerMessage> messages = this.sendCommand(new QueryPacket(sql), false);
                Mono mono = messages.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release).handle(ExceptionFactory.withSql(sql)::handleErrorResponse).flatMap(m -> this.redirect().then(Mono.just((Object)m))).then();
                return mono;
            }
            Mono mono = Mono.empty();
            return mono;
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public Mono<Void> rollbackTransaction() {
        return this.executeWhenTransaction("ROLLBACK");
    }

    @Override
    public Mono<Void> rollbackTransactionToSavepoint(String name) {
        String sql = String.format("ROLLBACK TO SAVEPOINT `%s`", name.replace("`", "``"));
        return this.executeWhenTransaction(sql);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Mono<Void> setAutoCommit(boolean autoCommit) {
        String sql = "SET autocommit=" + (autoCommit ? (char)'1' : '0');
        try {
            this.lock.lock();
            if (!this.exchangeQueue.isEmpty() || autoCommit != this.isAutoCommit()) {
                Flux<ServerMessage> messages = this.sendCommand(new QueryPacket(sql), false);
                Mono mono = messages.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release).windowUntil(ServerMessage::resultSetEnd).map(dataRow -> new MariadbResult(Protocol.TEXT, null, (Flux<ServerMessage>)dataRow, ExceptionFactory.withSql(sql), null, true, this.configuration)).flatMap(m -> this.redirect().then(Mono.just((Object)m))).cast(org.mariadb.r2dbc.api.MariadbResult.class).then();
                return mono;
            }
            Mono mono = Mono.empty();
            return mono;
        }
        finally {
            this.lock.unlock();
        }
    }

    public Flux<ServerMessage> receive(DecoderState initialState) {
        return Flux.create(sink -> {
            try {
                this.lock.lock();
                Exchange exchange = new Exchange((FluxSink<ServerMessage>)sink, initialState);
                sink.onRequest(value -> this.messageSubscriber.onRequest(exchange, value));
                if (!this.exchangeQueue.offer(exchange)) {
                    sink.error((Throwable)new R2dbcTransientResourceException("Request queue limit reached during handshake"));
                }
            }
            catch (Throwable t) {
                sink.error(t);
            }
            finally {
                this.lock.unlock();
            }
        });
    }

    @Override
    public void setContext(InitialHandshakePacket handshake, long clientCapabilities) {
        this.context = !HaMode.NONE.equals((Object)this.configuration.getHaMode()) && this.configuration.isTransactionReplay() ? new RedoContext(handshake.getServerVersion(), handshake.getThreadId(), handshake.getCapabilities(), handshake.getServerStatus(), handshake.isMariaDBServer(), clientCapabilities, this.configuration.getDatabase(), this.byteBufAllocator, this.configuration.getIsolationLevel()) : new SimpleContext(handshake.getServerVersion(), handshake.getThreadId(), handshake.getCapabilities(), handshake.getServerStatus(), handshake.isMariaDBServer(), clientCapabilities, this.configuration.getDatabase(), this.byteBufAllocator, this.configuration.getIsolationLevel());
        this.decoder.setContext(this.context);
        this.encoder.setContext(this.context);
    }

    @Override
    public boolean isAutoCommit() {
        return (this.context.getServerStatus() & 2) > 0;
    }

    @Override
    public boolean isInTransaction() {
        return (this.context.getServerStatus() & 1) > 0;
    }

    @Override
    public boolean noBackslashEscapes() {
        return (this.context.getServerStatus() & 0x200) > 0;
    }

    @Override
    public ServerVersion getVersion() {
        return this.context != null ? this.context.getVersion() : ServerVersion.UNKNOWN_VERSION;
    }

    @Override
    public boolean isConnected() {
        if (this.isClosed.get()) {
            return false;
        }
        return this.connection.channel().isOpen();
    }

    @Override
    public boolean isCloseRequested() {
        return this.closeRequested;
    }

    @Override
    public void sendCommandWithoutResult(ClientMessage message) {
        try {
            this.lock.lock();
            this.requestSink.emitNext((Object)message, Sinks.EmitFailureHandler.FAIL_FAST);
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public Flux<ServerMessage> sendCommand(ClientMessage message, boolean canSafelyBeReExecuted) {
        return this.sendCommand(message, DecoderState.QUERY_RESPONSE, null, canSafelyBeReExecuted);
    }

    @Override
    public Flux<ServerMessage> sendCommand(ClientMessage message, DecoderState initialState, boolean canSafelyBeReExecuted) {
        return this.sendCommand(message, initialState, null, canSafelyBeReExecuted);
    }

    @Override
    public Flux<ServerMessage> sendCommand(ClientMessage message, DecoderState initialState, String sql, boolean canSafelyBeReExecuted) {
        return Flux.create(sink -> {
            if (!this.isConnected() || this.messageSubscriber.isClose()) {
                sink.error((Throwable)new R2dbcNonTransientResourceException("The connection is closed. Unable to send anything"));
                return;
            }
            try {
                this.lock.lock();
                Exchange exchange = new Exchange((FluxSink<ServerMessage>)sink, initialState, sql);
                if (this.exchangeQueue.offer(exchange)) {
                    if (message instanceof PreparePacket) {
                        this.decoder.addPrepare(((PreparePacket)message).getSql());
                    }
                    sink.onRequest(value -> this.messageSubscriber.onRequest(exchange, value));
                    this.requestSink.emitNext((Object)message, Sinks.EmitFailureHandler.FAIL_FAST);
                } else {
                    sink.error((Throwable)new R2dbcTransientResourceException("Request queue limit reached"));
                }
            }
            catch (Throwable t) {
                sink.error(t);
            }
            finally {
                this.lock.unlock();
            }
        });
    }

    @Override
    public Mono<ServerPrepareResult> sendPrepare(ClientMessage requests, ExceptionFactory factory, String sql) {
        return this.sendCommand(requests, DecoderState.PREPARE_RESPONSE, sql, true).handle((it, sink) -> {
            if (it instanceof ErrorPacket) {
                sink.error((Throwable)factory.from((ErrorPacket)it));
                return;
            }
            if (it instanceof CompletePrepareResult) {
                sink.next((Object)((CompletePrepareResult)it).getPrepare());
            }
            if (it.ending()) {
                sink.complete();
            }
        }).cast(ServerPrepareResult.class).singleOrEmpty();
    }

    @Override
    public Flux<ServerMessage> sendCommand(PreparePacket preparePacket, ExecutePacket executePacket, boolean canSafelyBeReExecuted) {
        return Flux.create(sink -> {
            if (!this.isConnected() || this.messageSubscriber.isClose()) {
                sink.error((Throwable)new R2dbcNonTransientResourceException("The connection is closed. Unable to send anything"));
                return;
            }
            try {
                this.lock.lock();
                Exchange exchange = new Exchange((FluxSink<ServerMessage>)sink, DecoderState.PREPARE_AND_EXECUTE_RESPONSE, preparePacket.getSql());
                if (this.exchangeQueue.offer(exchange)) {
                    sink.onRequest(value -> this.messageSubscriber.onRequest(exchange, value));
                    this.decoder.addPrepare(preparePacket.getSql());
                    this.requestSink.emitNext((Object)preparePacket, Sinks.EmitFailureHandler.FAIL_FAST);
                    this.requestSink.emitNext((Object)executePacket, Sinks.EmitFailureHandler.FAIL_FAST);
                } else {
                    sink.error((Throwable)new R2dbcTransientResourceException("Request queue limit reached"));
                }
            }
            catch (Throwable t) {
                t.printStackTrace();
                sink.error(t);
            }
            finally {
                this.lock.unlock();
            }
        });
    }

    @Override
    public HostAddress getHostAddress() {
        return this.hostAddress;
    }

    @Override
    public PrepareCache getPrepareCache() {
        return this.prepareCache;
    }

    public String toString() {
        return "Client{isClosed=" + this.isClosed + ", context=" + this.context + '}';
    }

    @Override
    public Scheduler getScheduler() {
        return this.scheduler;
    }

    protected class ServerMessageSubscriber
    implements CoreSubscriber<ServerMessage> {
        private final AtomicLong receiverDemands = new AtomicLong(0L);
        private final ReentrantLock lock;
        private final Queue<Exchange> exchangeQueue;
        private final Queue<ServerMessage> receiverQueue;
        private Subscription upstream;
        private volatile boolean close;

        public ServerMessageSubscriber(ReentrantLock lock, Queue<Exchange> exchangeQueue, Queue<ServerMessage> receiverQueue) {
            this.lock = lock;
            this.receiverQueue = receiverQueue;
            this.exchangeQueue = exchangeQueue;
        }

        public void onSubscribe(Subscription subscription) {
            this.upstream = subscription;
        }

        public void onError(Throwable t) {
            if (this.close) {
                Operators.onErrorDropped((Throwable)t, (reactor.util.context.Context)this.currentContext());
                return;
            }
            SimpleClient.this.requestSink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
            SimpleClient.this.handleConnectionError(t);
            SimpleClient.this.quitOrClose().subscribe();
        }

        public void onComplete() {
            this.close((R2dbcException)new R2dbcNonTransientResourceException(String.format("Connection %s", SimpleClient.this.closeChannelIfNeeded() ? "unexpected error" : "error"), "08000"));
            if (!SimpleClient.this.isClosed.get()) {
                SimpleClient.this.quitOrClose().subscribe();
            }
        }

        public void onNext(ServerMessage message) {
            if (this.close) {
                Operators.onNextDropped((Object)message, (reactor.util.context.Context)this.currentContext());
                return;
            }
            this.receiverDemands.decrementAndGet();
            Exchange exchange = this.exchangeQueue.peek();
            ReferenceCountUtil.retain((Object)message);
            if (this.receiverQueue.isEmpty() && exchange != null && (exchange.hasDemand() || exchange.isCancelled())) {
                if (exchange.emit(message)) {
                    this.exchangeQueue.poll();
                }
                if (exchange.hasDemand() || exchange.isCancelled()) {
                    this.requestQueueFilling();
                }
                return;
            }
            if (!this.receiverQueue.offer(message)) {
                message.release();
                Operators.onNextDropped((Object)message, (reactor.util.context.Context)this.currentContext());
                this.onError((Throwable)new R2dbcNonTransientResourceException("unexpected : server message queue is full"));
                return;
            }
            this.tryDrainQueue();
        }

        public void onRequest(Exchange exchange, long n) {
            exchange.incrementDemand(n);
            this.requestQueueFilling();
            this.tryDrainQueue();
        }

        private void requestQueueFilling() {
            if (this.receiverQueue.isEmpty() && this.receiverDemands.compareAndSet(0L, Queues.SMALL_BUFFER_SIZE)) {
                this.upstream.request((long)Queues.SMALL_BUFFER_SIZE);
            }
        }

        private void tryDrainQueue() {
            while (!this.receiverQueue.isEmpty()) {
                Exchange exchange;
                if (!this.lock.tryLock()) {
                    return;
                }
                try {
                    while (!this.receiverQueue.isEmpty()) {
                        exchange = this.exchangeQueue.peek();
                        if (exchange == null || !exchange.hasDemand() && !exchange.isCancelled()) {
                            return;
                        }
                        ServerMessage srvMsg = this.receiverQueue.poll();
                        if (srvMsg == null) {
                            return;
                        }
                        if (!exchange.emit(srvMsg)) continue;
                        this.exchangeQueue.poll();
                    }
                }
                finally {
                    this.lock.unlock();
                }
                exchange = this.exchangeQueue.peek();
                if (exchange != null && !exchange.hasDemand() && !exchange.isCancelled()) continue;
                this.requestQueueFilling();
            }
        }

        public void close(R2dbcException error) {
            Exchange exchange;
            this.close = true;
            while ((exchange = this.exchangeQueue.poll()) != null) {
                exchange.onError((Throwable)error);
            }
            while (!this.receiverQueue.isEmpty()) {
                this.receiverQueue.poll().release();
            }
        }

        public boolean isClose() {
            return this.close;
        }
    }
}

