/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.queryablestate.network;

import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.queryablestate.FutureUtils;
import org.apache.flink.queryablestate.network.Client;
import org.apache.flink.queryablestate.network.ClientHandler;
import org.apache.flink.queryablestate.network.ClientHandlerCallback;
import org.apache.flink.queryablestate.network.NettyBufferPool;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class Client<REQ extends MessageBody, RESP extends MessageBody> {
    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
    private final String clientName;
    private final Bootstrap bootstrap;
    private final MessageSerializer<REQ, RESP> messageSerializer;
    private final KvStateRequestStats stats;
    private final Map<InetSocketAddress, EstablishedConnection> establishedConnections = new ConcurrentHashMap<InetSocketAddress, EstablishedConnection>();
    private final Map<InetSocketAddress, PendingConnection> pendingConnections = new ConcurrentHashMap<InetSocketAddress, PendingConnection>();
    private final AtomicReference<CompletableFuture<Void>> clientShutdownFuture = new AtomicReference<Object>(null);

    public Client(String clientName, int numEventLoopThreads, MessageSerializer<REQ, RESP> serializer, KvStateRequestStats stats) {
        Preconditions.checkArgument((numEventLoopThreads >= 1 ? 1 : 0) != 0, (Object)"Non-positive number of event loop threads.");
        this.clientName = (String)Preconditions.checkNotNull((Object)clientName);
        this.messageSerializer = (MessageSerializer)Preconditions.checkNotNull(serializer);
        this.stats = (KvStateRequestStats)Preconditions.checkNotNull((Object)stats);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flink " + clientName + " Event Loop Thread %d").build();
        NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
        NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads);
        this.bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group((EventLoopGroup)nioGroup)).channel(NioSocketChannel.class)).option(ChannelOption.ALLOCATOR, (Object)bufferPool)).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline().addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)}).addLast(new ChannelHandler[]{new ChunkedWriteHandler()});
            }
        });
    }

    public String getClientName() {
        return this.clientName;
    }

    public CompletableFuture<RESP> sendRequest(InetSocketAddress serverAddress, REQ request) {
        if (this.clientShutdownFuture.get() != null) {
            return FutureUtils.getFailedFuture(new IllegalStateException(this.clientName + " is already shut down."));
        }
        EstablishedConnection connection = this.establishedConnections.get(serverAddress);
        if (connection != null) {
            return connection.sendRequest(request);
        }
        PendingConnection pendingConnection = this.pendingConnections.get(serverAddress);
        if (pendingConnection != null) {
            return pendingConnection.sendRequest(request);
        }
        PendingConnection pending = new PendingConnection(serverAddress, this.messageSerializer);
        PendingConnection previous = this.pendingConnections.putIfAbsent(serverAddress, pending);
        if (previous == null) {
            this.bootstrap.connect(serverAddress.getAddress(), serverAddress.getPort()).addListener((GenericFutureListener)pending);
            return pending.sendRequest(request);
        }
        return previous.sendRequest(request);
    }

    public CompletableFuture<Void> shutdown() {
        CompletableFuture<Void> newShutdownFuture = new CompletableFuture<Void>();
        if (this.clientShutdownFuture.compareAndSet(null, newShutdownFuture)) {
            ArrayList<CompletableFuture> connectionFutures = new ArrayList<CompletableFuture>();
            for (Map.Entry<InetSocketAddress, EstablishedConnection> entry : this.establishedConnections.entrySet()) {
                if (!this.establishedConnections.remove(entry.getKey(), entry.getValue())) continue;
                connectionFutures.add(entry.getValue().close());
            }
            for (Map.Entry<InetSocketAddress, Object> entry : this.pendingConnections.entrySet()) {
                if (this.pendingConnections.remove(entry.getKey()) == null) continue;
                connectionFutures.add(((PendingConnection)entry.getValue()).close());
            }
            CompletableFuture.allOf(connectionFutures.toArray(new CompletableFuture[connectionFutures.size()])).whenComplete((result, throwable) -> {
                if (throwable != null) {
                    LOG.warn("Problem while shutting down the connections at the {}: {}", (Object)this.clientName, throwable);
                }
                if (this.bootstrap != null) {
                    EventLoopGroup group = this.bootstrap.group();
                    if (group != null && !group.isShutdown()) {
                        group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS).addListener(finished -> {
                            if (finished.isSuccess()) {
                                newShutdownFuture.complete(null);
                            } else {
                                newShutdownFuture.completeExceptionally(finished.cause());
                            }
                        });
                    } else {
                        newShutdownFuture.complete(null);
                    }
                } else {
                    newShutdownFuture.complete(null);
                }
            });
            return newShutdownFuture;
        }
        return this.clientShutdownFuture.get();
    }

    @VisibleForTesting
    public boolean isEventGroupShutdown() {
        return this.bootstrap == null || this.bootstrap.group().isTerminated();
    }

    private class EstablishedConnection
    implements ClientHandlerCallback<RESP> {
        private final InetSocketAddress serverAddress;
        private final Channel channel;
        private final ConcurrentHashMap<Long, org.apache.flink.queryablestate.network.Client$EstablishedConnection.TimestampedCompletableFuture> pendingRequests = new ConcurrentHashMap();
        private final AtomicLong requestCount = new AtomicLong();
        private final AtomicReference<CompletableFuture<Void>> connectionShutdownFuture = new AtomicReference<Object>(null);

        EstablishedConnection(InetSocketAddress serverAddress, MessageSerializer<REQ, RESP> serializer, Channel channel) {
            this.serverAddress = (InetSocketAddress)Preconditions.checkNotNull((Object)serverAddress);
            this.channel = (Channel)Preconditions.checkNotNull((Object)channel);
            channel.pipeline().addLast(Client.this.getClientName() + " Handler", new ClientHandler(Client.this.clientName, serializer, this));
            Client.this.stats.reportActiveConnection();
        }

        CompletableFuture<Void> close() {
            return this.close(new ClosedChannelException());
        }

        private CompletableFuture<Void> close(Throwable cause) {
            CompletableFuture shutdownFuture = new CompletableFuture();
            if (this.connectionShutdownFuture.compareAndSet(null, shutdownFuture)) {
                this.channel.close().addListener(finished -> {
                    Client.this.stats.reportInactiveConnection();
                    Iterator iterator = ((ConcurrentHashMap.KeySetView)this.pendingRequests.keySet()).iterator();
                    while (iterator.hasNext()) {
                        long requestId = (Long)iterator.next();
                        TimestampedCompletableFuture pending = (TimestampedCompletableFuture)this.pendingRequests.remove(requestId);
                        if (pending == null || !pending.completeExceptionally(cause)) continue;
                        Client.this.stats.reportFailedRequest();
                    }
                    if (finished.isSuccess()) {
                        shutdownFuture.completeExceptionally(cause);
                    } else {
                        LOG.warn("Something went wrong when trying to close connection due to : ", cause);
                        shutdownFuture.completeExceptionally(finished.cause());
                    }
                });
            }
            return this.connectionShutdownFuture.get();
        }

        CompletableFuture<RESP> sendRequest(REQ request) {
            TimestampedCompletableFuture requestPromiseTs = new TimestampedCompletableFuture(System.nanoTime());
            try {
                TimestampedCompletableFuture pending;
                long requestId = this.requestCount.getAndIncrement();
                this.pendingRequests.put(requestId, (org.apache.flink.queryablestate.network.Client$EstablishedConnection.TimestampedCompletableFuture)requestPromiseTs);
                Client.this.stats.reportRequest();
                ByteBuf buf = MessageSerializer.serializeRequest(this.channel.alloc(), requestId, request);
                this.channel.writeAndFlush((Object)buf).addListener((GenericFutureListener)((ChannelFutureListener)future -> {
                    TimestampedCompletableFuture pending;
                    if (!future.isSuccess() && (pending = (TimestampedCompletableFuture)this.pendingRequests.remove(requestId)) != null && pending.completeExceptionally(future.cause())) {
                        Client.this.stats.reportFailedRequest();
                    }
                }));
                CompletableFuture clShutdownFuture = (CompletableFuture)Client.this.clientShutdownFuture.get();
                if (clShutdownFuture != null && (pending = (TimestampedCompletableFuture)this.pendingRequests.remove(requestId)) != null) {
                    clShutdownFuture.whenComplete((ignored, throwable) -> {
                        if (throwable != null && pending.completeExceptionally((Throwable)throwable)) {
                            Client.this.stats.reportFailedRequest();
                        } else {
                            pending.completeExceptionally(new ClosedChannelException());
                        }
                    });
                }
            }
            catch (Throwable t) {
                requestPromiseTs.completeExceptionally(t);
            }
            return requestPromiseTs;
        }

        @Override
        public void onRequestResult(long requestId, RESP response) {
            TimestampedCompletableFuture pending = (TimestampedCompletableFuture)this.pendingRequests.remove(requestId);
            if (pending != null && !pending.isDone()) {
                long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1000000L;
                Client.this.stats.reportSuccessfulRequest(durationMillis);
                pending.complete(response);
            }
        }

        @Override
        public void onRequestFailure(long requestId, Throwable cause) {
            TimestampedCompletableFuture pending = (TimestampedCompletableFuture)this.pendingRequests.remove(requestId);
            if (pending != null && !pending.isDone()) {
                Client.this.stats.reportFailedRequest();
                pending.completeExceptionally(cause);
            }
        }

        @Override
        public void onFailure(Throwable cause) {
            this.close(cause).handle((cancelled, ignored) -> Client.this.establishedConnections.remove(this.serverAddress, this));
        }

        public String toString() {
            return "EstablishedConnection{serverAddress=" + this.serverAddress + ", channel=" + this.channel + ", pendingRequests=" + this.pendingRequests.size() + ", requestCount=" + this.requestCount + '}';
        }

        private class TimestampedCompletableFuture
        extends CompletableFuture<RESP> {
            private final long timestampInNanos;

            TimestampedCompletableFuture(long timestampInNanos) {
                this.timestampInNanos = timestampInNanos;
            }

            public long getTimestamp() {
                return this.timestampInNanos;
            }
        }
    }

    private class PendingConnection
    implements ChannelFutureListener {
        private final Object connectLock = new Object();
        private final InetSocketAddress serverAddress;
        private final MessageSerializer<REQ, RESP> serializer;
        private final ArrayDeque<org.apache.flink.queryablestate.network.Client$PendingConnection.PendingRequest> queuedRequests = new ArrayDeque();
        private EstablishedConnection established;
        private final AtomicReference<CompletableFuture<Void>> connectionShutdownFuture = new AtomicReference<Object>(null);
        private Throwable failureCause;

        private PendingConnection(InetSocketAddress serverAddress, MessageSerializer<REQ, RESP> serializer) {
            this.serverAddress = serverAddress;
            this.serializer = serializer;
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                this.handInChannel(future.channel());
            } else {
                this.close(future.cause());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        CompletableFuture<RESP> sendRequest(REQ request) {
            Object object = this.connectLock;
            synchronized (object) {
                if (this.failureCause != null) {
                    return FutureUtils.getFailedFuture(this.failureCause);
                }
                if (this.connectionShutdownFuture.get() != null) {
                    return FutureUtils.getFailedFuture(new ClosedChannelException());
                }
                if (this.established != null) {
                    return this.established.sendRequest(request);
                }
                PendingRequest pending = new PendingRequest(this, (MessageBody)request);
                this.queuedRequests.add((org.apache.flink.queryablestate.network.Client$PendingConnection.PendingRequest)pending);
                return pending;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handInChannel(Channel channel) {
            Object object = this.connectLock;
            synchronized (object) {
                if (this.connectionShutdownFuture.get() != null || this.failureCause != null) {
                    channel.close();
                } else {
                    this.established = new EstablishedConnection(this.serverAddress, this.serializer, channel);
                    while (!this.queuedRequests.isEmpty()) {
                        PendingRequest pending = (PendingRequest)this.queuedRequests.poll();
                        this.established.sendRequest(pending.request).whenComplete((response, throwable) -> {
                            if (throwable != null) {
                                pending.completeExceptionally((Throwable)throwable);
                            } else {
                                pending.complete(response);
                            }
                        });
                    }
                    Client.this.establishedConnections.put(this.serverAddress, this.established);
                    Client.this.pendingConnections.remove(this.serverAddress);
                    if (Client.this.clientShutdownFuture.get() != null && Client.this.establishedConnections.remove(this.serverAddress, this.established)) {
                        this.established.close();
                    }
                }
            }
        }

        private CompletableFuture<Void> close() {
            return this.close(new ClosedChannelException());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private CompletableFuture<Void> close(Throwable cause) {
            CompletableFuture<Object> future = new CompletableFuture<Object>();
            if (this.connectionShutdownFuture.compareAndSet(null, future)) {
                Object object = this.connectLock;
                synchronized (object) {
                    if (this.failureCause == null) {
                        this.failureCause = cause;
                    }
                    if (this.established != null) {
                        this.established.close().whenComplete((result, throwable) -> {
                            if (throwable != null) {
                                future.completeExceptionally((Throwable)throwable);
                            } else {
                                future.complete(null);
                            }
                        });
                    } else {
                        PendingRequest pending;
                        while ((pending = (PendingRequest)this.queuedRequests.poll()) != null) {
                            pending.completeExceptionally(cause);
                        }
                        future.complete(null);
                    }
                }
            }
            return this.connectionShutdownFuture.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public String toString() {
            Object object = this.connectLock;
            synchronized (object) {
                return "PendingConnection{serverAddress=" + this.serverAddress + ", queuedRequests=" + this.queuedRequests.size() + ", established=" + (this.established != null) + ", closed=" + (this.connectionShutdownFuture.get() != null) + '}';
            }
        }

        private static final class PendingRequest
        extends CompletableFuture<RESP> {
            private final REQ request;
            final /* synthetic */ PendingConnection this$1;

            private PendingRequest(REQ request) {
                this.this$1 = var1_1;
                this.request = request;
            }
        }
    }
}

