/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import com.google.auto.service.AutoService;
import java.util.HashMap;
import java.util.Properties;
import org.apache.kafka.common.TopicPartition;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.ConsumerMetadata;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceReader;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;

@AutoService(value={SeaTunnelSource.class})
public class KafkaSource
implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit, KafkaSourceState>,
SupportParallelism {
    private final ConsumerMetadata metadata = new ConsumerMetadata();
    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private SeaTunnelRowType typeInfo;
    private JobContext jobContext;
    private long discoveryIntervalMillis = (Long)org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.defaultValue();
    private MessageFormatErrorHandleWay messageFormatErrorHandleWay = MessageFormatErrorHandleWay.FAIL;

    public Boundedness getBoundedness() {
        return JobMode.BATCH.equals((Object)this.jobContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
    }

    public String getPluginName() {
        return "Kafka";
    }

    public void prepare(Config config) throws PrepareFailException {
        CheckResult result = CheckConfigUtil.checkAllExists((Config)config, (String[])new String[]{org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC.key(), org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS.key()});
        if (!result.isSuccess()) {
            throw new KafkaConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", this.getPluginName(), PluginType.SOURCE, result.getMsg()));
        }
        this.metadata.setTopic(config.getString(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC.key()));
        if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN.key())) {
            this.metadata.setPattern(config.getBoolean(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN.key()));
        } else {
            this.metadata.setPattern((Boolean)org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN.defaultValue());
        }
        this.metadata.setBootstrapServers(config.getString(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS.key()));
        this.metadata.setProperties(new Properties());
        if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP.key())) {
            this.metadata.setConsumerGroup(config.getString(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP.key()));
        } else {
            this.metadata.setConsumerGroup((String)org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP.defaultValue());
        }
        if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT.key())) {
            this.metadata.setCommitOnCheckpoint(config.getBoolean(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT.key()));
        } else {
            this.metadata.setCommitOnCheckpoint((Boolean)org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT.defaultValue());
        }
        if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE.key())) {
            StartMode startMode = StartMode.valueOf(config.getString(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE.key()).toUpperCase());
            this.metadata.setStartMode(startMode);
            switch (startMode) {
                case TIMESTAMP: {
                    long startOffsetsTimestamp = config.getLong(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_TIMESTAMP.key());
                    long currentTimestamp = System.currentTimeMillis();
                    if (startOffsetsTimestamp < 0L || startOffsetsTimestamp > currentTimestamp) {
                        throw new IllegalArgumentException("start_mode.timestamp The value is smaller than 0 or smaller than the current time");
                    }
                    this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
                    break;
                }
                case SPECIFIC_OFFSETS: {
                    Config offsets = config.getConfig(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_OFFSETS.key());
                    ConfigRenderOptions options = ConfigRenderOptions.concise();
                    String offsetsJson = offsets.root().render(options);
                    if (offsetsJson == null) {
                        throw new IllegalArgumentException("start mode is " + (Object)((Object)StartMode.SPECIFIC_OFFSETS) + "but no specific offsets were specified.");
                    }
                    HashMap<TopicPartition, Long> specificStartOffsets = new HashMap<TopicPartition, Long>();
                    ObjectNode jsonNodes = JsonUtils.parseObject((String)offsetsJson);
                    jsonNodes.fieldNames().forEachRemaining(key -> {
                        int splitIndex = key.lastIndexOf("-");
                        String topic = key.substring(0, splitIndex);
                        String partition = key.substring(splitIndex + 1);
                        long offset = jsonNodes.get(key).asLong();
                        TopicPartition topicPartition = new TopicPartition(topic, Integer.valueOf(partition));
                        specificStartOffsets.put(topicPartition, offset);
                    });
                    this.metadata.setSpecificStartOffsets(specificStartOffsets);
                    break;
                }
            }
        }
        if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key())) {
            this.discoveryIntervalMillis = config.getLong(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
        }
        if (CheckConfigUtil.isValidParam((Config)config, (String)org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG.key())) {
            config.getObject(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG.key()).forEach((key, value) -> this.metadata.getProperties().put(key, value.unwrapped()));
        }
        if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION.key())) {
            MessageFormatErrorHandleWay formatErrorWayOption = (MessageFormatErrorHandleWay)((Object)ReadonlyConfig.fromConfig((Config)config).get(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION));
            switch (formatErrorWayOption) {
                case FAIL: 
                case SKIP: {
                    this.messageFormatErrorHandleWay = formatErrorWayOption;
                    break;
                }
            }
        }
        this.setDeserialization(config);
    }

    public SeaTunnelRowType getProducedType() {
        return this.typeInfo;
    }

    public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
        return new KafkaSourceReader(this.metadata, this.deserializationSchema, readerContext, this.messageFormatErrorHandleWay);
    }

    public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> createEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext) throws Exception {
        return new KafkaSourceSplitEnumerator(this.metadata, enumeratorContext, this.discoveryIntervalMillis);
    }

    public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> restoreEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext, KafkaSourceState checkpointState) throws Exception {
        return new KafkaSourceSplitEnumerator(this.metadata, enumeratorContext, checkpointState, this.discoveryIntervalMillis);
    }

    public void setJobContext(JobContext jobContext) {
        this.jobContext = jobContext;
    }

    private void setDeserialization(Config config) {
        if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA.key())) {
            Config schema = config.getConfig(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA.key());
            this.typeInfo = CatalogTableUtil.buildWithConfig((Config)config).getSeaTunnelRowType();
            MessageFormat format = (MessageFormat)((Object)ReadonlyConfig.fromConfig((Config)config).get(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT));
            switch (format) {
                case JSON: {
                    this.deserializationSchema = new JsonDeserializationSchema(false, false, this.typeInfo);
                    break;
                }
                case TEXT: {
                    String delimiter = ",";
                    if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER.key())) {
                        delimiter = config.getString(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER.key());
                    }
                    this.deserializationSchema = TextDeserializationSchema.builder().seaTunnelRowType(this.typeInfo).delimiter(delimiter).build();
                    break;
                }
                case CANAL_JSON: {
                    this.deserializationSchema = CanalJsonDeserializationSchema.builder(this.typeInfo).setIgnoreParseErrors(true).build();
                    break;
                }
                case COMPATIBLE_KAFKA_CONNECT_JSON: {
                    this.deserializationSchema = new CompatibleKafkaConnectDeserializationSchema(this.typeInfo, config, false, false);
                    break;
                }
                case DEBEZIUM_JSON: {
                    boolean includeSchema = (Boolean)org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue();
                    if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) {
                        includeSchema = config.getBoolean(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA.key());
                    }
                    this.deserializationSchema = new DebeziumJsonDeserializationSchema(this.typeInfo, true, includeSchema);
                    break;
                }
                default: {
                    throw new SeaTunnelJsonFormatException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + (Object)((Object)format));
                }
            }
        } else {
            this.typeInfo = CatalogTableUtil.buildSimpleTextSchema();
            this.deserializationSchema = TextDeserializationSchema.builder().seaTunnelRowType(this.typeInfo).delimiter("\b").build();
        }
    }
}

