/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.pendingack.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import io.netty.buffer.ByteBuf;
import io.netty.util.Timer;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckReplyCallBack;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckReplyCallBack;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.broker.transaction.pendingack.proto.BatchedPendingAckMetadataEntry;
import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadata;
import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadataEntry;
import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckOp;
import org.apache.pulsar.broker.transaction.util.LogIndexLagBackoff;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.transaction.coordinator.impl.TxnBatchedPositionImpl;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MLPendingAckStore
implements PendingAckStore {
    private final ManagedLedger managedLedger;
    private final ManagedCursor cursor;
    private final SpscArrayQueue<Entry> entryQueue;
    private final PositionImpl lastConfirmedEntry;
    private PositionImpl currentLoadPosition;
    private final AtomicLong currentIndexLag = new AtomicLong(0L);
    private volatile long maxIndexLag;
    protected PositionImpl maxAckPosition = PositionImpl.EARLIEST;
    private final LogIndexLagBackoff logIndexBackoff;
    private final ArrayList<PendingAckMetadataEntry> batchedPendingAckLogsWaitingForHandle;
    final ConcurrentSkipListMap<PositionImpl, PositionImpl> pendingAckLogIndex;
    private final ManagedCursor subManagedCursor;
    private TxnLogBufferedWriter<PendingAckMetadataEntry> bufferedWriter;
    private final Predicate<PendingAckMetadataEntry> bothNotAbortAndCommitPredicate = pendingAckLog -> pendingAckLog.getPendingAckOp() != PendingAckOp.ABORT && pendingAckLog.getPendingAckOp() != PendingAckOp.COMMIT;
    private static final Logger log = LoggerFactory.getLogger(MLPendingAckStore.class);
    private static final FastThreadLocal<BatchedPendingAckMetadataEntry> batchedMetaThreadLocalForBufferedWriter = new FastThreadLocal<BatchedPendingAckMetadataEntry>(){

        protected BatchedPendingAckMetadataEntry initialValue() throws Exception {
            return new BatchedPendingAckMetadataEntry();
        }
    };

    public MLPendingAckStore(ManagedLedger managedLedger, ManagedCursor cursor, ManagedCursor subManagedCursor, long transactionPendingAckLogIndexMinLag, TxnLogBufferedWriterConfig bufferedWriterConfig, Timer timer) {
        this.managedLedger = managedLedger;
        this.cursor = cursor;
        this.currentLoadPosition = (PositionImpl)this.cursor.getMarkDeletedPosition();
        this.entryQueue = new SpscArrayQueue(2000);
        this.lastConfirmedEntry = (PositionImpl)managedLedger.getLastConfirmedEntry();
        this.pendingAckLogIndex = new ConcurrentSkipListMap();
        this.subManagedCursor = subManagedCursor;
        this.logIndexBackoff = new LogIndexLagBackoff(transactionPendingAckLogIndexMinLag, Long.MAX_VALUE, 1.0);
        this.maxIndexLag = this.logIndexBackoff.next(0);
        this.bufferedWriter = new TxnLogBufferedWriter(managedLedger, ((ManagedLedgerImpl)managedLedger).getExecutor(), timer, (TxnLogBufferedWriter.DataSerializer)PendingAckLogSerializer.INSTANCE, bufferedWriterConfig.getBatchedWriteMaxRecords(), bufferedWriterConfig.getBatchedWriteMaxSize(), bufferedWriterConfig.getBatchedWriteMaxDelayInMillis(), bufferedWriterConfig.isBatchEnabled());
        this.batchedPendingAckLogsWaitingForHandle = new ArrayList();
    }

    @Override
    public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService transactionReplayExecutor) {
        transactionReplayExecutor.execute(new PendingAckReplay(new MLPendingAckReplyCallBack(pendingAckHandle)));
    }

    private void readAsync(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback readEntriesCallback) {
        this.cursor.asyncReadEntries(numberOfEntriesToRead, readEntriesCallback, (Object)System.nanoTime(), PositionImpl.LATEST);
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        this.cursor.asyncClose(new AsyncCallbacks.CloseCallback(){

            public void closeComplete(Object ctx) {
                MLPendingAckStore.this.managedLedger.asyncClose(new AsyncCallbacks.CloseCallback(){

                    public void closeComplete(Object ctx) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}][{}] MLPendingAckStore closed successfully\uff01", (Object)MLPendingAckStore.this.managedLedger.getName(), ctx);
                        }
                        MLPendingAckStore.this.bufferedWriter.close();
                        completableFuture.complete(null);
                    }

                    public void closeFailed(ManagedLedgerException exception, Object ctx) {
                        log.error("[{}][{}] MLPendingAckStore closed failed,exception={}", new Object[]{MLPendingAckStore.this.managedLedger.getName(), ctx, exception});
                        completableFuture.completeExceptionally(exception);
                    }
                }, ctx);
            }

            public void closeFailed(ManagedLedgerException exception, Object ctx) {
                completableFuture.completeExceptionally(exception);
            }
        }, null);
        return completableFuture;
    }

    @Override
    public CompletableFuture<Void> appendIndividualAck(TxnID txnID, List<MutablePair<PositionImpl, Integer>> positions) {
        PendingAckMetadataEntry pendingAckMetadataEntry = new PendingAckMetadataEntry();
        pendingAckMetadataEntry.setPendingAckOp(PendingAckOp.ACK);
        pendingAckMetadataEntry.setAckType(CommandAck.AckType.Individual);
        ArrayList<PendingAckMetadata> pendingAckMetadataList = new ArrayList<PendingAckMetadata>();
        positions.forEach(positionIntegerMutablePair -> {
            PendingAckMetadata pendingAckMetadata = new PendingAckMetadata();
            PositionImpl position = (PositionImpl)positionIntegerMutablePair.getLeft();
            int batchSize = (Integer)positionIntegerMutablePair.getRight();
            if (((PositionImpl)positionIntegerMutablePair.getLeft()).getAckSet() != null) {
                for (long l : position.getAckSet()) {
                    pendingAckMetadata.addAckSet(l);
                }
            }
            pendingAckMetadata.setLedgerId(position.getLedgerId());
            pendingAckMetadata.setEntryId(position.getEntryId());
            pendingAckMetadata.setBatchSize(batchSize);
            pendingAckMetadataList.add(pendingAckMetadata);
        });
        pendingAckMetadataEntry.addAllPendingAckMetadatas(pendingAckMetadataList);
        return this.appendCommon(pendingAckMetadataEntry, txnID);
    }

    @Override
    public CompletableFuture<Void> appendCumulativeAck(TxnID txnID, PositionImpl position) {
        PendingAckMetadataEntry pendingAckMetadataEntry = new PendingAckMetadataEntry();
        pendingAckMetadataEntry.setPendingAckOp(PendingAckOp.ACK);
        pendingAckMetadataEntry.setAckType(CommandAck.AckType.Cumulative);
        PendingAckMetadata pendingAckMetadata = new PendingAckMetadata();
        if (position.getAckSet() != null) {
            for (long l : position.getAckSet()) {
                pendingAckMetadata.addAckSet(l);
            }
        }
        pendingAckMetadata.setLedgerId(position.getLedgerId());
        pendingAckMetadata.setEntryId(position.getEntryId());
        pendingAckMetadataEntry.addAllPendingAckMetadatas(Collections.singleton(pendingAckMetadata));
        return this.appendCommon(pendingAckMetadataEntry, txnID);
    }

    @Override
    public CompletableFuture<Void> appendCommitMark(TxnID txnID, CommandAck.AckType ackType) {
        PendingAckMetadataEntry pendingAckMetadataEntry = new PendingAckMetadataEntry();
        pendingAckMetadataEntry.setPendingAckOp(PendingAckOp.COMMIT);
        pendingAckMetadataEntry.setAckType(ackType);
        return this.appendCommon(pendingAckMetadataEntry, txnID);
    }

    @Override
    public CompletableFuture<Void> appendAbortMark(TxnID txnID, CommandAck.AckType ackType) {
        PendingAckMetadataEntry pendingAckMetadataEntry = new PendingAckMetadataEntry();
        pendingAckMetadataEntry.setPendingAckOp(PendingAckOp.ABORT);
        pendingAckMetadataEntry.setAckType(ackType);
        return this.appendCommon(pendingAckMetadataEntry, txnID);
    }

    private CompletableFuture<Void> appendCommon(final PendingAckMetadataEntry pendingAckMetadataEntry, final TxnID txnID) {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        pendingAckMetadataEntry.setTxnidLeastBits(txnID.getLeastSigBits());
        pendingAckMetadataEntry.setTxnidMostBits(txnID.getMostSigBits());
        this.bufferedWriter.asyncAddData((Object)pendingAckMetadataEntry, new TxnLogBufferedWriter.AddDataCallback(){

            public void addComplete(Position position, Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] MLPendingAckStore message append success at {} txnId: {}, operation : {}", new Object[]{MLPendingAckStore.this.managedLedger.getName(), ctx, position, txnID, pendingAckMetadataEntry.getPendingAckOp()});
                }
                MLPendingAckStore.this.currentIndexLag.incrementAndGet();
                if (position instanceof TxnBatchedPositionImpl) {
                    TxnBatchedPositionImpl batchedPosition = (TxnBatchedPositionImpl)position;
                    MLPendingAckStore.this.batchedPendingAckLogsWaitingForHandle.add(pendingAckMetadataEntry);
                    if (batchedPosition.getBatchIndex() == batchedPosition.getBatchSize() - 1) {
                        MLPendingAckStore.this.handleMetadataEntry((PositionImpl)position, MLPendingAckStore.this.batchedPendingAckLogsWaitingForHandle);
                        MLPendingAckStore.this.batchedPendingAckLogsWaitingForHandle.clear();
                    }
                } else {
                    MLPendingAckStore.this.handleMetadataEntry((PositionImpl)position, pendingAckMetadataEntry);
                }
                completableFuture.complete(null);
                MLPendingAckStore.this.clearUselessLogData();
            }

            public void addFailed(ManagedLedgerException exception, Object ctx) {
                log.error("[{}][{}] MLPendingAckStore message append fail exception : {}, operation : {}", new Object[]{MLPendingAckStore.this.managedLedger.getName(), ctx, exception, pendingAckMetadataEntry.getPendingAckOp()});
                if (exception instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
                    MLPendingAckStore.this.managedLedger.readyToCreateNewLedger();
                }
                completableFuture.completeExceptionally(new BrokerServiceException.PersistenceException(exception));
            }
        }, null);
        return completableFuture;
    }

    private void handleMetadataEntry(PositionImpl logPosition, List<PendingAckMetadataEntry> logList) {
        Stream<PendingAckMetadata> pendingAckMetaStream = logList.stream().filter(log -> this.bothNotAbortAndCommitPredicate.test((PendingAckMetadataEntry)log)).flatMap(log -> log.getPendingAckMetadatasList().stream());
        this.handleMetadataEntry(logPosition, pendingAckMetaStream);
    }

    private void handleMetadataEntry(PositionImpl logPosition, PendingAckMetadataEntry pendingAckMetadataEntry) {
        if (this.bothNotAbortAndCommitPredicate.test(pendingAckMetadataEntry)) {
            this.handleMetadataEntry(logPosition, pendingAckMetadataEntry.getPendingAckMetadatasList().stream());
        }
    }

    private void handleMetadataEntry(PositionImpl logPosition, Stream<PendingAckMetadata> pendingAckListStream) {
        Optional<PendingAckMetadata> optional = pendingAckListStream.max((o1, o2) -> ComparisonChain.start().compare(o1.getLedgerId(), o2.getLedgerId()).compare(o1.getEntryId(), o2.getEntryId()).result());
        optional.ifPresent(pendingAckMetadata -> {
            PositionImpl nowPosition = PositionImpl.get((long)pendingAckMetadata.getLedgerId(), (long)pendingAckMetadata.getEntryId());
            if (nowPosition.compareTo(this.maxAckPosition) > 0) {
                this.maxAckPosition = nowPosition;
            }
            if (this.currentIndexLag.get() >= this.maxIndexLag) {
                this.pendingAckLogIndex.compute(this.maxAckPosition, (thisPosition, otherPosition) -> logPosition);
                this.maxIndexLag = this.logIndexBackoff.next(this.pendingAckLogIndex.size());
                this.currentIndexLag.set(0L);
            }
        });
    }

    @VisibleForTesting
    void clearUselessLogData() {
        if (!this.pendingAckLogIndex.isEmpty()) {
            PositionImpl deletePosition = null;
            while (!this.pendingAckLogIndex.isEmpty() && this.pendingAckLogIndex.firstKey() != null && this.subManagedCursor.getPersistentMarkDeletedPosition() != null && this.pendingAckLogIndex.firstEntry().getKey().compareTo((PositionImpl)this.subManagedCursor.getPersistentMarkDeletedPosition()) <= 0) {
                deletePosition = this.pendingAckLogIndex.remove(this.pendingAckLogIndex.firstKey());
            }
            if (deletePosition != null) {
                this.maxIndexLag = this.logIndexBackoff.next(this.pendingAckLogIndex.size());
                final PositionImpl finalDeletePosition = deletePosition;
                this.cursor.asyncMarkDelete(deletePosition, new AsyncCallbacks.MarkDeleteCallback(){

                    public void markDeleteComplete(Object ctx) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Transaction pending ack store mark delete position : [{}] success", (Object)MLPendingAckStore.this.managedLedger.getName(), (Object)finalDeletePosition);
                        }
                    }

                    public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                        log.error("[{}] Transaction pending ack store mark delete position : [{}] fail!", new Object[]{MLPendingAckStore.this.managedLedger.getName(), finalDeletePosition, exception});
                    }
                }, null);
            }
        }
    }

    private List<PendingAckMetadataEntry> deserializeEntry(Entry entry) {
        ByteBuf buffer = entry.getDataBuffer();
        buffer.markReaderIndex();
        short magicNum = buffer.readShort();
        buffer.resetReaderIndex();
        if (magicNum == 3585) {
            buffer.skipBytes(4);
            BatchedPendingAckMetadataEntry batchedPendingAckMetadataEntry = new BatchedPendingAckMetadataEntry();
            batchedPendingAckMetadataEntry.parseFrom(buffer, buffer.readableBytes());
            return batchedPendingAckMetadataEntry.getPendingAckLogsList();
        }
        PendingAckMetadataEntry pendingAckMetadataEntry = new PendingAckMetadataEntry();
        pendingAckMetadataEntry.parseFrom(buffer, buffer.readableBytes());
        return Collections.singletonList(pendingAckMetadataEntry);
    }

    public CompletableFuture<ManagedLedger> getManagedLedger() {
        return CompletableFuture.completedFuture(this.managedLedger);
    }

    public static String getTransactionPendingAckStoreSuffix(String originTopicName, String subName) {
        return TopicName.get((String)originTopicName) + "-" + subName + "__transaction_pending_ack";
    }

    public static String getTransactionPendingAckStoreCursorName() {
        return "__pending_ack_state";
    }

    private static class PendingAckLogSerializer
    implements TxnLogBufferedWriter.DataSerializer<PendingAckMetadataEntry> {
        private static final PendingAckLogSerializer INSTANCE = new PendingAckLogSerializer();

        private PendingAckLogSerializer() {
        }

        public int getSerializedSize(PendingAckMetadataEntry data) {
            return data.getSerializedSize();
        }

        public ByteBuf serialize(PendingAckMetadataEntry data) {
            int batchSize = data.getSerializedSize();
            ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(batchSize, batchSize);
            data.writeTo(buf);
            return buf;
        }

        public ByteBuf serialize(ArrayList<PendingAckMetadataEntry> dataArray) {
            BatchedPendingAckMetadataEntry batch = (BatchedPendingAckMetadataEntry)batchedMetaThreadLocalForBufferedWriter.get();
            batch.clear();
            batch.addAllPendingAckLogs(dataArray);
            int batchSize = batch.getSerializedSize();
            ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(batchSize, batchSize);
            batch.writeTo(buf);
            return buf;
        }
    }

    class PendingAckReplay
    implements Runnable {
        private final FillEntryQueueCallback fillEntryQueueCallback;
        private final PendingAckReplyCallBack pendingAckReplyCallBack;

        PendingAckReplay(PendingAckReplyCallBack pendingAckReplyCallBack) {
            this.fillEntryQueueCallback = new FillEntryQueueCallback();
            this.pendingAckReplyCallBack = pendingAckReplyCallBack;
        }

        @Override
        public void run() {
            try {
                if (MLPendingAckStore.this.cursor.isClosed()) {
                    this.pendingAckReplyCallBack.replayFailed((Throwable)new ManagedLedgerException.CursorAlreadyClosedException("MLPendingAckStore cursor have been closed."));
                    log.warn("[{}] MLPendingAckStore cursor have been closed, close replay thread.", (Object)MLPendingAckStore.this.cursor.getManagedLedger().getName());
                    return;
                }
                while (MLPendingAckStore.this.lastConfirmedEntry.compareTo(MLPendingAckStore.this.currentLoadPosition) > 0 && this.fillEntryQueueCallback.fillQueue()) {
                    Entry entry = (Entry)MLPendingAckStore.this.entryQueue.poll();
                    if (entry != null) {
                        MLPendingAckStore.this.currentLoadPosition = PositionImpl.get((long)entry.getLedgerId(), (long)entry.getEntryId());
                        List<PendingAckMetadataEntry> logs = MLPendingAckStore.this.deserializeEntry(entry);
                        if (logs.isEmpty()) continue;
                        if (logs.size() == 1) {
                            MLPendingAckStore.this.currentIndexLag.incrementAndGet();
                            PendingAckMetadataEntry log = logs.get(0);
                            MLPendingAckStore.this.handleMetadataEntry(new PositionImpl(entry.getLedgerId(), entry.getEntryId()), log);
                            this.pendingAckReplyCallBack.handleMetadataEntry(log);
                        } else {
                            int batchSize = logs.size();
                            for (int batchIndex = 0; batchIndex < batchSize; ++batchIndex) {
                                PendingAckMetadataEntry log = logs.get(batchIndex);
                                this.pendingAckReplyCallBack.handleMetadataEntry(log);
                            }
                            MLPendingAckStore.this.currentIndexLag.addAndGet(batchSize);
                            MLPendingAckStore.this.handleMetadataEntry(new PositionImpl(entry.getLedgerId(), entry.getEntryId()), logs);
                        }
                        entry.release();
                        MLPendingAckStore.this.clearUselessLogData();
                        continue;
                    }
                    try {
                        Thread.sleep(1L);
                    }
                    catch (InterruptedException e) {
                        if (!Thread.interrupted()) continue;
                        log.error("[{}]Transaction pending replay thread interrupt!", (Object)MLPendingAckStore.this.managedLedger.getName(), (Object)e);
                    }
                }
            }
            catch (Exception e) {
                this.pendingAckReplyCallBack.replayFailed(e);
                log.error("[{}] Pending ack recover fail!", (Object)MLPendingAckStore.this.subManagedCursor.getManagedLedger().getName(), (Object)e);
                return;
            }
            this.pendingAckReplyCallBack.replayComplete();
        }
    }

    class FillEntryQueueCallback
    implements AsyncCallbacks.ReadEntriesCallback {
        private volatile boolean isReadable = true;
        private final AtomicLong outstandingReadsRequests = new AtomicLong(0L);
        private static final int NUMBER_OF_PER_READ_ENTRY = 100;

        FillEntryQueueCallback() {
        }

        boolean fillQueue() {
            if (MLPendingAckStore.this.entryQueue.size() + 100 < MLPendingAckStore.this.entryQueue.capacity() && this.outstandingReadsRequests.get() == 0L && MLPendingAckStore.this.cursor.hasMoreEntries()) {
                this.outstandingReadsRequests.incrementAndGet();
                MLPendingAckStore.this.readAsync(100, this);
            }
            return this.isReadable;
        }

        public void readEntriesComplete(final List<Entry> entries, Object ctx) {
            MLPendingAckStore.this.entryQueue.fill((MessagePassingQueue.Supplier)new MessagePassingQueue.Supplier<Entry>(){
                private int i = 0;

                public Entry get() {
                    Entry entry = (Entry)entries.get(this.i);
                    ++this.i;
                    return entry;
                }
            }, entries.size());
            this.outstandingReadsRequests.decrementAndGet();
        }

        public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
            if (MLPendingAckStore.this.managedLedger.getConfig().isAutoSkipNonRecoverableData() && exception instanceof ManagedLedgerException.NonRecoverableLedgerException || exception instanceof ManagedLedgerException.ManagedLedgerFencedException || exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
                this.isReadable = false;
            }
            log.error("MLPendingAckStore of topic [{}] stat reply fail!", (Object)MLPendingAckStore.this.managedLedger.getName(), (Object)exception);
            this.outstandingReadsRequests.decrementAndGet();
        }
    }
}

