/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server;

import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.server.event.JobEventReportOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventService {
    private static final Logger log = LoggerFactory.getLogger(EventService.class);
    private final BlockingQueue<Event> eventBuffer = new ArrayBlockingQueue<Event>(2048);
    private ExecutorService eventForwardService;
    private final NodeEngineImpl nodeEngine;

    public EventService(NodeEngineImpl nodeEngine) {
        this.initEventForwardService();
        this.nodeEngine = nodeEngine;
    }

    private void initEventForwardService() {
        this.eventForwardService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build());
        this.eventForwardService.submit(() -> {
            ArrayList<Event> events = new ArrayList<Event>();
            RetryUtils.RetryMaterial retryMaterial = new RetryUtils.RetryMaterial(2, true, e -> true);
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    events.clear();
                    Event first = this.eventBuffer.take();
                    events.add(first);
                    this.eventBuffer.drainTo(events, 500);
                    JobEventReportOperation operation = new JobEventReportOperation(events);
                    RetryUtils.retryWithException(() -> NodeEngineUtil.sendOperationToMasterNode((NodeEngine)this.nodeEngine, operation).join(), (RetryUtils.RetryMaterial)retryMaterial);
                    log.debug("Event forward success, events " + events.size());
                }
                catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    log.info("Event forward thread interrupted");
                }
                catch (Throwable t) {
                    log.warn("Event forward failed, discard events " + events.size(), t);
                }
            }
        });
    }

    public void reportEvent(Event e) {
        while (!this.eventBuffer.offer(e)) {
            this.eventBuffer.poll();
            log.warn("Event buffer is full, discard the oldest event");
        }
    }

    public void shutdownNow() {
        if (this.eventForwardService != null) {
            this.eventForwardService.shutdownNow();
        }
    }
}

