/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.selectdb.sink.committer;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
import org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorErrorCode;
import org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorException;
import org.apache.seatunnel.connectors.selectdb.rest.BaseResponse;
import org.apache.seatunnel.connectors.selectdb.rest.CopyIntoResp;
import org.apache.seatunnel.connectors.selectdb.sink.committer.SelectDBCommitInfo;
import org.apache.seatunnel.connectors.selectdb.util.HttpPostBuilder;
import org.apache.seatunnel.connectors.selectdb.util.HttpUtil;
import org.apache.seatunnel.connectors.selectdb.util.ResponseUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SelectDBCommitter
implements SinkCommitter<SelectDBCommitInfo> {
    private static final Logger log = LoggerFactory.getLogger(SelectDBCommitter.class);
    private static final String COMMIT_PATTERN = "http://%s/copy/query";
    private static final int HTTP_TEMPORARY_REDIRECT = 200;
    private final ObjectMapper objectMapper;
    private final CloseableHttpClient httpClient;
    private final SelectDBConfig selectdbConfig;
    int maxRetry;

    public SelectDBCommitter(Config pluginConfig) {
        new HttpUtil();
        this(SelectDBConfig.loadConfig(pluginConfig), SelectDBConfig.loadConfig(pluginConfig).getMaxRetries(), HttpUtil.getHttpClient());
    }

    public SelectDBCommitter(SelectDBConfig selectdbConfig, int maxRetry, CloseableHttpClient client) {
        this.objectMapper = new ObjectMapper();
        this.selectdbConfig = selectdbConfig;
        this.maxRetry = maxRetry;
        this.httpClient = client;
    }

    public List<SelectDBCommitInfo> commit(List<SelectDBCommitInfo> commitInfos) throws IOException {
        for (SelectDBCommitInfo committable : commitInfos) {
            this.commitTransaction(committable);
        }
        return Collections.emptyList();
    }

    public void abort(List<SelectDBCommitInfo> commitInfos) throws IOException {
    }

    private void commitTransaction(SelectDBCommitInfo commitInfo) throws IOException {
        long start = System.currentTimeMillis();
        String hostPort = commitInfo.getHostPort();
        String clusterName = commitInfo.getClusterName();
        String copySQL = commitInfo.getCopySQL();
        log.info("commit to cluster {} with copy sql: {}", (Object)clusterName, (Object)copySQL);
        int statusCode = -1;
        String reasonPhrase = null;
        int retry = 0;
        HashMap<String, String> params = new HashMap<String, String>();
        params.put("cluster", clusterName);
        params.put("sql", copySQL);
        boolean success = false;
        String loadResult = "";
        while (retry++ <= this.maxRetry) {
            CloseableHttpResponse response;
            HttpPostBuilder postBuilder = new HttpPostBuilder();
            postBuilder.setUrl(String.format(COMMIT_PATTERN, hostPort)).baseAuth(this.selectdbConfig.getUsername(), this.selectdbConfig.getPassword()).setEntity(new StringEntity(this.objectMapper.writeValueAsString(params)));
            try {
                response = this.httpClient.execute(postBuilder.build());
            }
            catch (IOException e) {
                log.error("commit error : ", (Throwable)e);
                continue;
            }
            statusCode = response.getStatusLine().getStatusCode();
            reasonPhrase = response.getStatusLine().getReasonPhrase();
            if (statusCode != 200) {
                log.warn("commit failed with status {} {}, reason {}", new Object[]{statusCode, hostPort, reasonPhrase});
                continue;
            }
            if (response.getEntity() == null) continue;
            loadResult = EntityUtils.toString(response.getEntity());
            success = this.handleCommitResponse(loadResult);
            if (success) {
                log.info("commit success cost {}ms, response is {}", (Object)(System.currentTimeMillis() - start), (Object)loadResult);
                break;
            }
            log.warn("commit failed, retry again");
        }
        if (!success) {
            throw new SelectDBConnectorException((SeaTunnelErrorCode)SelectDBConnectorErrorCode.COMMIT_FAILED, "commit failed with SQL: " + commitInfo.getCopySQL() + " Commit error with status: " + statusCode + ", Reason: " + reasonPhrase + ", Response: " + loadResult);
        }
    }

    public boolean handleCommitResponse(String loadResult) throws IOException {
        BaseResponse baseResponse = (BaseResponse)this.objectMapper.readValue(loadResult, (TypeReference)new TypeReference<BaseResponse<CopyIntoResp>>(){});
        if (baseResponse.getCode() == 0) {
            CopyIntoResp dataResp = (CopyIntoResp)baseResponse.getData();
            if ("1".equals(dataResp.getDataCode())) {
                log.error("copy into execute failed, reason:{}", (Object)loadResult);
                return false;
            }
            Map<String, String> result = dataResp.getResult();
            if (!result.get("state").equals("FINISHED") && !ResponseUtil.isCommitted(result.get("msg"))) {
                log.error("copy into load failed, reason:{}", (Object)loadResult);
                return false;
            }
            return true;
        }
        log.error("commit failed, reason:{}", (Object)loadResult);
        return false;
    }
}

