/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.influxdb.source;

import java.net.ConnectException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
import org.apache.seatunnel.connectors.seatunnel.influxdb.converter.InfluxDBRowConverter;
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplit;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InfluxdbSourceReader
implements SourceReader<SeaTunnelRow, InfluxDBSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(InfluxdbSourceReader.class);
    private InfluxDB influxdb;
    InfluxDBConfig config;
    private final SourceReader.Context context;
    private final SeaTunnelRowType seaTunnelRowType;
    List<Integer> columnsIndexList;
    private final Queue<InfluxDBSourceSplit> pendingSplits;
    private volatile boolean noMoreSplitsAssignment;

    InfluxdbSourceReader(InfluxDBConfig config, SourceReader.Context readerContext, SeaTunnelRowType seaTunnelRowType, List<Integer> columnsIndexList) {
        this.config = config;
        this.pendingSplits = new LinkedList<InfluxDBSourceSplit>();
        this.context = readerContext;
        this.seaTunnelRowType = seaTunnelRowType;
        this.columnsIndexList = columnsIndexList;
    }

    public void connect() throws ConnectException {
        if (this.influxdb == null) {
            this.influxdb = InfluxDBClient.getInfluxDB(this.config);
            String version = this.influxdb.version();
            if (!this.influxdb.ping().isGood()) {
                throw new InfluxdbConnectorException((SeaTunnelErrorCode)InfluxdbConnectorErrorCode.CONNECT_FAILED, String.format("connect influxdb failed, due to influxdb version info is unknown, the url is: {%s}", this.config.getUrl()));
            }
            log.info("connect influxdb successful. sever version :{}.", (Object)version);
        }
    }

    public void open() throws Exception {
        this.connect();
    }

    public void close() {
        if (this.influxdb != null) {
            this.influxdb.close();
            this.influxdb = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pollNext(Collector<SeaTunnelRow> output) {
        while (!this.pendingSplits.isEmpty()) {
            Object object = output.getCheckpointLock();
            synchronized (object) {
                InfluxDBSourceSplit split = this.pendingSplits.poll();
                this.read(split, output);
            }
        }
        if (Boundedness.BOUNDED.equals((Object)this.context.getBoundedness()) && this.noMoreSplitsAssignment && this.pendingSplits.isEmpty()) {
            log.info("Closed the bounded influxDB source");
            this.context.signalNoMoreElement();
        }
    }

    public List<InfluxDBSourceSplit> snapshotState(long checkpointId) {
        return new ArrayList<InfluxDBSourceSplit>(this.pendingSplits);
    }

    public void addSplits(List<InfluxDBSourceSplit> splits) {
        this.pendingSplits.addAll(splits);
    }

    public void handleNoMoreSplits() {
        log.info("Reader received NoMoreSplits event.");
        this.noMoreSplitsAssignment = true;
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    private void read(InfluxDBSourceSplit split, Collector<SeaTunnelRow> output) {
        QueryResult queryResult = this.influxdb.query(new Query(split.getQuery(), this.config.getDatabase()));
        for (QueryResult.Result result : queryResult.getResults()) {
            List<QueryResult.Series> serieList = result.getSeries();
            if (CollectionUtils.isNotEmpty(serieList)) {
                for (QueryResult.Series series : serieList) {
                    for (List<Object> values2 : series.getValues()) {
                        SeaTunnelRow row = InfluxDBRowConverter.convert(values2, this.seaTunnelRowType, this.columnsIndexList);
                        output.collect((Object)row);
                    }
                }
                continue;
            }
            log.debug("split[{}] reader influxDB series is empty.", (Object)split.splitId());
        }
    }
}

