/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis.cluster;

import com.lambdaworks.redis.AbstractRedisReactiveCommands;
import com.lambdaworks.redis.KeyScanCursor;
import com.lambdaworks.redis.KeyValue;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.ScanArgs;
import com.lambdaworks.redis.ScanCursor;
import com.lambdaworks.redis.StreamScanCursor;
import com.lambdaworks.redis.api.StatefulRedisConnection;
import com.lambdaworks.redis.api.reactive.RedisKeyReactiveCommands;
import com.lambdaworks.redis.cluster.ClusterScanSupport;
import com.lambdaworks.redis.cluster.SlotHash;
import com.lambdaworks.redis.cluster.StatefulRedisClusterConnectionImpl;
import com.lambdaworks.redis.cluster.api.StatefulRedisClusterConnection;
import com.lambdaworks.redis.cluster.api.reactive.RedisAdvancedClusterReactiveCommands;
import com.lambdaworks.redis.cluster.api.reactive.RedisClusterReactiveCommands;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.internal.LettuceLists;
import com.lambdaworks.redis.output.KeyStreamingChannel;
import com.lambdaworks.redis.output.KeyValueStreamingChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RedisAdvancedClusterReactiveCommandsImpl<K, V>
extends AbstractRedisReactiveCommands<K, V>
implements RedisAdvancedClusterReactiveCommands<K, V> {
    private final Random random = new Random();

    public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> connection, RedisCodec<K, V> codec) {
        super(connection, codec);
    }

    @Override
    public Mono<Long> del(K ... keys) {
        return this.del((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public Mono<Long> del(Iterable<K> keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.del(keys);
        }
        ArrayList<Mono<Long>> publishers = new ArrayList<Mono<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            publishers.add(super.del((Iterable)entry.getValue()));
        }
        return Flux.merge(publishers).reduce((accu, next) -> accu + next);
    }

    @Override
    public Mono<Long> unlink(K ... keys) {
        return this.unlink((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public Mono<Long> unlink(Iterable<K> keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.unlink(keys);
        }
        ArrayList<Mono<Long>> publishers = new ArrayList<Mono<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            publishers.add(super.unlink((Iterable)entry.getValue()));
        }
        return Flux.merge(publishers).reduce((accu, next) -> accu + next);
    }

    @Override
    public Mono<Long> exists(K ... keys) {
        return this.exists((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public Mono<Long> exists(Iterable<K> keys) {
        List<K> keyList = LettuceLists.newList(keys);
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keyList);
        if (partitioned.size() < 2) {
            return super.exists(keyList);
        }
        ArrayList<Mono<Long>> publishers = new ArrayList<Mono<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            publishers.add(super.exists((Iterable)entry.getValue()));
        }
        return Flux.merge(publishers).reduce((accu, next) -> accu + next);
    }

    @Override
    public Flux<KeyValue<K, V>> mget(K ... keys) {
        return this.mget((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public Flux<KeyValue<K, V>> mget(Iterable<K> keys) {
        List keyList = LettuceLists.newList(keys);
        Map partitioned = SlotHash.partition(this.codec, keyList);
        if (partitioned.size() < 2) {
            return super.mget(keyList);
        }
        ArrayList publishers = new ArrayList();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            publishers.add(super.mget((Iterable)entry.getValue()));
        }
        Flux fluxes = Flux.concat(publishers);
        Mono map = fluxes.collectList().map(vs -> {
            KeyValue[] values = new KeyValue[vs.size()];
            int offset = 0;
            for (Map.Entry entry : partitioned.entrySet()) {
                for (int i = 0; i < keyList.size(); ++i) {
                    int index = ((List)entry.getValue()).indexOf(keyList.get(i));
                    if (index == -1) continue;
                    values[i] = (KeyValue)vs.get(offset + index);
                }
                offset += ((List)entry.getValue()).size();
            }
            ArrayList<KeyValue> objects = new ArrayList<KeyValue>(Arrays.asList(values));
            return objects;
        });
        return map.flatMap(Flux::fromIterable);
    }

    @Override
    public Mono<Long> mget(KeyValueStreamingChannel<K, V> channel, K ... keys) {
        return this.mget(channel, (Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public Mono<Long> mget(KeyValueStreamingChannel<K, V> channel, Iterable<K> keys) {
        List<K> keyList = LettuceLists.newList(keys);
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keyList);
        if (partitioned.size() < 2) {
            return super.mget(channel, keyList);
        }
        ArrayList<Mono<Long>> publishers = new ArrayList<Mono<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            publishers.add(super.mget(channel, (Iterable)entry.getValue()));
        }
        return Flux.merge(publishers).reduce((accu, next) -> accu + next);
    }

    @Override
    public Mono<Boolean> msetnx(Map<K, V> map) {
        return this.pipeliningWithMap(map, kvMap -> RedisAdvancedClusterReactiveCommandsImpl.super.msetnx(kvMap).flux(), booleanFlux -> booleanFlux).reduce((accu, next) -> accu != false && next != false);
    }

    @Override
    public Mono<String> mset(Map<K, V> map) {
        return this.pipeliningWithMap(map, kvMap -> RedisAdvancedClusterReactiveCommandsImpl.super.mset(kvMap).flux(), booleanFlux -> booleanFlux).last();
    }

    @Override
    public Flux<K> clusterGetKeysInSlot(int slot, int count) {
        RedisClusterReactiveCommands<K, V> connectionBySlot = this.findConnectionBySlot(slot);
        if (connectionBySlot != null) {
            return connectionBySlot.clusterGetKeysInSlot(slot, count);
        }
        return super.clusterGetKeysInSlot(slot, count);
    }

    @Override
    public Mono<Long> clusterCountKeysInSlot(int slot) {
        RedisClusterReactiveCommands<K, V> connectionBySlot = this.findConnectionBySlot(slot);
        if (connectionBySlot != null) {
            return connectionBySlot.clusterCountKeysInSlot(slot);
        }
        return super.clusterCountKeysInSlot(slot);
    }

    @Override
    public Mono<String> clientSetname(K name) {
        ArrayList<Mono<String>> publishers = new ArrayList<Mono<String>>();
        for (RedisClusterNode redisClusterNode : this.getStatefulConnection().getPartitions()) {
            StatefulRedisConnection<K, V> byHost;
            StatefulRedisConnection<K, V> byNodeId = this.getStatefulConnection().getConnection(redisClusterNode.getNodeId());
            if (byNodeId.isOpen()) {
                publishers.add(byNodeId.reactive().clientSetname(name));
            }
            if (!(byHost = this.getStatefulConnection().getConnection(redisClusterNode.getUri().getHost(), redisClusterNode.getUri().getPort())).isOpen()) continue;
            publishers.add(byHost.reactive().clientSetname(name));
        }
        return Flux.merge(publishers).last();
    }

    @Override
    public Mono<Long> dbsize() {
        Map publishers = this.executeOnMasters(commands -> commands.dbsize().flux());
        return Flux.merge(publishers.values()).reduce((accu, next) -> accu + next);
    }

    @Override
    public Mono<String> flushall() {
        Map publishers = this.executeOnMasters(kvRedisClusterReactiveCommancommandss -> kvRedisClusterReactiveCommancommandss.flushall().flux());
        return Flux.merge(publishers.values()).last();
    }

    @Override
    public Mono<String> flushdb() {
        Map publishers = this.executeOnMasters(commands -> commands.flushdb().flux());
        return Flux.merge(publishers.values()).last();
    }

    @Override
    public Flux<K> keys(K pattern) {
        Map publishers = this.executeOnMasters(commands -> commands.keys(pattern));
        return Flux.merge(publishers.values());
    }

    @Override
    public Mono<Long> keys(KeyStreamingChannel<K> channel, K pattern) {
        Map publishers = this.executeOnMasters(commands -> commands.keys(channel, pattern).flux());
        return Flux.merge(publishers.values()).reduce((accu, next) -> accu + next);
    }

    @Override
    public Mono<V> randomkey() {
        Partitions partitions = this.getStatefulConnection().getPartitions();
        int index = this.random.nextInt(partitions.size());
        RedisClusterReactiveCommands<K, V> connection = this.getConnection(partitions.getPartition(index).getNodeId());
        return connection.randomkey();
    }

    @Override
    public Mono<String> scriptFlush() {
        Map publishers = this.executeOnNodes(commands -> commands.scriptFlush().flux(), redisClusterNode -> true);
        return Flux.merge(publishers.values()).last();
    }

    @Override
    public Mono<String> scriptKill() {
        Map publishers = this.executeOnNodes(commands -> commands.scriptFlush().flux(), redisClusterNode -> true);
        return Flux.merge(publishers.values()).onErrorReturn((Object)"OK").last();
    }

    @Override
    public Mono<Void> shutdown(boolean save) {
        Map publishers = this.executeOnNodes(commands -> commands.shutdown(save).flux(), redisClusterNode -> true);
        return Flux.merge(publishers.values()).then();
    }

    @Override
    public Mono<Long> touch(K ... keys) {
        return this.touch((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public Mono<Long> touch(Iterable<K> keys) {
        List<K> keyList = LettuceLists.newList(keys);
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keyList);
        if (partitioned.size() < 2) {
            return super.touch(keyList);
        }
        ArrayList<Flux> publishers = new ArrayList<Flux>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            publishers.add(super.touch((Iterable)entry.getValue()).flux());
        }
        return Flux.merge(publishers).reduce((accu, next) -> accu + next);
    }

    protected <T> Map<String, Flux<T>> executeOnMasters(Function<RedisClusterReactiveCommands<K, V>, Flux<T>> function) {
        return this.executeOnNodes(function, redisClusterNode -> redisClusterNode.is(RedisClusterNode.NodeFlag.MASTER));
    }

    protected <T> Map<String, Flux<T>> executeOnNodes(Function<RedisClusterReactiveCommands<K, V>, Flux<T>> function, Function<RedisClusterNode, Boolean> filter) {
        HashMap<String, Flux<T>> executions = new HashMap<String, Flux<T>>();
        for (RedisClusterNode redisClusterNode : this.getStatefulConnection().getPartitions()) {
            if (!filter.apply(redisClusterNode).booleanValue()) continue;
            RedisURI uri = redisClusterNode.getUri();
            StatefulRedisConnection<K, V> connection = this.getStatefulConnection().getConnection(uri.getHost(), uri.getPort());
            if (!connection.isOpen()) continue;
            executions.put(redisClusterNode.getNodeId(), function.apply(connection.reactive()));
        }
        return executions;
    }

    private RedisClusterReactiveCommands<K, V> findConnectionBySlot(int slot) {
        RedisClusterNode node = this.getStatefulConnection().getPartitions().getPartitionBySlot(slot);
        if (node != null) {
            return this.getConnection(node.getUri().getHost(), node.getUri().getPort());
        }
        return null;
    }

    @Override
    public StatefulRedisClusterConnection<K, V> getStatefulConnection() {
        return (StatefulRedisClusterConnection)this.connection;
    }

    @Override
    public RedisClusterReactiveCommands<K, V> getConnection(String nodeId) {
        return this.getStatefulConnection().getConnection(nodeId).reactive();
    }

    @Override
    public RedisClusterReactiveCommands<K, V> getConnection(String host, int port) {
        return this.getStatefulConnection().getConnection(host, port).reactive();
    }

    @Override
    public Mono<KeyScanCursor<K>> scan() {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(), ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override
    public Mono<KeyScanCursor<K>> scan(ScanArgs scanArgs) {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(scanArgs), ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override
    public Mono<KeyScanCursor<K>> scan(ScanCursor scanCursor, ScanArgs scanArgs) {
        return this.clusterScan(scanCursor, (connection, cursor) -> connection.scan((ScanCursor)cursor, scanArgs), ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override
    public Mono<KeyScanCursor<K>> scan(ScanCursor scanCursor) {
        return this.clusterScan(scanCursor, (connection, cursor) -> connection.scan((ScanCursor)cursor), ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override
    public Mono<StreamScanCursor> scan(KeyStreamingChannel<K> channel) {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(channel), ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    @Override
    public Mono<StreamScanCursor> scan(KeyStreamingChannel<K> channel, ScanArgs scanArgs) {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(channel, scanArgs), ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    @Override
    public Mono<StreamScanCursor> scan(KeyStreamingChannel<K> channel, ScanCursor scanCursor, ScanArgs scanArgs) {
        return this.clusterScan(scanCursor, (connection, cursor) -> connection.scan(channel, (ScanCursor)cursor, scanArgs), ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    @Override
    public Mono<StreamScanCursor> scan(KeyStreamingChannel<K> channel, ScanCursor scanCursor) {
        return this.clusterScan(scanCursor, (connection, cursor) -> connection.scan(channel, (ScanCursor)cursor), ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    private <T extends ScanCursor> Mono<T> clusterScan(ScanCursor cursor, BiFunction<RedisKeyReactiveCommands<K, V>, ScanCursor, Mono<T>> scanFunction, ClusterScanSupport.ScanCursorMapper<Mono<T>> resultMapper) {
        return RedisAdvancedClusterReactiveCommandsImpl.clusterScan(this.getStatefulConnection(), cursor, scanFunction, resultMapper);
    }

    static <T extends ScanCursor, K, V> Mono<T> clusterScan(StatefulRedisClusterConnection<K, V> connection, ScanCursor cursor, BiFunction<RedisKeyReactiveCommands<K, V>, ScanCursor, Mono<T>> scanFunction, ClusterScanSupport.ScanCursorMapper<Mono<T>> mapper) {
        List<String> nodeIds = ClusterScanSupport.getNodeIds(connection, cursor);
        String currentNodeId = ClusterScanSupport.getCurrentNodeId(cursor, nodeIds);
        ScanCursor continuationCursor = ClusterScanSupport.getContinuationCursor(cursor);
        Mono<T> scanCursor = scanFunction.apply(connection.getConnection(currentNodeId).reactive(), continuationCursor);
        return mapper.map(nodeIds, currentNodeId, scanCursor);
    }

    private <T> Flux<T> pipeliningWithMap(Map<K, V> map, Function<Map<K, V>, Flux<T>> function, Function<Flux<T>, Flux<T>> resultFunction) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, map.keySet());
        if (partitioned.size() < 2) {
            return function.apply(map);
        }
        List publishers = partitioned.values().stream().map(ks -> {
            HashMap op = new HashMap();
            ks.forEach(k -> op.put(k, map.get(k)));
            return (Flux)function.apply(op);
        }).collect(Collectors.toList());
        return resultFunction.apply(Flux.merge(publishers));
    }
}

