/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;

import java.io.IOException;
import java.io.Serializable;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;

public class JdbcSinkWriter
implements SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> {
    private final JdbcOutputFormat<SeaTunnelRow, JdbcBatchStatementExecutor<SeaTunnelRow>> outputFormat;
    private final SinkWriter.Context context;
    private final JdbcConnectionProvider connectionProvider;
    private transient boolean isOpen;

    public JdbcSinkWriter(SinkWriter.Context context, JdbcDialect dialect, JdbcSinkConfig jdbcSinkConfig, SeaTunnelRowType rowType) {
        this.context = context;
        this.connectionProvider = new SimpleJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
        this.outputFormat = new JdbcOutputFormatBuilder(dialect, this.connectionProvider, jdbcSinkConfig, rowType).build();
    }

    private void tryOpen() throws IOException {
        if (!this.isOpen) {
            this.isOpen = true;
            this.outputFormat.open();
        }
    }

    public List<JdbcSinkState> snapshotState(long checkpointId) {
        return Collections.emptyList();
    }

    public void write(SeaTunnelRow element) throws IOException {
        this.tryOpen();
        SeaTunnelRow copy = (SeaTunnelRow)SerializationUtils.clone((Serializable)element);
        this.outputFormat.writeRecord(copy);
    }

    public Optional<XidInfo> prepareCommit() throws IOException {
        this.tryOpen();
        this.outputFormat.checkFlushException();
        this.outputFormat.flush();
        try {
            if (!this.connectionProvider.getConnection().getAutoCommit()) {
                this.connectionProvider.getConnection().commit();
            }
        }
        catch (SQLException e) {
            throw new JdbcConnectorException(JdbcConnectorErrorCode.TRANSACTION_OPERATION_FAILED, "commit failed," + e.getMessage(), e);
        }
        return Optional.empty();
    }

    public void abortPrepare() {
    }

    public void close() throws IOException {
        this.tryOpen();
        this.outputFormat.flush();
        try {
            if (!this.connectionProvider.getConnection().getAutoCommit()) {
                this.connectionProvider.getConnection().commit();
            }
        }
        catch (SQLException e) {
            throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCode.WRITER_OPERATION_FAILED, "unable to close JDBC sink write", e);
        }
        this.outputFormat.close();
    }
}

