/*
 * Decompiled with CFR 0.152.
 */
package com.github.shyiko.mysql.binlog;

import com.github.shyiko.mysql.binlog.GtidSet;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.GtidEventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.RotateEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory;
import com.github.shyiko.mysql.binlog.network.SSLMode;
import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
import com.github.shyiko.mysql.binlog.network.ServerException;
import com.github.shyiko.mysql.binlog.network.SocketFactory;
import com.github.shyiko.mysql.binlog.network.TLSHostnameVerifier;
import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket;
import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket;
import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;
import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;
import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateNativePasswordCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.Command;
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogGtidCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.SSLRequestCommand;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;

public class BinaryLogClient
implements BinaryLogClientMXBean {
    private static final SSLSocketFactory DEFAULT_REQUIRED_SSL_MODE_SOCKET_FACTORY = new DefaultSSLSocketFactory(){

        @Override
        protected void initSSLContext(SSLContext sc) throws GeneralSecurityException {
            sc.init(null, new TrustManager[]{new X509TrustManager(){

                @Override
                public void checkClientTrusted(X509Certificate[] x509Certificates, String s) {
                }

                @Override
                public void checkServerTrusted(X509Certificate[] x509Certificates, String s) {
                }

                @Override
                public X509Certificate[] getAcceptedIssuers() {
                    return new X509Certificate[0];
                }
            }}, null);
        }
    };
    private static final SSLSocketFactory DEFAULT_VERIFY_CA_SSL_MODE_SOCKET_FACTORY = new DefaultSSLSocketFactory();
    private static final int MAX_PACKET_LENGTH = 0xFFFFFF;
    private final Logger logger = Logger.getLogger(this.getClass().getName());
    private final String hostname;
    private final int port;
    private final String schema;
    private final String username;
    private final String password;
    private boolean blocking = true;
    private long serverId = 65535L;
    private volatile String binlogFilename;
    private volatile long binlogPosition = 4L;
    private volatile long connectionId;
    private SSLMode sslMode = SSLMode.DISABLED;
    private GtidSet gtidSet;
    private final Object gtidSetAccessLock = new Object();
    private boolean gtidSetFallbackToPurged;
    private boolean useBinlogFilenamePositionInGtidMode;
    private String gtid;
    private boolean tx;
    private EventDeserializer eventDeserializer = new EventDeserializer();
    private final List<EventListener> eventListeners = new CopyOnWriteArrayList<EventListener>();
    private final List<LifecycleListener> lifecycleListeners = new CopyOnWriteArrayList<LifecycleListener>();
    private SocketFactory socketFactory;
    private SSLSocketFactory sslSocketFactory;
    private volatile PacketChannel channel;
    private volatile boolean connected;
    private ThreadFactory threadFactory;
    private boolean keepAlive = true;
    private long keepAliveInterval = TimeUnit.MINUTES.toMillis(1L);
    private long heartbeatInterval;
    private volatile long eventLastSeen;
    private long connectTimeout = TimeUnit.SECONDS.toMillis(3L);
    private volatile ExecutorService keepAliveThreadExecutor;
    private final Lock connectLock = new ReentrantLock();
    private volatile CountDownLatch connectLatch;

    public BinaryLogClient(String username, String password) {
        this("localhost", 3306, null, username, password);
    }

    public BinaryLogClient(String schema, String username, String password) {
        this("localhost", 3306, schema, username, password);
    }

    public BinaryLogClient(String hostname, int port, String username, String password) {
        this(hostname, port, null, username, password);
    }

    public BinaryLogClient(String hostname, int port, String schema, String username, String password) {
        this.hostname = hostname;
        this.port = port;
        this.schema = schema;
        this.username = username;
        this.password = password;
    }

    public boolean isBlocking() {
        return this.blocking;
    }

    public void setBlocking(boolean blocking) {
        this.blocking = blocking;
    }

    public SSLMode getSSLMode() {
        return this.sslMode;
    }

    public void setSSLMode(SSLMode sslMode) {
        if (sslMode == null) {
            throw new IllegalArgumentException("SSL mode cannot be NULL");
        }
        this.sslMode = sslMode;
    }

    public long getServerId() {
        return this.serverId;
    }

    public void setServerId(long serverId) {
        this.serverId = serverId;
    }

    @Override
    public String getBinlogFilename() {
        return this.binlogFilename;
    }

    @Override
    public void setBinlogFilename(String binlogFilename) {
        this.binlogFilename = binlogFilename;
    }

    @Override
    public long getBinlogPosition() {
        return this.binlogPosition;
    }

    @Override
    public void setBinlogPosition(long binlogPosition) {
        this.binlogPosition = binlogPosition;
    }

    public long getConnectionId() {
        return this.connectionId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String getGtidSet() {
        Object object = this.gtidSetAccessLock;
        synchronized (object) {
            return this.gtidSet != null ? this.gtidSet.toString() : null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setGtidSet(String gtidSet) {
        if (gtidSet != null && this.binlogFilename == null) {
            this.binlogFilename = "";
        }
        Object object = this.gtidSetAccessLock;
        synchronized (object) {
            this.gtidSet = gtidSet != null ? new GtidSet(gtidSet) : null;
        }
    }

    public boolean isGtidSetFallbackToPurged() {
        return this.gtidSetFallbackToPurged;
    }

    public void setGtidSetFallbackToPurged(boolean gtidSetFallbackToPurged) {
        this.gtidSetFallbackToPurged = gtidSetFallbackToPurged;
    }

    public boolean isUseBinlogFilenamePositionInGtidMode() {
        return this.useBinlogFilenamePositionInGtidMode;
    }

    public void setUseBinlogFilenamePositionInGtidMode(boolean useBinlogFilenamePositionInGtidMode) {
        this.useBinlogFilenamePositionInGtidMode = useBinlogFilenamePositionInGtidMode;
    }

    public boolean isKeepAlive() {
        return this.keepAlive;
    }

    public void setKeepAlive(boolean keepAlive) {
        this.keepAlive = keepAlive;
    }

    public long getKeepAliveInterval() {
        return this.keepAliveInterval;
    }

    public void setKeepAliveInterval(long keepAliveInterval) {
        this.keepAliveInterval = keepAliveInterval;
    }

    public long getKeepAliveConnectTimeout() {
        return this.connectTimeout;
    }

    public void setKeepAliveConnectTimeout(long connectTimeout) {
        this.connectTimeout = connectTimeout;
    }

    public long getHeartbeatInterval() {
        return this.heartbeatInterval;
    }

    public void setHeartbeatInterval(long heartbeatInterval) {
        this.heartbeatInterval = heartbeatInterval;
    }

    public long getConnectTimeout() {
        return this.connectTimeout;
    }

    public void setConnectTimeout(long connectTimeout) {
        this.connectTimeout = connectTimeout;
    }

    public void setEventDeserializer(EventDeserializer eventDeserializer) {
        if (eventDeserializer == null) {
            throw new IllegalArgumentException("Event deserializer cannot be NULL");
        }
        this.eventDeserializer = eventDeserializer;
    }

    public void setSocketFactory(SocketFactory socketFactory) {
        this.socketFactory = socketFactory;
    }

    public void setSslSocketFactory(SSLSocketFactory sslSocketFactory) {
        this.sslSocketFactory = sslSocketFactory;
    }

    public void setThreadFactory(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    public void connect() throws IOException {
        this.connectWithTimeout(this.connectTimeout);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void connectWithTimeout(long connectTimeout) throws IOException {
        CountDownLatch latch = new CountDownLatch(1);
        boolean connected = false;
        try {
            PacketChannel localChannel;
            this.connectLock.lock();
            try {
                if (this.connectLatch != null) {
                    throw new IllegalStateException("BinaryLogClient is already connected");
                }
                this.connectLatch = latch;
                this.channel = localChannel = this.openChannelToBinaryLogStream(connectTimeout);
                if (this.keepAlive && !this.isKeepAliveThreadRunning()) {
                    this.keepAliveThreadExecutor = this.spawnKeepAliveThread(connectTimeout);
                }
            }
            finally {
                this.connectLock.unlock();
            }
            connected = true;
            for (LifecycleListener lifecycleListener : this.lifecycleListeners) {
                lifecycleListener.onConnect(this);
            }
            this.ensureEventDeserializerHasRequiredEDDs();
            this.listenForEventPackets(localChannel);
        }
        finally {
            this.connectLock.lock();
            try {
                latch.countDown();
                if (latch == this.connectLatch) {
                    this.connectLatch = null;
                }
            }
            finally {
                this.connectLock.unlock();
            }
            if (connected) {
                for (LifecycleListener lifecycleListener : this.lifecycleListeners) {
                    lifecycleListener.onDisconnect(this);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private PacketChannel openChannelToBinaryLogStream(long connectTimeout) throws IOException {
        Object object;
        PacketChannel channel = null;
        Callable<Void> cancelCloseChannel = null;
        try {
            ChecksumType checksumType;
            try {
                long start = System.currentTimeMillis();
                channel = this.openChannel(connectTimeout);
                if (connectTimeout > 0L && !this.isKeepAliveThreadRunning()) {
                    cancelCloseChannel = this.scheduleCloseChannel(channel, connectTimeout - (System.currentTimeMillis() - start));
                }
                if (channel.getInputStream().peek() == -1) {
                    throw new EOFException();
                }
            }
            catch (IOException e) {
                throw new IOException("Failed to connect to MySQL on " + this.hostname + ":" + this.port + ". Please make sure it's running.", e);
            }
            GreetingPacket greetingPacket = this.receiveGreeting(channel);
            this.authenticate(channel, greetingPacket);
            this.connectionId = greetingPacket.getThreadId();
            if ("".equals(this.binlogFilename)) {
                object = this.gtidSetAccessLock;
                synchronized (object) {
                    if (this.gtidSet != null && "".equals(this.gtidSet.toString()) && this.gtidSetFallbackToPurged) {
                        this.gtidSet = new GtidSet(this.fetchGtidPurged(channel));
                    }
                }
            }
            if (this.binlogFilename == null) {
                this.fetchBinlogFilenameAndPosition(channel);
            }
            if (this.binlogPosition < 4L) {
                if (this.logger.isLoggable(Level.WARNING)) {
                    this.logger.warning("Binary log position adjusted from " + this.binlogPosition + " to " + 4);
                }
                this.binlogPosition = 4L;
            }
            if ((checksumType = this.fetchBinlogChecksum(channel)) != ChecksumType.NONE) {
                this.confirmSupportOfChecksum(channel, checksumType);
            }
            if (this.heartbeatInterval > 0L) {
                this.enableHeartbeat(channel);
            }
            this.gtid = null;
            this.tx = false;
            this.requestBinaryLogStream(channel);
        }
        catch (IOException e) {
            this.closeChannel(channel);
            throw e;
        }
        finally {
            block29: {
                if (cancelCloseChannel != null) {
                    try {
                        cancelCloseChannel.call();
                    }
                    catch (Exception e) {
                        if (!this.logger.isLoggable(Level.WARNING)) break block29;
                        this.logger.warning("\"" + e.getMessage() + "\" was thrown while canceling scheduled disconnect call");
                    }
                }
            }
        }
        this.connected = true;
        if (this.logger.isLoggable(Level.INFO)) {
            String position;
            object = this.gtidSetAccessLock;
            synchronized (object) {
                position = this.gtidSet != null ? this.gtidSet.toString() : this.binlogFilename + "/" + this.binlogPosition;
            }
            this.logger.info("Connected to " + this.hostname + ":" + this.port + " at " + position + " (" + (this.blocking ? "sid:" + this.serverId + ", " : "") + "cid:" + this.connectionId + ")");
        }
        return channel;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ensureEventDeserializerHasRequiredEDDs() {
        this.ensureEventDataDeserializerIfPresent(EventType.ROTATE, RotateEventDataDeserializer.class);
        Object object = this.gtidSetAccessLock;
        synchronized (object) {
            if (this.gtidSet != null) {
                this.ensureEventDataDeserializerIfPresent(EventType.GTID, GtidEventDataDeserializer.class);
                this.ensureEventDataDeserializerIfPresent(EventType.QUERY, QueryEventDataDeserializer.class);
            }
        }
    }

    private PacketChannel openChannel(long connectTimeout) throws IOException {
        Socket socket = this.socketFactory != null ? this.socketFactory.createSocket() : new Socket();
        socket.connect(new InetSocketAddress(this.hostname, this.port), (int)connectTimeout);
        return new PacketChannel(socket);
    }

    private Callable<Void> scheduleCloseChannel(final PacketChannel channel, final long timeout) {
        final CountDownLatch connectLatch = new CountDownLatch(1);
        final Thread thread = this.newNamedThread(new Runnable(){

            @Override
            public void run() {
                block7: {
                    block6: {
                        try {
                            connectLatch.await(timeout, TimeUnit.MILLISECONDS);
                        }
                        catch (InterruptedException e) {
                            if (!BinaryLogClient.this.logger.isLoggable(Level.WARNING)) break block6;
                            BinaryLogClient.this.logger.log(Level.WARNING, e.getMessage());
                        }
                    }
                    if (connectLatch.getCount() != 0L) {
                        if (BinaryLogClient.this.logger.isLoggable(Level.WARNING)) {
                            BinaryLogClient.this.logger.warning("Failed to establish connection in " + timeout + "ms. Forcing disconnect.");
                        }
                        try {
                            BinaryLogClient.this.closeChannel(channel);
                        }
                        catch (IOException e) {
                            if (!BinaryLogClient.this.logger.isLoggable(Level.WARNING)) break block7;
                            BinaryLogClient.this.logger.log(Level.WARNING, e.getMessage());
                        }
                    }
                }
            }
        }, "blc-disconnect-" + this.hostname + ":" + this.port);
        thread.start();
        return new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                connectLatch.countDown();
                thread.join();
                return null;
            }
        };
    }

    private GreetingPacket receiveGreeting(PacketChannel channel) throws IOException {
        byte[] initialHandshakePacket = channel.read();
        if (initialHandshakePacket[0] == -1) {
            byte[] bytes = Arrays.copyOfRange(initialHandshakePacket, 1, initialHandshakePacket.length);
            ErrorPacket errorPacket = new ErrorPacket(bytes);
            throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState());
        }
        return new GreetingPacket(initialHandshakePacket);
    }

    private void enableHeartbeat(PacketChannel channel) throws IOException {
        channel.write(new QueryCommand("set @master_heartbeat_period=" + this.heartbeatInterval * 1000000L));
        byte[] statementResult = channel.read();
        if (statementResult[0] == -1) {
            byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length);
            ErrorPacket errorPacket = new ErrorPacket(bytes);
            throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void requestBinaryLogStream(PacketChannel channel) throws IOException {
        Command dumpBinaryLogCommand;
        long serverId = this.blocking ? this.serverId : 0L;
        Object object = this.gtidSetAccessLock;
        synchronized (object) {
            dumpBinaryLogCommand = this.gtidSet != null ? new DumpBinaryLogGtidCommand(serverId, this.useBinlogFilenamePositionInGtidMode ? this.binlogFilename : "", this.useBinlogFilenamePositionInGtidMode ? this.binlogPosition : 4L, this.gtidSet) : new DumpBinaryLogCommand(serverId, this.binlogFilename, this.binlogPosition);
        }
        channel.write(dumpBinaryLogCommand);
    }

    private void ensureEventDataDeserializerIfPresent(EventType eventType, Class<? extends EventDataDeserializer<?>> eventDataDeserializerClass) {
        EventDataDeserializer eventDataDeserializer = this.eventDeserializer.getEventDataDeserializer(eventType);
        if (eventDataDeserializer.getClass() != eventDataDeserializerClass && eventDataDeserializer.getClass() != EventDeserializer.EventDataWrapper.Deserializer.class) {
            EventDataDeserializer<?> internalEventDataDeserializer;
            try {
                internalEventDataDeserializer = eventDataDeserializerClass.newInstance();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            this.eventDeserializer.setEventDataDeserializer(eventType, new EventDeserializer.EventDataWrapper.Deserializer(internalEventDataDeserializer, eventDataDeserializer));
        }
    }

    private void authenticate(PacketChannel channel, GreetingPacket greetingPacket) throws IOException {
        int collation = greetingPacket.getServerCollation();
        int packetNumber = 1;
        boolean usingSSLSocket = false;
        if (this.sslMode != SSLMode.DISABLED) {
            boolean serverSupportsSSL;
            boolean bl = serverSupportsSSL = (greetingPacket.getServerCapabilities() & 0x800) != 0;
            if (!(serverSupportsSSL || this.sslMode != SSLMode.REQUIRED && this.sslMode != SSLMode.VERIFY_CA && this.sslMode != SSLMode.VERIFY_IDENTITY)) {
                throw new IOException("MySQL server does not support SSL");
            }
            if (serverSupportsSSL) {
                SSLRequestCommand sslRequestCommand = new SSLRequestCommand();
                sslRequestCommand.setCollation(collation);
                channel.write(sslRequestCommand, packetNumber++);
                SSLSocketFactory sslSocketFactory = this.sslSocketFactory != null ? this.sslSocketFactory : (this.sslMode == SSLMode.REQUIRED || this.sslMode == SSLMode.PREFERRED ? DEFAULT_REQUIRED_SSL_MODE_SOCKET_FACTORY : DEFAULT_VERIFY_CA_SSL_MODE_SOCKET_FACTORY);
                channel.upgradeToSSL(sslSocketFactory, this.sslMode == SSLMode.VERIFY_IDENTITY ? new TLSHostnameVerifier() : null);
                usingSSLSocket = true;
            }
        }
        AuthenticateCommand authenticateCommand = new AuthenticateCommand(this.schema, this.username, this.password, greetingPacket.getScramble());
        authenticateCommand.setCollation(collation);
        channel.write(authenticateCommand, packetNumber);
        byte[] authenticationResult = channel.read();
        if (authenticationResult[0] != 0) {
            if (authenticationResult[0] == -1) {
                byte[] bytes = Arrays.copyOfRange(authenticationResult, 1, authenticationResult.length);
                ErrorPacket errorPacket = new ErrorPacket(bytes);
                throw new AuthenticationException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState());
            }
            if (authenticationResult[0] == -2) {
                this.switchAuthentication(channel, authenticationResult, usingSSLSocket);
            } else {
                throw new AuthenticationException("Unexpected authentication result (" + authenticationResult[0] + ")");
            }
        }
    }

    private void switchAuthentication(PacketChannel channel, byte[] authenticationResult, boolean usingSSLSocket) throws IOException {
        ByteArrayInputStream buffer = new ByteArrayInputStream(authenticationResult);
        buffer.read(1);
        String authName = buffer.readZeroTerminatedString();
        if ("mysql_native_password".equals(authName)) {
            String scramble = buffer.readZeroTerminatedString();
            AuthenticateNativePasswordCommand switchCommand = new AuthenticateNativePasswordCommand(scramble, this.password);
            channel.write(switchCommand, usingSSLSocket ? 4 : 3);
            byte[] authResult = channel.read();
            if (authResult[0] != 0) {
                byte[] bytes = Arrays.copyOfRange(authResult, 1, authResult.length);
                ErrorPacket errorPacket = new ErrorPacket(bytes);
                throw new AuthenticationException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState());
            }
        } else {
            throw new AuthenticationException("Unsupported authentication type: " + authName);
        }
    }

    private ExecutorService spawnKeepAliveThread(final long connectTimeout) {
        final ExecutorService threadExecutor = Executors.newSingleThreadExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable runnable) {
                return BinaryLogClient.this.newNamedThread(runnable, "blc-keepalive-" + BinaryLogClient.this.hostname + ":" + BinaryLogClient.this.port);
            }
        });
        threadExecutor.submit(new Runnable(){

            @Override
            public void run() {
                BinaryLogClient.this.connectLock.lock();
                BinaryLogClient.this.connectLock.unlock();
                while (!threadExecutor.isShutdown()) {
                    try {
                        Thread.sleep(BinaryLogClient.this.keepAliveInterval);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    if (threadExecutor.isShutdown()) {
                        return;
                    }
                    boolean connectionLost = false;
                    if (BinaryLogClient.this.heartbeatInterval > 0L) {
                        connectionLost = System.currentTimeMillis() - BinaryLogClient.this.eventLastSeen > BinaryLogClient.this.keepAliveInterval;
                    } else {
                        try {
                            BinaryLogClient.this.channel.write(new PingCommand());
                        }
                        catch (IOException e) {
                            connectionLost = true;
                        }
                    }
                    if (!connectionLost) continue;
                    if (BinaryLogClient.this.logger.isLoggable(Level.INFO)) {
                        BinaryLogClient.this.logger.info("Trying to restore lost connection to " + BinaryLogClient.this.hostname + ":" + BinaryLogClient.this.port);
                    }
                    try {
                        BinaryLogClient.this.terminateConnect();
                        BinaryLogClient.this.connect(connectTimeout);
                    }
                    catch (Exception ce) {
                        if (!BinaryLogClient.this.logger.isLoggable(Level.WARNING)) continue;
                        BinaryLogClient.this.logger.warning("Failed to restore connection to " + BinaryLogClient.this.hostname + ":" + BinaryLogClient.this.port + ". Next attempt in " + BinaryLogClient.this.keepAliveInterval + "ms");
                    }
                }
            }
        });
        return threadExecutor;
    }

    private Thread newNamedThread(Runnable runnable, String threadName) {
        Thread thread = this.threadFactory == null ? new Thread(runnable) : this.threadFactory.newThread(runnable);
        thread.setName(threadName);
        return thread;
    }

    boolean isKeepAliveThreadRunning() {
        return this.keepAliveThreadExecutor != null && !this.keepAliveThreadExecutor.isShutdown();
    }

    @Override
    public void connect(final long timeout) throws IOException, TimeoutException {
        boolean started;
        AtomicReference exceptionReference;
        AbstractLifecycleListener connectListener;
        block6: {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            connectListener = new AbstractLifecycleListener(){

                @Override
                public void onConnect(BinaryLogClient client) {
                    countDownLatch.countDown();
                }
            };
            this.registerLifecycleListener(connectListener);
            exceptionReference = new AtomicReference();
            Runnable runnable = new Runnable(){

                @Override
                public void run() {
                    try {
                        BinaryLogClient.this.connectWithTimeout(timeout);
                    }
                    catch (IOException e) {
                        exceptionReference.set(e);
                        countDownLatch.countDown();
                    }
                }
            };
            this.newNamedThread(runnable, "blc-" + this.hostname + ":" + this.port).start();
            started = false;
            try {
                started = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                if (!this.logger.isLoggable(Level.WARNING)) break block6;
                this.logger.log(Level.WARNING, e.getMessage());
            }
        }
        this.unregisterLifecycleListener(connectListener);
        if (exceptionReference.get() != null) {
            throw (IOException)exceptionReference.get();
        }
        if (!started) {
            block7: {
                try {
                    this.terminateConnect();
                }
                catch (IOException e) {
                    if (!this.logger.isLoggable(Level.WARNING)) break block7;
                    this.logger.warning("\"" + e.getMessage() + "\" was thrown while terminating connection due to timeout");
                }
            }
            throw new TimeoutException("BinaryLogClient was unable to connect in " + timeout + "ms");
        }
    }

    @Override
    public boolean isConnected() {
        return this.connectLatch != null;
    }

    private String fetchGtidPurged(PacketChannel channel) throws IOException {
        channel.write(new QueryCommand("show global variables like 'gtid_purged'"));
        ResultSetRowPacket[] resultSet = this.readResultSet(channel);
        if (resultSet.length != 0) {
            return resultSet[0].getValue(1).toUpperCase();
        }
        return "";
    }

    private void fetchBinlogFilenameAndPosition(PacketChannel channel) throws IOException {
        channel.write(new QueryCommand("show master status"));
        ResultSetRowPacket[] resultSet = this.readResultSet(channel);
        if (resultSet.length == 0) {
            throw new IOException("Failed to determine binlog filename/position");
        }
        ResultSetRowPacket resultSetRow = resultSet[0];
        this.binlogFilename = resultSetRow.getValue(0);
        this.binlogPosition = Long.parseLong(resultSetRow.getValue(1));
    }

    private ChecksumType fetchBinlogChecksum(PacketChannel channel) throws IOException {
        channel.write(new QueryCommand("show global variables like 'binlog_checksum'"));
        ResultSetRowPacket[] resultSet = this.readResultSet(channel);
        if (resultSet.length == 0) {
            return ChecksumType.NONE;
        }
        return ChecksumType.valueOf(resultSet[0].getValue(1).toUpperCase());
    }

    private void confirmSupportOfChecksum(PacketChannel channel, ChecksumType checksumType) throws IOException {
        channel.write(new QueryCommand("set @master_binlog_checksum= @@global.binlog_checksum"));
        byte[] statementResult = channel.read();
        if (statementResult[0] == -1) {
            byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length);
            ErrorPacket errorPacket = new ErrorPacket(bytes);
            throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState());
        }
        this.eventDeserializer.setChecksumType(checksumType);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void listenForEventPackets(PacketChannel channel) throws IOException {
        ByteArrayInputStream inputStream = channel.getInputStream();
        boolean completeShutdown = false;
        try {
            while (inputStream.peek() != -1) {
                Event event;
                int packetLength = inputStream.readInteger(3);
                inputStream.skip(1L);
                int marker = inputStream.read();
                if (marker == 255) {
                    ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));
                    throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState());
                }
                if (marker == 254 && !this.blocking) {
                    completeShutdown = true;
                    break;
                }
                try {
                    event = this.eventDeserializer.nextEvent(packetLength == 0xFFFFFF ? new ByteArrayInputStream(this.readPacketSplitInChunks(inputStream, packetLength - 1)) : inputStream);
                    if (event == null) {
                        throw new EOFException();
                    }
                }
                catch (Exception e) {
                    Throwable cause;
                    Throwable throwable = cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
                    if (cause instanceof EOFException || cause instanceof SocketException) {
                        throw e;
                    }
                    if (!this.connected) continue;
                    for (LifecycleListener lifecycleListener : this.lifecycleListeners) {
                        lifecycleListener.onEventDeserializationFailure(this, e);
                    }
                    continue;
                }
                if (!this.connected) continue;
                this.eventLastSeen = System.currentTimeMillis();
                this.updateGtidSet(event);
                this.notifyEventListeners(event);
                this.updateClientBinlogFilenameAndPosition(event);
            }
        }
        catch (Exception e) {
            if (this.connected) {
                for (LifecycleListener lifecycleListener : this.lifecycleListeners) {
                    lifecycleListener.onCommunicationFailure(this, e);
                }
            }
        }
        finally {
            if (this.connected) {
                if (completeShutdown) {
                    this.disconnect();
                } else {
                    this.closeChannel(channel);
                }
            }
        }
    }

    private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int packetLength) throws IOException {
        int chunkLength;
        byte[] result = inputStream.read(packetLength);
        do {
            chunkLength = inputStream.readInteger(3);
            inputStream.skip(1L);
            result = Arrays.copyOf(result, result.length + chunkLength);
            inputStream.fill(result, result.length - chunkLength, chunkLength);
        } while (chunkLength == 0xFFFFFF);
        return result;
    }

    private void updateClientBinlogFilenameAndPosition(Event event) {
        EventHeaderV4 trackableEventHeader;
        long nextBinlogPosition;
        Object eventHeader = event.getHeader();
        EventType eventType = eventHeader.getEventType();
        if (eventType == EventType.ROTATE) {
            RotateEventData rotateEventData = (RotateEventData)EventDeserializer.EventDataWrapper.internal(event.getData());
            this.binlogFilename = rotateEventData.getBinlogFilename();
            this.binlogPosition = rotateEventData.getBinlogPosition();
        } else if (eventType != EventType.TABLE_MAP && eventHeader instanceof EventHeaderV4 && (nextBinlogPosition = (trackableEventHeader = (EventHeaderV4)eventHeader).getNextPosition()) > 0L) {
            this.binlogPosition = nextBinlogPosition;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateGtidSet(Event event) {
        Object object = this.gtidSetAccessLock;
        synchronized (object) {
            if (this.gtidSet == null) {
                return;
            }
        }
        Object eventHeader = event.getHeader();
        switch (eventHeader.getEventType()) {
            case GTID: {
                GtidEventData gtidEventData = (GtidEventData)EventDeserializer.EventDataWrapper.internal(event.getData());
                this.gtid = gtidEventData.getGtid();
                break;
            }
            case XID: {
                this.commitGtid();
                this.tx = false;
                break;
            }
            case QUERY: {
                QueryEventData queryEventData = (QueryEventData)EventDeserializer.EventDataWrapper.internal(event.getData());
                String sql = queryEventData.getSql();
                if (sql == null) break;
                if ("BEGIN".equals(sql)) {
                    this.tx = true;
                    break;
                }
                if ("COMMIT".equals(sql) || "ROLLBACK".equals(sql)) {
                    this.commitGtid();
                    this.tx = false;
                    break;
                }
                if (this.tx) break;
                this.commitGtid();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void commitGtid() {
        if (this.gtid != null) {
            Object object = this.gtidSetAccessLock;
            synchronized (object) {
                this.gtidSet.add(this.gtid);
            }
        }
    }

    private ResultSetRowPacket[] readResultSet(PacketChannel channel) throws IOException {
        byte[] bytes;
        LinkedList<ResultSetRowPacket> resultSet = new LinkedList<ResultSetRowPacket>();
        byte[] statementResult = channel.read();
        if (statementResult[0] == -1) {
            byte[] bytes2 = Arrays.copyOfRange(statementResult, 1, statementResult.length);
            ErrorPacket errorPacket = new ErrorPacket(bytes2);
            throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState());
        }
        while (channel.read()[0] != -2) {
        }
        while ((bytes = channel.read())[0] != -2) {
            resultSet.add(new ResultSetRowPacket(bytes));
        }
        return resultSet.toArray(new ResultSetRowPacket[0]);
    }

    public List<EventListener> getEventListeners() {
        return Collections.unmodifiableList(this.eventListeners);
    }

    public void registerEventListener(EventListener eventListener) {
        this.eventListeners.add(eventListener);
    }

    public void unregisterEventListener(Class<? extends EventListener> listenerClass) {
        for (EventListener eventListener : this.eventListeners) {
            if (!listenerClass.isInstance(eventListener)) continue;
            this.eventListeners.remove(eventListener);
        }
    }

    public void unregisterEventListener(EventListener eventListener) {
        this.eventListeners.remove(eventListener);
    }

    private void notifyEventListeners(Event event) {
        if (event.getData() instanceof EventDeserializer.EventDataWrapper) {
            event = new Event((EventHeader)event.getHeader(), ((EventDeserializer.EventDataWrapper)event.getData()).getExternal());
        }
        for (EventListener eventListener : this.eventListeners) {
            try {
                eventListener.onEvent(event);
            }
            catch (Exception e) {
                if (!this.logger.isLoggable(Level.WARNING)) continue;
                this.logger.log(Level.WARNING, eventListener + " choked on " + event, e);
            }
        }
    }

    public List<LifecycleListener> getLifecycleListeners() {
        return Collections.unmodifiableList(this.lifecycleListeners);
    }

    public void registerLifecycleListener(LifecycleListener lifecycleListener) {
        this.lifecycleListeners.add(lifecycleListener);
    }

    public void unregisterLifecycleListener(Class<? extends LifecycleListener> listenerClass) {
        for (LifecycleListener lifecycleListener : this.lifecycleListeners) {
            if (!listenerClass.isInstance(lifecycleListener)) continue;
            this.lifecycleListeners.remove(lifecycleListener);
        }
    }

    public void unregisterLifecycleListener(LifecycleListener eventListener) {
        this.lifecycleListeners.remove(eventListener);
    }

    @Override
    public void disconnect() throws IOException {
        this.connectLock.lock();
        ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor;
        PacketChannel channel = this.channel;
        CountDownLatch connectLatch = this.connectLatch;
        this.connectLock.unlock();
        this.terminateKeepAliveThread(keepAliveThreadExecutor);
        this.closeChannel(channel);
        this.waitForConnectToTerminate(connectLatch);
    }

    private void terminateKeepAliveThread(ExecutorService threadExecutor) {
        if (threadExecutor == null) {
            return;
        }
        threadExecutor.shutdownNow();
        while (!BinaryLogClient.awaitTerminationInterruptibly(threadExecutor, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
        }
    }

    private static boolean awaitTerminationInterruptibly(ExecutorService executorService, long timeout, TimeUnit unit) {
        try {
            return executorService.awaitTermination(timeout, unit);
        }
        catch (InterruptedException e) {
            return false;
        }
    }

    private void terminateConnect() throws IOException {
        this.connectLock.lock();
        PacketChannel channel = this.channel;
        CountDownLatch connectLatch = this.connectLatch;
        this.connectLock.unlock();
        this.closeChannel(channel);
        this.waitForConnectToTerminate(connectLatch);
    }

    private void waitForConnectToTerminate(CountDownLatch connectLatch) {
        if (connectLatch != null) {
            while (!BinaryLogClient.awaitInterruptibly(connectLatch, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
            }
        }
    }

    private static boolean awaitInterruptibly(CountDownLatch countDownLatch, long time, TimeUnit unit) {
        try {
            return countDownLatch.await(time, unit);
        }
        catch (InterruptedException e) {
            return false;
        }
    }

    private void closeChannel(PacketChannel channel) throws IOException {
        this.connected = false;
        if (channel != null && channel.isOpen()) {
            channel.close();
        }
    }

    public static abstract class AbstractLifecycleListener
    implements LifecycleListener {
        @Override
        public void onConnect(BinaryLogClient client) {
        }

        @Override
        public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
        }

        @Override
        public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
        }

        @Override
        public void onDisconnect(BinaryLogClient client) {
        }
    }

    public static interface LifecycleListener {
        public void onConnect(BinaryLogClient var1);

        public void onCommunicationFailure(BinaryLogClient var1, Exception var2);

        public void onEventDeserializationFailure(BinaryLogClient var1, Exception var2);

        public void onDisconnect(BinaryLogClient var1);
    }

    public static interface EventListener {
        public void onEvent(Event var1);
    }
}

