/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.micrometer.observation;

import io.micrometer.common.util.StringUtils;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationConvention;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.docs.ObservationDocumentation;
import io.netty.buffer.ByteBuf;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.frame.FrameType;
import io.rsocket.metadata.RoutingMetadata;
import io.rsocket.metadata.WellKnownMimeType;
import io.rsocket.micrometer.observation.CompositeMetadataUtils;
import io.rsocket.micrometer.observation.DefaultRSocketRequesterObservationConvention;
import io.rsocket.micrometer.observation.RSocketContext;
import io.rsocket.micrometer.observation.RSocketObservationDocumentation;
import io.rsocket.micrometer.observation.RSocketRequesterObservationConvention;
import io.rsocket.util.RSocketProxy;
import java.util.Iterator;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.context.ContextView;

public class ObservationRequesterRSocketProxy
extends RSocketProxy {
    private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";
    private final ObservationRegistry observationRegistry;
    @Nullable
    private final RSocketRequesterObservationConvention observationConvention;

    public ObservationRequesterRSocketProxy(RSocket source, ObservationRegistry observationRegistry) {
        this(source, observationRegistry, null);
    }

    public ObservationRequesterRSocketProxy(RSocket source, ObservationRegistry observationRegistry, RSocketRequesterObservationConvention observationConvention) {
        super(source);
        this.observationRegistry = observationRegistry;
        this.observationConvention = observationConvention;
    }

    public Mono<Void> fireAndForget(Payload payload) {
        return this.setObservation(x$0 -> super.fireAndForget(x$0), payload, FrameType.REQUEST_FNF, RSocketObservationDocumentation.RSOCKET_REQUESTER_FNF);
    }

    public Mono<Payload> requestResponse(Payload payload) {
        return this.setObservation(x$0 -> super.requestResponse(x$0), payload, FrameType.REQUEST_RESPONSE, RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_RESPONSE);
    }

    <T> Mono<T> setObservation(Function<Payload, Mono<T>> input, Payload payload, FrameType frameType, ObservationDocumentation observation) {
        return Mono.deferContextual(contextView -> this.observe(input, payload, frameType, observation, (ContextView)contextView));
    }

    private String route(Payload payload) {
        if (payload.hasMetadata()) {
            try {
                ByteBuf extracted = CompositeMetadataUtils.extract(payload.sliceMetadata(), WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString());
                RoutingMetadata routingMetadata = new RoutingMetadata(extracted);
                Iterator iterator = routingMetadata.iterator();
                return (String)iterator.next();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        return null;
    }

    private <T> Mono<T> observe(Function<Payload, Mono<T>> input, Payload payload, FrameType frameType, ObservationDocumentation obs, ContextView contextView) {
        String route = this.route(payload);
        RSocketContext rSocketContext = new RSocketContext(payload, payload.sliceMetadata(), frameType, route, RSocketContext.Side.REQUESTER);
        Observation parentObservation = (Observation)contextView.getOrDefault((Object)MICROMETER_OBSERVATION_KEY, null);
        Observation observation = obs.observation((ObservationConvention)this.observationConvention, (ObservationConvention)new DefaultRSocketRequesterObservationConvention(rSocketContext), () -> rSocketContext, this.observationRegistry).parentObservation(parentObservation);
        this.setContextualName(frameType, route, observation);
        observation.start();
        Payload newPayload = payload;
        if (rSocketContext.modifiedPayload != null) {
            newPayload = rSocketContext.modifiedPayload;
        }
        return input.apply(newPayload).doOnError(arg_0 -> ((Observation)observation).error(arg_0)).doFinally(signalType -> observation.stop()).contextWrite(context -> context.put((Object)MICROMETER_OBSERVATION_KEY, (Object)observation));
    }

    public Flux<Payload> requestStream(Payload payload) {
        return this.observationFlux(x$0 -> super.requestStream(x$0), payload, FrameType.REQUEST_STREAM, RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_STREAM);
    }

    public Flux<Payload> requestChannel(Publisher<Payload> inbound) {
        return Flux.from(inbound).switchOnFirst((firstSignal, flux) -> {
            Payload firstPayload = (Payload)firstSignal.get();
            if (firstPayload != null) {
                return this.observationFlux(p -> super.requestChannel((Publisher)flux.skip(1L).startWith((Object[])new Payload[]{p})), firstPayload, FrameType.REQUEST_CHANNEL, RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_CHANNEL);
            }
            return flux;
        });
    }

    private Flux<Payload> observationFlux(Function<Payload, Flux<Payload>> input, Payload payload, FrameType frameType, ObservationDocumentation obs) {
        return Flux.deferContextual(contextView -> {
            String route = this.route(payload);
            RSocketContext rSocketContext = new RSocketContext(payload, payload.sliceMetadata(), frameType, route, RSocketContext.Side.REQUESTER);
            Observation parentObservation = (Observation)contextView.getOrDefault((Object)MICROMETER_OBSERVATION_KEY, null);
            Observation newObservation = obs.observation((ObservationConvention)this.observationConvention, (ObservationConvention)new DefaultRSocketRequesterObservationConvention(rSocketContext), () -> rSocketContext, this.observationRegistry).parentObservation(parentObservation);
            this.setContextualName(frameType, route, newObservation);
            newObservation.start();
            return ((Flux)input.apply(rSocketContext.modifiedPayload)).doOnError(arg_0 -> ((Observation)newObservation).error(arg_0)).doFinally(signalType -> newObservation.stop()).contextWrite(context -> context.put((Object)MICROMETER_OBSERVATION_KEY, (Object)newObservation));
        });
    }

    private void setContextualName(FrameType frameType, String route, Observation newObservation) {
        if (StringUtils.isNotBlank((String)route)) {
            newObservation.contextualName(frameType.name() + " " + route);
        } else {
            newObservation.contextualName(frameType.name());
        }
    }
}

