/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;

import com.google.auto.service.AutoService;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.config.ConsumerConfig;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.ConsumerMetadata;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceReader;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceState;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;

@AutoService(value={SeaTunnelSource.class})
public class RocketMqSource
implements SeaTunnelSource<SeaTunnelRow, RocketMqSourceSplit, RocketMqSourceState>,
SupportParallelism {
    private static final String DEFAULT_CONSUMER_GROUP = "SeaTunnel-Consumer-Group";
    private final ConsumerMetadata metadata = new ConsumerMetadata();
    private JobContext jobContext;
    private SeaTunnelRowType typeInfo;
    private CatalogTable catalogTable;
    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private long discoveryIntervalMillis = (Long)ConsumerConfig.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.defaultValue();

    public String getPluginName() {
        return "Rocketmq";
    }

    public Boundedness getBoundedness() {
        return JobMode.BATCH.equals((Object)this.jobContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
    }

    public void prepare(Config config) throws PrepareFailException {
        CheckResult result = CheckConfigUtil.checkAllExists((Config)config, (String[])new String[]{ConsumerConfig.TOPICS.key(), org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.NAME_SRV_ADDR.key()});
        if (!result.isSuccess()) {
            throw new RocketMqConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", this.getPluginName(), PluginType.SOURCE, result.getMsg()));
        }
        this.metadata.setTopics(Arrays.asList(config.getString(ConsumerConfig.TOPICS.key()).split(",")));
        RocketMqBaseConfiguration.Builder baseConfigBuilder = RocketMqBaseConfiguration.newBuilder().consumer().namesrvAddr(config.getString(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.NAME_SRV_ADDR.key()));
        boolean aclEnabled = (Boolean)org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACL_ENABLED.defaultValue();
        if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACL_ENABLED.key())) {
            aclEnabled = config.getBoolean(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACL_ENABLED.key());
            if (!(!aclEnabled || config.hasPath(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACCESS_KEY.key()) && config.hasPath(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.SECRET_KEY.key()))) {
                throw new RocketMqConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, "When ACL_ENABLED true , ACCESS_KEY and SECRET_KEY must be configured");
            }
            if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACCESS_KEY.key())) {
                baseConfigBuilder.accessKey(config.getString(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACCESS_KEY.key()));
            }
            if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.SECRET_KEY.key())) {
                baseConfigBuilder.secretKey(config.getString(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.SECRET_KEY.key()));
            }
        }
        baseConfigBuilder.aclEnable(aclEnabled);
        if (config.hasPath(ConsumerConfig.CONSUMER_GROUP.key())) {
            baseConfigBuilder.groupId(config.getString(ConsumerConfig.CONSUMER_GROUP.key()));
        } else {
            baseConfigBuilder.groupId(DEFAULT_CONSUMER_GROUP);
        }
        if (config.hasPath(ConsumerConfig.BATCH_SIZE.key())) {
            baseConfigBuilder.batchSize(config.getInt(ConsumerConfig.BATCH_SIZE.key()));
        } else {
            baseConfigBuilder.batchSize((Integer)ConsumerConfig.BATCH_SIZE.defaultValue());
        }
        if (config.hasPath(ConsumerConfig.POLL_TIMEOUT_MILLIS.key())) {
            baseConfigBuilder.pollTimeoutMillis(config.getInt(ConsumerConfig.POLL_TIMEOUT_MILLIS.key()));
        } else {
            baseConfigBuilder.pollTimeoutMillis((Long)ConsumerConfig.POLL_TIMEOUT_MILLIS.defaultValue());
        }
        this.metadata.setBaseConfig(baseConfigBuilder.build());
        if (config.hasPath(ConsumerConfig.COMMIT_ON_CHECKPOINT.key())) {
            this.metadata.setEnabledCommitCheckpoint(config.getBoolean(ConsumerConfig.COMMIT_ON_CHECKPOINT.key()));
        } else {
            this.metadata.setEnabledCommitCheckpoint((Boolean)ConsumerConfig.COMMIT_ON_CHECKPOINT.defaultValue());
        }
        StartMode startMode = (StartMode)((Object)ConsumerConfig.START_MODE.defaultValue());
        if (config.hasPath(ConsumerConfig.START_MODE.key())) {
            startMode = StartMode.valueOf(config.getString(ConsumerConfig.START_MODE.key()).toUpperCase());
            switch (startMode) {
                case CONSUME_FROM_TIMESTAMP: {
                    long startOffsetsTimestamp = config.getLong(ConsumerConfig.START_MODE_TIMESTAMP.key());
                    long currentTimestamp = System.currentTimeMillis();
                    if (startOffsetsTimestamp < 0L || startOffsetsTimestamp > currentTimestamp) {
                        throw new IllegalArgumentException("The offsets timestamp value is smaller than 0 or smaller than the current time");
                    }
                    this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
                    break;
                }
                case CONSUME_FROM_SPECIFIC_OFFSETS: {
                    Config offsets = config.getConfig(ConsumerConfig.START_MODE_OFFSETS.key());
                    ConfigRenderOptions options = ConfigRenderOptions.concise();
                    String offsetsJson = offsets.root().render(options);
                    if (offsetsJson == null) {
                        throw new IllegalArgumentException("start mode is " + (Object)((Object)StartMode.CONSUME_FROM_SPECIFIC_OFFSETS) + "but no specific offsets were specified.");
                    }
                    HashMap<MessageQueue, Long> specificStartOffsets = new HashMap<MessageQueue, Long>();
                    ObjectNode jsonNodes = JsonUtils.parseObject((String)offsetsJson);
                    jsonNodes.fieldNames().forEachRemaining(key -> {
                        int splitIndex = key.lastIndexOf("-");
                        String topic = key.substring(0, splitIndex);
                        String partition = key.substring(splitIndex + 1);
                        long offset = jsonNodes.get(key).asLong();
                        MessageQueue messageQueue = new MessageQueue(topic, null, Integer.valueOf(partition));
                        specificStartOffsets.put(messageQueue, offset);
                    });
                    this.metadata.setSpecificStartOffsets(specificStartOffsets);
                    break;
                }
            }
        }
        this.metadata.setStartMode(startMode);
        if (config.hasPath(ConsumerConfig.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key())) {
            this.discoveryIntervalMillis = config.getLong(ConsumerConfig.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
        }
        this.catalogTable = CatalogTableUtil.buildWithConfig((Config)config);
        this.typeInfo = this.catalogTable.getSeaTunnelRowType();
        this.setDeserialization(config);
    }

    public void setJobContext(JobContext jobContext) {
        this.jobContext = jobContext;
    }

    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
        return this.typeInfo;
    }

    public SourceReader<SeaTunnelRow, RocketMqSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
        return new RocketMqSourceReader(this.metadata, this.deserializationSchema, readerContext);
    }

    public SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> createEnumerator(SourceSplitEnumerator.Context<RocketMqSourceSplit> context) throws Exception {
        return new RocketMqSourceSplitEnumerator(this.metadata, context, this.discoveryIntervalMillis);
    }

    public SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> restoreEnumerator(SourceSplitEnumerator.Context<RocketMqSourceSplit> context, RocketMqSourceState sourceState) throws Exception {
        return new RocketMqSourceSplitEnumerator(this.metadata, context, this.discoveryIntervalMillis);
    }

    private void setDeserialization(Config config) {
        if (config.hasPath(ConsumerConfig.SCHEMA.key())) {
            SchemaFormat format = SchemaFormat.JSON;
            if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FORMAT.key())) {
                format = SchemaFormat.find(config.getString(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FORMAT.key()));
            }
            switch (format) {
                case JSON: {
                    this.deserializationSchema = new JsonDeserializationSchema(this.catalogTable, false, false);
                    break;
                }
                case TEXT: {
                    String delimiter = ",";
                    if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FIELD_DELIMITER.key())) {
                        delimiter = config.getString(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FIELD_DELIMITER.key());
                    }
                    this.deserializationSchema = TextDeserializationSchema.builder().seaTunnelRowType(this.typeInfo).delimiter(delimiter).build();
                    break;
                }
                default: {
                    throw new SeaTunnelJsonFormatException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + (Object)((Object)format));
                }
            }
        } else {
            this.typeInfo = CatalogTableUtil.buildSimpleTextSchema();
            this.deserializationSchema = TextDeserializationSchema.builder().seaTunnelRowType(this.typeInfo).delimiter(String.valueOf('\u0002')).build();
        }
    }
}

