/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.internals.DelayedTask;
import org.apache.kafka.clients.consumer.internals.DelayedTaskQueue;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.SendFailedException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.utils.Time;

public class ConsumerNetworkClient
implements Closeable {
    private final KafkaClient client;
    private final AtomicBoolean wakeup = new AtomicBoolean(false);
    private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
    private final Map<Node, List<ClientRequest>> unsent = new HashMap<Node, List<ClientRequest>>();
    private final Metadata metadata;
    private final Time time;
    private final long retryBackoffMs;
    private boolean wakeupsEnabled = true;

    public ConsumerNetworkClient(KafkaClient client, Metadata metadata, Time time, long retryBackoffMs) {
        this.client = client;
        this.metadata = metadata;
        this.time = time;
        this.retryBackoffMs = retryBackoffMs;
    }

    public void schedule(DelayedTask task, long at) {
        this.delayedTasks.add(task, at);
    }

    public void unschedule(DelayedTask task) {
        this.delayedTasks.remove(task);
    }

    public RequestFuture<ClientResponse> send(Node node, ApiKeys api, AbstractRequest request) {
        long now = this.time.milliseconds();
        RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
        RequestHeader header = this.client.nextRequestHeader(api);
        RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
        this.put(node, new ClientRequest(now, true, send, future));
        return future;
    }

    private void put(Node node, ClientRequest request) {
        List<ClientRequest> nodeUnsent = this.unsent.get(node);
        if (nodeUnsent == null) {
            nodeUnsent = new ArrayList<ClientRequest>();
            this.unsent.put(node, nodeUnsent);
        }
        nodeUnsent.add(request);
    }

    public Node leastLoadedNode() {
        return this.client.leastLoadedNode(this.time.milliseconds());
    }

    public void awaitMetadataUpdate() {
        int version = this.metadata.requestUpdate();
        do {
            this.poll(Long.MAX_VALUE);
        } while (this.metadata.version() == version);
    }

    public void ensureFreshMetadata() {
        if (this.metadata.timeToNextUpdate(this.time.milliseconds()) == 0L) {
            this.awaitMetadataUpdate();
        }
    }

    public void wakeup() {
        if (this.wakeupsEnabled) {
            this.wakeup.set(true);
            this.client.wakeup();
        }
    }

    public void poll(RequestFuture<?> future) {
        while (!future.isDone()) {
            this.poll(Long.MAX_VALUE);
        }
    }

    public boolean poll(RequestFuture<?> future, long timeout) {
        long begin = this.time.milliseconds();
        long remaining = timeout;
        long now = begin;
        do {
            this.poll(remaining, now);
            now = this.time.milliseconds();
            long elapsed = now - begin;
            remaining = timeout - elapsed;
        } while (!future.isDone() && remaining > 0L);
        return future.isDone();
    }

    public void poll(long timeout) {
        this.poll(timeout, this.time.milliseconds());
    }

    private void poll(long timeout, long now) {
        this.trySend(now);
        timeout = Math.min(timeout, this.delayedTasks.nextTimeout(now));
        this.clientPoll(timeout, now);
        now = this.time.milliseconds();
        this.checkDisconnects(now);
        this.delayedTasks.poll(now);
        this.trySend(now);
        this.failUnsentRequests();
    }

    public void awaitPendingRequests(Node node) {
        while (this.pendingRequestCount(node) > 0) {
            this.poll(this.retryBackoffMs);
        }
    }

    public int pendingRequestCount(Node node) {
        List<ClientRequest> pending = this.unsent.get(node);
        int unsentCount = pending == null ? 0 : pending.size();
        return unsentCount + this.client.inFlightRequestCount(node.idString());
    }

    public int pendingRequestCount() {
        int total = 0;
        for (List<ClientRequest> requests : this.unsent.values()) {
            total += requests.size();
        }
        return total + this.client.inFlightRequestCount();
    }

    private void checkDisconnects(long now) {
        Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = this.unsent.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
            Node node = requestEntry.getKey();
            if (!this.client.connectionFailed(node)) continue;
            for (ClientRequest request : requestEntry.getValue()) {
                RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler)request.callback();
                handler.complete(new ClientResponse(request, now, true, null));
            }
            iterator.remove();
        }
    }

    private void failUnsentRequests() {
        for (Map.Entry<Node, List<ClientRequest>> requestEntry : this.unsent.entrySet()) {
            Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
            while (iterator.hasNext()) {
                ClientRequest request = iterator.next();
                RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler)request.callback();
                handler.raise(SendFailedException.INSTANCE);
                iterator.remove();
            }
        }
        this.unsent.clear();
    }

    private boolean trySend(long now) {
        boolean requestsSent = false;
        for (Map.Entry<Node, List<ClientRequest>> requestEntry : this.unsent.entrySet()) {
            Node node = requestEntry.getKey();
            Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
            while (iterator.hasNext()) {
                ClientRequest request = iterator.next();
                if (!this.client.ready(node, now)) continue;
                this.client.send(request, now);
                iterator.remove();
                requestsSent = true;
            }
        }
        return requestsSent;
    }

    private void clientPoll(long timeout, long now) {
        this.client.poll(timeout, now);
        if (this.wakeup.get()) {
            this.failUnsentRequests();
            this.wakeup.set(false);
            throw new WakeupException();
        }
    }

    public void disableWakeups() {
        this.wakeup.set(false);
        this.wakeupsEnabled = false;
    }

    public void enableWakeups() {
        this.wakeupsEnabled = true;
    }

    @Override
    public void close() throws IOException {
        this.client.close();
    }

    public boolean connectionFailed(Node node) {
        return this.client.connectionFailed(node);
    }

    public void tryConnect(Node node) {
        this.client.ready(node, this.time.milliseconds());
    }

    public static class RequestFutureCompletionHandler
    extends RequestFuture<ClientResponse>
    implements RequestCompletionHandler {
        @Override
        public void onComplete(ClientResponse response) {
            this.complete(response);
        }
    }
}

