/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.function;

import java.lang.reflect.Type;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.binding.DefaultPartitioningInterceptor;
import org.springframework.cloud.stream.binding.NewDestinationBindingCallback;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper;
import org.springframework.cloud.stream.function.StreamOperations;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.ResolvableType;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.config.GlobalChannelInterceptorProcessor;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

public final class StreamBridge
implements StreamOperations,
SmartInitializingSingleton {
    private static final String STREAM_BRIDGE_FUNC_NAME = "streamBridge";
    private final Log logger = LogFactory.getLog(this.getClass());
    private final Map<String, MessageChannel> channelCache;
    private final FunctionCatalog functionCatalog;
    private final NewDestinationBindingCallback destinationBindingCallback;
    private final BindingServiceProperties bindingServiceProperties;
    private final ConfigurableApplicationContext applicationContext;
    private boolean initialized;
    private final BindingService bindingService;
    private final Map<String, SimpleFunctionRegistry.FunctionInvocationWrapper> streamBridgeFunctionCache;
    private final FunctionInvocationHelper<?> functionInvocationHelper;

    StreamBridge(FunctionCatalog functionCatalog, final BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext applicationContext, @Nullable NewDestinationBindingCallback destinationBindingCallback) {
        Assert.notNull((Object)functionCatalog, (String)"'functionCatalog' must not be null");
        Assert.notNull((Object)applicationContext, (String)"'applicationContext' must not be null");
        Assert.notNull((Object)bindingServiceProperties, (String)"'bindingServiceProperties' must not be null");
        this.bindingService = (BindingService)applicationContext.getBean(BindingService.class);
        this.functionCatalog = functionCatalog;
        this.applicationContext = applicationContext;
        this.bindingServiceProperties = bindingServiceProperties;
        this.destinationBindingCallback = destinationBindingCallback;
        this.channelCache = new LinkedHashMap<String, MessageChannel>(){

            @Override
            protected boolean removeEldestEntry(Map.Entry<String, MessageChannel> eldest) {
                boolean remove;
                boolean bl = remove = this.size() > bindingServiceProperties.getDynamicDestinationCacheSize();
                if (remove) {
                    if (StreamBridge.this.logger.isDebugEnabled()) {
                        StreamBridge.this.logger.debug((Object)("Removing message channel from cache " + eldest.getKey()));
                    }
                    StreamBridge.this.bindingService.unbindProducers(eldest.getKey());
                }
                return remove;
            }
        };
        this.functionInvocationHelper = (FunctionInvocationHelper)applicationContext.getBean(FunctionInvocationHelper.class);
        this.streamBridgeFunctionCache = new HashMap<String, SimpleFunctionRegistry.FunctionInvocationWrapper>();
    }

    @Override
    public boolean send(String bindingName, Object data) {
        BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(bindingName);
        MimeType contentType = StringUtils.hasText((String)bindingProperties.getContentType()) ? MimeType.valueOf((String)bindingProperties.getContentType()) : MimeTypeUtils.APPLICATION_JSON;
        return this.send(bindingName, data, contentType);
    }

    @Override
    public boolean send(String bindingName, Object data, MimeType outputContentType) {
        return this.send(bindingName, null, data, outputContentType);
    }

    @Override
    public boolean send(String bindingName, @Nullable String binderName, Object data) {
        return this.send(bindingName, binderName, data, MimeTypeUtils.APPLICATION_JSON);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean send(String bindingName, @Nullable String binderName, Object data, MimeType outputContentType) {
        Message resultMessage;
        GenericMessage genericMessage;
        if (!this.initialized) {
            this.afterSingletonsInstantiated();
        }
        ProducerProperties producerProperties = this.bindingServiceProperties.getProducerProperties(bindingName);
        MessageChannel messageChannel = this.resolveDestination(bindingName, producerProperties, binderName);
        Object functionToInvoke = this.getStreamBridgeFunction(outputContentType.toString(), producerProperties);
        if (producerProperties != null && producerProperties.isPartitioned()) {
            functionToInvoke = new PartitionAwareFunctionWrapper((Function<?, ?>)functionToInvoke, this.applicationContext, producerProperties);
        }
        String targetType = this.resolveBinderTargetType(bindingName, binderName, MessageChannel.class, (BinderFactory)this.applicationContext.getBean(BinderFactory.class));
        if (data instanceof Message) {
            Message messageData = (Message)data;
            genericMessage = MessageBuilder.fromMessage((Message)messageData).setHeaderIfAbsent(MessageUtils.TARGET_PROTOCOL, (Object)targetType).build();
        } else {
            genericMessage = new GenericMessage(data, Collections.singletonMap(MessageUtils.TARGET_PROTOCOL, targetType));
        }
        GenericMessage messageToSend = genericMessage;
        StreamBridge streamBridge = this;
        synchronized (streamBridge) {
            resultMessage = (Message)functionToInvoke.apply(messageToSend);
        }
        if (resultMessage == null && messageToSend.getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) {
            resultMessage = messageToSend;
        }
        resultMessage = (Message)this.functionInvocationHelper.postProcessResult((Object)resultMessage, null);
        return messageChannel.send(resultMessage);
    }

    private synchronized SimpleFunctionRegistry.FunctionInvocationWrapper getStreamBridgeFunction(String outputContentType, ProducerProperties producerProperties) {
        if (StringUtils.hasText((String)outputContentType) && this.streamBridgeFunctionCache.containsKey(outputContentType)) {
            return this.streamBridgeFunctionCache.get(outputContentType);
        }
        SimpleFunctionRegistry.FunctionInvocationWrapper functionToInvoke = (SimpleFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(STREAM_BRIDGE_FUNC_NAME, new String[]{outputContentType.toString()});
        this.streamBridgeFunctionCache.put(outputContentType, functionToInvoke);
        functionToInvoke.setSkipOutputConversion(producerProperties.isUseNativeEncoding());
        return functionToInvoke;
    }

    public void afterSingletonsInstantiated() {
        if (this.initialized) {
            return;
        }
        FunctionRegistration fr = new FunctionRegistration((Object)new SimpleFunctionRegistry.PassThruFunction(), new String[]{STREAM_BRIDGE_FUNC_NAME});
        fr.getProperties().put("singleton", "false");
        Type functionType = ResolvableType.forClassWithGenerics(Function.class, (Class[])new Class[]{Object.class, Object.class}).getType();
        ((FunctionRegistry)this.functionCatalog).register(fr.type(functionType));
        this.initialized = true;
    }

    synchronized MessageChannel resolveDestination(String destinationName, ProducerProperties producerProperties, String binderName) {
        Object messageChannel = null;
        messageChannel = StringUtils.hasText((String)binderName) ? this.channelCache.get(binderName + ":" + destinationName) : this.channelCache.get(destinationName);
        if (messageChannel == null) {
            if (this.applicationContext.containsBean(destinationName)) {
                messageChannel = (MessageChannel)this.applicationContext.getBean(destinationName, MessageChannel.class);
                Object[] consumerBindingNames = this.bindingService.getConsumerBindingNames();
                if (ObjectUtils.containsElement((Object[])consumerBindingNames, (Object)destinationName)) {
                    this.logger.warn((Object)"You seem to be sending data to the input binding.  It is not recommended, since you are bypassing the binder and this the messaging system exposed by the binder.");
                }
            } else {
                messageChannel = new DirectWithAttributesChannel();
                if (this.destinationBindingCallback != null) {
                    Object extendedProducerProperties = this.bindingService.getExtendedProducerProperties(messageChannel, destinationName);
                    this.destinationBindingCallback.configure(destinationName, (MessageChannel)messageChannel, producerProperties, extendedProducerProperties);
                }
                Binder<?, ConsumerProperties, ProducerProperties> binder = null;
                if (StringUtils.hasText((String)binderName)) {
                    BinderFactory binderFactory = (BinderFactory)this.applicationContext.getBean(BinderFactory.class);
                    binder = binderFactory.getBinder(binderName, messageChannel.getClass());
                }
                if (producerProperties != null && producerProperties.isPartitioned()) {
                    BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(destinationName);
                    ((AbstractMessageChannel)messageChannel).addInterceptor((ChannelInterceptor)new DefaultPartitioningInterceptor(bindingProperties, this.applicationContext.getBeanFactory()));
                }
                this.addInterceptors((AbstractMessageChannel)messageChannel, destinationName);
                this.bindingService.bindProducer(messageChannel, destinationName, true, binder);
                if (StringUtils.hasText((String)binderName)) {
                    this.channelCache.put(binderName + ":" + destinationName, (MessageChannel)messageChannel);
                } else {
                    this.channelCache.put(destinationName, (MessageChannel)messageChannel);
                }
            }
        }
        return messageChannel;
    }

    private String resolveBinderTargetType(String channelName, String binderName, Class<?> bindableType, BinderFactory binderFactory) {
        String binderConfigurationName = binderName != null ? binderName : this.bindingServiceProperties.getBinder(channelName);
        Binder<?, ConsumerProperties, ProducerProperties> binder = binderFactory.getBinder(binderConfigurationName, bindableType);
        String targetProtocol = binder.getClass().getSimpleName().startsWith("Rabbit") ? "amqp" : "kafka";
        return targetProtocol;
    }

    private void addInterceptors(AbstractMessageChannel messageChannel, String destinationName) {
        GlobalChannelInterceptorProcessor globalChannelInterceptorProcessor = (GlobalChannelInterceptorProcessor)this.applicationContext.getBean(GlobalChannelInterceptorProcessor.class);
        globalChannelInterceptorProcessor.postProcessAfterInitialization((Object)messageChannel, destinationName);
    }
}

