/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;

import com.google.common.base.Throwables;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.transaction.xa.Xid;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XidGenerator;
import org.apache.seatunnel.connectors.seatunnel.jdbc.sink.AbstractJdbcSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcExactlyOnceSinkWriter
extends AbstractJdbcSinkWriter<Void> {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcExactlyOnceSinkWriter.class);
    private final SinkWriter.Context sinkcontext;
    private final JobContext context;
    private final List<JdbcSinkState> recoverStates;
    private final XaFacade xaFacade;
    private final XaGroupOps xaGroupOps;
    private final XidGenerator xidGenerator;
    private transient Xid currentXid;
    private transient Xid prepareXid;

    public JdbcExactlyOnceSinkWriter(TablePath sinkTablePath, SinkWriter.Context sinkcontext, JobContext context, JdbcDialect dialect, JdbcSinkConfig jdbcSinkConfig, TableSchema tableSchema, List<JdbcSinkState> states) {
        Preconditions.checkArgument((jdbcSinkConfig.getJdbcConnectionConfig().getMaxRetries() == 0 ? 1 : 0) != 0, (Object)"JDBC XA sink requires maxRetries equal to 0, otherwise it could cause duplicates.");
        this.sinkTablePath = sinkTablePath;
        this.dialect = dialect;
        this.tableSchema = tableSchema;
        this.jdbcSinkConfig = jdbcSinkConfig;
        this.context = context;
        this.sinkcontext = sinkcontext;
        this.recoverStates = states;
        this.xidGenerator = XidGenerator.semanticXidGenerator();
        Preconditions.checkState((boolean)jdbcSinkConfig.isExactlyOnce(), (Object)"is_exactly_once config error");
        this.connectionProvider = XaFacade.fromJdbcConnectionOptions(jdbcSinkConfig.getJdbcConnectionConfig());
        this.xaFacade = (XaFacade)this.connectionProvider;
        this.outputFormat = new JdbcOutputFormatBuilder(dialect, this.xaFacade, jdbcSinkConfig, tableSchema).build();
        this.xaGroupOps = new XaGroupOpsImpl(this.xaFacade);
    }

    private void tryOpen() {
        if (!this.isOpen) {
            this.isOpen = true;
            try {
                this.xidGenerator.open();
                this.xaFacade.open();
                this.outputFormat.open();
                if (!this.recoverStates.isEmpty()) {
                    Xid xid = this.recoverStates.get(0).getXid();
                    this.xaGroupOps.recoverAndRollback(this.context, this.sinkcontext, this.xidGenerator, xid);
                }
                this.beginTx();
            }
            catch (Exception e) {
                throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, "unable to open JDBC exactly one writer", e);
            }
        }
    }

    public List<JdbcSinkState> snapshotState(long checkpointId) {
        Preconditions.checkState((this.prepareXid != null ? 1 : 0) != 0, (Object)"prepare xid must not be null");
        return Collections.singletonList(new JdbcSinkState(this.prepareXid));
    }

    public void write(SeaTunnelRow element) {
        this.tryOpen();
        Preconditions.checkState((this.currentXid != null ? 1 : 0) != 0, (Object)"current xid must not be null");
        SeaTunnelRow copy = (SeaTunnelRow)SerializationUtils.clone((Serializable)element);
        this.outputFormat.writeRecord(copy);
    }

    public Optional<XidInfo> prepareCommit() throws IOException {
        this.tryOpen();
        boolean emptyXaTransaction = false;
        try {
            this.prepareCurrentTx();
        }
        catch (Exception e) {
            if (Throwables.getRootCause((Throwable)e) instanceof XaFacade.EmptyXaTransactionException) {
                emptyXaTransaction = true;
                LOG.info("skip prepare empty xa transaction, xid={}", (Object)this.currentXid);
            }
            throw e;
        }
        this.currentXid = null;
        this.beginTx();
        Preconditions.checkState((this.prepareXid != null ? 1 : 0) != 0, (Object)"prepare xid must not be null");
        return emptyXaTransaction ? Optional.empty() : Optional.of(new XidInfo(this.prepareXid, 0));
    }

    public void abortPrepare() {
    }

    public void close() throws IOException {
        if (this.currentXid != null && this.xaFacade.isOpen()) {
            try {
                LOG.debug("remove current transaction before closing, xid={}", (Object)this.currentXid);
                this.xaFacade.failAndRollback(this.currentXid);
            }
            catch (Exception e) {
                LOG.warn("unable to fail/rollback current transaction, xid={}", (Object)this.currentXid, (Object)e);
            }
        }
        try {
            this.xaFacade.close();
        }
        catch (Exception e) {
            throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, "unable to close JDBC exactly one writer", e);
        }
        finally {
            this.outputFormat.close();
            this.xidGenerator.close();
            this.currentXid = null;
            this.prepareXid = null;
        }
    }

    private void beginTx() throws IOException {
        Preconditions.checkState((this.currentXid == null ? 1 : 0) != 0, (Object)"currentXid not null");
        this.currentXid = this.xidGenerator.generateXid(this.context, this.sinkcontext, System.currentTimeMillis());
        try {
            this.xaFacade.start(this.currentXid);
        }
        catch (Exception e) {
            throw new JdbcConnectorException(JdbcConnectorErrorCode.XA_OPERATION_FAILED, "unable to start xa transaction", e);
        }
    }

    private void prepareCurrentTx() throws IOException {
        Preconditions.checkState((this.currentXid != null ? 1 : 0) != 0, (Object)"no current xid");
        this.outputFormat.flush();
        Exception endAndPrepareException = null;
        try {
            this.xaFacade.endAndPrepare(this.currentXid);
        }
        catch (Exception e) {
            endAndPrepareException = e;
            throw new JdbcConnectorException(JdbcConnectorErrorCode.XA_OPERATION_FAILED, "unable to prepare current xa transaction", e);
        }
        finally {
            if (endAndPrepareException == null || Throwables.getRootCause((Throwable)endAndPrepareException) instanceof XaFacade.EmptyXaTransactionException) {
                this.prepareXid = this.currentXid;
            }
        }
    }
}

