/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.driver.internal.core.pool;

import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.InvalidKeyspaceException;
import com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException;
import com.datastax.oss.driver.api.core.auth.AuthenticationException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
import com.datastax.oss.driver.internal.core.channel.ChannelEvent;
import com.datastax.oss.driver.internal.core.channel.ChannelFactory;
import com.datastax.oss.driver.internal.core.channel.ClusterNameMismatchException;
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.channel.DriverChannelOptions;
import com.datastax.oss.driver.internal.core.config.ConfigChangeEvent;
import com.datastax.oss.driver.internal.core.context.EventBus;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
import com.datastax.oss.driver.internal.core.metadata.TopologyEvent;
import com.datastax.oss.driver.internal.core.pool.ChannelSet;
import com.datastax.oss.driver.internal.core.util.Loggers;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.internal.core.util.concurrent.Reconnection;
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.driver.shaded.guava.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class ChannelPool
implements AsyncAutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelPool.class);
    @VisibleForTesting
    final ChannelSet channels = new ChannelSet();
    private final Node node;
    private final CqlIdentifier initialKeyspaceName;
    private final EventExecutor adminExecutor;
    private final String sessionLogPrefix;
    private final String logPrefix;
    private final SingleThreaded singleThreaded;
    private volatile boolean invalidKeyspace;

    public static CompletionStage<ChannelPool> init(Node node, CqlIdentifier keyspaceName, NodeDistance distance, InternalDriverContext context, String sessionLogPrefix) {
        ChannelPool pool = new ChannelPool(node, keyspaceName, distance, context, sessionLogPrefix);
        return pool.connect();
    }

    private ChannelPool(Node node, CqlIdentifier keyspaceName, NodeDistance distance, InternalDriverContext context, String sessionLogPrefix) {
        this.node = node;
        this.initialKeyspaceName = keyspaceName;
        this.adminExecutor = context.getNettyOptions().adminEventExecutorGroup().next();
        this.sessionLogPrefix = sessionLogPrefix;
        this.logPrefix = sessionLogPrefix + "|" + node.getEndPoint();
        this.singleThreaded = new SingleThreaded(keyspaceName, distance, context);
    }

    private CompletionStage<ChannelPool> connect() {
        RunOrSchedule.on(this.adminExecutor, () -> this.singleThreaded.connect());
        return this.singleThreaded.connectFuture;
    }

    public Node getNode() {
        return this.node;
    }

    public CqlIdentifier getInitialKeyspaceName() {
        return this.initialKeyspaceName;
    }

    public boolean isInvalidKeyspace() {
        return this.invalidKeyspace;
    }

    public DriverChannel next() {
        return this.channels.next();
    }

    public int size() {
        return this.channels.size();
    }

    public int getAvailableIds() {
        return this.channels.getAvailableIds();
    }

    public int getInFlight() {
        return this.channels.getInFlight();
    }

    public int getOrphanedIds() {
        return this.channels.getOrphanedIds();
    }

    public void resize(NodeDistance newDistance) {
        RunOrSchedule.on(this.adminExecutor, () -> this.singleThreaded.resize(newDistance));
    }

    public CompletionStage<Void> setKeyspace(CqlIdentifier newKeyspaceName) {
        return RunOrSchedule.on(this.adminExecutor, () -> this.singleThreaded.setKeyspace(newKeyspaceName));
    }

    public void reconnectNow() {
        RunOrSchedule.on(this.adminExecutor, () -> this.singleThreaded.reconnectNow());
    }

    @Override
    @NonNull
    public CompletionStage<Void> closeFuture() {
        return this.singleThreaded.closeFuture;
    }

    @Override
    @NonNull
    public CompletionStage<Void> closeAsync() {
        RunOrSchedule.on(this.adminExecutor, () -> this.singleThreaded.close());
        return this.singleThreaded.closeFuture;
    }

    @Override
    @NonNull
    public CompletionStage<Void> forceCloseAsync() {
        RunOrSchedule.on(this.adminExecutor, () -> this.singleThreaded.forceClose());
        return this.singleThreaded.closeFuture;
    }

    private class SingleThreaded {
        private final DriverConfig config;
        private final ChannelFactory channelFactory;
        private final EventBus eventBus;
        private final List<CompletionStage<DriverChannel>> pendingChannels = new ArrayList<CompletionStage<DriverChannel>>();
        private final Set<DriverChannel> closingChannels = new HashSet<DriverChannel>();
        private final Reconnection reconnection;
        private final Object configListenerKey;
        private NodeDistance distance;
        private int wantedCount;
        private final CompletableFuture<ChannelPool> connectFuture = new CompletableFuture();
        private boolean isConnecting;
        private final CompletableFuture<Void> closeFuture = new CompletableFuture();
        private boolean isClosing;
        private CompletableFuture<Void> setKeyspaceFuture;
        private CqlIdentifier keyspaceName;

        private SingleThreaded(CqlIdentifier keyspaceName, NodeDistance distance, InternalDriverContext context) {
            this.keyspaceName = keyspaceName;
            this.config = context.getConfig();
            this.distance = distance;
            this.wantedCount = this.getConfiguredSize(distance);
            this.channelFactory = context.getChannelFactory();
            this.eventBus = context.getEventBus();
            ReconnectionPolicy reconnectionPolicy = context.getReconnectionPolicy();
            this.reconnection = new Reconnection(ChannelPool.this.logPrefix, ChannelPool.this.adminExecutor, () -> reconnectionPolicy.newNodeSchedule(ChannelPool.this.node), this::addMissingChannels, () -> this.eventBus.fire(ChannelEvent.reconnectionStarted(ChannelPool.this.node)), () -> this.eventBus.fire(ChannelEvent.reconnectionStopped(ChannelPool.this.node)));
            this.configListenerKey = this.eventBus.register(ConfigChangeEvent.class, RunOrSchedule.on(ChannelPool.this.adminExecutor, this::onConfigChanged));
        }

        private void connect() {
            assert (ChannelPool.this.adminExecutor.inEventLoop());
            if (this.isConnecting) {
                return;
            }
            this.isConnecting = true;
            CompletionStage<ChannelPool> initialChannels = this.addMissingChannels().thenApply(allConnected -> {
                if (!allConnected.booleanValue()) {
                    this.reconnection.start();
                }
                return ChannelPool.this;
            });
            CompletableFutures.completeFrom(initialChannels, this.connectFuture);
        }

        private CompletionStage<Boolean> addMissingChannels() {
            assert (ChannelPool.this.adminExecutor.inEventLoop());
            assert (this.pendingChannels.isEmpty());
            int missing = this.wantedCount - ChannelPool.this.channels.size();
            LOG.debug("[{}] Trying to create {} missing channels", (Object)ChannelPool.this.logPrefix, (Object)missing);
            DriverChannelOptions options = DriverChannelOptions.builder().withKeyspace(this.keyspaceName).withOwnerLogPrefix(ChannelPool.this.sessionLogPrefix).build();
            for (int i = 0; i < missing; ++i) {
                CompletionStage<DriverChannel> channelFuture = this.channelFactory.connect(ChannelPool.this.node, options);
                this.pendingChannels.add(channelFuture);
            }
            return CompletableFutures.allDone(this.pendingChannels).thenApplyAsync(this::onAllConnected, ChannelPool.this.adminExecutor);
        }

        private boolean onAllConnected(Void v) {
            assert (ChannelPool.this.adminExecutor.inEventLoop());
            Throwable fatalError = null;
            int invalidKeyspaceErrors = 0;
            for (CompletionStage<DriverChannel> pendingChannel : this.pendingChannels) {
                CompletableFuture<DriverChannel> future = pendingChannel.toCompletableFuture();
                assert (future.isDone());
                if (future.isCompletedExceptionally()) {
                    Throwable error = CompletableFutures.getFailed(future);
                    ((DefaultNode)ChannelPool.this.node).getMetricUpdater().incrementCounter(error instanceof AuthenticationException ? DefaultNodeMetric.AUTHENTICATION_ERRORS : DefaultNodeMetric.CONNECTION_INIT_ERRORS, null);
                    if (error instanceof ClusterNameMismatchException || error instanceof UnsupportedProtocolVersionException) {
                        fatalError = error;
                        continue;
                    }
                    if (error instanceof AuthenticationException) {
                        Loggers.warnWithException(LOG, "[{}] Authentication error", ChannelPool.this.logPrefix, error);
                        continue;
                    }
                    if (error instanceof InvalidKeyspaceException) {
                        ++invalidKeyspaceErrors;
                        continue;
                    }
                    if (this.config.getDefaultProfile().getBoolean(DefaultDriverOption.CONNECTION_WARN_INIT_ERROR)) {
                        Loggers.warnWithException(LOG, "[{}]  Error while opening new channel", ChannelPool.this.logPrefix, error);
                        continue;
                    }
                    LOG.debug("[{}]  Error while opening new channel", (Object)ChannelPool.this.logPrefix, (Object)error);
                    continue;
                }
                DriverChannel channel = CompletableFutures.getCompleted(future);
                if (this.isClosing) {
                    LOG.debug("[{}] New channel added ({}) but the pool was closed, closing it", (Object)ChannelPool.this.logPrefix, (Object)channel);
                    channel.forceClose();
                    continue;
                }
                LOG.debug("[{}] New channel added {}", (Object)ChannelPool.this.logPrefix, (Object)channel);
                ChannelPool.this.channels.add(channel);
                this.eventBus.fire(ChannelEvent.channelOpened(ChannelPool.this.node));
                channel.closeStartedFuture().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)f -> ChannelPool.this.adminExecutor.submit(() -> this.onChannelCloseStarted(channel)).addListener(UncaughtExceptions::log)));
                channel.closeFuture().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)f -> ChannelPool.this.adminExecutor.submit(() -> this.onChannelClosed(channel)).addListener(UncaughtExceptions::log)));
            }
            ChannelPool.this.invalidKeyspace = invalidKeyspaceErrors > 0 && invalidKeyspaceErrors == this.pendingChannels.size();
            this.pendingChannels.clear();
            if (fatalError != null) {
                Loggers.warnWithException(LOG, "[{}] Fatal error while initializing pool, forcing the node down", ChannelPool.this.logPrefix, fatalError);
                ChannelPool.this.node.getBroadcastRpcAddress().ifPresent(address -> this.eventBus.fire(TopologyEvent.forceDown(address)));
                return true;
            }
            this.shrinkIfTooManyChannels();
            int currentCount = ChannelPool.this.channels.size();
            LOG.debug("[{}] Reconnection attempt complete, {}/{} channels", new Object[]{ChannelPool.this.logPrefix, currentCount, this.wantedCount});
            return currentCount >= this.wantedCount;
        }

        private void onChannelCloseStarted(DriverChannel channel) {
            assert (ChannelPool.this.adminExecutor.inEventLoop());
            if (!this.isClosing) {
                LOG.debug("[{}] Channel {} started graceful shutdown", (Object)ChannelPool.this.logPrefix, (Object)channel);
                ChannelPool.this.channels.remove(channel);
                this.closingChannels.add(channel);
                this.eventBus.fire(ChannelEvent.channelClosed(ChannelPool.this.node));
                this.reconnection.start();
            }
        }

        private void onChannelClosed(DriverChannel channel) {
            assert (ChannelPool.this.adminExecutor.inEventLoop());
            if (!this.isClosing) {
                if (ChannelPool.this.channels.remove(channel)) {
                    LOG.debug("[{}] Lost channel {}", (Object)ChannelPool.this.logPrefix, (Object)channel);
                    this.eventBus.fire(ChannelEvent.channelClosed(ChannelPool.this.node));
                    this.reconnection.start();
                } else {
                    LOG.debug("[{}] Channel {} completed graceful shutdown", (Object)ChannelPool.this.logPrefix, (Object)channel);
                    this.closingChannels.remove(channel);
                }
            }
        }

        private void resize(NodeDistance newDistance) {
            assert (ChannelPool.this.adminExecutor.inEventLoop());
            this.distance = newDistance;
            int newChannelCount = this.getConfiguredSize(newDistance);
            if (newChannelCount > this.wantedCount) {
                LOG.debug("[{}] Growing ({} => {} channels)", new Object[]{ChannelPool.this.logPrefix, this.wantedCount, newChannelCount});
                this.wantedCount = newChannelCount;
                this.reconnection.start();
            } else if (newChannelCount < this.wantedCount) {
                LOG.debug("[{}] Shrinking ({} => {} channels)", new Object[]{ChannelPool.this.logPrefix, this.wantedCount, newChannelCount});
                this.wantedCount = newChannelCount;
                if (!this.reconnection.isRunning()) {
                    this.shrinkIfTooManyChannels();
                }
            }
        }

        private void shrinkIfTooManyChannels() {
            assert (ChannelPool.this.adminExecutor.inEventLoop());
            int extraCount = ChannelPool.this.channels.size() - this.wantedCount;
            if (extraCount > 0) {
                LOG.debug("[{}] Closing {} extra channels", (Object)ChannelPool.this.logPrefix, (Object)extraCount);
                HashSet<DriverChannel> toRemove = Sets.newHashSetWithExpectedSize(extraCount);
                for (DriverChannel channel : ChannelPool.this.channels) {
                    toRemove.add(channel);
                    if (--extraCount != 0) continue;
                    break;
                }
                for (DriverChannel channel : toRemove) {
                    ChannelPool.this.channels.remove(channel);
                    channel.close();
                    this.eventBus.fire(ChannelEvent.channelClosed(ChannelPool.this.node));
                }
            }
        }

        private void onConfigChanged(ConfigChangeEvent event) {
            assert (ChannelPool.this.adminExecutor.inEventLoop());
            this.resize(this.distance);
        }

        private CompletionStage<Void> setKeyspace(CqlIdentifier newKeyspaceName) {
            assert (ChannelPool.this.adminExecutor.inEventLoop());
            if (this.setKeyspaceFuture != null && !this.setKeyspaceFuture.isDone()) {
                return CompletableFutures.failedFuture(new IllegalStateException("Can't call setKeyspace while a keyspace switch is already in progress"));
            }
            this.keyspaceName = newKeyspaceName;
            this.setKeyspaceFuture = new CompletableFuture();
            int toSwitch = ChannelPool.this.channels.size();
            if (toSwitch == 0) {
                this.setKeyspaceFuture.complete(null);
            } else {
                AtomicInteger remaining = new AtomicInteger(toSwitch);
                for (DriverChannel channel2 : ChannelPool.this.channels) {
                    channel2.setKeyspace(newKeyspaceName).addListener(f -> {
                        if (remaining.decrementAndGet() == 0) {
                            this.setKeyspaceFuture.complete(null);
                        }
                    });
                }
            }
            for (CompletionStage<DriverChannel> channelFuture : this.pendingChannels) {
                channelFuture.thenAccept(channel -> channel.setKeyspace(newKeyspaceName));
            }
            return this.setKeyspaceFuture;
        }

        private void reconnectNow() {
            assert (ChannelPool.this.adminExecutor.inEventLoop());
            this.reconnection.reconnectNow(false);
        }

        private void close() {
            assert (ChannelPool.this.adminExecutor.inEventLoop());
            if (this.isClosing) {
                return;
            }
            this.isClosing = true;
            this.reconnection.stop();
            this.eventBus.unregister(this.configListenerKey, ConfigChangeEvent.class);
            int toClose = this.closingChannels.size() + ChannelPool.this.channels.size();
            if (toClose == 0) {
                this.closeFuture.complete(null);
            } else {
                AtomicInteger remaining = new AtomicInteger(toClose);
                GenericFutureListener<Future> channelCloseListener = f -> {
                    if (!f.isSuccess()) {
                        Loggers.warnWithException(LOG, "[{}] Error closing channel", ChannelPool.this.logPrefix, f.cause());
                    }
                    if (remaining.decrementAndGet() == 0) {
                        this.closeFuture.complete(null);
                    }
                };
                for (DriverChannel channel : ChannelPool.this.channels) {
                    this.eventBus.fire(ChannelEvent.channelClosed(ChannelPool.this.node));
                    channel.close().addListener(channelCloseListener);
                }
                for (DriverChannel channel : this.closingChannels) {
                    channel.closeFuture().addListener((GenericFutureListener<? extends Future<? super Void>>)channelCloseListener);
                }
            }
        }

        private void forceClose() {
            assert (ChannelPool.this.adminExecutor.inEventLoop());
            if (!this.isClosing) {
                this.close();
            }
            for (DriverChannel channel : ChannelPool.this.channels) {
                channel.forceClose();
            }
            for (DriverChannel channel : this.closingChannels) {
                channel.forceClose();
            }
        }

        private int getConfiguredSize(NodeDistance distance) {
            return this.config.getDefaultProfile().getInt(distance == NodeDistance.LOCAL ? DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE : DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE);
        }
    }
}

