/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.Serializable;
import java.sql.SQLException;
import java.util.function.Supplier;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcOutputFormat<I, E extends JdbcBatchStatementExecutor<I>>
implements Serializable {
    protected final JdbcConnectionProvider connectionProvider;
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(JdbcOutputFormat.class);
    private final JdbcConnectionConfig jdbcConnectionConfig;
    private final StatementExecutorFactory<E> statementExecutorFactory;
    private transient E jdbcStatementExecutor;
    private transient int batchCount = 0;
    private volatile transient boolean closed = false;
    private volatile transient Exception flushException;

    public JdbcOutputFormat(JdbcConnectionProvider connectionProvider, JdbcConnectionConfig jdbcConnectionConfig, StatementExecutorFactory<E> statementExecutorFactory) {
        this.connectionProvider = (JdbcConnectionProvider)Preconditions.checkNotNull((Object)connectionProvider);
        this.jdbcConnectionConfig = (JdbcConnectionConfig)Preconditions.checkNotNull((Object)jdbcConnectionConfig);
        this.statementExecutorFactory = (StatementExecutorFactory)Preconditions.checkNotNull(statementExecutorFactory);
    }

    public void open() throws IOException {
        try {
            this.connectionProvider.getOrEstablishConnection();
        }
        catch (Exception e) {
            throw new JdbcConnectorException(JdbcConnectorErrorCode.CONNECT_DATABASE_FAILED, "unable to open JDBC writer", e);
        }
        this.jdbcStatementExecutor = this.createAndOpenStatementExecutor(this.statementExecutorFactory);
    }

    private E createAndOpenStatementExecutor(StatementExecutorFactory<E> statementExecutorFactory) {
        JdbcBatchStatementExecutor exec = (JdbcBatchStatementExecutor)statementExecutorFactory.get();
        try {
            exec.prepareStatements(this.connectionProvider.getConnection());
        }
        catch (SQLException e) {
            throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.SQL_OPERATION_FAILED, "unable to open JDBC writer", e);
        }
        return (E)exec;
    }

    public void checkFlushException() {
        if (this.flushException != null) {
            throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.FLUSH_DATA_FAILED, "Writing records to JDBC failed.", this.flushException);
        }
    }

    public final synchronized void writeRecord(I record) {
        this.checkFlushException();
        try {
            this.addToBatch(record);
            ++this.batchCount;
            if (this.jdbcConnectionConfig.getBatchSize() > 0 && this.batchCount >= this.jdbcConnectionConfig.getBatchSize()) {
                this.flush();
            }
        }
        catch (Exception e) {
            throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.SQL_OPERATION_FAILED, "Writing records to JDBC failed.", e);
        }
    }

    protected void addToBatch(I record) throws SQLException {
        this.jdbcStatementExecutor.addToBatch(record);
    }

    public synchronized void flush() throws IOException {
        if (this.flushException != null) {
            LOG.warn(String.format("An exception occurred during the previous flush process %s, skipping this flush", ExceptionUtils.getMessage((Throwable)this.flushException)));
            return;
        }
        if (this.batchCount == 0) {
            LOG.debug("No data to flush.");
            return;
        }
        int sleepMs = 1000;
        for (int i = 0; i <= this.jdbcConnectionConfig.getMaxRetries(); ++i) {
            try {
                this.attemptFlush();
                this.batchCount = 0;
                break;
            }
            catch (SQLException e) {
                LOG.error("JDBC executeBatch error, retry times = {}", (Object)i, (Object)e);
                if (i >= this.jdbcConnectionConfig.getMaxRetries()) {
                    throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.FLUSH_DATA_FAILED, e);
                }
                try {
                    if (!this.connectionProvider.isConnectionValid()) {
                        this.updateExecutor(true);
                    }
                }
                catch (Exception exception) {
                    LOG.error("JDBC connection is not valid, and reestablish connection failed.", (Throwable)exception);
                    throw new JdbcConnectorException(JdbcConnectorErrorCode.CONNECT_DATABASE_FAILED, "Reestablish JDBC connection failed", exception);
                }
                try {
                    Thread.sleep(1000 * i);
                    continue;
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.FLUSH_DATA_FAILED, "unable to flush; interrupted while doing another attempt", e);
                }
            }
        }
    }

    protected void attemptFlush() throws SQLException {
        this.jdbcStatementExecutor.executeBatch();
    }

    public synchronized void close() {
        if (!this.closed) {
            this.closed = true;
            if (this.batchCount > 0) {
                try {
                    this.flush();
                }
                catch (Exception e) {
                    LOG.warn("Writing records to JDBC failed.", (Throwable)e);
                    this.flushException = new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.FLUSH_DATA_FAILED, "Writing records to JDBC failed.", e);
                }
            }
            try {
                if (this.jdbcStatementExecutor != null) {
                    this.jdbcStatementExecutor.closeStatements();
                }
            }
            catch (SQLException e) {
                LOG.warn("Close JDBC writer failed.", (Throwable)e);
            }
        }
        this.connectionProvider.closeConnection();
        this.checkFlushException();
    }

    public void updateExecutor(boolean reconnect) throws SQLException, ClassNotFoundException {
        try {
            this.jdbcStatementExecutor.closeStatements();
        }
        catch (SQLException e) {
            if (!reconnect) {
                throw e;
            }
            LOG.error("Close JDBC statement failed on reconnect.", (Throwable)e);
        }
        this.jdbcStatementExecutor.prepareStatements(reconnect ? this.connectionProvider.reestablishConnection() : this.connectionProvider.getConnection());
    }

    public static interface StatementExecutorFactory<T extends JdbcBatchStatementExecutor<?>>
    extends Supplier<T>,
    Serializable {
    }
}

