/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seata.discovery.registry.redis;

import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.seata.common.exception.ShouldNeverHappenException;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.config.exception.ConfigNotFoundException;
import org.apache.seata.discovery.registry.RegistryHeartBeats;
import org.apache.seata.discovery.registry.RegistryService;
import org.apache.seata.discovery.registry.redis.RedisListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;

public class RedisRegistryServiceImpl
implements RegistryService<RedisListener> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisRegistryServiceImpl.class);
    private static final String PRO_SERVER_ADDR_KEY = "serverAddr";
    private static final String REDIS_FILEKEY_PREFIX = "registry.redis.";
    private static final String REGISTRY_TYPE = "redis";
    private static final String DEFAULT_CLUSTER = "default";
    private static final String REGISTRY_CLUSTER_KEY = "cluster";
    private String clusterName;
    private static final String REDIS_DB = "db";
    private static final String REDIS_PASSWORD = "password";
    private static final ConcurrentMap<String, List<RedisListener>> LISTENER_SERVICE_MAP = new ConcurrentHashMap<String, List<RedisListener>>();
    private static final ConcurrentMap<String, Set<InetSocketAddress>> CLUSTER_ADDRESS_MAP = new ConcurrentHashMap<String, Set<InetSocketAddress>>();
    private static volatile RedisRegistryServiceImpl instance;
    private static volatile JedisPool jedisPool;
    private static final long KEY_TTL = 5L;
    private static final long KEY_REFRESH_PERIOD = 2000L;
    private String transactionServiceGroup;
    private ScheduledExecutorService threadPoolExecutorForSubscribe = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("RedisRegistryService-subscribe", 1));
    private ScheduledExecutorService threadPoolExecutorForUpdateMap = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("RedisRegistryService-updateClusterAddrMap", 1));

    private RedisRegistryServiceImpl() {
        int minEvictableIdleTimeMillis;
        int timeBetweenEvictionRunsMillis;
        int numTestsPerEvictionRun;
        int maxWait;
        int maxTotal;
        int maxActive;
        int minIdle;
        Configuration seataConfig = ConfigurationFactory.CURRENT_FILE_INSTANCE;
        this.clusterName = seataConfig.getConfig("registry.redis.cluster", DEFAULT_CLUSTER);
        String password = seataConfig.getConfig(this.getRedisPasswordFileKey());
        String serverAddr = seataConfig.getConfig(this.getRedisAddrFileKey());
        String[] serverArr = NetUtil.splitIPPortStr(serverAddr);
        String host = serverArr[0];
        int port = Integer.parseInt(serverArr[1]);
        int db = seataConfig.getInt(this.getRedisDbFileKey());
        GenericObjectPoolConfig redisConfig = new GenericObjectPoolConfig();
        redisConfig.setTestOnBorrow(seataConfig.getBoolean("registry.redis.test-on-borrow", true));
        redisConfig.setTestOnReturn(seataConfig.getBoolean("registry.redis.test-on-return", false));
        redisConfig.setTestWhileIdle(seataConfig.getBoolean("registry.redis.test-while-idle", false));
        int maxIdle = seataConfig.getInt("registry.redis.max-idle", 0);
        if (maxIdle > 0) {
            redisConfig.setMaxIdle(maxIdle);
        }
        if ((minIdle = seataConfig.getInt("registry.redis.min-idle", 0)) > 0) {
            redisConfig.setMinIdle(minIdle);
        }
        if ((maxActive = seataConfig.getInt("registry.redis.max-active", 0)) > 0) {
            redisConfig.setMaxTotal(maxActive);
        }
        if ((maxTotal = seataConfig.getInt("registry.redis.max-total", 0)) > 0) {
            redisConfig.setMaxTotal(maxTotal);
        }
        if ((maxWait = seataConfig.getInt("registry.redis.max-wait", seataConfig.getInt("registry.redis.timeout", 0))) > 0) {
            redisConfig.setMaxWaitMillis((long)maxWait);
        }
        if ((numTestsPerEvictionRun = seataConfig.getInt("registry.redis.num-tests-per-eviction-run", 0)) > 0) {
            redisConfig.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
        }
        if ((timeBetweenEvictionRunsMillis = seataConfig.getInt("registry.redis.time-between-eviction-runs-millis", 0)) > 0) {
            redisConfig.setTimeBetweenEvictionRunsMillis((long)timeBetweenEvictionRunsMillis);
        }
        if ((minEvictableIdleTimeMillis = seataConfig.getInt("registry.redis.min-evictable-idle-time-millis", 0)) > 0) {
            redisConfig.setMinEvictableIdleTimeMillis((long)minEvictableIdleTimeMillis);
        }
        jedisPool = StringUtils.isNullOrEmpty(password) ? new JedisPool(redisConfig, host, port, 2000, null, db) : new JedisPool(redisConfig, host, port, 2000, password, db);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    static RedisRegistryServiceImpl getInstance() {
        if (instance != null) return instance;
        Class<RedisRegistryServiceImpl> clazz = RedisRegistryServiceImpl.class;
        synchronized (RedisRegistryServiceImpl.class) {
            if (instance != null) return instance;
            instance = new RedisRegistryServiceImpl();
            // ** MonitorExit[var0] (shouldn't be in output)
            return instance;
        }
    }

    @Override
    public void register(InetSocketAddress address) {
        NetUtil.validAddress(address);
        this.doRegisterOrExpire(address, true);
        RegistryHeartBeats.addHeartBeat(REGISTRY_TYPE, address, 2000L, this::doRegisterOrExpire);
    }

    private void doRegisterOrExpire(InetSocketAddress address) {
        this.doRegisterOrExpire(address, false);
    }

    private void doRegisterOrExpire(InetSocketAddress address, boolean publish) {
        String serverAddr = NetUtil.toStringAddress(address);
        String key = this.getRedisRegistryKey() + "_" + serverAddr;
        try (Jedis jedis = jedisPool.getResource();
             Pipeline pipelined = jedis.pipelined();){
            pipelined.setex(key, 5L, ManagementFactory.getRuntimeMXBean().getName());
            if (publish) {
                pipelined.publish(this.getRedisRegistryKey(), serverAddr + "-" + "register");
            }
            pipelined.sync();
        }
    }

    @Override
    public void unregister(InetSocketAddress address) {
        NetUtil.validAddress(address);
        String serverAddr = NetUtil.toStringAddress(address);
        try (Jedis jedis = jedisPool.getResource();
             Pipeline pipelined = jedis.pipelined();){
            pipelined.hdel(this.getRedisRegistryKey(), new String[]{serverAddr});
            pipelined.publish(this.getRedisRegistryKey(), serverAddr + "-" + "unregister");
            pipelined.sync();
        }
    }

    @Override
    public void subscribe(String cluster, RedisListener listener) {
        String redisRegistryKey = REDIS_FILEKEY_PREFIX + cluster;
        CollectionUtils.computeIfAbsent(LISTENER_SERVICE_MAP, cluster, key -> new ArrayList()).add(listener);
        this.threadPoolExecutorForUpdateMap.scheduleAtFixedRate(() -> {
            try (Jedis jedis = jedisPool.getResource();){
                this.updateClusterAddressMap(jedis, redisRegistryKey, cluster);
            }
            catch (Exception e) {
                LOGGER.error(e.getMessage(), (Throwable)e);
            }
        }, 0L, 2000L, TimeUnit.MILLISECONDS);
        this.threadPoolExecutorForSubscribe.scheduleAtFixedRate(() -> {
            try (Jedis jedis = jedisPool.getResource();){
                jedis.subscribe((JedisPubSub)new NotifySub((List)LISTENER_SERVICE_MAP.get(cluster)), new String[]{redisRegistryKey});
            }
            catch (Exception e) {
                LOGGER.error(e.getMessage(), (Throwable)e);
            }
        }, 0L, 1L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void unsubscribe(String cluster, RedisListener listener) {
    }

    @Override
    public List<InetSocketAddress> lookup(String key) {
        this.transactionServiceGroup = key;
        String clusterName = this.getServiceGroup(key);
        if (clusterName == null) {
            String missingDataId = "service.vgroupMapping." + key;
            throw new ConfigNotFoundException("%s configuration item is required", missingDataId);
        }
        return this.lookupByCluster(clusterName);
    }

    List<InetSocketAddress> lookupByCluster(String clusterName) {
        if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {
            String redisRegistryKey = REDIS_FILEKEY_PREFIX + clusterName;
            try (Jedis jedis = jedisPool.getResource();){
                this.updateClusterAddressMap(jedis, redisRegistryKey, clusterName);
            }
            this.subscribe(clusterName, (String msg) -> {
                String eventType;
                String[] msgr = msg.split("-");
                String serverAddr = msgr[0];
                switch (eventType = msgr[1]) {
                    case "register": {
                        CollectionUtils.computeIfAbsent(CLUSTER_ADDRESS_MAP, clusterName, value -> ConcurrentHashMap.newKeySet(2)).add(NetUtil.toInetSocketAddress(serverAddr));
                        break;
                    }
                    case "unregister": {
                        this.removeServerAddressByPushEmptyProtection(clusterName, serverAddr);
                        break;
                    }
                    default: {
                        throw new ShouldNeverHappenException("unknown redis msg:" + msg);
                    }
                }
            });
        }
        return new ArrayList<InetSocketAddress>(CollectionUtils.computeIfAbsent(CLUSTER_ADDRESS_MAP, clusterName, value -> ConcurrentHashMap.newKeySet(2)));
    }

    private void removeServerAddressByPushEmptyProtection(String notifyCluserName, String serverAddr) {
        String clusterName;
        String txServiceGroupName;
        Set socketAddresses = CollectionUtils.computeIfAbsent(CLUSTER_ADDRESS_MAP, notifyCluserName, value -> ConcurrentHashMap.newKeySet(2));
        InetSocketAddress inetSocketAddress = NetUtil.toInetSocketAddress(serverAddr);
        if (socketAddresses.size() == 1 && socketAddresses.contains(inetSocketAddress) && StringUtils.isNotEmpty(txServiceGroupName = ConfigurationFactory.getInstance().getConfig("txServiceGroup")) && notifyCluserName.equals(clusterName = this.getServiceGroup(txServiceGroupName))) {
            return;
        }
        socketAddresses.remove(inetSocketAddress);
        this.removeOfflineAddressesIfNecessary(this.transactionServiceGroup, notifyCluserName, socketAddresses);
    }

    @Override
    public void close() {
        this.threadPoolExecutorForSubscribe.shutdown();
        this.threadPoolExecutorForUpdateMap.shutdown();
        RegistryHeartBeats.close();
        jedisPool.destroy();
    }

    private void updateClusterAddressMap(Jedis jedis, String redisRegistryKey, String clusterName) {
        ScanParams scanParams = new ScanParams();
        scanParams.count(Integer.valueOf(10));
        scanParams.match(redisRegistryKey + "_*");
        String cursor = ScanParams.SCAN_POINTER_START;
        ConcurrentHashMap.KeySetView newAddressSet = ConcurrentHashMap.newKeySet(2);
        do {
            ScanResult scanResult = jedis.scan(cursor, scanParams);
            cursor = scanResult.getCursor();
            List instances = scanResult.getResult();
            if (instances == null || instances.isEmpty()) continue;
            Set part = instances.stream().map(key -> {
                String[] split = key.split("_");
                return NetUtil.toInetSocketAddress(split[1]);
            }).collect(Collectors.toSet());
            newAddressSet.addAll(part);
        } while (!cursor.equals(ScanParams.SCAN_POINTER_START));
        if (CollectionUtils.isNotEmpty(newAddressSet) && !newAddressSet.equals(CLUSTER_ADDRESS_MAP.get(clusterName))) {
            CLUSTER_ADDRESS_MAP.put(clusterName, newAddressSet);
        }
    }

    private String getRedisRegistryKey() {
        return REDIS_FILEKEY_PREFIX + this.clusterName;
    }

    private String getRedisAddrFileKey() {
        return "registry.redis.serverAddr";
    }

    private String getRedisPasswordFileKey() {
        return "registry.redis.password";
    }

    private String getRedisDbFileKey() {
        return "registry.redis.db";
    }

    private static class NotifySub
    extends JedisPubSub {
        private final List<RedisListener> redisListeners;

        NotifySub(List<RedisListener> redisListeners) {
            this.redisListeners = redisListeners;
        }

        public void onMessage(String key, String msg) {
            for (RedisListener listener : this.redisListeners) {
                try {
                    listener.onEvent(msg);
                }
                catch (Exception e) {
                    LOGGER.error(e.getMessage(), (Throwable)e);
                }
            }
        }
    }
}

