/*
 * Decompiled with CFR 0.152.
 */
package io.modelcontextprotocol.client.transport;

import io.modelcontextprotocol.spec.McpError;
import java.net.http.HttpResponse;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.FluxSink;

class ResponseSubscribers {
    ResponseSubscribers() {
    }

    static HttpResponse.BodySubscriber<Void> sseToBodySubscriber(HttpResponse.ResponseInfo responseInfo, FluxSink<ResponseEvent> sink) {
        return HttpResponse.BodySubscribers.fromLineSubscriber(FlowAdapters.toFlowSubscriber((Subscriber)new SseLineSubscriber(responseInfo, sink)));
    }

    static HttpResponse.BodySubscriber<Void> aggregateBodySubscriber(HttpResponse.ResponseInfo responseInfo, FluxSink<ResponseEvent> sink) {
        return HttpResponse.BodySubscribers.fromLineSubscriber(FlowAdapters.toFlowSubscriber((Subscriber)new AggregateSubscriber(responseInfo, sink)));
    }

    static HttpResponse.BodySubscriber<Void> bodilessBodySubscriber(HttpResponse.ResponseInfo responseInfo, FluxSink<ResponseEvent> sink) {
        return HttpResponse.BodySubscribers.fromLineSubscriber(FlowAdapters.toFlowSubscriber((Subscriber)new BodilessResponseLineSubscriber(responseInfo, sink)));
    }

    static class SseLineSubscriber
    extends BaseSubscriber<String> {
        private static final Pattern EVENT_DATA_PATTERN = Pattern.compile("^data:(.+)$", 8);
        private static final Pattern EVENT_ID_PATTERN = Pattern.compile("^id:(.+)$", 8);
        private static final Pattern EVENT_TYPE_PATTERN = Pattern.compile("^event:(.+)$", 8);
        private final FluxSink<ResponseEvent> sink;
        private final StringBuilder eventBuilder;
        private final AtomicReference<String> currentEventId;
        private final AtomicReference<String> currentEventType;
        private HttpResponse.ResponseInfo responseInfo;

        public SseLineSubscriber(HttpResponse.ResponseInfo responseInfo, FluxSink<ResponseEvent> sink) {
            this.sink = sink;
            this.eventBuilder = new StringBuilder();
            this.currentEventId = new AtomicReference();
            this.currentEventType = new AtomicReference();
            this.responseInfo = responseInfo;
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.sink.onRequest(n -> subscription.request(n));
            this.sink.onDispose(() -> subscription.cancel());
        }

        protected void hookOnNext(String line) {
            if (line.isEmpty()) {
                if (this.eventBuilder.length() > 0) {
                    String eventData = this.eventBuilder.toString();
                    SseEvent sseEvent = new SseEvent(this.currentEventId.get(), this.currentEventType.get(), eventData.trim());
                    this.sink.next((Object)new SseResponseEvent(this.responseInfo, sseEvent));
                    this.eventBuilder.setLength(0);
                }
            } else if (line.startsWith("data:")) {
                Matcher matcher = EVENT_DATA_PATTERN.matcher(line);
                if (matcher.find()) {
                    this.eventBuilder.append(matcher.group(1).trim()).append("\n");
                }
            } else if (line.startsWith("id:")) {
                Matcher matcher = EVENT_ID_PATTERN.matcher(line);
                if (matcher.find()) {
                    this.currentEventId.set(matcher.group(1).trim());
                }
            } else if (line.startsWith("event:")) {
                Matcher matcher = EVENT_TYPE_PATTERN.matcher(line);
                if (matcher.find()) {
                    this.currentEventType.set(matcher.group(1).trim());
                }
            } else {
                this.sink.error((Throwable)new McpError((Object)("Invalid SSE response. Status code: " + this.responseInfo.statusCode() + " Line: " + line)));
            }
        }

        protected void hookOnComplete() {
            if (this.eventBuilder.length() > 0) {
                String eventData = this.eventBuilder.toString();
                SseEvent sseEvent = new SseEvent(this.currentEventId.get(), this.currentEventType.get(), eventData.trim());
                this.sink.next((Object)new SseResponseEvent(this.responseInfo, sseEvent));
            }
            this.sink.complete();
        }

        protected void hookOnError(Throwable throwable) {
            this.sink.error(throwable);
        }
    }

    static class AggregateSubscriber
    extends BaseSubscriber<String> {
        private final FluxSink<ResponseEvent> sink;
        private final StringBuilder eventBuilder;
        private HttpResponse.ResponseInfo responseInfo;

        public AggregateSubscriber(HttpResponse.ResponseInfo responseInfo, FluxSink<ResponseEvent> sink) {
            this.sink = sink;
            this.eventBuilder = new StringBuilder();
            this.responseInfo = responseInfo;
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.sink.onRequest(arg_0 -> ((Subscription)subscription).request(arg_0));
            this.sink.onDispose(() -> ((Subscription)subscription).cancel());
        }

        protected void hookOnNext(String line) {
            this.eventBuilder.append(line).append("\n");
        }

        protected void hookOnComplete() {
            if (this.eventBuilder.length() > 0) {
                String data = this.eventBuilder.toString();
                this.sink.next((Object)new AggregateResponseEvent(this.responseInfo, data));
            }
            this.sink.complete();
        }

        protected void hookOnError(Throwable throwable) {
            this.sink.error(throwable);
        }
    }

    static class BodilessResponseLineSubscriber
    extends BaseSubscriber<String> {
        private final FluxSink<ResponseEvent> sink;
        private final HttpResponse.ResponseInfo responseInfo;

        public BodilessResponseLineSubscriber(HttpResponse.ResponseInfo responseInfo, FluxSink<ResponseEvent> sink) {
            this.sink = sink;
            this.responseInfo = responseInfo;
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.sink.onRequest(n -> subscription.request(n));
            this.sink.onDispose(() -> subscription.cancel());
        }

        protected void hookOnComplete() {
            this.sink.next((Object)new DummyEvent(this.responseInfo));
            this.sink.complete();
        }

        protected void hookOnError(Throwable throwable) {
            this.sink.error(throwable);
        }
    }

    record AggregateResponseEvent(HttpResponse.ResponseInfo responseInfo, String data) implements ResponseEvent
    {
    }

    record SseResponseEvent(HttpResponse.ResponseInfo responseInfo, SseEvent sseEvent) implements ResponseEvent
    {
    }

    record DummyEvent(HttpResponse.ResponseInfo responseInfo) implements ResponseEvent
    {
    }

    /*
     * Uses 'sealed' constructs - enablewith --sealed true
     */
    static interface ResponseEvent {
        public HttpResponse.ResponseInfo responseInfo();
    }

    record SseEvent(String id, String event, String data) {
    }
}

