/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.LatencyMarkerEmitter;
import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.FunctionWithException;

@Internal
public class SourceOperator<OUT, SplitT extends SourceSplit>
extends AbstractStreamOperator<OUT>
implements OperatorEventHandler,
PushingAsyncDataInput<OUT> {
    private static final long serialVersionUID = 1405537676017904695L;
    static final ListStateDescriptor<byte[]> SPLITS_STATE_DESC = new ListStateDescriptor("SourceReaderState", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
    private final FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception> readerFactory;
    private final SimpleVersionedSerializer<SplitT> splitSerializer;
    private final OperatorEventGateway operatorEventGateway;
    private final WatermarkStrategy<OUT> watermarkStrategy;
    private final Configuration configuration;
    private final String localHostname;
    private final boolean emitProgressiveWatermarks;
    private SourceReader<OUT, SplitT> sourceReader;
    private ReaderOutput<OUT> currentMainOutput;
    private PushingAsyncDataInput.DataOutput<OUT> lastInvokedOutput;
    private ListState<SplitT> readerState;
    private TimestampsAndWatermarks<OUT> eventTimeLogic;
    private boolean closed;
    @Nullable
    private LatencyMarkerEmitter<OUT> latencyMarerEmitter;

    public SourceOperator(FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception> readerFactory, OperatorEventGateway operatorEventGateway, SimpleVersionedSerializer<SplitT> splitSerializer, WatermarkStrategy<OUT> watermarkStrategy, ProcessingTimeService timeService, Configuration configuration, String localHostname, boolean emitProgressiveWatermarks) {
        this.readerFactory = (FunctionWithException)Preconditions.checkNotNull(readerFactory);
        this.operatorEventGateway = (OperatorEventGateway)Preconditions.checkNotNull((Object)operatorEventGateway);
        this.splitSerializer = (SimpleVersionedSerializer)Preconditions.checkNotNull(splitSerializer);
        this.watermarkStrategy = (WatermarkStrategy)Preconditions.checkNotNull(watermarkStrategy);
        this.processingTimeService = timeService;
        this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration);
        this.localHostname = (String)Preconditions.checkNotNull((Object)localHostname);
        this.emitProgressiveWatermarks = emitProgressiveWatermarks;
    }

    public void initReader() throws Exception {
        if (this.sourceReader != null) {
            return;
        }
        final MetricGroup metricGroup = this.getMetricGroup();
        assert (metricGroup != null);
        final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
        SourceReaderContext context = new SourceReaderContext(){

            public MetricGroup metricGroup() {
                return metricGroup;
            }

            public Configuration getConfiguration() {
                return SourceOperator.this.configuration;
            }

            public String getLocalHostName() {
                return SourceOperator.this.localHostname;
            }

            public int getIndexOfSubtask() {
                return subtaskIndex;
            }

            public void sendSplitRequest() {
                SourceOperator.this.operatorEventGateway.sendEventToCoordinator((OperatorEvent)new RequestSplitEvent(this.getLocalHostName()));
            }

            public void sendSourceEventToCoordinator(SourceEvent event) {
                SourceOperator.this.operatorEventGateway.sendEventToCoordinator((OperatorEvent)new SourceEventWrapper(event));
            }

            public UserCodeClassLoader getUserCodeClassLoader() {
                return new UserCodeClassLoader(){

                    public ClassLoader asClassLoader() {
                        return SourceOperator.this.getRuntimeContext().getUserCodeClassLoader();
                    }

                    public void registerReleaseHookIfAbsent(String releaseHookName, Runnable releaseHook) {
                        SourceOperator.this.getRuntimeContext().registerUserCodeClassLoaderReleaseHookIfAbsent(releaseHookName, releaseHook);
                    }
                };
            }
        };
        this.sourceReader = (SourceReader)this.readerFactory.apply((Object)context);
    }

    @Override
    public void open() throws Exception {
        this.initReader();
        this.eventTimeLogic = this.emitProgressiveWatermarks ? TimestampsAndWatermarks.createProgressiveEventTimeLogic(this.watermarkStrategy, this.getMetricGroup(), this.getProcessingTimeService(), this.getExecutionConfig().getAutoWatermarkInterval()) : TimestampsAndWatermarks.createNoOpEventTimeLogic(this.watermarkStrategy, this.getMetricGroup());
        List splits = CollectionUtil.iterableToList((Iterable)((Iterable)this.readerState.get()));
        if (!splits.isEmpty()) {
            this.sourceReader.addSplits(splits);
        }
        this.registerReader();
        this.sourceReader.start();
        this.eventTimeLogic.startPeriodicWatermarkEmits();
    }

    @Override
    public void close() throws Exception {
        if (this.eventTimeLogic != null) {
            this.eventTimeLogic.stopPeriodicWatermarkEmits();
        }
        if (this.latencyMarerEmitter != null) {
            this.latencyMarerEmitter.close();
        }
        if (this.sourceReader != null) {
            this.sourceReader.close();
        }
        this.closed = true;
        super.close();
    }

    @Override
    public void dispose() throws Exception {
        if (!this.closed && this.sourceReader != null) {
            this.sourceReader.close();
        }
        if (this.latencyMarerEmitter != null) {
            this.latencyMarerEmitter.close();
        }
    }

    @Override
    public InputStatus emitNext(PushingAsyncDataInput.DataOutput<OUT> output) throws Exception {
        assert (this.lastInvokedOutput == output || this.lastInvokedOutput == null);
        if (this.currentMainOutput != null) {
            return this.sourceReader.pollNext(this.currentMainOutput);
        }
        this.currentMainOutput = this.eventTimeLogic.createMainOutput(output);
        this.initializeLatencyMarkerEmitter(output);
        this.lastInvokedOutput = output;
        return this.sourceReader.pollNext(this.currentMainOutput);
    }

    private void initializeLatencyMarkerEmitter(PushingAsyncDataInput.DataOutput<OUT> output) {
        long latencyTrackingInterval;
        long l = latencyTrackingInterval = this.getExecutionConfig().isLatencyTrackingConfigured() ? this.getExecutionConfig().getLatencyTrackingInterval() : this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration().getLong(MetricOptions.LATENCY_INTERVAL);
        if (latencyTrackingInterval > 0L) {
            this.latencyMarerEmitter = new LatencyMarkerEmitter(this.getProcessingTimeService(), output::emitLatencyMarker, latencyTrackingInterval, this.getOperatorID(), this.getRuntimeContext().getIndexOfThisSubtask());
        }
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        long checkpointId = context.getCheckpointId();
        LOG.debug("Taking a snapshot for checkpoint {}", (Object)checkpointId);
        this.readerState.update(this.sourceReader.snapshotState(checkpointId));
    }

    public CompletableFuture<?> getAvailableFuture() {
        return this.sourceReader.isAvailable();
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        ListState rawState = context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);
        this.readerState = new SimpleVersionedListState<SplitT>((ListState<byte[]>)rawState, this.splitSerializer);
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        this.sourceReader.notifyCheckpointComplete(checkpointId);
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        super.notifyCheckpointAborted(checkpointId);
        this.sourceReader.notifyCheckpointAborted(checkpointId);
    }

    public void handleOperatorEvent(OperatorEvent event) {
        if (event instanceof AddSplitEvent) {
            try {
                this.sourceReader.addSplits(((AddSplitEvent)event).splits(this.splitSerializer));
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Failed to deserialize the splits.", (Throwable)e);
            }
        } else if (event instanceof SourceEventWrapper) {
            this.sourceReader.handleSourceEvents(((SourceEventWrapper)event).getSourceEvent());
        } else if (event instanceof NoMoreSplitsEvent) {
            this.sourceReader.notifyNoMoreSplits();
        } else {
            throw new IllegalStateException("Received unexpected operator event " + event);
        }
    }

    private void registerReader() {
        this.operatorEventGateway.sendEventToCoordinator((OperatorEvent)new ReaderRegistrationEvent(this.getRuntimeContext().getIndexOfThisSubtask(), this.localHostname));
    }

    @VisibleForTesting
    public SourceReader<OUT, SplitT> getSourceReader() {
        return this.sourceReader;
    }

    @VisibleForTesting
    ListState<SplitT> getReaderState() {
        return this.readerState;
    }
}

