/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.selectdb.sink.writer;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
import org.apache.seatunnel.connectors.selectdb.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.selectdb.serialize.SelectDBSerializer;
import org.apache.seatunnel.connectors.selectdb.sink.committer.SelectDBCommitInfo;
import org.apache.seatunnel.connectors.selectdb.sink.writer.CopySQLBuilder;
import org.apache.seatunnel.connectors.selectdb.sink.writer.LabelGenerator;
import org.apache.seatunnel.connectors.selectdb.sink.writer.SelectDBSinkState;
import org.apache.seatunnel.connectors.selectdb.sink.writer.SelectDBStageLoad;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SelectDBSinkWriter
implements SinkWriter<SeaTunnelRow, SelectDBCommitInfo, SelectDBSinkState> {
    private static final Logger log = LoggerFactory.getLogger(SelectDBSinkWriter.class);
    private final SelectDBConfig selectdbConfig;
    private final long lastCheckpointId;
    private SelectDBStageLoad selectDBStageLoad;
    volatile boolean loading;
    private final String labelPrefix;
    private final byte[] lineDelimiter;
    private final LabelGenerator labelGenerator;
    private final SelectDBSinkState selectdbSinkState;
    private final SelectDBSerializer serializer;

    public SelectDBSinkWriter(SinkWriter.Context context, List<SelectDBSinkState> state, SeaTunnelRowType seaTunnelRowType, Config pluginConfig, String jobId) {
        this.selectdbConfig = SelectDBConfig.loadConfig(pluginConfig);
        this.lastCheckpointId = state.size() != 0 ? state.get(0).getCheckpointId() : 0L;
        log.info("restore checkpointId {}", (Object)this.lastCheckpointId);
        log.info("labelPrefix " + this.selectdbConfig.getLabelPrefix());
        this.selectdbSinkState = new SelectDBSinkState(this.selectdbConfig.getLabelPrefix(), this.lastCheckpointId);
        this.labelPrefix = this.selectdbConfig.getLabelPrefix() + "_" + jobId + "_" + context.getIndexOfSubtask();
        this.lineDelimiter = this.selectdbConfig.getStageLoadProps().getProperty("file.line_delimiter", "\n").getBytes();
        this.labelGenerator = new LabelGenerator(this.labelPrefix);
        this.serializer = SelectDBSinkWriter.createSerializer(this.selectdbConfig, seaTunnelRowType);
        this.loading = false;
    }

    public void initializeLoad(List<SelectDBSinkState> state) throws IOException {
        this.selectDBStageLoad = new SelectDBStageLoad(this.selectdbConfig, this.labelGenerator);
        this.selectDBStageLoad.setCurrentCheckpointID(this.lastCheckpointId + 1L);
        this.serializer.open();
    }

    public synchronized void write(SeaTunnelRow element) throws IOException {
        byte[] serialize = this.serializer.serialize(element);
        if (Objects.isNull(serialize)) {
            return;
        }
        try {
            this.selectDBStageLoad.writeRecord(serialize);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized Optional<SelectDBCommitInfo> prepareCommit() throws IOException {
        Preconditions.checkState((this.selectDBStageLoad != null ? 1 : 0) != 0);
        log.info("checkpoint arrived, upload buffer to storage");
        try {
            this.selectDBStageLoad.flush(true);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        CopySQLBuilder copySQLBuilder = new CopySQLBuilder(this.selectdbConfig, this.selectDBStageLoad.getFileList());
        String copySql = copySQLBuilder.buildCopySQL();
        return Optional.of(new SelectDBCommitInfo(this.selectDBStageLoad.getHostPort(), this.selectdbConfig.getClusterName(), copySql));
    }

    public synchronized List<SelectDBSinkState> snapshotState(long checkpointId) throws IOException {
        Preconditions.checkState((this.selectDBStageLoad != null ? 1 : 0) != 0);
        log.info("clear the file list {}", this.selectDBStageLoad.getFileList());
        this.selectDBStageLoad.clearFileList();
        this.selectDBStageLoad.setCurrentCheckpointID(checkpointId + 1L);
        return Collections.singletonList(this.selectdbSinkState);
    }

    public void abortPrepare() {
    }

    public void close() throws IOException {
        if (this.selectDBStageLoad != null) {
            this.selectDBStageLoad.close();
        }
        this.serializer.close();
    }

    public static SelectDBSerializer createSerializer(SelectDBConfig selectdbConfig, SeaTunnelRowType seaTunnelRowType) {
        return new SeaTunnelRowSerializer(selectdbConfig.getStageLoadProps().getProperty("file.type").toLowerCase(), seaTunnelRowType, selectdbConfig.getStageLoadProps().getProperty("file.column_separator"), selectdbConfig.getEnableDelete());
    }
}

