/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.sink.RocketMqProducerSender;

public class RocketMqTransactionSender
implements RocketMqProducerSender {
    private static final String TXN_PARAM = "SeaTunnel-RocketMq";
    private final TransactionMQProducer transactionMQProducer;

    public RocketMqTransactionSender(RocketMqBaseConfiguration configuration) {
        this.transactionMQProducer = RocketMqAdminUtil.initTransactionMqProducer(configuration, new TransactionListener(){

            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        try {
            this.transactionMQProducer.start();
        }
        catch (MQClientException e) {
            throw new RocketMqConnectorException((SeaTunnelErrorCode)RocketMqConnectorErrorCode.PRODUCER_START_ERROR, e);
        }
    }

    @Override
    public void send(Message message) {
        try {
            this.transactionMQProducer.sendMessageInTransaction(message, StringUtils.isEmpty(message.getKeys()) ? TXN_PARAM : message.getKeys());
        }
        catch (MQClientException e) {
            throw new RocketMqConnectorException((SeaTunnelErrorCode)RocketMqConnectorErrorCode.PRODUCER_SEND_MESSAGE_ERROR, e);
        }
    }

    @Override
    public void close() throws Exception {
        if (this.transactionMQProducer != null) {
            this.transactionMQProducer.shutdown();
        }
    }
}

