/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.iotdb.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.session.util.Version;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.iotdb.serialize.DefaultSeaTunnelRowDeserializer;
import org.apache.seatunnel.connectors.seatunnel.iotdb.serialize.SeaTunnelRowDeserializer;
import org.apache.seatunnel.connectors.seatunnel.iotdb.source.IoTDBSourceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IoTDBSourceReader
implements SourceReader<SeaTunnelRow, IoTDBSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(IoTDBSourceReader.class);
    private final Map<String, Object> conf;
    private final Queue<IoTDBSourceSplit> pendingSplits;
    private final SourceReader.Context context;
    private final SeaTunnelRowDeserializer deserializer;
    private Session session;
    private volatile boolean noMoreSplitsAssignment;

    public IoTDBSourceReader(Map<String, Object> conf, SourceReader.Context readerContext, SeaTunnelRowType rowType) {
        this.conf = conf;
        this.pendingSplits = new LinkedList<IoTDBSourceSplit>();
        this.context = readerContext;
        this.deserializer = new DefaultSeaTunnelRowDeserializer(rowType);
    }

    public void open() throws IoTDBConnectionException {
        this.session = this.buildSession(this.conf);
        this.session.open();
    }

    public void close() throws IOException {
        try {
            if (this.session != null) {
                this.session.close();
            }
        }
        catch (IoTDBConnectionException e) {
            throw new IotdbConnectorException(IotdbConnectorErrorCode.CLOSE_SESSION_FAILED, "Close IoTDB session failed", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        while (!this.pendingSplits.isEmpty()) {
            Object object = output.getCheckpointLock();
            synchronized (object) {
                IoTDBSourceSplit split = this.pendingSplits.poll();
                this.read(split, output);
            }
        }
        if (Boundedness.BOUNDED.equals((Object)this.context.getBoundedness()) && this.noMoreSplitsAssignment && this.pendingSplits.isEmpty()) {
            log.info("Closed the bounded iotdb source");
            this.context.signalNoMoreElement();
        }
    }

    private void read(IoTDBSourceSplit split, Collector<SeaTunnelRow> output) throws Exception {
        try (SessionDataSet dataSet = this.session.executeQueryStatement(split.getQuery());){
            while (dataSet.hasNext()) {
                RowRecord rowRecord = dataSet.next();
                SeaTunnelRow seaTunnelRow = this.deserializer.deserialize(rowRecord);
                output.collect((Object)seaTunnelRow);
            }
        }
    }

    private Session buildSession(Map<String, Object> conf) {
        Session.Builder sessionBuilder = new Session.Builder();
        String nodeUrlsString = (String)conf.get(SourceConfig.NODE_URLS.key());
        List<String> nodes = Stream.of(nodeUrlsString.split(",")).collect(Collectors.toList());
        sessionBuilder.nodeUrls(nodes);
        if (null != conf.get(SourceConfig.FETCH_SIZE.key())) {
            sessionBuilder.fetchSize(Integer.parseInt(conf.get(SourceConfig.FETCH_SIZE.key()).toString()));
        }
        if (null != conf.get(SourceConfig.USERNAME.key())) {
            sessionBuilder.username((String)conf.get(SourceConfig.USERNAME.key()));
        }
        if (null != conf.get(SourceConfig.PASSWORD.key())) {
            sessionBuilder.password((String)conf.get(SourceConfig.PASSWORD.key()));
        }
        if (null != conf.get(SourceConfig.THRIFT_DEFAULT_BUFFER_SIZE.key())) {
            sessionBuilder.thriftDefaultBufferSize(Integer.parseInt(conf.get(SourceConfig.THRIFT_DEFAULT_BUFFER_SIZE.key()).toString()));
        }
        if (null != conf.get(SourceConfig.THRIFT_MAX_FRAME_SIZE.key())) {
            sessionBuilder.thriftMaxFrameSize(Integer.parseInt(conf.get(SourceConfig.THRIFT_MAX_FRAME_SIZE.key()).toString()));
        }
        if (null != conf.get(SourceConfig.ENABLE_CACHE_LEADER.key())) {
            sessionBuilder.enableCacheLeader(Boolean.parseBoolean(conf.get(SourceConfig.ENABLE_CACHE_LEADER.key()).toString()));
        }
        if (null != conf.get(SourceConfig.VERSION.key())) {
            Version version = Version.valueOf(conf.get(SourceConfig.VERSION.key()).toString());
            sessionBuilder.version(version);
        }
        return sessionBuilder.build();
    }

    public List<IoTDBSourceSplit> snapshotState(long checkpointId) {
        return new ArrayList<IoTDBSourceSplit>(this.pendingSplits);
    }

    public void addSplits(List<IoTDBSourceSplit> splits) {
        this.pendingSplits.addAll(splits);
    }

    public void handleNoMoreSplits() {
        log.info("Reader received NoMoreSplits event.");
        this.noMoreSplitsAssignment = true;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }
}

