/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import org.apache.storm.kafka.DynamicPartitionConnections;
import org.apache.storm.kafka.FailedFetchException;
import org.apache.storm.kafka.KafkaConfig;
import org.apache.storm.kafka.KafkaError;
import org.apache.storm.kafka.KeyValueSchemeAsMultiScheme;
import org.apache.storm.kafka.MessageMetadataSchemeAsMultiScheme;
import org.apache.storm.kafka.Partition;
import org.apache.storm.kafka.PartitionManager;
import org.apache.storm.kafka.StaticHosts;
import org.apache.storm.kafka.StringMultiSchemeWithTopic;
import org.apache.storm.kafka.TopicOffsetOutOfRangeException;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.GlobalPartitionInformation;
import org.apache.storm.kafka.trident.IBrokerReader;
import org.apache.storm.kafka.trident.StaticBrokerReader;
import org.apache.storm.kafka.trident.ZkBrokerReader;
import org.apache.storm.metric.api.IMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaUtils {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
    private static final int NO_OFFSET = -5;

    public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
        if (conf.hosts instanceof StaticHosts) {
            return new StaticBrokerReader(conf.topic, ((StaticHosts)conf.hosts).getPartitionInformation());
        }
        return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts)conf.hosts);
    }

    public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
        long startOffsetTime = config.startOffsetTime;
        return KafkaUtils.getOffset(consumer, topic, partition, startOffsetTime);
    }

    public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        HashMap<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
        OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
        long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition);
        if (offsets.length > 0) {
            return offsets[0];
        }
        return -5L;
    }

    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) throws TopicOffsetOutOfRangeException, FailedFetchException, RuntimeException {
        FetchResponse fetchResponse;
        ByteBufferMessageSet msgs = null;
        String topic = partition.topic;
        int partitionId = partition.partition;
        FetchRequestBuilder builder = new FetchRequestBuilder();
        FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build();
        try {
            fetchResponse = consumer.fetch(fetchRequest);
        }
        catch (Exception e) {
            if (e instanceof ConnectException || e instanceof SocketTimeoutException || e instanceof IOException || e instanceof UnresolvedAddressException) {
                LOG.warn("Network error when fetching messages:", (Throwable)e);
                throw new FailedFetchException(e);
            }
            throw new RuntimeException(e);
        }
        if (fetchResponse.hasError()) {
            KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
            if (error.equals((Object)KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
                String msg = partition + " Got fetch request with offset out of range: [" + offset + "]";
                LOG.warn(msg);
                throw new TopicOffsetOutOfRangeException(msg);
            }
            String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + (Object)((Object)error) + "]";
            LOG.error(message);
            throw new FailedFetchException(message);
        }
        msgs = fetchResponse.messageSet(topic, partitionId);
        LOG.debug("Messages fetched. [config = {}], [consumer = {}], [partition = {}], [offset = {}], [msgs = {}]", new Object[]{config, consumer, partition, offset, msgs});
        return msgs;
    }

    public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, String topic) {
        ByteBuffer payload = msg.payload();
        if (payload == null) {
            return null;
        }
        ByteBuffer key = msg.key();
        Iterable<List<Object>> tups = key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme ? ((KeyValueSchemeAsMultiScheme)kafkaConfig.scheme).deserializeKeyAndValue(key, payload) : (kafkaConfig.scheme instanceof StringMultiSchemeWithTopic ? ((StringMultiSchemeWithTopic)kafkaConfig.scheme).deserializeWithTopic(topic, payload) : kafkaConfig.scheme.deserialize(payload));
        return tups;
    }

    public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition, long offset) {
        ByteBuffer payload = msg.payload();
        if (payload == null) {
            return null;
        }
        return scheme.deserializeMessageWithMetadata(payload, partition, offset);
    }

    public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) {
        Preconditions.checkArgument((taskIndex < totalTasks ? 1 : 0) != 0, (Object)"task index must be less that total tasks");
        ArrayList<Partition> taskPartitions = new ArrayList<Partition>();
        ArrayList<Partition> partitions = new ArrayList<Partition>();
        for (GlobalPartitionInformation partitionInformation : partitons) {
            partitions.addAll(partitionInformation.getOrderedPartitions());
        }
        int numPartitions = partitions.size();
        if (numPartitions < totalTasks) {
            LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");
        }
        for (int i = taskIndex; i < numPartitions; i += totalTasks) {
            Partition taskPartition = (Partition)partitions.get(i);
            taskPartitions.add(taskPartition);
        }
        KafkaUtils.logPartitionMapping(totalTasks, taskIndex, taskPartitions);
        return taskPartitions;
    }

    private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions) {
        String taskPrefix = KafkaUtils.taskId(taskIndex, totalTasks);
        if (taskPartitions.isEmpty()) {
            LOG.warn(taskPrefix + "no partitions assigned");
        } else {
            LOG.info(taskPrefix + "assigned " + taskPartitions);
        }
    }

    public static String taskId(int taskIndex, int totalTasks) {
        return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] ";
    }

    public static class KafkaOffsetMetric
    implements IMetric {
        Map<Partition, PartitionManager.OffsetData> _partitionToOffset = new HashMap<Partition, PartitionManager.OffsetData>();
        Set<Partition> _partitions;
        DynamicPartitionConnections _connections;

        public KafkaOffsetMetric(DynamicPartitionConnections connections) {
            this._connections = connections;
        }

        public void setOffsetData(Partition partition, PartitionManager.OffsetData offsetData) {
            this._partitionToOffset.put(partition, offsetData);
        }

        public Object getValueAndReset() {
            try {
                HashMap<String, Long> ret = new HashMap<String, Long>();
                if (this._partitions != null && this._partitions.size() == this._partitionToOffset.size()) {
                    TreeMap<String, TopicMetrics> topicMetricsMap = new TreeMap<String, TopicMetrics>();
                    for (Map.Entry<Partition, PartitionManager.OffsetData> entry : this._partitionToOffset.entrySet()) {
                        Partition partition = entry.getKey();
                        SimpleConsumer consumer = this._connections.getConnection(partition);
                        if (consumer == null) {
                            LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
                            return null;
                        }
                        long latestTimeOffset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
                        long earliestTimeOffset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
                        if (latestTimeOffset == -5L) {
                            LOG.warn("No data found in Kafka Partition " + partition.getId());
                            return null;
                        }
                        long latestEmittedOffset = entry.getValue().latestEmittedOffset;
                        long latestCompletedOffset = entry.getValue().latestCompletedOffset;
                        long spoutLag = latestTimeOffset - latestCompletedOffset;
                        String topic = partition.topic;
                        String metricPath = partition.getId();
                        if (!metricPath.startsWith(topic + "/")) {
                            metricPath = topic + "/" + metricPath;
                        }
                        ret.put(metricPath + "/spoutLag", spoutLag);
                        ret.put(metricPath + "/earliestTimeOffset", earliestTimeOffset);
                        ret.put(metricPath + "/latestTimeOffset", latestTimeOffset);
                        ret.put(metricPath + "/latestEmittedOffset", latestEmittedOffset);
                        ret.put(metricPath + "/latestCompletedOffset", latestCompletedOffset);
                        if (!topicMetricsMap.containsKey(partition.topic)) {
                            topicMetricsMap.put(partition.topic, new TopicMetrics());
                        }
                        TopicMetrics topicMetrics = (TopicMetrics)topicMetricsMap.get(partition.topic);
                        topicMetrics.totalSpoutLag += spoutLag;
                        topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
                        topicMetrics.totalLatestTimeOffset += latestTimeOffset;
                        topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
                        topicMetrics.totalLatestCompletedOffset += latestCompletedOffset;
                    }
                    for (Map.Entry<Partition, PartitionManager.OffsetData> entry : topicMetricsMap.entrySet()) {
                        String topic = (String)((Object)entry.getKey());
                        TopicMetrics topicMetrics = (TopicMetrics)((Object)entry.getValue());
                        ret.put(topic + "/totalSpoutLag", topicMetrics.totalSpoutLag);
                        ret.put(topic + "/totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset);
                        ret.put(topic + "/totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset);
                        ret.put(topic + "/totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset);
                        ret.put(topic + "/totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset);
                    }
                    return ret;
                }
                LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
            }
            catch (Throwable t) {
                LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
            }
            return null;
        }

        public void refreshPartitions(Set<Partition> partitions) {
            this._partitions = partitions;
            Iterator<Partition> it = this._partitionToOffset.keySet().iterator();
            while (it.hasNext()) {
                if (partitions.contains(it.next())) continue;
                it.remove();
            }
        }

        private class TopicMetrics {
            long totalSpoutLag = 0L;
            long totalEarliestTimeOffset = 0L;
            long totalLatestTimeOffset = 0L;
            long totalLatestEmittedOffset = 0L;
            long totalLatestCompletedOffset = 0L;

            private TopicMetrics() {
            }
        }
    }
}

