/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.channel;

import java.util.ArrayDeque;
import java.util.List;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.ExecutorChannelInterceptorAware;
import org.springframework.integration.kafka.channel.AbstractKafkaChannel;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.support.management.metrics.CounterFacade;
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ExecutorChannelInterceptor;
import org.springframework.util.Assert;

public class PollableKafkaChannel
extends AbstractKafkaChannel
implements PollableChannel,
ExecutorChannelInterceptorAware {
    private final KafkaMessageSource<?, ?> source;
    private CounterFacade receiveCounter;
    private volatile int executorInterceptorsSize;

    public PollableKafkaChannel(KafkaOperations<?, ?> template, KafkaMessageSource<?, ?> source) {
        super(template, PollableKafkaChannel.topic(source));
        this.source = source;
        if (source.getConsumerProperties().getGroupId() == null) {
            String groupId = this.getGroupId();
            source.getConsumerProperties().setGroupId(groupId != null ? groupId : this.getBeanName());
        }
    }

    @Nullable
    public Message<?> receive() {
        return this.doReceive();
    }

    @Nullable
    public Message<?> receive(long timeout) {
        return this.doReceive();
    }

    @Nullable
    protected Message<?> doReceive() {
        AbstractMessageChannel.ChannelInterceptorList interceptorList = this.getIChannelInterceptorList();
        ArrayDeque interceptorStack = null;
        boolean counted = false;
        try {
            if (this.isLoggingEnabled()) {
                this.logger.trace(() -> "preReceive on channel '" + this + "'");
            }
            if (interceptorList.getInterceptors().size() > 0 && !interceptorList.preReceive((MessageChannel)this, interceptorStack = new ArrayDeque())) {
                return null;
            }
            Message message = this.source.receive();
            if (message != null) {
                this.incrementReceiveCounter();
                counted = true;
                message = interceptorList.postReceive(message, (MessageChannel)this);
            }
            interceptorList.afterReceiveCompletion(message, (MessageChannel)this, null, interceptorStack);
            return message;
        }
        catch (RuntimeException ex) {
            if (!counted) {
                this.incrementReceiveErrorCounter(ex);
            }
            interceptorList.afterReceiveCompletion(null, (MessageChannel)this, (Exception)ex, interceptorStack);
            throw ex;
        }
    }

    private void incrementReceiveCounter() {
        MetricsCaptor metricsCaptor = this.getMetricsCaptor();
        if (metricsCaptor != null) {
            if (this.receiveCounter == null) {
                this.receiveCounter = this.buildReceiveCounter(metricsCaptor, null);
            }
            this.receiveCounter.increment();
        }
    }

    private void incrementReceiveErrorCounter(Exception ex) {
        MetricsCaptor metricsCaptor = this.getMetricsCaptor();
        if (metricsCaptor != null) {
            this.buildReceiveCounter(metricsCaptor, ex).increment();
        }
    }

    private CounterFacade buildReceiveCounter(MetricsCaptor metricsCaptor, @Nullable Exception ex) {
        CounterFacade counterFacade = metricsCaptor.counterBuilder("spring.integration.receive").tag("name", this.getComponentName() == null ? "unknown" : this.getComponentName()).tag("type", "channel").tag("result", ex == null ? "success" : "failure").tag("exception", ex == null ? "none" : ex.getClass().getSimpleName()).description("Messages received").build();
        this.meters.add(counterFacade);
        return counterFacade;
    }

    public void setInterceptors(List<ChannelInterceptor> interceptors) {
        super.setInterceptors(interceptors);
        for (ChannelInterceptor interceptor : interceptors) {
            if (!(interceptor instanceof ExecutorChannelInterceptor)) continue;
            ++this.executorInterceptorsSize;
        }
    }

    public void addInterceptor(ChannelInterceptor interceptor) {
        super.addInterceptor(interceptor);
        if (interceptor instanceof ExecutorChannelInterceptor) {
            ++this.executorInterceptorsSize;
        }
    }

    public void addInterceptor(int index, ChannelInterceptor interceptor) {
        super.addInterceptor(index, interceptor);
        if (interceptor instanceof ExecutorChannelInterceptor) {
            ++this.executorInterceptorsSize;
        }
    }

    public boolean removeInterceptor(ChannelInterceptor interceptor) {
        boolean removed = super.removeInterceptor(interceptor);
        if (removed && interceptor instanceof ExecutorChannelInterceptor) {
            --this.executorInterceptorsSize;
        }
        return removed;
    }

    @Nullable
    public ChannelInterceptor removeInterceptor(int index) {
        ChannelInterceptor interceptor = super.removeInterceptor(index);
        if (interceptor instanceof ExecutorChannelInterceptor) {
            --this.executorInterceptorsSize;
        }
        return interceptor;
    }

    public boolean hasExecutorInterceptors() {
        return this.executorInterceptorsSize > 0;
    }

    private static String topic(KafkaMessageSource<?, ?> source) {
        Assert.notNull(source, (String)"'source' cannot be null");
        String[] topics = source.getConsumerProperties().getTopics();
        Assert.isTrue((topics != null && topics.length == 1 ? 1 : 0) != 0, (String)"Only one topic is allowed");
        return topics[0];
    }
}

