/*
 * Decompiled with CFR 0.152.
 */
package io.modelcontextprotocol.server.transport;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpServerSession;
import io.modelcontextprotocol.spec.McpServerTransport;
import io.modelcontextprotocol.spec.McpServerTransportProvider;
import io.modelcontextprotocol.util.Assert;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class StdioServerTransportProvider
implements McpServerTransportProvider {
    private static final Logger logger = LoggerFactory.getLogger(StdioServerTransportProvider.class);
    private final ObjectMapper objectMapper;
    private final InputStream inputStream;
    private final OutputStream outputStream;
    private McpServerSession session;
    private final AtomicBoolean isClosing = new AtomicBoolean(false);
    private final Sinks.One<Void> inboundReady = Sinks.one();

    public StdioServerTransportProvider() {
        this(new ObjectMapper());
    }

    public StdioServerTransportProvider(ObjectMapper objectMapper) {
        this(objectMapper, System.in, System.out);
    }

    public StdioServerTransportProvider(ObjectMapper objectMapper, InputStream inputStream, OutputStream outputStream) {
        Assert.notNull(objectMapper, "The ObjectMapper can not be null");
        Assert.notNull(inputStream, "The InputStream can not be null");
        Assert.notNull(outputStream, "The OutputStream can not be null");
        this.objectMapper = objectMapper;
        this.inputStream = inputStream;
        this.outputStream = outputStream;
    }

    @Override
    public String protocolVersion() {
        return "2024-11-05";
    }

    @Override
    public void setSessionFactory(McpServerSession.Factory sessionFactory) {
        StdioMcpSessionTransport transport = new StdioMcpSessionTransport();
        this.session = sessionFactory.create(transport);
        transport.initProcessing();
    }

    @Override
    public Mono<Void> notifyClients(String method, Object params) {
        if (this.session == null) {
            return Mono.error((Throwable)new McpError((Object)"No session to close"));
        }
        return this.session.sendNotification(method, params).doOnError(e -> logger.error("Failed to send notification: {}", (Object)e.getMessage()));
    }

    @Override
    public Mono<Void> closeGracefully() {
        if (this.session == null) {
            return Mono.empty();
        }
        return this.session.closeGracefully();
    }

    private class StdioMcpSessionTransport
    implements McpServerTransport {
        private final Sinks.Many<McpSchema.JSONRPCMessage> inboundSink;
        private final Sinks.Many<McpSchema.JSONRPCMessage> outboundSink;
        private final AtomicBoolean isStarted = new AtomicBoolean(false);
        private Scheduler inboundScheduler;
        private Scheduler outboundScheduler;
        private final Sinks.One<Void> outboundReady = Sinks.one();

        public StdioMcpSessionTransport() {
            this.inboundSink = Sinks.many().unicast().onBackpressureBuffer();
            this.outboundSink = Sinks.many().unicast().onBackpressureBuffer();
            this.inboundScheduler = Schedulers.fromExecutorService((ExecutorService)Executors.newSingleThreadExecutor(), (String)"stdio-inbound");
            this.outboundScheduler = Schedulers.fromExecutorService((ExecutorService)Executors.newSingleThreadExecutor(), (String)"stdio-outbound");
        }

        @Override
        public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
            return Mono.zip((Mono)StdioServerTransportProvider.this.inboundReady.asMono(), (Mono)this.outboundReady.asMono()).then(Mono.defer(() -> {
                if (this.outboundSink.tryEmitNext((Object)message).isSuccess()) {
                    return Mono.empty();
                }
                return Mono.error((Throwable)new RuntimeException("Failed to enqueue message"));
            }));
        }

        @Override
        public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
            return (T)StdioServerTransportProvider.this.objectMapper.convertValue(data, typeRef);
        }

        @Override
        public Mono<Void> closeGracefully() {
            return Mono.fromRunnable(() -> {
                StdioServerTransportProvider.this.isClosing.set(true);
                logger.debug("Session transport closing gracefully");
                this.inboundSink.tryEmitComplete();
            });
        }

        @Override
        public void close() {
            StdioServerTransportProvider.this.isClosing.set(true);
            logger.debug("Session transport closed");
        }

        private void initProcessing() {
            this.handleIncomingMessages();
            this.startInboundProcessing();
            this.startOutboundProcessing();
        }

        private void handleIncomingMessages() {
            this.inboundSink.asFlux().flatMap(message -> StdioServerTransportProvider.this.session.handle((McpSchema.JSONRPCMessage)message)).doOnTerminate(() -> {
                this.outboundSink.tryEmitComplete();
                this.inboundScheduler.dispose();
            }).subscribe();
        }

        private void startInboundProcessing() {
            if (this.isStarted.compareAndSet(false, true)) {
                this.inboundScheduler.schedule(() -> {
                    StdioServerTransportProvider.this.inboundReady.tryEmitValue(null);
                    BufferedReader reader = null;
                    try {
                        reader = new BufferedReader(new InputStreamReader(StdioServerTransportProvider.this.inputStream));
                        while (!StdioServerTransportProvider.this.isClosing.get()) {
                            try {
                                String line = reader.readLine();
                                if (line == null) break;
                                if (StdioServerTransportProvider.this.isClosing.get()) {
                                    break;
                                }
                                logger.debug("Received JSON message: {}", (Object)line);
                                try {
                                    McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(StdioServerTransportProvider.this.objectMapper, line);
                                    if (this.inboundSink.tryEmitNext((Object)message).isSuccess()) continue;
                                }
                                catch (Exception e) {
                                    this.logIfNotClosing("Error processing inbound message", e);
                                }
                            }
                            catch (IOException e) {
                                this.logIfNotClosing("Error reading from stdin", e);
                            }
                            break;
                        }
                    }
                    catch (Exception e) {
                        this.logIfNotClosing("Error in inbound processing", e);
                    }
                    finally {
                        StdioServerTransportProvider.this.isClosing.set(true);
                        if (StdioServerTransportProvider.this.session != null) {
                            StdioServerTransportProvider.this.session.close();
                        }
                        this.inboundSink.tryEmitComplete();
                    }
                });
            }
        }

        private void startOutboundProcessing() {
            Function<Flux, Flux> outboundConsumer = messages -> messages.doOnSubscribe(subscription -> this.outboundReady.tryEmitValue(null)).publishOn(this.outboundScheduler).handle((message, sink) -> {
                block9: {
                    if (message != null && !StdioServerTransportProvider.this.isClosing.get()) {
                        try {
                            String jsonMessage = StdioServerTransportProvider.this.objectMapper.writeValueAsString(message);
                            jsonMessage = jsonMessage.replace("\r\n", "\\n").replace("\n", "\\n").replace("\r", "\\n");
                            OutputStream outputStream = StdioServerTransportProvider.this.outputStream;
                            synchronized (outputStream) {
                                StdioServerTransportProvider.this.outputStream.write(jsonMessage.getBytes(StandardCharsets.UTF_8));
                                StdioServerTransportProvider.this.outputStream.write("\n".getBytes(StandardCharsets.UTF_8));
                                StdioServerTransportProvider.this.outputStream.flush();
                            }
                            sink.next(message);
                        }
                        catch (IOException e) {
                            if (!StdioServerTransportProvider.this.isClosing.get()) {
                                logger.error("Error writing message", (Throwable)e);
                                sink.error((Throwable)new RuntimeException(e));
                                break block9;
                            }
                            logger.debug("Stream closed during shutdown", (Throwable)e);
                        }
                    } else if (StdioServerTransportProvider.this.isClosing.get()) {
                        sink.complete();
                    }
                }
            }).doOnComplete(() -> {
                StdioServerTransportProvider.this.isClosing.set(true);
                this.outboundScheduler.dispose();
            }).doOnError(e -> {
                if (!StdioServerTransportProvider.this.isClosing.get()) {
                    logger.error("Error in outbound processing", e);
                    StdioServerTransportProvider.this.isClosing.set(true);
                    this.outboundScheduler.dispose();
                }
            }).map(msg -> (McpSchema.JSONRPCMessage)msg);
            outboundConsumer.apply(this.outboundSink.asFlux()).subscribe();
        }

        private void logIfNotClosing(String message, Exception e) {
            if (!StdioServerTransportProvider.this.isClosing.get()) {
                logger.error(message, (Throwable)e);
            }
        }
    }
}

