/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.trident;

import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import org.apache.storm.kafka.DynamicPartitionConnections;
import org.apache.storm.kafka.FailedFetchException;
import org.apache.storm.kafka.KafkaUtils;
import org.apache.storm.kafka.MessageMetadataSchemeAsMultiScheme;
import org.apache.storm.kafka.Partition;
import org.apache.storm.kafka.PartitionManager;
import org.apache.storm.kafka.TopicOffsetOutOfRangeException;
import org.apache.storm.kafka.trident.GlobalPartitionInformation;
import org.apache.storm.kafka.trident.MaxMetric;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import org.apache.storm.metric.api.CombinedMetric;
import org.apache.storm.metric.api.ICombiner;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.IReducer;
import org.apache.storm.metric.api.MeanReducer;
import org.apache.storm.metric.api.ReducedMetric;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.apache.storm.trident.spout.IPartitionedTridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TridentKafkaEmitter {
    private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class);
    private DynamicPartitionConnections _connections;
    private String _topologyName;
    private KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;
    private ReducedMetric _kafkaMeanFetchLatencyMetric;
    private CombinedMetric _kafkaMaxFetchLatencyMetric;
    private TridentKafkaConfig _config;
    private String _topologyInstanceId;

    public TridentKafkaEmitter(Map conf, TopologyContext context, TridentKafkaConfig config, String topologyInstanceId) {
        this._config = config;
        this._topologyInstanceId = topologyInstanceId;
        this._connections = new DynamicPartitionConnections(this._config, KafkaUtils.makeBrokerReader(conf, this._config));
        this._topologyName = (String)conf.get("topology.name");
        this._kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(this._connections);
        context.registerMetric("kafkaOffset", (IMetric)this._kafkaOffsetMetric, this._config.metricsTimeBucketSizeInSecs);
        this._kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", (IReducer)new MeanReducer(), this._config.metricsTimeBucketSizeInSecs);
        this._kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", (ICombiner)new MaxMetric(), this._config.metricsTimeBucketSizeInSecs);
    }

    private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
        SimpleConsumer consumer = this._connections.register(partition);
        Map ret = this.doEmitNewPartitionBatch(consumer, partition, collector, lastMeta, attempt);
        Long offset = (Long)ret.get("offset");
        Long endOffset = (Long)ret.get("nextOffset");
        this._kafkaOffsetMetric.setOffsetData(partition, new PartitionManager.OffsetData(endOffset, offset));
        return ret;
    }

    private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
        try {
            return this.failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);
        }
        catch (FailedFetchException e) {
            LOG.warn("Failed to fetch from partition " + partition);
            if (lastMeta == null) {
                return null;
            }
            HashMap ret = new HashMap();
            ret.put("offset", lastMeta.get("nextOffset"));
            ret.put("nextOffset", lastMeta.get("nextOffset"));
            ret.put("partition", partition.partition);
            ret.put("broker", ImmutableMap.of((Object)"host", (Object)partition.host.host, (Object)"port", (Object)partition.host.port));
            ret.put("topic", partition.topic);
            ret.put("topology", ImmutableMap.of((Object)"name", (Object)this._topologyName, (Object)"id", (Object)this._topologyInstanceId));
            return ret;
        }
    }

    private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition, TridentCollector collector, Map lastMeta, TransactionAttempt attempt) {
        long offset;
        LOG.debug("Emitting new partition batch - [transaction = {}], [lastMeta = {}]", (Object)attempt, (Object)lastMeta);
        if (lastMeta != null) {
            String lastInstanceId = null;
            Map lastTopoMeta = (Map)lastMeta.get("topology");
            if (lastTopoMeta != null) {
                lastInstanceId = (String)lastTopoMeta.get("id");
            }
            offset = this._config.ignoreZkOffsets && !this._topologyInstanceId.equals(lastInstanceId) ? KafkaUtils.getOffset(consumer, partition.topic, partition.partition, this._config.startOffsetTime) : (Long)lastMeta.get("nextOffset");
        } else {
            offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, this._config);
        }
        LOG.debug("[transaction = {}], [OFFSET = {}]", (Object)attempt, (Object)offset);
        ByteBufferMessageSet msgs = null;
        try {
            msgs = this.fetchMessages(consumer, partition, offset);
        }
        catch (TopicOffsetOutOfRangeException e) {
            long newOffset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, OffsetRequest.EarliestTime());
            LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
            offset = newOffset;
            msgs = KafkaUtils.fetchMessages(this._config, consumer, partition, offset);
        }
        long endoffset = offset;
        for (MessageAndOffset msg : msgs) {
            this.emit(collector, msg.message(), partition, msg.offset(), attempt);
            endoffset = msg.nextOffset();
        }
        HashMap<String, Object> newMeta = new HashMap<String, Object>();
        newMeta.put("offset", offset);
        newMeta.put("nextOffset", endoffset);
        newMeta.put("instanceId", this._topologyInstanceId);
        newMeta.put("partition", partition.partition);
        newMeta.put("broker", ImmutableMap.of((Object)"host", (Object)partition.host.host, (Object)"port", (Object)partition.host.port));
        newMeta.put("topic", partition.topic);
        newMeta.put("topology", ImmutableMap.of((Object)"name", (Object)this._topologyName, (Object)"id", (Object)this._topologyInstanceId));
        LOG.debug("[transaction = {}], [newMeta = {}]", (Object)attempt, newMeta);
        return newMeta;
    }

    private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
        long start = System.nanoTime();
        ByteBufferMessageSet msgs = null;
        msgs = KafkaUtils.fetchMessages(this._config, consumer, partition, offset);
        long end = System.nanoTime();
        long millis = (end - start) / 1000000L;
        this._kafkaMeanFetchLatencyMetric.update((Object)millis);
        this._kafkaMaxFetchLatencyMetric.update((Object)millis);
        return msgs;
    }

    private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
        LOG.info("re-emitting batch, attempt " + attempt);
        String instanceId = (String)meta.get("instanceId");
        if (!this._config.ignoreZkOffsets || instanceId.equals(this._topologyInstanceId)) {
            SimpleConsumer consumer = this._connections.register(partition);
            long offset = (Long)meta.get("offset");
            long nextOffset = (Long)meta.get("nextOffset");
            ByteBufferMessageSet msgs = null;
            msgs = this.fetchMessages(consumer, partition, offset);
            if (msgs != null) {
                for (MessageAndOffset msg : msgs) {
                    if (offset == nextOffset) break;
                    if (offset > nextOffset) {
                        throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
                    }
                    this.emit(collector, msg.message(), partition, msg.offset(), attempt);
                    offset = msg.nextOffset();
                }
            }
        }
    }

    private void emit(TridentCollector collector, Message msg, Partition partition, long offset, TransactionAttempt attempt) {
        Iterable<List<Object>> values = this._config.scheme instanceof MessageMetadataSchemeAsMultiScheme ? KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme)this._config.scheme, msg, partition, offset) : KafkaUtils.generateTuples(this._config, msg, partition.topic);
        if (values != null) {
            for (List<Object> value : values) {
                LOG.debug("Emitting: [Transaction: {}], [Data: {}]", (Object)attempt, value);
                collector.emit(value);
            }
        } else {
            LOG.debug("NOT Emitting NULL data. [Transaction: {}]", (Object)attempt);
        }
    }

    private void clear() {
        this._connections.clear();
    }

    private List<Partition> orderPartitions(List<GlobalPartitionInformation> partitions) {
        ArrayList<Partition> part = new ArrayList<Partition>();
        for (GlobalPartitionInformation globalPartitionInformation : partitions) {
            part.addAll(globalPartitionInformation.getOrderedPartitions());
        }
        return part;
    }

    private void refresh(List<Partition> list) {
        this._connections.clear();
        this._kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));
    }

    public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> asOpaqueEmitter() {
        return new IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>(){

            public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
                return TridentKafkaEmitter.this.emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
            }

            public void refreshPartitions(List<Partition> partitions) {
                TridentKafkaEmitter.this.refresh(partitions);
            }

            public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) {
                return TridentKafkaEmitter.this.orderPartitions(partitionInformation);
            }

            public List<Partition> getPartitionsForTask(int taskId, int numTasks, List<GlobalPartitionInformation> allPartitionInfo) {
                List<Partition> orderedPartitions = this.getOrderedPartitions(allPartitionInfo);
                ArrayList<Partition> taskPartitions = new ArrayList<Partition>(orderedPartitions == null ? 0 : orderedPartitions.size());
                if (orderedPartitions != null) {
                    for (int i = taskId; i < orderedPartitions.size(); i += numTasks) {
                        taskPartitions.add(orderedPartitions.get(i));
                    }
                }
                return taskPartitions;
            }

            public void close() {
                TridentKafkaEmitter.this.clear();
            }
        };
    }

    public IPartitionedTridentSpout.Emitter asTransactionalEmitter() {
        return new IPartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>(){

            public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
                return TridentKafkaEmitter.this.failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
            }

            public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
                TridentKafkaEmitter.this.reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
            }

            public void refreshPartitions(List<Partition> partitions) {
                TridentKafkaEmitter.this.refresh(partitions);
            }

            public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) {
                return TridentKafkaEmitter.this.orderPartitions(partitionInformation);
            }

            public void close() {
                TridentKafkaEmitter.this.clear();
            }
        };
    }
}

