/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.tdengine.source;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.tdengine.source.StableMetadata;
import org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;

public class TDengineSourceSplitEnumerator
implements SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> {
    private final SourceSplitEnumerator.Context<TDengineSourceSplit> context;
    private final TDengineSourceConfig config;
    private final StableMetadata stableMetadata;
    private Set<TDengineSourceSplit> pendingSplit = new HashSet<TDengineSourceSplit>();
    private Set<TDengineSourceSplit> assignedSplit = new HashSet<TDengineSourceSplit>();

    public TDengineSourceSplitEnumerator(StableMetadata stableMetadata, TDengineSourceConfig config, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
        this(stableMetadata, config, null, context);
    }

    public TDengineSourceSplitEnumerator(StableMetadata stableMetadata, TDengineSourceConfig config, TDengineSourceState sourceState, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
        this.config = config;
        this.context = context;
        this.stableMetadata = stableMetadata;
        if (sourceState != null) {
            this.assignedSplit = sourceState.getAssignedSplit();
        }
    }

    private static int getSplitOwner(String tp, int numReaders) {
        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
    }

    public void open() {
    }

    public void run() {
        this.pendingSplit = this.getAllSplits();
        this.assignSplit(this.context.registeredReaders());
    }

    private Set<TDengineSourceSplit> getAllSplits() {
        String timestampFieldName = this.stableMetadata.getTimestampFieldName();
        HashSet<TDengineSourceSplit> splits = new HashSet<TDengineSourceSplit>();
        for (String subTableName : this.stableMetadata.getSubTableNames()) {
            TDengineSourceSplit splitBySubTable = this.createSplitBySubTable(subTableName, timestampFieldName);
            splits.add(splitBySubTable);
        }
        return splits;
    }

    private TDengineSourceSplit createSplitBySubTable(String subTableName, String timestampFieldName) {
        String selectFields = Arrays.stream(this.stableMetadata.getRowType().getFieldNames()).skip(1L).collect(Collectors.joining(","));
        String subTableSQL = "select " + selectFields + " from " + this.config.getDatabase() + "." + subTableName;
        String start = this.config.getLowerBound();
        String end = this.config.getUpperBound();
        if (start != null || end != null) {
            String startCondition = null;
            String endCondition = null;
            if (start != null) {
                startCondition = timestampFieldName + " >= '" + start + "'";
            }
            if (end != null) {
                endCondition = timestampFieldName + " < '" + end + "'";
            }
            String query = String.join((CharSequence)" and ", startCondition, endCondition);
            subTableSQL = subTableSQL + " where " + query;
        }
        return new TDengineSourceSplit(subTableName, subTableSQL);
    }

    public void addSplitsBack(List<TDengineSourceSplit> splits, int subtaskId) {
        if (!splits.isEmpty()) {
            this.pendingSplit.addAll(splits);
            this.assignSplit(Collections.singletonList(subtaskId));
        }
    }

    public int currentUnassignedSplitSize() {
        return this.pendingSplit.size();
    }

    public void registerReader(int subtaskId) {
        if (!this.pendingSplit.isEmpty()) {
            this.assignSplit(Collections.singletonList(subtaskId));
        }
    }

    private void assignSplit(Collection<Integer> taskIDList) {
        this.assignedSplit = this.pendingSplit.stream().map(split -> {
            int splitOwner = TDengineSourceSplitEnumerator.getSplitOwner(split.splitId(), this.context.currentParallelism());
            if (taskIDList.contains(splitOwner)) {
                this.context.assignSplit(splitOwner, (SourceSplit)split);
                return split;
            }
            return null;
        }).filter(Objects::nonNull).collect(Collectors.toSet());
        this.pendingSplit.clear();
    }

    public TDengineSourceState snapshotState(long checkpointId) {
        return new TDengineSourceState(this.assignedSplit);
    }

    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        super.handleSourceEvent(subtaskId, sourceEvent);
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        super.notifyCheckpointAborted(checkpointId);
    }

    public void close() {
    }

    public void handleSplitRequest(int subtaskId) {
    }
}

