/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.api.sink.multitablesink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableCommitInfo;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableState;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableWriterRunnable;
import org.apache.seatunnel.api.sink.multitablesink.SinkIdentifier;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.tracing.MDCTracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiTableSinkWriter
implements SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> {
    private static final Logger log = LoggerFactory.getLogger(MultiTableSinkWriter.class);
    private final Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriters;
    private final Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext;
    private final Map<String, Optional<Integer>> sinkPrimaryKeys = new HashMap<String, Optional<Integer>>();
    private final List<ConcurrentMap<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>>> sinkWritersWithIndex;
    private final List<MultiTableWriterRunnable> runnable = new ArrayList<MultiTableWriterRunnable>();
    private final Random random = new Random();
    private final List<BlockingQueue<SeaTunnelRow>> blockingQueues = new ArrayList<BlockingQueue<SeaTunnelRow>>();
    private final ExecutorService executorService;
    private MultiTableResourceManager resourceManager;
    private volatile boolean submitted = false;

    public MultiTableSinkWriter(Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriters, int queueSize, Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext) {
        this.sinkWriters = sinkWriters;
        this.sinkWritersContext = sinkWritersContext;
        AtomicInteger cnt = new AtomicInteger(0);
        this.executorService = MDCTracer.tracing(Executors.newFixedThreadPool(queueSize * 2, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("st-multi-table-sink-writer-" + cnt.incrementAndGet());
            return thread;
        }));
        this.sinkWritersWithIndex = new ArrayList();
        int i = 0;
        while (i < queueSize) {
            LinkedBlockingQueue<SeaTunnelRow> queue = new LinkedBlockingQueue<SeaTunnelRow>(1024);
            HashMap tableIdWriterMap = new HashMap();
            ConcurrentHashMap sinkIdentifierMap = new ConcurrentHashMap();
            int queueIndex = i++;
            sinkWriters.entrySet().stream().filter(entry -> ((SinkIdentifier)entry.getKey()).getIndex() % queueSize == queueIndex).forEach(entry -> {
                tableIdWriterMap.put(((SinkIdentifier)entry.getKey()).getTableIdentifier(), (SinkWriter)entry.getValue());
                sinkIdentifierMap.put((SinkIdentifier)entry.getKey(), (SinkWriter)entry.getValue());
            });
            this.sinkWritersWithIndex.add(sinkIdentifierMap);
            this.blockingQueues.add(queue);
            MultiTableWriterRunnable r = new MultiTableWriterRunnable(tableIdWriterMap, queue);
            this.runnable.add(r);
        }
        log.info("init multi table sink writer, queue size: {}", (Object)queueSize);
        this.initResourceManager(queueSize);
    }

    private void initResourceManager(int queueSize) {
        Iterator<SinkIdentifier> iterator = this.sinkWriters.keySet().iterator();
        if (iterator.hasNext()) {
            SinkIdentifier tableIdentifier = iterator.next();
            SinkWriter<SeaTunnelRow, ?, ?> sink = this.sinkWriters.get(tableIdentifier);
            this.resourceManager = ((SupportMultiTableSinkWriter)((Object)sink)).initMultiTableResourceManager(this.sinkWriters.size(), queueSize);
        }
        for (int i = 0; i < this.sinkWritersWithIndex.size(); ++i) {
            Map writerMap = this.sinkWritersWithIndex.get(i);
            for (Map.Entry entry : writerMap.entrySet()) {
                SupportMultiTableSinkWriter sink = (SupportMultiTableSinkWriter)entry.getValue();
                sink.setMultiTableResourceManager(this.resourceManager, i);
                this.sinkPrimaryKeys.put(((SinkIdentifier)entry.getKey()).getTableIdentifier(), sink.primaryKey());
            }
        }
    }

    private void subSinkErrorCheck() {
        for (MultiTableWriterRunnable writerRunnable : this.runnable) {
            if (writerRunnable.getThrowable() == null) continue;
            throw new RuntimeException(writerRunnable.getThrowable());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void applySchemaChange(SchemaChangeEvent event) throws IOException {
        this.subSinkErrorCheck();
        for (int i = 0; i < this.sinkWritersWithIndex.size(); ++i) {
            for (Map.Entry sinkWriterEntry : this.sinkWritersWithIndex.get(i).entrySet()) {
                if (!((SinkIdentifier)sinkWriterEntry.getKey()).getTableIdentifier().equals(event.tablePath().getFullName())) continue;
                log.info("Start apply schema change for table {} sub-writer {}", (Object)((SinkIdentifier)sinkWriterEntry.getKey()).getTableIdentifier(), (Object)((SinkIdentifier)sinkWriterEntry.getKey()).getIndex());
                MultiTableWriterRunnable multiTableWriterRunnable = this.runnable.get(i);
                synchronized (multiTableWriterRunnable) {
                    ((SinkWriter)sinkWriterEntry.getValue()).applySchemaChange(event);
                }
                log.info("Finish apply schema change for table {} sub-writer {}", (Object)((SinkIdentifier)sinkWriterEntry.getKey()).getTableIdentifier(), (Object)((SinkIdentifier)sinkWriterEntry.getKey()).getIndex());
            }
        }
    }

    @Override
    public void write(SeaTunnelRow element) throws IOException {
        if (!this.submitted) {
            this.submitted = true;
            this.runnable.forEach(this.executorService::submit);
        }
        this.subSinkErrorCheck();
        Optional<Integer> primaryKey = this.sinkPrimaryKeys.get(element.getTableId());
        try {
            if (primaryKey == null && this.sinkPrimaryKeys.size() == 1 || primaryKey != null && !primaryKey.isPresent()) {
                int index = this.random.nextInt(this.blockingQueues.size());
                BlockingQueue<SeaTunnelRow> queue = this.blockingQueues.get(index);
                while (!queue.offer(element, 500L, TimeUnit.MILLISECONDS)) {
                    this.subSinkErrorCheck();
                }
            } else {
                if (primaryKey == null) {
                    throw new RuntimeException("multi table sink can not write table: " + element.getTableId());
                }
                Object object = element.getField(primaryKey.get());
                int index = 0;
                if (object != null) {
                    index = Math.abs(object.hashCode()) % this.blockingQueues.size();
                }
                BlockingQueue<SeaTunnelRow> queue = this.blockingQueues.get(index);
                while (!queue.offer(element, 500L, TimeUnit.MILLISECONDS)) {
                    this.subSinkErrorCheck();
                }
            }
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<MultiTableState> snapshotState(long checkpointId) throws IOException {
        this.checkQueueRemain();
        this.subSinkErrorCheck();
        ArrayList<MultiTableState> multiTableStates = new ArrayList<MultiTableState>();
        MultiTableState multiTableState = new MultiTableState(new HashMap());
        for (int i = 0; i < this.sinkWritersWithIndex.size(); ++i) {
            for (Map.Entry sinkWriterEntry : this.sinkWritersWithIndex.get(i).entrySet()) {
                MultiTableWriterRunnable multiTableWriterRunnable = this.runnable.get(i);
                synchronized (multiTableWriterRunnable) {
                    List states = ((SinkWriter)sinkWriterEntry.getValue()).snapshotState(checkpointId);
                    multiTableState.getStates().put((SinkIdentifier)sinkWriterEntry.getKey(), states);
                }
            }
        }
        multiTableStates.add(multiTableState);
        return multiTableStates;
    }

    @Override
    public Optional<MultiTableCommitInfo> prepareCommit() throws IOException {
        this.checkQueueRemain();
        this.subSinkErrorCheck();
        MultiTableCommitInfo multiTableCommitInfo = new MultiTableCommitInfo(new ConcurrentHashMap<SinkIdentifier, Object>());
        ArrayList futures = new ArrayList();
        int i = 0;
        while (i < this.sinkWritersWithIndex.size()) {
            int n = i++;
            futures.add(this.executorService.submit(() -> {
                MultiTableWriterRunnable multiTableWriterRunnable = this.runnable.get(subWriterIndex);
                synchronized (multiTableWriterRunnable) {
                    for (Map.Entry sinkWriterEntry : this.sinkWritersWithIndex.get(subWriterIndex).entrySet()) {
                        Optional commit;
                        try {
                            commit = ((SinkWriter)sinkWriterEntry.getValue()).prepareCommit();
                        }
                        catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                        commit.ifPresent(o -> multiTableCommitInfo.getCommitInfo().put((SinkIdentifier)sinkWriterEntry.getKey(), o));
                    }
                }
            }));
        }
        for (Future future : futures) {
            try {
                future.get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        if (multiTableCommitInfo.getCommitInfo().isEmpty()) {
            return Optional.empty();
        }
        return Optional.of(multiTableCommitInfo);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void abortPrepare() {
        Throwable firstE = null;
        try {
            this.checkQueueRemain();
        }
        catch (Exception e) {
            firstE = e;
        }
        for (int i = 0; i < this.sinkWritersWithIndex.size(); ++i) {
            MultiTableWriterRunnable multiTableWriterRunnable = this.runnable.get(i);
            synchronized (multiTableWriterRunnable) {
                for (SinkWriter sinkWriter : this.sinkWritersWithIndex.get(i).values()) {
                    try {
                        sinkWriter.abortPrepare();
                    }
                    catch (Throwable e) {
                        if (firstE == null) {
                            firstE = e;
                        }
                        log.error("abortPrepare error", e);
                    }
                }
                continue;
            }
        }
        if (firstE != null) {
            throw new RuntimeException(firstE);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        Throwable[] firstE = new Throwable[]{null};
        try {
            this.checkQueueRemain();
        }
        catch (Exception e) {
            firstE[0] = e;
        }
        this.executorService.shutdownNow();
        for (int i = 0; i < this.sinkWritersWithIndex.size(); ++i) {
            MultiTableWriterRunnable multiTableWriterRunnable = this.runnable.get(i);
            synchronized (multiTableWriterRunnable) {
                Map sinkIdentifierSinkWriterMap = this.sinkWritersWithIndex.get(i);
                sinkIdentifierSinkWriterMap.forEach((identifier, sinkWriter) -> {
                    try {
                        sinkWriter.close();
                        this.sinkWritersContext.get(identifier).getEventListener().onEvent(new WriterCloseEvent());
                    }
                    catch (Throwable e) {
                        if (firstE[0] == null) {
                            firstE[0] = e;
                        }
                        log.error("close error", e);
                    }
                });
                continue;
            }
        }
        try {
            if (this.resourceManager != null) {
                this.resourceManager.close();
            }
        }
        catch (Throwable e) {
            log.error("close resourceManager error", e);
        }
        if (firstE[0] != null) {
            throw new RuntimeException(firstE[0]);
        }
    }

    private void checkQueueRemain() {
        try {
            for (BlockingQueue<SeaTunnelRow> blockingQueue : this.blockingQueues) {
                while (!blockingQueue.isEmpty()) {
                    Thread.sleep(100L);
                    this.subSinkErrorCheck();
                }
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

