/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.starrocks.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.StarRocksBeReadClient;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.apache.seatunnel.connectors.seatunnel.starrocks.source.StarRocksSourceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StarRocksSourceReader
implements SourceReader<SeaTunnelRow, StarRocksSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(StarRocksSourceReader.class);
    private final Queue<StarRocksSourceSplit> pendingSplits = new LinkedList<StarRocksSourceSplit>();
    private final SourceReader.Context context;
    private final SourceConfig sourceConfig;
    private Map<String, StarRocksBeReadClient> clientsPools;
    private volatile boolean noMoreSplitsAssignment;
    private final Map<String, SeaTunnelRowType> tables;

    public StarRocksSourceReader(SourceReader.Context readerContext, SourceConfig sourceConfig) {
        this.context = readerContext;
        this.sourceConfig = sourceConfig;
        HashMap<String, SeaTunnelRowType> tables = new HashMap<String, SeaTunnelRowType>();
        sourceConfig.getTableConfigList().forEach(starRocksSourceTableConfig -> tables.put(starRocksSourceTableConfig.getTable(), starRocksSourceTableConfig.getCatalogTable().getSeaTunnelRowType()));
        this.tables = tables;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        while (!this.pendingSplits.isEmpty()) {
            Object object = output.getCheckpointLock();
            synchronized (object) {
                StarRocksSourceSplit split = this.pendingSplits.poll();
                this.read(split, output);
            }
        }
        if (Boundedness.BOUNDED.equals((Object)this.context.getBoundedness()) && this.noMoreSplitsAssignment && this.pendingSplits.isEmpty()) {
            log.info("Closed the bounded StarRocks source");
            this.context.signalNoMoreElement();
        }
    }

    public List<StarRocksSourceSplit> snapshotState(long checkpointId) {
        return new ArrayList<StarRocksSourceSplit>(this.pendingSplits);
    }

    public void addSplits(List<StarRocksSourceSplit> splits) {
        this.pendingSplits.addAll(splits);
    }

    public void handleNoMoreSplits() {
        log.info("Reader received NoMoreSplits event.");
        this.noMoreSplitsAssignment = true;
    }

    private void read(StarRocksSourceSplit split, Collector<SeaTunnelRow> output) {
        QueryPartition partition = split.getPartition();
        String table = partition.getTable();
        String beAddress = partition.getBeAddress();
        StarRocksBeReadClient client = null;
        if (this.clientsPools.containsKey(beAddress)) {
            client = this.clientsPools.get(beAddress);
        } else {
            client = new StarRocksBeReadClient(beAddress, this.sourceConfig);
            this.clientsPools.put(beAddress, client);
        }
        SeaTunnelRowType seaTunnelRowType = this.tables.get(partition.getTable());
        client.openScanner(partition, seaTunnelRowType);
        while (client.hasNext()) {
            SeaTunnelRow seaTunnelRow = client.getNext();
            seaTunnelRow.setTableId(TablePath.of((String)table).toString());
            output.collect((Object)seaTunnelRow);
        }
    }

    public void open() throws Exception {
        this.clientsPools = new HashMap<String, StarRocksBeReadClient>();
    }

    public void close() throws IOException {
        if (!this.clientsPools.isEmpty()) {
            this.clientsPools.values().forEach(client -> {
                if (client != null) {
                    try {
                        client.close();
                    }
                    catch (StarRocksConnectorException e) {
                        log.error("Failed to close reader: ", (Throwable)((Object)e));
                    }
                }
            });
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }
}

