/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server.task.group.queue;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.common.utils.function.ConsumerWithException;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
import org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
import org.apache.seatunnel.engine.server.task.record.Barrier;

public class IntermediateBlockingQueue
extends AbstractIntermediateQueue<BlockingQueue<Record<?>>> {
    public IntermediateBlockingQueue(BlockingQueue<Record<?>> queue) {
        super(queue);
    }

    @Override
    public void received(Record<?> record) {
        try {
            this.handleRecord(record, ((BlockingQueue)this.getIntermediateQueue())::put);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void collect(Collector<Record<?>> collector) throws Exception {
        Record record;
        while ((record = (Record)((BlockingQueue)this.getIntermediateQueue()).poll(100L, TimeUnit.MILLISECONDS)) != null) {
            this.handleRecord(record, arg_0 -> collector.collect(arg_0));
        }
    }

    @Override
    public void close() throws IOException {
        ((BlockingQueue)this.getIntermediateQueue()).clear();
    }

    private void handleRecord(Record<?> record, ConsumerWithException<Record<?>> consumer) throws Exception {
        if (record.getData() instanceof Barrier) {
            CheckpointBarrier barrier = (CheckpointBarrier)record.getData();
            this.getRunningTask().ack(barrier);
            if (barrier.prepareClose(this.getRunningTask().getTaskLocation())) {
                this.getIntermediateQueueFlowLifeCycle().setPrepareClose(true);
            }
            consumer.accept(record);
        } else {
            if (this.getIntermediateQueueFlowLifeCycle().getPrepareClose().booleanValue()) {
                return;
            }
            consumer.accept(record);
        }
    }
}

