/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server.task;

import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
import org.apache.seatunnel.engine.server.execution.ProgressState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.TaskRuntimeException;
import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.record.Barrier;

public class SourceSeaTunnelTask<T, SplitT extends SourceSplit>
extends SeaTunnelTask {
    private static final ILogger LOGGER = Logger.getLogger(SourceSeaTunnelTask.class);
    private transient SeaTunnelSourceCollector<T> collector;
    private transient Object checkpointLock;
    private transient Serializer<SplitT> splitSerializer;
    private final Map<String, Object> envOption;
    private final PhysicalExecutionFlow<SourceAction, SourceConfig> sourceFlow;

    public SourceSeaTunnelTask(long jobID, TaskLocation taskID, int indexID, PhysicalExecutionFlow<SourceAction, SourceConfig> executionFlow, Map<String, Object> envOption) {
        super(jobID, taskID, indexID, executionFlow);
        this.sourceFlow = executionFlow;
        this.envOption = envOption;
    }

    @Override
    public void init() throws Exception {
        SeaTunnelDataType sourceProducedType;
        super.init();
        this.checkpointLock = new Object();
        this.splitSerializer = this.sourceFlow.getAction().getSource().getSplitSerializer();
        LOGGER.info("starting seatunnel source task, index " + this.indexID);
        if (!(this.startFlowLifeCycle instanceof SourceFlowLifeCycle)) {
            throw new TaskRuntimeException("SourceSeaTunnelTask only support SourceFlowLifeCycle, but get " + this.startFlowLifeCycle.getClass().getName());
        }
        ArrayList<TablePath> tablePaths = new ArrayList();
        try {
            List producedCatalogTables = this.sourceFlow.getAction().getSource().getProducedCatalogTables();
            sourceProducedType = CatalogTableUtil.convertToDataType((List)producedCatalogTables);
            tablePaths = producedCatalogTables.stream().map(CatalogTable::getTableId).map(TableIdentifier::toTablePath).collect(Collectors.toList());
        }
        catch (UnsupportedOperationException e) {
            sourceProducedType = this.sourceFlow.getAction().getSource().getProducedType();
        }
        this.collector = new SeaTunnelSourceCollector(this.checkpointLock, this.outputs, (MetricsContext)this.getMetricsContext(), FlowControlStrategy.fromMap(this.envOption), sourceProducedType, tablePaths);
        ((SourceFlowLifeCycle)this.startFlowLifeCycle).setCollector(this.collector);
    }

    @Override
    protected SourceFlowLifeCycle<?, ?> createSourceFlowLifeCycle(SourceAction<?, ?, ?> sourceAction, SourceConfig config, CompletableFuture<Void> completableFuture, MetricsContext metricsContext) {
        return new SourceFlowLifeCycle(sourceAction, this.indexID, config.getEnumeratorTask(), this, this.taskLocation, completableFuture, metricsContext);
    }

    @Override
    protected void collect() throws Exception {
        ((SourceFlowLifeCycle)this.startFlowLifeCycle).collect();
    }

    @Override
    @NonNull
    public ProgressState call() throws Exception {
        this.stateProcess();
        return this.progress.toState();
    }

    public void receivedSourceSplit(List<SplitT> splits) {
        ((SourceFlowLifeCycle)this.startFlowLifeCycle).receivedSplits(splits);
    }

    @Override
    public void triggerBarrier(Barrier barrier) throws Exception {
        SourceFlowLifeCycle sourceFlow = (SourceFlowLifeCycle)this.startFlowLifeCycle;
        sourceFlow.triggerBarrier(barrier);
    }

    public Serializer<SplitT> getSplitSerializer() {
        return this.splitSerializer;
    }
}

