/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.selectdb.sink.writer;

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
import org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorErrorCode;
import org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorException;
import org.apache.seatunnel.connectors.selectdb.rest.BaseResponse;
import org.apache.seatunnel.connectors.selectdb.rest.CopySQLUtil;
import org.apache.seatunnel.connectors.selectdb.sink.writer.CopySQLBuilder;
import org.apache.seatunnel.connectors.selectdb.sink.writer.LabelGenerator;
import org.apache.seatunnel.connectors.selectdb.sink.writer.RecordBuffer;
import org.apache.seatunnel.connectors.selectdb.util.HttpPutBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SelectDBStageLoad
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(SelectDBStageLoad.class);
    private final LabelGenerator labelGenerator;
    private final String lineDelimiter;
    private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload";
    private final SelectDBConfig selectdbConfig;
    private String uploadUrl;
    private String hostPort;
    private final String username;
    private final String password;
    private final Properties stageLoadProps;
    private List<String> fileList = new CopyOnWriteArrayList<String>();
    private RecordBuffer buffer;
    private long currentCheckpointID;
    private AtomicInteger fileNum;
    private ExecutorService loadExecutorService;
    private StageLoadAsyncExecutor loadAsyncExecutor;
    private ArrayBlockingQueue<RecordBuffer> queue;
    private final AtomicBoolean started;
    private AtomicReference<Throwable> exception = new AtomicReference<Object>(null);
    private HttpClientBuilder httpClientBuilder = HttpClients.custom().disableRedirectHandling();

    public SelectDBStageLoad(SelectDBConfig selectdbConfig, LabelGenerator labelGenerator) {
        this.selectdbConfig = selectdbConfig;
        this.hostPort = selectdbConfig.getLoadUrl();
        this.username = selectdbConfig.getUsername();
        this.password = selectdbConfig.getPassword();
        this.labelGenerator = labelGenerator;
        this.uploadUrl = String.format(UPLOAD_URL_PATTERN, this.hostPort);
        this.stageLoadProps = selectdbConfig.getStageLoadProps();
        this.lineDelimiter = this.stageLoadProps.getProperty("file.line_delimiter", "\n");
        this.fileNum = new AtomicInteger();
        this.buffer = new RecordBuffer(this.lineDelimiter);
        this.queue = new ArrayBlockingQueue(selectdbConfig.getFlushQueueSize());
        this.loadAsyncExecutor = new StageLoadAsyncExecutor();
        this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1), new DefaultThreadFactory("upload-executor"), new ThreadPoolExecutor.AbortPolicy());
        this.started = new AtomicBoolean(true);
        this.loadExecutorService.execute(this.loadAsyncExecutor);
    }

    public String getHostPort() {
        return this.hostPort;
    }

    public List<String> getFileList() {
        return this.fileList;
    }

    public void clearFileList() {
        this.fileNum.set(0);
        this.fileList.clear();
    }

    public void writeRecord(byte[] record) throws InterruptedException {
        this.buffer.insert(new String(record, StandardCharsets.UTF_8));
        if (this.buffer.getBufferSizeBytes() >= (long)this.selectdbConfig.getBufferSize().intValue() || this.selectdbConfig.getBufferCount() != 0 && this.buffer.getNumOfRecords() >= this.selectdbConfig.getBufferCount()) {
            this.flush(false);
        }
    }

    public void flush(boolean waitUtilDone) throws InterruptedException {
        this.checkFlushException();
        if (this.buffer == null) {
            return;
        }
        String fileName = this.labelGenerator.generateLabel(this.currentCheckpointID, this.fileNum.getAndIncrement());
        this.buffer.setFileName(fileName);
        RecordBuffer tmpBuff = this.buffer;
        log.info("flush buffer to queue, actual queue size {}", (Object)this.queue.size());
        this.offer(tmpBuff);
        if (waitUtilDone) {
            this.waitAsyncLoadFinish();
        }
        this.buffer = new RecordBuffer(this.lineDelimiter);
    }

    private void offer(RecordBuffer buffer) throws InterruptedException {
        this.checkFlushException();
        if (!this.queue.offer(buffer, 600000L, TimeUnit.MILLISECONDS)) {
            throw new SelectDBConnectorException((SeaTunnelErrorCode)SelectDBConnectorErrorCode.STAGE_LOAD_FAILED, "offer data to queue timeout, exceed ");
        }
    }

    private void checkFlushException() {
        if (this.exception.get() != null) {
            throw new SelectDBConnectorException((SeaTunnelErrorCode)SelectDBConnectorErrorCode.STAGE_LOAD_FAILED, this.exception.get());
        }
    }

    private void waitAsyncLoadFinish() throws InterruptedException {
        for (int i = 0; i < this.selectdbConfig.getFlushQueueSize() + 1; ++i) {
            this.offer(new RecordBuffer());
        }
    }

    public void close() {
        this.started.set(false);
        this.loadExecutorService.shutdown();
    }

    public void setCurrentCheckpointID(long currentCheckpointID) {
        this.currentCheckpointID = currentCheckpointID;
    }

    static class DefaultThreadFactory
    implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory(String name) {
            this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-" + name + "-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, this.namePrefix + this.threadNumber.getAndIncrement());
            t.setDaemon(false);
            return t;
        }
    }

    class StageLoadAsyncExecutor
    implements Runnable {
        StageLoadAsyncExecutor() {
        }

        @Override
        public void run() {
            log.info("StageLoadAsyncExecutor start");
            while (SelectDBStageLoad.this.started.get()) {
                try {
                    RecordBuffer buffer = (RecordBuffer)SelectDBStageLoad.this.queue.poll(2000L, TimeUnit.MILLISECONDS);
                    if (buffer == null || buffer.getFileName() == null) continue;
                    this.uploadToStorage(buffer.getFileName(), buffer);
                    SelectDBStageLoad.this.fileList.add(buffer.getFileName());
                    if (SelectDBStageLoad.this.selectdbConfig.isEnable2PC()) continue;
                    CopySQLBuilder copySQLBuilder = new CopySQLBuilder(SelectDBStageLoad.this.selectdbConfig, SelectDBStageLoad.this.fileList);
                    String copySql = copySQLBuilder.buildCopySQL();
                    CopySQLUtil.copyFileToDatabase(SelectDBStageLoad.this.selectdbConfig, SelectDBStageLoad.this.selectdbConfig.getClusterName(), copySql, SelectDBStageLoad.this.hostPort);
                    log.info("clear the file list {}", (Object)SelectDBStageLoad.this.fileList);
                    SelectDBStageLoad.this.clearFileList();
                }
                catch (Exception e) {
                    log.error("worker running error", (Throwable)e);
                    SelectDBStageLoad.this.exception.set(e);
                    break;
                }
            }
            log.info("StageLoadAsyncExecutor stop");
        }

        public void uploadToStorage(String fileName, RecordBuffer buffer) {
            long start = System.currentTimeMillis();
            log.info("file write started for {}", (Object)fileName);
            String address = this.getUploadAddress(fileName);
            log.info("redirect to internalStage address:{}", (Object)address);
            this.uploadToInternalStage(address, buffer.getData().getBytes(StandardCharsets.UTF_8));
            log.info("upload file {} finished, record {} size {}, cost {}ms ", new Object[]{fileName, buffer.getNumOfRecords(), buffer.getBufferSizeBytes(), System.currentTimeMillis() - start});
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public BaseResponse uploadToInternalStage(String address, byte[] data) throws SelectDBConnectorException {
            ByteArrayEntity entity = new ByteArrayEntity(data);
            HttpPutBuilder putBuilder = new HttpPutBuilder();
            putBuilder.setUrl(address).addCommonHeader().setEntity(entity);
            HttpPut httpPut = putBuilder.build();
            try (CloseableHttpResponse response = SelectDBStageLoad.this.httpClientBuilder.build().execute(httpPut);){
                int statusCode = response.getStatusLine().getStatusCode();
                if (statusCode != 200) throw new SelectDBConnectorException((SeaTunnelErrorCode)SelectDBConnectorErrorCode.STAGE_LOAD_FAILED, "upload file error: " + response.getStatusLine().toString());
                if (response.getEntity() == null) throw new SelectDBConnectorException((SeaTunnelErrorCode)SelectDBConnectorErrorCode.STAGE_LOAD_FAILED, "upload file error: " + response.getStatusLine().toString());
                String loadResult = EntityUtils.toString(response.getEntity());
                if (loadResult != null) {
                    if (!loadResult.isEmpty()) throw new SelectDBConnectorException((SeaTunnelErrorCode)SelectDBConnectorErrorCode.STAGE_LOAD_FAILED, "upload file failed: " + response.getStatusLine().toString());
                }
                BaseResponse baseResponse = null;
                return baseResponse;
            }
            catch (IOException ex) {
                throw new SelectDBConnectorException((SeaTunnelErrorCode)SelectDBConnectorErrorCode.STAGE_LOAD_FAILED, "Failed to upload data to internal stage", ex);
            }
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public String getUploadAddress(String fileName) throws SelectDBConnectorException {
            HttpPutBuilder putBuilder = new HttpPutBuilder();
            putBuilder.setUrl(SelectDBStageLoad.this.uploadUrl).addFileName(fileName).addCommonHeader().setEmptyEntity().baseAuth(SelectDBStageLoad.this.username, SelectDBStageLoad.this.password);
            try (CloseableHttpResponse execute = SelectDBStageLoad.this.httpClientBuilder.build().execute(putBuilder.build());){
                int statusCode = execute.getStatusLine().getStatusCode();
                String reason = execute.getStatusLine().getReasonPhrase();
                if (statusCode == 307) {
                    String uploadAddress;
                    Header location = execute.getFirstHeader("location");
                    String string = uploadAddress = location.getValue();
                    return string;
                }
                HttpEntity entity = execute.getEntity();
                String result = entity == null ? null : EntityUtils.toString(entity);
                String errMsg = String.format("Failed to get internalStage address, status {}, reason {}, response {}", statusCode, reason, result);
                throw new SelectDBConnectorException((SeaTunnelErrorCode)SelectDBConnectorErrorCode.STAGE_LOAD_FAILED, errMsg);
            }
            catch (IOException e) {
                throw new SelectDBConnectorException((SeaTunnelErrorCode)SelectDBConnectorErrorCode.STAGE_LOAD_FAILED, "get internalStage address error", e);
            }
        }
    }
}

