/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.k8s.sync;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.k8s.sync.K8sSyncConfig;
import com.alibaba.nacos.k8s.sync.Loggers;
import com.alibaba.nacos.naming.core.InstanceOperatorClientImpl;
import com.alibaba.nacos.naming.core.ServiceOperatorV2Impl;
import com.alibaba.nacos.naming.core.v2.ServiceManager;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import io.kubernetes.client.informer.ResourceEventHandler;
import io.kubernetes.client.informer.SharedIndexInformer;
import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.informer.cache.Lister;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1EndpointAddress;
import io.kubernetes.client.openapi.models.V1EndpointSubset;
import io.kubernetes.client.openapi.models.V1Endpoints;
import io.kubernetes.client.openapi.models.V1EndpointsList;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceList;
import io.kubernetes.client.openapi.models.V1ServicePort;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.KubeConfig;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import okhttp3.OkHttpClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class K8sSyncServer {
    @Autowired
    private K8sSyncConfig k8sSyncConfig;
    @Autowired
    private ServiceOperatorV2Impl serviceOperatorV2;
    @Autowired
    private InstanceOperatorClientImpl instanceOperatorClient;
    private SharedInformerFactory factory;

    @EventListener(value={ApplicationReadyEvent.class})
    public void start() throws IOException {
        if (!this.k8sSyncConfig.isEnabled()) {
            Loggers.MAIN.info("The Nacos k8s-sync is disabled.");
            return;
        }
        Loggers.MAIN.info("Starting Nacos k8s-sync ...");
        this.startInformer();
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                Loggers.MAIN.info("Stopping Nacos k8s-sync ...");
                K8sSyncServer.this.stop();
                Loggers.MAIN.info("Nacos k8s-sync stopped...");
            }
        });
    }

    public void startInformer() throws IOException {
        ApiClient apiClient = this.k8sSyncConfig.isOutsideCluster() ? this.getOutsideApiClient() : ClientBuilder.cluster().build();
        Configuration.setDefaultApiClient((ApiClient)apiClient);
        CoreV1Api coreV1Api = new CoreV1Api();
        OkHttpClient httpClient = apiClient.getHttpClient().newBuilder().build();
        apiClient.setHttpClient(httpClient);
        this.factory = new SharedInformerFactory(apiClient);
        final SharedIndexInformer serviceInformer = this.factory.sharedIndexInformerFor(params -> {
            CoreV1Api.APIlistServiceForAllNamespacesRequest request = coreV1Api.listServiceForAllNamespaces();
            request.resourceVersion(params.resourceVersion);
            request.timeoutSeconds(params.timeoutSeconds);
            request.watch(params.watch);
            return request.buildCall(null);
        }, V1Service.class, V1ServiceList.class);
        final SharedIndexInformer endpointInformer = this.factory.sharedIndexInformerFor(params -> {
            CoreV1Api.APIlistEndpointsForAllNamespacesRequest request = coreV1Api.listEndpointsForAllNamespaces();
            request.resourceVersion(params.resourceVersion);
            request.timeoutSeconds(params.timeoutSeconds);
            request.watch(params.watch);
            return request.buildCall(null);
        }, V1Endpoints.class, V1EndpointsList.class);
        serviceInformer.addEventHandler((ResourceEventHandler)new ResourceEventHandler<V1Service>(){

            public void onAdd(V1Service service) {
                if (service.getMetadata() == null || service.getSpec() == null) {
                    return;
                }
                String serviceName = service.getMetadata().getName();
                String namespace = service.getMetadata().getNamespace();
                List servicePorts = service.getSpec().getPorts();
                try {
                    K8sSyncServer.this.registerService(namespace, serviceName, servicePorts, false, (SharedIndexInformer<V1Endpoints>)endpointInformer);
                    Loggers.MAIN.info("add service, namespace:" + namespace + " serviceName: " + serviceName);
                }
                catch (Exception e) {
                    Loggers.MAIN.warn("add service fail, message:" + e.getMessage() + " namespace:" + namespace + " serviceName: " + serviceName);
                }
            }

            public void onUpdate(V1Service oldService, V1Service newService) {
                if (oldService.getMetadata() == null || oldService.getSpec() == null || newService.getMetadata() == null || newService.getSpec() == null) {
                    return;
                }
                List oldServicePorts = oldService.getSpec().getPorts();
                String serviceName = newService.getMetadata().getName();
                String namespace = newService.getMetadata().getNamespace();
                List newServicePorts = newService.getSpec().getPorts();
                boolean portChanged = K8sSyncServer.this.compareServicePorts(oldServicePorts, newServicePorts);
                try {
                    K8sSyncServer.this.registerService(namespace, serviceName, newServicePorts, portChanged, (SharedIndexInformer<V1Endpoints>)endpointInformer);
                    Loggers.MAIN.info("update service, namespace: " + namespace + " serviceName: " + serviceName);
                }
                catch (Exception e) {
                    Loggers.MAIN.warn("update service fail, message: " + e.getMessage() + " namespace: " + namespace + " serviceName: " + serviceName);
                }
            }

            public void onDelete(V1Service service, boolean deletedFinalStateUnknown) {
                if (service.getMetadata() == null) {
                    return;
                }
                String serviceName = service.getMetadata().getName();
                String namespace = service.getMetadata().getNamespace();
                try {
                    K8sSyncServer.this.unregisterService(namespace, serviceName);
                    Loggers.MAIN.info("delete service, namespace:" + namespace + " serviceName:" + serviceName);
                }
                catch (Exception e) {
                    Loggers.MAIN.warn("delete service fail, message: " + e.getMessage() + " namespace:" + namespace + " serviceName:" + serviceName);
                }
            }
        });
        endpointInformer.addEventHandler((ResourceEventHandler)new ResourceEventHandler<V1Endpoints>(){

            public void onAdd(V1Endpoints obj) {
                if (obj.getMetadata() == null) {
                    return;
                }
                String serviceName = obj.getMetadata().getName();
                String namespace = obj.getMetadata().getNamespace();
                Set<String> addIpSet = K8sSyncServer.this.getIpFromEndpoints(obj);
                Lister serviceLister = new Lister(serviceInformer.getIndexer(), namespace);
                V1Service service = (V1Service)serviceLister.get(serviceName);
                List servicePorts = service.getSpec().getPorts();
                try {
                    K8sSyncServer.this.registerInstances(addIpSet, namespace, serviceName, servicePorts);
                    Loggers.MAIN.info("add instances, namespace:" + namespace + " serviceName: " + serviceName);
                }
                catch (NacosException e) {
                    Loggers.MAIN.warn("add instances fail, message:" + e.getMessage() + " namespace:" + namespace + ", serviceName: " + serviceName);
                }
            }

            public void onUpdate(V1Endpoints oldObj, V1Endpoints newObj) {
                if (newObj.getMetadata() == null) {
                    return;
                }
                String serviceName = newObj.getMetadata().getName();
                String namespace = newObj.getMetadata().getNamespace();
                Lister serviceLister = new Lister(serviceInformer.getIndexer(), namespace);
                V1Service service = (V1Service)serviceLister.get(serviceName);
                List servicePorts = service.getSpec().getPorts();
                try {
                    K8sSyncServer.this.registerService(namespace, serviceName, servicePorts, false, (SharedIndexInformer<V1Endpoints>)endpointInformer);
                    Loggers.MAIN.info("update instances, namespace:" + namespace + " serviceName: " + serviceName);
                }
                catch (NacosException e) {
                    Loggers.MAIN.warn("update instances fail, message:" + e.getMessage() + " namespace:" + namespace + ", serviceName: " + serviceName);
                }
            }

            public void onDelete(V1Endpoints obj, boolean deletedFinalStateUnknown) {
                if (obj.getMetadata() == null) {
                    return;
                }
                String serviceName = obj.getMetadata().getName();
                String namespace = obj.getMetadata().getNamespace();
                Set<String> deleteIpSet = K8sSyncServer.this.getIpFromEndpoints(obj);
                try {
                    List oldInstanceList = K8sSyncServer.this.instanceOperatorClient.listAllInstances(namespace, serviceName);
                    K8sSyncServer.this.unregisterInstances(deleteIpSet, namespace, serviceName, oldInstanceList);
                    Loggers.MAIN.info("delete instances, namespace:" + namespace + ", serviceName: " + serviceName);
                }
                catch (NacosException e) {
                    Loggers.MAIN.info("delete instances fail, namespace:" + namespace + ", serviceName: " + serviceName);
                }
            }
        });
        long timeout = 30000L;
        long startTime = System.currentTimeMillis();
        serviceInformer.run();
        while (!serviceInformer.hasSynced()) {
            if (System.currentTimeMillis() - startTime > timeout) {
                throw new RuntimeException("Informer serviceInformer sync timed out");
            }
            ThreadUtils.sleep((long)100L);
        }
        startTime = System.currentTimeMillis();
        endpointInformer.run();
        while (!endpointInformer.hasSynced()) {
            if (System.currentTimeMillis() - startTime > timeout) {
                throw new RuntimeException("Informer endpointInformer sync timed out");
            }
            ThreadUtils.sleep((long)100L);
        }
    }

    public Instance createInstance(String ip, int targetPort, String serviceName, int port) {
        Instance instance = new Instance();
        instance.setIp(ip);
        instance.setPort(targetPort);
        instance.setClusterName(serviceName);
        instance.setEphemeral(false);
        instance.setHealthy(true);
        instance.addMetadata("servicePort", String.valueOf(port));
        return instance;
    }

    public void registerService(String namespace, String serviceName, List<V1ServicePort> servicePorts, boolean portChanged, SharedIndexInformer<V1Endpoints> endpointInformer) throws NacosException {
        Service service = Service.newService((String)namespace, (String)"DEFAULT_GROUP", (String)serviceName, (boolean)false);
        ServiceManager.getInstance().getSingleton(service);
        HashSet<String> oldIpSet = new HashSet<String>();
        List oldInstanceList = this.instanceOperatorClient.listAllInstances(namespace, serviceName);
        for (Instance instance : oldInstanceList) {
            oldIpSet.add(instance.getIp());
        }
        Lister endpointLister = new Lister(endpointInformer.getIndexer(), namespace);
        V1Endpoints endpoints = (V1Endpoints)endpointLister.get(serviceName);
        Set<String> newIpSet = this.getIpFromEndpoints(endpoints);
        HashSet<String> deleteIpSet = new HashSet<String>();
        deleteIpSet.addAll(oldIpSet);
        deleteIpSet.removeAll(newIpSet);
        this.unregisterInstances(deleteIpSet, namespace, serviceName, oldInstanceList);
        HashSet<String> addIpSet = new HashSet<String>();
        addIpSet.addAll(newIpSet);
        if (!portChanged) {
            addIpSet.removeAll(oldIpSet);
        }
        this.registerInstances(addIpSet, namespace, serviceName, servicePorts);
    }

    public void unregisterService(String namespace, String serviceName) throws NacosException {
        List instancelist = this.instanceOperatorClient.listAllInstances(namespace, serviceName);
        for (Instance instance : instancelist) {
            this.instanceOperatorClient.removeInstance(namespace, serviceName, instance);
        }
        this.serviceOperatorV2.delete(namespace, serviceName);
    }

    public void registerInstances(Set<String> addIpSet, String namespace, String serviceName, List<V1ServicePort> servicePorts) throws NacosException {
        for (V1ServicePort servicePort : servicePorts) {
            int port = servicePort.getPort();
            if (!servicePort.getTargetPort().isInteger()) continue;
            int targetPort = servicePort.getTargetPort().getIntValue();
            for (String ip : addIpSet) {
                Instance instance = this.createInstance(ip, targetPort, serviceName, port);
                this.instanceOperatorClient.registerInstance(namespace, serviceName, instance);
            }
        }
    }

    public void unregisterInstances(Set<String> deleteIpSet, String namespace, String serviceName, List<? extends Instance> oldInstanceList) throws NacosException {
        for (Instance instance : oldInstanceList) {
            if (!deleteIpSet.contains(instance.getIp())) continue;
            this.instanceOperatorClient.removeInstance(namespace, serviceName, instance);
        }
    }

    public Set<String> getIpFromEndpoints(V1Endpoints endpoints) {
        HashSet<String> ipSet = new HashSet<String>();
        List endpointSubsetList = endpoints.getSubsets();
        for (V1EndpointSubset endpointSubset : endpointSubsetList) {
            for (V1EndpointAddress endpointAddress : endpointSubset.getAddresses()) {
                ipSet.add(endpointAddress.getIp());
            }
        }
        return ipSet;
    }

    public boolean compareServicePorts(List<V1ServicePort> oldServicePorts, List<V1ServicePort> newServicePorts) {
        if (oldServicePorts.size() != newServicePorts.size()) {
            return false;
        }
        return oldServicePorts.containsAll(newServicePorts) && newServicePorts.containsAll(oldServicePorts);
    }

    public ApiClient getOutsideApiClient() throws IOException {
        String kubeConfigPath = this.k8sSyncConfig.getKubeConfig();
        ApiClient apiClient = ClientBuilder.kubeconfig((KubeConfig)KubeConfig.loadKubeConfig((Reader)new FileReader(kubeConfigPath))).build();
        Configuration.setDefaultApiClient((ApiClient)apiClient);
        return apiClient;
    }

    public void stop() {
        if (this.factory != null) {
            this.factory.stopAllRegisteredInformers();
        }
    }
}

