/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.distribution;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.DataCommand;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.functional.ReadWriteKeyCommand;
import org.infinispan.commands.functional.ReadWriteKeyValueCommand;
import org.infinispan.commands.functional.ReadWriteManyCommand;
import org.infinispan.commands.functional.ReadWriteManyEntriesCommand;
import org.infinispan.commands.functional.WriteOnlyKeyCommand;
import org.infinispan.commands.functional.WriteOnlyKeyValueCommand;
import org.infinispan.commands.functional.WriteOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.read.AbstractDataCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.write.ComputeCommand;
import org.infinispan.commands.write.ComputeIfAbsentCommand;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.InvalidateL1Command;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.configuration.cache.ClusteringConfiguration;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.EntryFactory;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.L1Manager;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.distribution.L1WriteSynchronizer;
import org.infinispan.interceptors.impl.BaseRpcInterceptor;
import org.infinispan.interceptors.impl.MultiSubCommandInvoker;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class L1NonTxInterceptor
extends BaseRpcInterceptor {
    private static final Log log = LogFactory.getLog(L1NonTxInterceptor.class);
    @Inject
    protected L1Manager l1Manager;
    @Inject
    protected ClusteringDependentLogic cdl;
    @Inject
    protected EntryFactory entryFactory;
    @Inject
    protected CommandsFactory commandsFactory;
    @Inject
    protected InternalDataContainer dataContainer;
    @Inject
    protected StateTransferLock stateTransferLock;
    @Inject
    protected KeyPartitioner keyPartitioner;
    private long l1Lifespan;
    private long replicationTimeout;
    private final ConcurrentMap<Object, L1WriteSynchronizer> concurrentWrites = new ConcurrentHashMap<Object, L1WriteSynchronizer>();

    @Start
    public void start() {
        this.l1Lifespan = this.cacheConfiguration.clustering().l1().lifespan();
        this.replicationTimeout = this.cacheConfiguration.clustering().remoteTimeout();
        this.cacheConfiguration.clustering().attributes().attribute(ClusteringConfiguration.REMOTE_TIMEOUT).addListener((a, ignored) -> {
            this.replicationTimeout = (Long)a.get();
        });
    }

    @Override
    public final Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
        return this.visitDataReadCommand(ctx, command, false);
    }

    @Override
    public final Object visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command) throws Throwable {
        return this.visitDataReadCommand(ctx, command, true);
    }

    private Object visitDataReadCommand(InvocationContext ctx, AbstractDataCommand command, boolean isEntry) throws Throwable {
        return this.performCommandWithL1WriteIfAble(ctx, command, isEntry, false, true);
    }

    protected Object performCommandWithL1WriteIfAble(InvocationContext ctx, DataCommand command, boolean isEntry, boolean shouldAlwaysRunNextInterceptor, boolean registerL1) throws Throwable {
        if (ctx.isOriginLocal()) {
            Object key = command.getKey();
            if (!this.skipL1Lookup(command, key)) {
                return this.performL1Lookup(ctx, command, shouldAlwaysRunNextInterceptor, key, isEntry);
            }
        } else if (registerL1) {
            this.l1Manager.addRequestor(command.getKey(), ctx.getOrigin());
        }
        return this.invokeNext(ctx, command);
    }

    private Object performL1Lookup(InvocationContext ctx, VisitableCommand command, boolean runInterceptorOnConflict, Object key, boolean isEntry) throws Throwable {
        Object returnValue;
        L1WriteSynchronizer l1WriteSync = new L1WriteSynchronizer(this.dataContainer, this.l1Lifespan, this.stateTransferLock, this.cdl);
        L1WriteSynchronizer presentSync = this.concurrentWrites.putIfAbsent(key, l1WriteSync);
        if (presentSync == null) {
            this.l1Manager.registerL1WriteSynchronizer(key, l1WriteSync);
            return this.invokeNextAndFinally(ctx, command, (rCtx, rCommand, rv, t) -> {
                if (t != null) {
                    l1WriteSync.retrievalEncounteredException(t);
                }
                this.l1Manager.unregisterL1WriteSynchronizer(key, l1WriteSync);
                this.concurrentWrites.remove(key);
            });
        }
        if (log.isTraceEnabled()) {
            log.tracef("Found current request for key %s, waiting for their invocation's response", key);
        }
        try {
            returnValue = presentSync.get(this.replicationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException e) {
            log.warnf("Synchronizer didn't return in %s milliseconds - running command normally!", this.replicationTimeout);
            return this.invokeNext(ctx, command);
        }
        catch (ExecutionException e) {
            throw e.getCause();
        }
        if (runInterceptorOnConflict) {
            return this.invokeNext(ctx, command);
        }
        if (!isEntry && returnValue instanceof InternalCacheEntry) {
            returnValue = ((InternalCacheEntry)returnValue).getValue();
        }
        return returnValue;
    }

    protected boolean skipL1Lookup(FlagAffectedCommand command, Object key) {
        return command.hasAnyFlag(FlagBitSets.CACHE_MODE_LOCAL) || command.hasAnyFlag(FlagBitSets.SKIP_REMOTE_LOOKUP) || command.hasAnyFlag(FlagBitSets.IGNORE_RETURN_VALUES) || this.cdl.getCacheTopology().isWriteOwner(key) || this.dataContainer.containsKey(key);
    }

    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
        return this.handleDataWriteCommand(ctx, command, true);
    }

    @Override
    public Object visitWriteOnlyKeyCommand(InvocationContext ctx, WriteOnlyKeyCommand command) throws Throwable {
        return this.handleDataWriteCommand(ctx, command, false);
    }

    @Override
    public Object visitReadWriteKeyValueCommand(InvocationContext ctx, ReadWriteKeyValueCommand command) throws Throwable {
        return this.handleDataWriteCommand(ctx, command, false);
    }

    @Override
    public Object visitReadWriteKeyCommand(InvocationContext ctx, ReadWriteKeyCommand command) throws Throwable {
        return this.handleDataWriteCommand(ctx, command, false);
    }

    @Override
    public Object visitWriteOnlyManyEntriesCommand(InvocationContext ctx, WriteOnlyManyEntriesCommand command) throws Throwable {
        return this.handleWriteManyCommand(ctx, command);
    }

    @Override
    public Object visitWriteOnlyKeyValueCommand(InvocationContext ctx, WriteOnlyKeyValueCommand command) throws Throwable {
        return this.handleDataWriteCommand(ctx, command, false);
    }

    @Override
    public Object visitWriteOnlyManyCommand(InvocationContext ctx, WriteOnlyManyCommand command) throws Throwable {
        return this.handleWriteManyCommand(ctx, command);
    }

    @Override
    public Object visitReadWriteManyCommand(InvocationContext ctx, ReadWriteManyCommand command) throws Throwable {
        return this.handleWriteManyCommand(ctx, command);
    }

    @Override
    public Object visitReadWriteManyEntriesCommand(InvocationContext ctx, ReadWriteManyEntriesCommand command) throws Throwable {
        return this.handleWriteManyCommand(ctx, command);
    }

    @Override
    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
        return this.handleDataWriteCommand(ctx, command, false);
    }

    @Override
    public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
        return this.handleDataWriteCommand(ctx, command, true);
    }

    @Override
    public Object visitComputeCommand(InvocationContext ctx, ComputeCommand command) throws Throwable {
        return this.handleDataWriteCommand(ctx, command, true);
    }

    @Override
    public Object visitComputeIfAbsentCommand(InvocationContext ctx, ComputeIfAbsentCommand command) throws Throwable {
        return this.handleDataWriteCommand(ctx, command, true);
    }

    @Override
    public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
        return this.handleWriteManyCommand(ctx, command);
    }

    private Object handleWriteManyCommand(InvocationContext ctx, WriteCommand command) {
        Collection<?> keys = command.getAffectedKeys();
        HashSet<Object> toInvalidate = new HashSet<Object>(keys.size());
        for (Object k2 : keys) {
            if (!this.cdl.getCacheTopology().isWriteOwner(k2)) continue;
            toInvalidate.add(k2);
        }
        CompletableFuture<?> invalidationFuture = !toInvalidate.isEmpty() ? this.l1Manager.flushCache(toInvalidate, ctx.getOrigin(), true) : null;
        Iterator subCommands = keys.stream().filter(k -> !this.cdl.getCacheTopology().isWriteOwner(k)).map(k -> CompletionStages.join(this.removeFromL1Command(ctx, k, this.keyPartitioner.getSegment(k)))).iterator();
        return this.invokeNextAndHandle(ctx, command, (rCtx, writeCommand, rv, ex) -> {
            if (ex != null) {
                if (this.mustSyncInvalidation(invalidationFuture, (WriteCommand)writeCommand)) {
                    return L1NonTxInterceptor.asyncValue(invalidationFuture).thenApply(rCtx, writeCommand, (rCtx1, rCommand1, rv1) -> {
                        throw ex;
                    });
                }
                throw ex;
            }
            if (this.mustSyncInvalidation(invalidationFuture, (WriteCommand)writeCommand)) {
                return L1NonTxInterceptor.asyncValue(invalidationFuture).thenApply(null, null, (rCtx2, rCommand2, rv2) -> MultiSubCommandInvoker.invokeEach(rCtx, subCommands, this, rv));
            }
            return MultiSubCommandInvoker.invokeEach(rCtx, subCommands, this, rv);
        });
    }

    @Override
    public Object visitInvalidateL1Command(InvocationContext ctx, InvalidateL1Command invalidateL1Command) throws Throwable {
        AggregateCompletionStage<Void> aggregateCompletionStage = null;
        for (Object key : invalidateL1Command.getKeys()) {
            CompletionStage<Void> stage;
            this.abortL1UpdateOrWait(key);
            if (ctx.lookupEntry(key) != null || CompletionStages.isCompletedSuccessfully(stage = this.entryFactory.wrapEntryForWriting(ctx, key, this.keyPartitioner.getSegment(key), true, false))) continue;
            if (aggregateCompletionStage == null) {
                aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
            }
            aggregateCompletionStage.dependsOn(stage);
        }
        return aggregateCompletionStage != null ? this.asyncInvokeNext(ctx, (VisitableCommand)invalidateL1Command, aggregateCompletionStage.freeze()) : this.invokeNext(ctx, invalidateL1Command);
    }

    private void abortL1UpdateOrWait(Object key) {
        L1WriteSynchronizer sync = (L1WriteSynchronizer)this.concurrentWrites.remove(key);
        if (sync != null) {
            if (sync.trySkipL1Update()) {
                if (log.isTraceEnabled()) {
                    log.tracef("Aborted possible L1 update due to concurrent invalidation for key %s", key);
                }
            } else {
                boolean success;
                if (log.isTraceEnabled()) {
                    log.tracef("L1 invalidation found a pending update for key %s - need to block until finished", key);
                }
                try {
                    sync.get();
                    success = true;
                }
                catch (InterruptedException e) {
                    success = false;
                    Thread.currentThread().interrupt();
                }
                catch (ExecutionException e) {
                    success = false;
                }
                if (log.isTraceEnabled()) {
                    log.tracef("Pending L1 update completed successfully: %b - L1 invalidation can occur for key %s", success, key);
                }
            }
        }
    }

    private Object handleDataWriteCommand(InvocationContext ctx, DataWriteCommand command, boolean assumeOriginKeptEntryInL1) {
        if (command.hasAnyFlag(FlagBitSets.CACHE_MODE_LOCAL)) {
            if (log.isTraceEnabled()) {
                log.tracef("local mode forced, suppressing L1 calls.", new Object[0]);
            }
            return this.invokeNext(ctx, command);
        }
        CompletableFuture<?> l1InvalidationFuture = this.invalidateL1InCluster(ctx, command, assumeOriginKeptEntryInL1);
        return this.invokeNextAndHandle(ctx, command, (rCtx, dataWriteCommand, rv, ex) -> {
            if (ex != null) {
                if (this.mustSyncInvalidation(l1InvalidationFuture, (WriteCommand)dataWriteCommand)) {
                    return L1NonTxInterceptor.asyncValue(l1InvalidationFuture).thenApply(rCtx, dataWriteCommand, (rCtx1, rCommand1, rv1) -> {
                        throw ex;
                    });
                }
                throw ex;
            }
            if (this.mustSyncInvalidation(l1InvalidationFuture, (WriteCommand)dataWriteCommand)) {
                if (this.shouldRemoveFromLocalL1(rCtx, (DataWriteCommand)dataWriteCommand)) {
                    CompletionStage<VisitableCommand> removeFromL1CommandStage = this.removeFromL1Command(rCtx, dataWriteCommand.getKey(), dataWriteCommand.getSegment());
                    VisitableCommand removeFromL1Command = CompletionStages.join(removeFromL1CommandStage);
                    return L1NonTxInterceptor.makeStage(this.asyncInvokeNext(rCtx, removeFromL1Command, l1InvalidationFuture)).thenApply(null, null, (rCtx2, rCommand2, rv2) -> rv);
                }
                return L1NonTxInterceptor.asyncValue(l1InvalidationFuture).thenApply(rCtx, dataWriteCommand, (rCtx1, rCommand1, rv1) -> rv);
            }
            if (this.shouldRemoveFromLocalL1(rCtx, (DataWriteCommand)dataWriteCommand)) {
                CompletionStage<VisitableCommand> removeFromL1CommandStage = this.removeFromL1Command(rCtx, dataWriteCommand.getKey(), dataWriteCommand.getSegment());
                VisitableCommand removeFromL1Command = CompletionStages.join(removeFromL1CommandStage);
                return this.invokeNextThenApply(rCtx, removeFromL1Command, (rCtx2, rCommand2, rv2) -> rv);
            }
            if (log.isTraceEnabled()) {
                log.trace("Allowing entry to commit as local node is owner");
            }
            return rv;
        });
    }

    private boolean mustSyncInvalidation(CompletableFuture<?> invalidationFuture, WriteCommand writeCommand) {
        return invalidationFuture != null && !invalidationFuture.isDone() && this.isSynchronous(writeCommand);
    }

    private boolean shouldRemoveFromLocalL1(InvocationContext ctx, DataWriteCommand command) {
        return ctx.isOriginLocal() && !this.cdl.getCacheTopology().isWriteOwner(command.getKey());
    }

    private CompletionStage<VisitableCommand> removeFromL1Command(InvocationContext ctx, Object key, int segment) {
        if (log.isTraceEnabled()) {
            log.tracef("Removing entry from L1 for key %s", key);
        }
        this.abortL1UpdateOrWait(key);
        ctx.removeLookedUpEntry(key);
        CompletionStage<Void> stage = this.entryFactory.wrapEntryForWriting(ctx, key, segment, true, false);
        return stage.thenApply(ignore -> this.commandsFactory.buildInvalidateFromL1Command(0L, Collections.singleton(key)));
    }

    private CompletableFuture<?> invalidateL1InCluster(InvocationContext ctx, DataWriteCommand command, boolean assumeOriginKeptEntryInL1) {
        CompletableFuture<?> l1InvalidationFuture = null;
        if (this.cdl.getCacheTopology().isWriteOwner(command.getKey())) {
            l1InvalidationFuture = this.l1Manager.flushCache(Collections.singletonList(command.getKey()), ctx.getOrigin(), assumeOriginKeptEntryInL1);
        } else if (log.isTraceEnabled()) {
            log.tracef("Not invalidating key '%s' as local node(%s) is not owner", command.getKey(), this.rpcManager.getAddress());
        }
        return l1InvalidationFuture;
    }

    @Override
    protected Log getLog() {
        return log;
    }
}

