/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.ConsumerMetadata;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqConsumerThread;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceSplit;
import org.apache.seatunnel.shade.com.google.common.collect.Maps;
import org.apache.seatunnel.shade.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketMqSourceReader
implements SourceReader<SeaTunnelRow, RocketMqSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(RocketMqSourceReader.class);
    private static final long THREAD_WAIT_TIME = 500L;
    private final SourceReader.Context context;
    private final ConsumerMetadata metadata;
    private final Set<RocketMqSourceSplit> sourceSplits;
    private final Map<Long, Map<MessageQueue, Long>> checkpointOffsets;
    private final Map<MessageQueue, RocketMqConsumerThread> consumerThreads;
    private final ExecutorService executorService;
    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private final LinkedBlockingQueue<RocketMqSourceSplit> pendingPartitionsQueue;
    private volatile boolean running = false;

    public RocketMqSourceReader(ConsumerMetadata metadata, DeserializationSchema<SeaTunnelRow> deserializationSchema, SourceReader.Context context) {
        this.metadata = metadata;
        this.context = context;
        this.sourceSplits = new HashSet<RocketMqSourceSplit>();
        this.deserializationSchema = deserializationSchema;
        this.consumerThreads = new ConcurrentHashMap<MessageQueue, RocketMqConsumerThread>();
        this.checkpointOffsets = new ConcurrentHashMap<Long, Map<MessageQueue, Long>>();
        this.executorService = Executors.newCachedThreadPool(r -> new Thread(r, "RocketMq Source Data Consumer"));
        this.pendingPartitionsQueue = new LinkedBlockingQueue();
    }

    public void open() throws Exception {
    }

    public void close() throws IOException {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        if (!this.running) {
            Thread.sleep(500L);
            return;
        }
        while (this.pendingPartitionsQueue.size() != 0) {
            this.sourceSplits.add(this.pendingPartitionsQueue.poll());
        }
        this.sourceSplits.forEach(sourceSplit -> this.consumerThreads.computeIfAbsent(sourceSplit.getMessageQueue(), s -> {
            RocketMqConsumerThread thread = new RocketMqConsumerThread(this.metadata);
            this.executorService.submit(thread);
            return thread;
        }));
        this.sourceSplits.forEach(sourceSplit -> {
            CompletableFuture completableFuture = new CompletableFuture();
            try {
                this.consumerThreads.get(sourceSplit.getMessageQueue()).getTasks().put(consumer -> {
                    try {
                        List<MessageExt> records;
                        HashSet messageQueues = Sets.newHashSet((Object[])new MessageQueue[]{sourceSplit.getMessageQueue()});
                        consumer.assign(messageQueues);
                        if (sourceSplit.getStartOffset() >= 0L) {
                            consumer.seek(sourceSplit.getMessageQueue(), sourceSplit.getStartOffset());
                        }
                        if ((records = consumer.poll(this.metadata.getBaseConfig().getPollTimeoutMillis())).isEmpty()) {
                            log.warn("Rocketmq consumer can not pull data, split {}, start offset {}, end offset {}", new Object[]{sourceSplit.getMessageQueue(), sourceSplit.getStartOffset(), sourceSplit.getEndOffset()});
                        }
                        Map<MessageQueue, List<MessageExt>> groupRecords = records.stream().collect(Collectors.groupingBy(record -> new MessageQueue(record.getTopic(), record.getBrokerName(), record.getQueueId())));
                        for (MessageQueue messageQueue : messageQueues) {
                            if (!groupRecords.containsKey(messageQueue)) continue;
                            List<MessageExt> messages = groupRecords.get(messageQueue);
                            for (MessageExt record2 : messages) {
                                this.deserializationSchema.deserialize(record2.getBody(), output);
                                if (!Boundedness.BOUNDED.equals((Object)this.context.getBoundedness()) || record2.getQueueOffset() < sourceSplit.getEndOffset()) continue;
                                break;
                            }
                            long lastOffset = -1L;
                            if (!messages.isEmpty()) {
                                lastOffset = messages.get(messages.size() - 1).getQueueOffset();
                                sourceSplit.setStartOffset(lastOffset);
                            }
                            if (lastOffset < sourceSplit.getEndOffset()) continue;
                            sourceSplit.setEndOffset(lastOffset);
                        }
                    }
                    catch (Exception e) {
                        completableFuture.completeExceptionally(e);
                    }
                    completableFuture.complete(null);
                });
            }
            catch (InterruptedException e) {
                throw new RocketMqConnectorException((SeaTunnelErrorCode)RocketMqConnectorErrorCode.CONSUME_DATA_FAILED, e);
            }
            completableFuture.join();
        });
        if (Boundedness.BOUNDED.equals((Object)this.context.getBoundedness())) {
            this.context.signalNoMoreElement();
        }
    }

    public List<RocketMqSourceSplit> snapshotState(long checkpointId) throws Exception {
        List<RocketMqSourceSplit> pendingSplit = this.sourceSplits.stream().map(RocketMqSourceSplit::copy).collect(Collectors.toList());
        Map offsets = this.checkpointOffsets.computeIfAbsent(checkpointId, id -> Maps.newConcurrentMap());
        for (RocketMqSourceSplit split : pendingSplit) {
            offsets.put(split.getMessageQueue(), split.getStartOffset());
        }
        return pendingSplit;
    }

    public void addSplits(List<RocketMqSourceSplit> splits) {
        this.running = true;
        splits.forEach(s -> {
            try {
                this.pendingPartitionsQueue.put((RocketMqSourceSplit)s);
            }
            catch (InterruptedException e) {
                throw new RocketMqConnectorException((SeaTunnelErrorCode)RocketMqConnectorErrorCode.ADD_SPLIT_CHECKPOINT_FAILED, e);
            }
        });
    }

    public void handleNoMoreSplits() {
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (!this.checkpointOffsets.containsKey(checkpointId)) {
            log.warn("checkpoint {} do not exist or have already been committed.", (Object)checkpointId);
        } else {
            Map<MessageQueue, Long> messageQueueOffset = this.checkpointOffsets.remove(checkpointId);
            for (Map.Entry<MessageQueue, Long> entry : messageQueueOffset.entrySet()) {
                MessageQueue messageQueue = entry.getKey();
                Long offset = entry.getValue();
                try {
                    if (messageQueue == null || offset == null) continue;
                    this.consumerThreads.get(messageQueue).getTasks().put(consumer -> {
                        if (this.metadata.isEnabledCommitCheckpoint()) {
                            consumer.getOffsetStore().updateOffset(messageQueue, offset, false);
                            consumer.getOffsetStore().persist(messageQueue);
                        }
                    });
                }
                catch (InterruptedException e) {
                    log.error("commit offset failed", (Throwable)e);
                }
            }
        }
    }
}

