/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;

public class PulsarKafkaProducer<K, V>
implements Producer<K, V> {
    private final PulsarClient client;
    private final ProducerBuilder<byte[]> pulsarProducerBuilder;
    private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<String, org.apache.pulsar.client.api.Producer<byte[]>>();
    private final Serializer<K> keySerializer;
    private final Serializer<V> valueSerializer;
    private final Partitioner partitioner;
    private volatile Cluster cluster = Cluster.empty();

    public PulsarKafkaProducer(Map<String, Object> configs) {
        this(configs, null, null);
    }

    public PulsarKafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(configs, new Properties(), keySerializer, valueSerializer);
    }

    public PulsarKafkaProducer(Properties properties) {
        this(properties, (Serializer<K>)null, (Serializer<V>)null);
    }

    public PulsarKafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(new HashMap<String, Object>(), properties, keySerializer, valueSerializer);
    }

    private PulsarKafkaProducer(Map<String, Object> conf, Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        properties.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(k, v) -> conf.put((String)k, v)));
        ProducerConfig producerConfig = new ProducerConfig(conf);
        if (keySerializer == null) {
            this.keySerializer = (Serializer)producerConfig.getConfiguredInstance("key.serializer", Serializer.class);
            this.keySerializer.configure(producerConfig.originals(), true);
        } else {
            this.keySerializer = keySerializer;
            producerConfig.ignore("key.serializer");
        }
        if (valueSerializer == null) {
            this.valueSerializer = (Serializer)producerConfig.getConfiguredInstance("value.serializer", Serializer.class);
            this.valueSerializer.configure(producerConfig.originals(), true);
        } else {
            this.valueSerializer = valueSerializer;
            producerConfig.ignore("value.serializer");
        }
        this.partitioner = (Partitioner)producerConfig.getConfiguredInstance("partitioner.class", Partitioner.class);
        this.partitioner.configure(producerConfig.originals());
        String serviceUrl = (String)producerConfig.getList("bootstrap.servers").get(0);
        try {
            this.client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).build();
        }
        catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
        this.pulsarProducerBuilder = PulsarProducerKafkaConfig.getProducerBuilder(this.client, properties);
        long lingerMs = Long.parseLong(properties.getProperty("linger.ms", "1"));
        this.pulsarProducerBuilder.batchingMaxPublishDelay(lingerMs, TimeUnit.MILLISECONDS);
        String compressionType = properties.getProperty("compression.type");
        if ("gzip".equals(compressionType)) {
            this.pulsarProducerBuilder.compressionType(CompressionType.ZLIB);
        } else if ("lz4".equals(compressionType)) {
            this.pulsarProducerBuilder.compressionType(CompressionType.LZ4);
        }
        this.pulsarProducerBuilder.messageRouter((MessageRouter)new KafkaMessageRouter(lingerMs));
        int sendTimeoutMillis = Integer.parseInt(properties.getProperty("max.block.ms", "60000"));
        this.pulsarProducerBuilder.sendTimeout(sendTimeoutMillis, TimeUnit.MILLISECONDS);
        boolean blockOnBufferFull = Boolean.parseBoolean(properties.getProperty("block.on.buffer.full", "false"));
        boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0 || blockOnBufferFull;
        this.pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.send(record, null);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        org.apache.pulsar.client.api.Producer producer;
        try {
            producer = this.producers.computeIfAbsent(record.topic(), topic -> this.createNewProducer((String)topic));
        }
        catch (Exception e) {
            if (callback != null) {
                callback.onCompletion(null, e);
            }
            CompletableFuture<RecordMetadata> future = new CompletableFuture<RecordMetadata>();
            future.completeExceptionally(e);
            return future;
        }
        TypedMessageBuilder messageBuilder = producer.newMessage();
        int messageSize = this.buildMessage((TypedMessageBuilder<byte[]>)messageBuilder, record);
        CompletableFuture<RecordMetadata> future = new CompletableFuture<RecordMetadata>();
        ((CompletableFuture)messageBuilder.sendAsync().thenAccept(messageId -> future.complete(this.getRecordMetadata(record.topic(), (TypedMessageBuilder<byte[]>)messageBuilder, (MessageId)messageId, messageSize)))).exceptionally(ex -> {
            future.completeExceptionally((Throwable)ex);
            return null;
        });
        future.handle((recordMetadata, throwable) -> {
            if (callback != null) {
                Exception exception = throwable != null ? new Exception((Throwable)throwable) : null;
                callback.onCompletion(recordMetadata, exception);
            }
            return null;
        });
        return future;
    }

    public void flush() {
        this.producers.values().stream().map(p -> p.flushAsync()).collect(Collectors.toList()).forEach(CompletableFuture::join);
    }

    public List<PartitionInfo> partitionsFor(String topic) {
        throw new UnsupportedOperationException();
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.emptyMap();
    }

    public void close() {
        this.close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        this.partitioner.close();
    }

    public void close(long timeout, TimeUnit unit) {
        try {
            this.client.closeAsync().get(timeout, unit);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
        try {
            this.cluster = this.cluster.withPartitions(this.readPartitionsInfo(topic));
            return this.pulsarProducerBuilder.clone().topic(topic).create();
        }
        catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }

    private Map<TopicPartition, PartitionInfo> readPartitionsInfo(String topic) {
        List partitions = (List)this.client.getPartitionsForTopic(topic).join();
        HashMap<TopicPartition, PartitionInfo> partitionsInfo = new HashMap<TopicPartition, PartitionInfo>();
        for (int i = 0; i < partitions.size(); ++i) {
            TopicPartition tp = new TopicPartition(topic, i);
            PartitionInfo pi = new PartitionInfo(topic, i, null, null, null);
            partitionsInfo.put(tp, pi);
        }
        return partitionsInfo;
    }

    private int buildMessage(TypedMessageBuilder<byte[]> builder, ProducerRecord<K, V> record) {
        byte[] keyBytes = null;
        if (record.key() != null) {
            String key = this.getKey(record.topic(), record.key());
            keyBytes = key.getBytes(StandardCharsets.UTF_8);
            builder.key(key);
        }
        if (record.timestamp() != null) {
            builder.eventTime(record.timestamp().longValue());
        }
        byte[] value = this.valueSerializer.serialize(record.topic(), record.value());
        builder.value((Object)value);
        if (record.partition() != null) {
            builder.property("pulsar.partition.id", record.partition().toString());
        } else {
            int partition = this.partitioner.partition(record.topic(), record.key(), keyBytes, record.value(), value, this.cluster);
            builder.property("pulsar.partition.id", Integer.toString(partition));
        }
        return value.length;
    }

    private String getKey(String topic, K key) {
        if (this.keySerializer instanceof StringSerializer) {
            return (String)key;
        }
        byte[] keyBytes = this.keySerializer.serialize(topic, key);
        return Base64.getEncoder().encodeToString(keyBytes);
    }

    private RecordMetadata getRecordMetadata(String topic, TypedMessageBuilder<byte[]> msgBuilder, MessageId messageId, int size) {
        MessageIdImpl msgId = (MessageIdImpl)messageId;
        long offset = MessageIdUtils.getOffset((MessageId)msgId);
        int partition = msgId.getPartitionIndex();
        TopicPartition tp = new TopicPartition(topic, partition);
        TypedMessageBuilderImpl mb = (TypedMessageBuilderImpl)msgBuilder;
        return new RecordMetadata(tp, offset, 0L, mb.getPublishTime(), 0L, mb.hasKey() ? mb.getKey().length() : 0, size);
    }
}

