/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.starrocks.client.source;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.HttpHelper;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPlan;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSourceTableConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StarRocksQueryPlanReadClient {
    private static final Logger log = LoggerFactory.getLogger(StarRocksQueryPlanReadClient.class);
    private RetryUtils.RetryMaterial retryMaterial;
    private SourceConfig sourceConfig;
    private final HttpHelper httpHelper = new HttpHelper();
    private final Map<String, StarRocksSourceTableConfig> tables;
    private static final long DEFAULT_SLEEP_TIME_MS = 1000L;

    public StarRocksQueryPlanReadClient(SourceConfig sourceConfig) {
        this.sourceConfig = sourceConfig;
        this.retryMaterial = new RetryUtils.RetryMaterial(sourceConfig.getMaxRetries(), true, exception -> true, 1000L);
        this.tables = sourceConfig.getTableConfigList().stream().collect(Collectors.toMap(StarRocksSourceTableConfig::getTable, Function.identity()));
    }

    public List<QueryPartition> findPartitions(String table) {
        QueryPlan queryPlan = this.getQueryPlan(this.genQuerySql(table), table);
        Map<String, List<Long>> be2Tablets = this.selectBeForTablet(queryPlan);
        return this.tabletsMapToPartition(be2Tablets, queryPlan.getQueryPlan(), this.sourceConfig.getDatabase(), table);
    }

    private List<QueryPartition> tabletsMapToPartition(Map<String, List<Long>> be2Tablets, String opaquedQueryPlan, String database, String table) throws IllegalArgumentException {
        int tabletsSize = this.sourceConfig.getRequestTabletSize();
        ArrayList<QueryPartition> partitions = new ArrayList<QueryPartition>();
        for (Map.Entry<String, List<Long>> beInfo : be2Tablets.entrySet()) {
            log.debug("Generate partition with beInfo: '{}'.", beInfo);
            HashSet tabletSet = new HashSet(beInfo.getValue());
            beInfo.getValue().clear();
            beInfo.getValue().addAll(tabletSet);
            for (int first = 0; first < beInfo.getValue().size(); first += tabletsSize) {
                HashSet<Long> partitionTablets = new HashSet<Long>(beInfo.getValue().subList(first, Math.min(beInfo.getValue().size(), first + tabletsSize)));
                QueryPartition partitionDefinition = new QueryPartition(database, table, beInfo.getKey(), partitionTablets, opaquedQueryPlan);
                log.debug("Generate one PartitionDefinition '{}'.", (Object)partitionDefinition);
                partitions.add(partitionDefinition);
            }
        }
        return partitions;
    }

    private Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan) {
        HashMap<String, List<Long>> beXTablets = new HashMap<String, List<Long>>();
        queryPlan.getPartitions().forEach((tabletId, routingList) -> {
            int tabletCount = Integer.MAX_VALUE;
            String candidateBe = "";
            for (String beNode : routingList.getRoutings()) {
                if (!beXTablets.containsKey(beNode)) {
                    beXTablets.put(beNode, new ArrayList());
                    candidateBe = beNode;
                    break;
                }
                if (((List)beXTablets.get(beNode)).size() >= tabletCount) continue;
                candidateBe = beNode;
                tabletCount = ((List)beXTablets.get(beNode)).size();
            }
            ((List)beXTablets.get(candidateBe)).add(Long.valueOf(tabletId));
        });
        return beXTablets;
    }

    private QueryPlan getQueryPlan(String querySQL, String table) {
        List<String> nodeUrls = this.sourceConfig.getNodeUrls();
        Collections.shuffle(nodeUrls);
        HashMap<String, String> bodyMap = new HashMap<String, String>();
        bodyMap.put("sql", querySQL);
        String body = JsonUtils.toJsonString(bodyMap);
        String respString = "";
        for (String feNode : nodeUrls) {
            String url = "http://" + feNode + "/api/" + this.sourceConfig.getDatabase() + "/" + table + "/_query_plan";
            try {
                respString = (String)RetryUtils.retryWithException(() -> this.httpHelper.doHttpPost(url, this.getQueryPlanHttpHeader(), body), (RetryUtils.RetryMaterial)this.retryMaterial);
                if (!StringUtils.isNoneEmpty(respString)) continue;
                return (QueryPlan)JsonUtils.parseObject((String)respString, QueryPlan.class);
            }
            catch (Exception e) {
                log.error("Request query Plan From {} failed: {}", (Object)feNode, (Object)e.getMessage());
            }
        }
        throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.QUEST_QUERY_PLAN_FAILED, "query failed with empty response");
    }

    private String getBasicAuthHeader(String username, String password) {
        String auth = username + ":" + password;
        byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encodedAuth);
    }

    private Map<String, String> getQueryPlanHttpHeader() {
        HashMap<String, String> headerMap = new HashMap<String, String>();
        headerMap.put("Content-Type", "application/json;charset=UTF-8");
        headerMap.put("Authorization", this.getBasicAuthHeader(this.sourceConfig.getUsername(), this.sourceConfig.getPassword()));
        return headerMap;
    }

    private String genQuerySql(String table) {
        StarRocksSourceTableConfig starRocksSourceTableConfig = this.tables.get(table);
        SeaTunnelRowType seaTunnelRowType = starRocksSourceTableConfig.getCatalogTable().getSeaTunnelRowType();
        String columns = seaTunnelRowType.getFieldNames().length != 0 ? String.join((CharSequence)",", seaTunnelRowType.getFieldNames()) : "*";
        String scanFilter = starRocksSourceTableConfig.getScanFilter();
        String filter = scanFilter.isEmpty() ? "" : " where " + scanFilter;
        String sql = "select " + columns + " from `" + this.sourceConfig.getDatabase() + "`.`" + table + "`" + filter;
        log.debug("Generate query sql '{}'.", (Object)sql);
        return sql;
    }
}

