/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound;

import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQConsumerFactory;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import java.util.List;
import java.util.function.Supplier;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

public class RocketMQInboundChannelAdapter
extends MessageProducerSupport
implements OrderlyShutdownCapable {
    private static final Logger log = LoggerFactory.getLogger(RocketMQInboundChannelAdapter.class);
    private RetryTemplate retryTemplate;
    private RecoveryCallback<Object> recoveryCallback;
    private DefaultMQPushConsumer pushConsumer;
    private final String topic;
    private final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties;

    public RocketMQInboundChannelAdapter(String topic, ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
        this.topic = topic;
        this.extendedConsumerProperties = extendedConsumerProperties;
    }

    protected void onInit() {
        if (this.extendedConsumerProperties.getExtension() == null || !((RocketMQConsumerProperties)this.extendedConsumerProperties.getExtension()).getEnabled()) {
            return;
        }
        try {
            super.onInit();
            if (this.retryTemplate != null) {
                Assert.state((this.getErrorChannel() == null ? 1 : 0) != 0, (String)"Cannot have an 'errorChannel' property when a 'RetryTemplate' is provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to send an error message when retries are exhausted");
                this.retryTemplate.registerListener(new RetryListener(){

                    public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
                        return true;
                    }

                    public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
                    }

                    public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
                    }
                });
            }
            this.pushConsumer = RocketMQConsumerFactory.initPushConsumer(this.extendedConsumerProperties);
            if (((RocketMQConsumerProperties)this.extendedConsumerProperties.getExtension()).getPush().getOrderly()) {
                this.pushConsumer.registerMessageListener((msgs, context) -> this.consumeMessage(msgs, () -> {
                    context.setSuspendCurrentQueueTimeMillis((long)((RocketMQConsumerProperties)this.extendedConsumerProperties.getExtension()).getPush().getSuspendCurrentQueueTimeMillis());
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }, () -> ConsumeOrderlyStatus.SUCCESS));
            } else {
                this.pushConsumer.registerMessageListener((msgs, context) -> this.consumeMessage(msgs, () -> {
                    context.setDelayLevelWhenNextConsume(((RocketMQConsumerProperties)this.extendedConsumerProperties.getExtension()).getPush().getDelayLevelWhenNextConsume());
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }, () -> ConsumeConcurrentlyStatus.CONSUME_SUCCESS));
            }
        }
        catch (Exception e) {
            log.error("DefaultMQPushConsumer init failed, Caused by " + e.getMessage());
            throw new MessagingException(MessageBuilder.withPayload((Object)("DefaultMQPushConsumer init failed, Caused by " + e.getMessage())).build(), (Throwable)e);
        }
    }

    private <R> R consumeMessage(List<MessageExt> messageExtList, Supplier<R> failSupplier, Supplier<R> sucSupplier) {
        if (CollectionUtils.isEmpty(messageExtList)) {
            throw new MessagingException("DefaultMQPushConsumer consuming failed, Caused by messageExtList is empty");
        }
        for (MessageExt messageExt : messageExtList) {
            try {
                Message message = RocketMQMessageConverterSupport.convertMessage2Spring(messageExt);
                if (this.retryTemplate != null) {
                    this.retryTemplate.execute(context -> {
                        this.sendMessage(message);
                        return message;
                    }, this.recoveryCallback);
                    continue;
                }
                this.sendMessage(message);
            }
            catch (Exception e) {
                log.warn("consume message failed. messageExt:{}", (Object)messageExt, (Object)e);
                return failSupplier.get();
            }
        }
        return sucSupplier.get();
    }

    protected void doStart() {
        if (this.extendedConsumerProperties.getExtension() == null || !((RocketMQConsumerProperties)this.extendedConsumerProperties.getExtension()).getEnabled()) {
            return;
        }
        Instrumentation instrumentation = new Instrumentation(this.topic, (Lifecycle)this);
        try {
            this.pushConsumer.subscribe(this.topic, RocketMQUtils.getMessageSelector(((RocketMQConsumerProperties)this.extendedConsumerProperties.getExtension()).getSubscription()));
            this.pushConsumer.start();
            instrumentation.markStartedSuccessfully();
        }
        catch (Exception e) {
            instrumentation.markStartFailed(e);
            log.error("DefaultMQPushConsumer init failed, Caused by " + e.getMessage());
            throw new MessagingException(MessageBuilder.withPayload((Object)("DefaultMQPushConsumer init failed, Caused by " + e.getMessage())).build(), (Throwable)e);
        }
        finally {
            InstrumentationManager.addHealthInstrumentation(instrumentation);
        }
    }

    protected void doStop() {
        if (this.pushConsumer != null) {
            this.pushConsumer.shutdown();
        }
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<Object> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public int beforeShutdown() {
        this.stop();
        return 0;
    }

    public int afterShutdown() {
        return 0;
    }
}

