/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer;

import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarKafkaConsumer<K, V>
implements Consumer<K, V>,
MessageListener<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(PulsarKafkaConsumer.class);
    private static final long serialVersionUID = 1L;
    private final PulsarClient client;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final String groupId;
    private final boolean isAutoCommit;
    private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer<byte[]>> consumers = new ConcurrentHashMap<TopicPartition, org.apache.pulsar.client.api.Consumer<byte[]>>();
    private final Map<TopicPartition, Long> lastReceivedOffset = new ConcurrentHashMap<TopicPartition, Long>();
    private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset = new ConcurrentHashMap<TopicPartition, OffsetAndMetadata>();
    private final Set<TopicPartition> unpolledPartitions = new HashSet<TopicPartition>();
    private final SubscriptionInitialPosition strategy;
    private volatile boolean closed = false;
    private final Properties properties;
    private final BlockingQueue<QueueItem> receivedMessages = new ArrayBlockingQueue<QueueItem>(1000);
    private static final int MAX_RECORDS_IN_SINGLE_POLL = 1000;

    public PulsarKafkaConsumer(Map<String, Object> configs) {
        this(configs, null, null);
    }

    public PulsarKafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
    }

    public PulsarKafkaConsumer(Properties properties) {
        this(properties, (Deserializer<K>)null, (Deserializer<V>)null);
    }

    public PulsarKafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(new ConsumerConfig((Map)ConsumerConfig.addDeserializerToConfig((Properties)properties, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
    }

    private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        if (keyDeserializer == null) {
            this.keyDeserializer = (Deserializer)config.getConfiguredInstance("key.deserializer", Deserializer.class);
            this.keyDeserializer.configure(config.originals(), true);
        } else {
            this.keyDeserializer = keyDeserializer;
            config.ignore("key.deserializer");
        }
        if (valueDeserializer == null) {
            this.valueDeserializer = (Deserializer)config.getConfiguredInstance("value.deserializer", Deserializer.class);
            this.valueDeserializer.configure(config.originals(), true);
        } else {
            this.valueDeserializer = valueDeserializer;
            config.ignore("value.deserializer");
        }
        this.groupId = config.getString("group.id");
        this.isAutoCommit = config.getBoolean("enable.auto.commit");
        this.strategy = this.getStrategy(config.getString("auto.offset.reset"));
        log.info("Offset reset strategy has been assigned value {}", (Object)this.strategy);
        String serviceUrl = (String)config.getList("bootstrap.servers").get(0);
        this.properties = new Properties();
        config.originals().forEach((k, v) -> this.properties.put(k, v));
        ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(this.properties);
        clientBuilder.enableTcpNoDelay(false);
        try {
            this.client = clientBuilder.serviceUrl(serviceUrl).build();
        }
        catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }

    private SubscriptionInitialPosition getStrategy(String strategy) {
        switch (strategy) {
            case "earliest": {
                return SubscriptionInitialPosition.Earliest;
            }
        }
        return SubscriptionInitialPosition.Latest;
    }

    public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> msg) {
        block2: {
            try {
                this.receivedMessages.put(new QueueItem(consumer, msg));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (this.closed) break block2;
                throw new RuntimeException(e);
            }
        }
    }

    public Set<TopicPartition> assignment() {
        throw new UnsupportedOperationException("Cannot access the partitions assignements");
    }

    public Set<String> subscription() {
        return this.consumers.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
    }

    public void subscribe(Collection<String> topics) {
        this.subscribe(topics, null);
    }

    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
        ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>();
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        try {
            for (String topic : topics) {
                int numberOfPartitions = (Integer)((PulsarClientImpl)this.client).getNumberOfPartitions(topic).get();
                ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(this.client, this.properties);
                consumerBuilder.subscriptionType(SubscriptionType.Failover);
                consumerBuilder.messageListener((MessageListener)this);
                consumerBuilder.subscriptionName(this.groupId);
                if (numberOfPartitions > 1) {
                    consumerBuilder.consumerName(ConsumerName.generateRandomName());
                    int i = 0;
                    while (i < numberOfPartitions) {
                        String partitionName = TopicName.get((String)topic).getPartition(i).toString();
                        CompletableFuture future = consumerBuilder.clone().topic(new String[]{partitionName}).subscribeAsync();
                        int partitionIndex = i++;
                        TopicPartition tp = new TopicPartition(TopicName.get((String)topic).getPartitionedTopicName(), partitionIndex);
                        futures.add(future.thenApply(consumer -> {
                            log.info("Add consumer {} for partition {}", consumer, (Object)tp);
                            this.consumers.putIfAbsent(tp, (org.apache.pulsar.client.api.Consumer<byte[]>)consumer);
                            return consumer;
                        }));
                        topicPartitions.add(tp);
                    }
                    continue;
                }
                CompletableFuture future = consumerBuilder.topic(new String[]{topic}).subscribeAsync();
                TopicPartition tp = new TopicPartition(TopicName.get((String)topic).getPartitionedTopicName(), 0);
                futures.add(future.thenApply(consumer -> {
                    log.info("Add consumer {} for partition {}", consumer, (Object)tp);
                    this.consumers.putIfAbsent(tp, (org.apache.pulsar.client.api.Consumer<byte[]>)consumer);
                    return consumer;
                }));
                topicPartitions.add(tp);
            }
            this.unpolledPartitions.addAll(topicPartitions);
            futures.forEach(CompletableFuture::join);
            if (callback != null) {
                callback.onPartitionsAssigned(topicPartitions);
            }
        }
        catch (Exception e) {
            futures.forEach(f -> {
                try {
                    ((org.apache.pulsar.client.api.Consumer)f.get()).close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            });
            throw new RuntimeException(e);
        }
    }

    public void assign(Collection<TopicPartition> partitions) {
        throw new UnsupportedOperationException("Cannot manually assign partitions");
    }

    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
        throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
    }

    public void unsubscribe() {
        this.consumers.values().forEach(c -> {
            try {
                c.unsubscribe();
            }
            catch (PulsarClientException e) {
                throw new RuntimeException(e);
            }
        });
    }

    public ConsumerRecords<K, V> poll(long timeoutMillis) {
        try {
            QueueItem item = this.receivedMessages.poll(timeoutMillis, TimeUnit.MILLISECONDS);
            if (item == null) {
                return ConsumerRecords.EMPTY;
            }
            HashMap<TopicPartition, List> records = new HashMap<TopicPartition, List>();
            int numberOfRecords = 0;
            while (item != null) {
                TopicName topicName = TopicName.get((String)item.consumer.getTopic());
                String topic = topicName.getPartitionedTopicName();
                int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
                Message<byte[]> msg = item.message;
                MessageIdImpl msgId = (MessageIdImpl)msg.getMessageId();
                long offset = MessageIdUtils.getOffset((MessageId)msgId);
                TopicPartition tp = new TopicPartition(topic, partition);
                if (this.lastReceivedOffset.get(tp) == null && !this.unpolledPartitions.contains(tp)) {
                    log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", (Object)tp);
                    this.resetOffsets(tp);
                }
                K key = this.getKey(topic, msg);
                Object value = this.valueDeserializer.deserialize(topic, msg.getData());
                TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
                long timestamp = msg.getPublishTime();
                if (msg.getEventTime() > 0L) {
                    timestamp = msg.getEventTime();
                    timestampType = TimestampType.CREATE_TIME;
                }
                ConsumerRecord consumerRecord = new ConsumerRecord(topic, partition, offset, timestamp, timestampType, -1L, msg.hasKey() ? msg.getKey().length() : 0, msg.getData().length, key, value);
                records.computeIfAbsent(tp, k -> new ArrayList()).add(consumerRecord);
                this.lastReceivedOffset.put(tp, offset);
                this.unpolledPartitions.remove(tp);
                if (++numberOfRecords >= 1000) break;
                item = this.receivedMessages.poll(0L, TimeUnit.MILLISECONDS);
            }
            if (this.isAutoCommit && !records.isEmpty()) {
                this.commitAsync();
            }
            return new ConsumerRecords(records);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private K getKey(String topic, Message<byte[]> msg) {
        if (!msg.hasKey()) {
            return null;
        }
        if (this.keyDeserializer instanceof StringDeserializer) {
            return (K)msg.getKey();
        }
        byte[] data = Base64.getDecoder().decode(msg.getKey());
        return (K)this.keyDeserializer.deserialize(topic, data);
    }

    public void commitSync() {
        try {
            this.doCommitOffsets(this.getCurrentOffsetsMap()).get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        try {
            this.doCommitOffsets(offsets).get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void commitAsync() {
        this.doCommitOffsets(this.getCurrentOffsetsMap());
    }

    public void commitAsync(OffsetCommitCallback callback) {
        Map<TopicPartition, OffsetAndMetadata> offsets = this.getCurrentOffsetsMap();
        this.doCommitOffsets(offsets).handle((v, throwable) -> {
            callback.onComplete(offsets, throwable != null ? new Exception((Throwable)throwable) : null);
            return null;
        });
    }

    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        this.doCommitOffsets(offsets).handle((v, throwable) -> {
            callback.onComplete(offsets, throwable != null ? new Exception((Throwable)throwable) : null);
            return null;
        });
    }

    private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
        ArrayList futures = new ArrayList();
        offsets.forEach((topicPartition, offsetAndMetadata) -> {
            org.apache.pulsar.client.api.Consumer consumer = (org.apache.pulsar.client.api.Consumer)this.consumers.get(topicPartition);
            this.lastCommittedOffset.put((TopicPartition)topicPartition, (OffsetAndMetadata)offsetAndMetadata);
            futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
        });
        return FutureUtil.waitForAll(futures);
    }

    private Map<TopicPartition, OffsetAndMetadata> getCurrentOffsetsMap() {
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        this.lastReceivedOffset.forEach((topicPartition, offset) -> {
            OffsetAndMetadata om = new OffsetAndMetadata(offset.longValue());
            offsets.put((TopicPartition)topicPartition, om);
        });
        return offsets;
    }

    public void seek(TopicPartition partition, long offset) {
        MessageId msgId = MessageIdUtils.getMessageId(offset);
        org.apache.pulsar.client.api.Consumer c = (org.apache.pulsar.client.api.Consumer)this.consumers.get(partition);
        if (c == null) {
            throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
        }
        try {
            c.seek(msgId);
        }
        catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }

    public void seekToBeginning(Collection<TopicPartition> partitions) {
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        if (partitions.isEmpty()) {
            partitions = this.consumers.keySet();
        }
        this.lastCommittedOffset.clear();
        this.lastReceivedOffset.clear();
        for (TopicPartition tp : partitions) {
            org.apache.pulsar.client.api.Consumer c = (org.apache.pulsar.client.api.Consumer)this.consumers.get(tp);
            if (c == null) {
                futures.add(FutureUtil.failedFuture((Throwable)new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
                continue;
            }
            futures.add(c.seekAsync(MessageId.earliest));
        }
        FutureUtil.waitForAll(futures).join();
    }

    public void seekToEnd(Collection<TopicPartition> partitions) {
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        if (partitions.isEmpty()) {
            partitions = this.consumers.keySet();
        }
        this.lastCommittedOffset.clear();
        this.lastReceivedOffset.clear();
        for (TopicPartition tp : partitions) {
            org.apache.pulsar.client.api.Consumer c = (org.apache.pulsar.client.api.Consumer)this.consumers.get(tp);
            if (c == null) {
                futures.add(FutureUtil.failedFuture((Throwable)new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
                continue;
            }
            futures.add(c.seekAsync(MessageId.latest));
        }
        FutureUtil.waitForAll(futures).join();
    }

    public long position(TopicPartition partition) {
        Long offset = this.lastReceivedOffset.get(partition);
        if (offset == null && !this.unpolledPartitions.contains(partition)) {
            return this.resetOffsets(partition).getValue();
        }
        return this.unpolledPartitions.contains(partition) ? 0L : offset;
    }

    private SubscriptionInitialPosition resetOffsets(TopicPartition partition) {
        log.info("Resetting partition {} and seeking to {} position", (Object)partition, (Object)this.strategy);
        if (this.strategy == SubscriptionInitialPosition.Earliest) {
            this.seekToBeginning(Collections.singleton(partition));
        } else {
            this.seekToEnd(Collections.singleton(partition));
        }
        return this.strategy;
    }

    public OffsetAndMetadata committed(TopicPartition partition) {
        return this.lastCommittedOffset.get(partition);
    }

    public Map<MetricName, ? extends Metric> metrics() {
        throw new UnsupportedOperationException();
    }

    public List<PartitionInfo> partitionsFor(String topic) {
        throw new UnsupportedOperationException();
    }

    public Map<String, List<PartitionInfo>> listTopics() {
        throw new UnsupportedOperationException();
    }

    public Set<TopicPartition> paused() {
        throw new UnsupportedOperationException();
    }

    public void pause(Collection<TopicPartition> partitions) {
        throw new UnsupportedOperationException();
    }

    public void resume(Collection<TopicPartition> partitions) {
        throw new UnsupportedOperationException();
    }

    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
        throw new UnsupportedOperationException();
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
        throw new UnsupportedOperationException();
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
        throw new UnsupportedOperationException();
    }

    public void close() {
        this.close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    public void close(long timeout, TimeUnit unit) {
        try {
            this.closed = true;
            if (this.isAutoCommit) {
                this.commitAsync();
            }
            this.client.closeAsync().get(timeout, unit);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    public void wakeup() {
        throw new UnsupportedOperationException();
    }

    private static class QueueItem {
        final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
        final Message<byte[]> message;

        QueueItem(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> message) {
            this.consumer = consumer;
            this.message = message;
        }
    }
}

