/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.ConsumerMetadata;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaConsumerThread;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
import org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSourceReader
implements SourceReader<SeaTunnelRow, KafkaSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(KafkaSourceReader.class);
    private static final long THREAD_WAIT_TIME = 500L;
    private static final long POLL_TIMEOUT = 10000L;
    private final SourceReader.Context context;
    private final ConsumerMetadata metadata;
    private final Set<KafkaSourceSplit> sourceSplits;
    private final Map<Long, Map<TopicPartition, Long>> checkpointOffsetMap;
    private final Map<TopicPartition, KafkaConsumerThread> consumerThreadMap;
    private final ExecutorService executorService;
    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private final MessageFormatErrorHandleWay messageFormatErrorHandleWay;
    private final LinkedBlockingQueue<KafkaSourceSplit> pendingPartitionsQueue;
    private volatile boolean running = false;

    KafkaSourceReader(ConsumerMetadata metadata, DeserializationSchema<SeaTunnelRow> deserializationSchema, SourceReader.Context context, MessageFormatErrorHandleWay messageFormatErrorHandleWay) {
        this.metadata = metadata;
        this.context = context;
        this.messageFormatErrorHandleWay = messageFormatErrorHandleWay;
        this.sourceSplits = new HashSet<KafkaSourceSplit>();
        this.deserializationSchema = deserializationSchema;
        this.consumerThreadMap = new ConcurrentHashMap<TopicPartition, KafkaConsumerThread>();
        this.checkpointOffsetMap = new ConcurrentHashMap<Long, Map<TopicPartition, Long>>();
        this.executorService = Executors.newCachedThreadPool(r -> new Thread(r, "Kafka Source Data Consumer"));
        this.pendingPartitionsQueue = new LinkedBlockingQueue();
    }

    public void open() {
    }

    public void close() throws IOException {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        if (!this.running) {
            Thread.sleep(500L);
            return;
        }
        while (this.pendingPartitionsQueue.size() != 0) {
            this.sourceSplits.add(this.pendingPartitionsQueue.poll());
        }
        this.sourceSplits.forEach(sourceSplit -> this.consumerThreadMap.computeIfAbsent(sourceSplit.getTopicPartition(), s -> {
            KafkaConsumerThread thread = new KafkaConsumerThread(this.metadata);
            this.executorService.submit(thread);
            return thread;
        }));
        this.sourceSplits.forEach(sourceSplit -> {
            CompletableFuture completableFuture = new CompletableFuture();
            try {
                this.consumerThreadMap.get(sourceSplit.getTopicPartition()).getTasks().put(consumer -> {
                    try {
                        HashSet partitions = Sets.newHashSet((Object[])new TopicPartition[]{sourceSplit.getTopicPartition()});
                        StringDeserializer stringDeserializer = new StringDeserializer();
                        stringDeserializer.configure((Map<String, ?>)Maps.fromProperties((Properties)this.metadata.getProperties()), false);
                        consumer.assign(partitions);
                        if (sourceSplit.getStartOffset() >= 0L) {
                            consumer.seek(sourceSplit.getTopicPartition(), sourceSplit.getStartOffset());
                        }
                        ConsumerRecords records = consumer.poll(Duration.ofMillis(10000L));
                        for (TopicPartition partition : partitions) {
                            List recordList = records.records(partition);
                            for (ConsumerRecord<byte[], byte[]> consumerRecord : recordList) {
                                try {
                                    if (this.deserializationSchema instanceof CompatibleKafkaConnectDeserializationSchema) {
                                        ((CompatibleKafkaConnectDeserializationSchema)this.deserializationSchema).deserialize(consumerRecord, output);
                                    } else {
                                        this.deserializationSchema.deserialize((byte[])consumerRecord.value(), output);
                                    }
                                }
                                catch (IOException e) {
                                    if (this.messageFormatErrorHandleWay == MessageFormatErrorHandleWay.SKIP) {
                                        log.warn("Deserialize message failed, skip this message, message: {}", (Object)new String(consumerRecord.value()));
                                        continue;
                                    }
                                    throw e;
                                }
                                if (!Boundedness.BOUNDED.equals((Object)this.context.getBoundedness()) || consumerRecord.offset() < sourceSplit.getEndOffset()) continue;
                                break;
                            }
                            long lastOffset = -1L;
                            if (!recordList.isEmpty()) {
                                lastOffset = recordList.get(recordList.size() - 1).offset();
                                sourceSplit.setStartOffset(lastOffset + 1L);
                            }
                            if (lastOffset < sourceSplit.getEndOffset()) continue;
                            sourceSplit.setEndOffset(lastOffset);
                        }
                    }
                    catch (Exception e) {
                        completableFuture.completeExceptionally(e);
                    }
                    completableFuture.complete(null);
                });
            }
            catch (InterruptedException e) {
                throw new KafkaConnectorException((SeaTunnelErrorCode)KafkaConnectorErrorCode.CONSUME_DATA_FAILED, e);
            }
            completableFuture.join();
        });
        if (Boundedness.BOUNDED.equals((Object)this.context.getBoundedness())) {
            this.context.signalNoMoreElement();
        }
    }

    public List<KafkaSourceSplit> snapshotState(long checkpointId) {
        this.checkpointOffsetMap.put(checkpointId, this.sourceSplits.stream().collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, KafkaSourceSplit::getStartOffset)));
        return this.sourceSplits.stream().map(KafkaSourceSplit::copy).collect(Collectors.toList());
    }

    public void addSplits(List<KafkaSourceSplit> splits) {
        this.running = true;
        splits.forEach(s -> {
            try {
                this.pendingPartitionsQueue.put((KafkaSourceSplit)s);
            }
            catch (InterruptedException e) {
                throw new KafkaConnectorException((SeaTunnelErrorCode)KafkaConnectorErrorCode.ADD_SPLIT_CHECKPOINT_FAILED, e);
            }
        });
    }

    public void handleNoMoreSplits() {
        log.info("receive no more splits message, this reader will not add new split.");
    }

    public void notifyCheckpointComplete(long checkpointId) {
        if (!this.checkpointOffsetMap.containsKey(checkpointId)) {
            log.warn("checkpoint {} do not exist or have already been committed.", (Object)checkpointId);
        } else {
            this.checkpointOffsetMap.remove(checkpointId).forEach((topicPartition, offset) -> {
                try {
                    this.consumerThreadMap.get(topicPartition).getTasks().put(consumer -> {
                        if (this.metadata.isCommitOnCheckpoint()) {
                            HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
                            if (offset >= 0L) {
                                offsets.put((TopicPartition)topicPartition, new OffsetAndMetadata((long)offset));
                                consumer.commitSync(offsets);
                            }
                        }
                    });
                }
                catch (InterruptedException e) {
                    log.error("commit offset to kafka failed", (Throwable)e);
                }
            });
        }
    }
}

