/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.client;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.client.DataSegmentInterner;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.FilteredServerInventoryView;
import org.apache.druid.client.FilteringSegmentCallback;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.ServerView;
import org.apache.druid.curator.inventory.CuratorInventoryManager;
import org.apache.druid.curator.inventory.CuratorInventoryManagerStrategy;
import org.apache.druid.curator.inventory.InventoryManagerConfig;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;

@Deprecated
@ManageLifecycle
public class BatchServerInventoryView
implements ServerInventoryView,
FilteredServerInventoryView {
    private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);
    private final CuratorInventoryManager<DruidServer, Set<DataSegment>> inventoryManager;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final ConcurrentMap<ServerView.ServerRemovedCallback, Executor> serverRemovedCallbacks = new ConcurrentHashMap<ServerView.ServerRemovedCallback, Executor>();
    private final ConcurrentMap<ServerView.SegmentCallback, Executor> segmentCallbacks = new ConcurrentHashMap<ServerView.SegmentCallback, Executor>();
    private final ConcurrentMap<String, Set<DataSegment>> zNodes = new ConcurrentHashMap<String, Set<DataSegment>>();
    private final ConcurrentMap<ServerView.SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates = new ConcurrentHashMap<ServerView.SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>>();
    private final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter;

    public BatchServerInventoryView(final ZkPathsConfig zkPaths, CuratorFramework curator, final ObjectMapper jsonMapper, Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter, String pathChildrenCacheExecPrefix) {
        this.inventoryManager = new CuratorInventoryManager<DruidServer, Set<DataSegment>>(curator, new InventoryManagerConfig(){

            @Override
            public String getContainerPath() {
                return zkPaths.getAnnouncementsPath();
            }

            @Override
            public String getInventoryPath() {
                return zkPaths.getLiveSegmentsPath();
            }
        }, Execs.singleThreaded((String)(pathChildrenCacheExecPrefix + "-%s")), new CuratorInventoryManagerStrategy<DruidServer, Set<DataSegment>>(){

            @Override
            public DruidServer deserializeContainer(byte[] bytes) {
                try {
                    return (DruidServer)jsonMapper.readValue(bytes, DruidServer.class);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            @Override
            public Set<DataSegment> deserializeInventory(byte[] bytes) {
                try {
                    return (Set)jsonMapper.readValue(bytes, (TypeReference)new TypeReference<Set<DataSegment>>(){});
                }
                catch (IOException e) {
                    log.error((Throwable)e, "Could not parse json: %s", new Object[]{StringUtils.fromUtf8((byte[])bytes)});
                    throw new RuntimeException(e);
                }
            }

            @Override
            public void newContainer(DruidServer container) {
                log.info("New Server[%s]", new Object[]{container});
            }

            @Override
            public void deadContainer(DruidServer deadContainer) {
                log.info("Server Disappeared[%s]", new Object[]{deadContainer});
                BatchServerInventoryView.this.runServerRemovedCallbacks(deadContainer);
            }

            @Override
            public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer) {
                return newContainer.addDataSegments(oldContainer);
            }

            @Override
            public DruidServer addInventory(DruidServer container, String inventoryKey, Set<DataSegment> inventory) {
                return BatchServerInventoryView.this.addInnerInventory(container, inventoryKey, inventory);
            }

            @Override
            public DruidServer updateInventory(DruidServer container, String inventoryKey, Set<DataSegment> inventory) {
                return BatchServerInventoryView.this.updateInnerInventory(container, inventoryKey, inventory);
            }

            @Override
            public DruidServer removeInventory(DruidServer container, String inventoryKey) {
                return BatchServerInventoryView.this.removeInnerInventory(container, inventoryKey);
            }

            @Override
            public void inventoryInitialized() {
                log.info("Inventory Initialized", new Object[0]);
                BatchServerInventoryView.this.runSegmentCallbacks((Function<ServerView.SegmentCallback, ServerView.CallbackAction>)((Function)ServerView.SegmentCallback::segmentViewInitialized));
            }
        });
        this.defaultFilter = (Predicate)Preconditions.checkNotNull(defaultFilter);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStart
    public void start() throws Exception {
        AtomicBoolean atomicBoolean = this.started;
        synchronized (atomicBoolean) {
            if (!this.started.get()) {
                this.inventoryManager.start();
                this.started.set(true);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStop
    public void stop() throws IOException {
        AtomicBoolean atomicBoolean = this.started;
        synchronized (atomicBoolean) {
            if (this.started.getAndSet(false)) {
                this.inventoryManager.stop();
            }
        }
    }

    @Override
    public boolean isStarted() {
        return this.started.get();
    }

    @Override
    public DruidServer getInventoryValue(String containerKey) {
        return this.inventoryManager.getInventoryValue(containerKey);
    }

    @Override
    public Collection<DruidServer> getInventory() {
        return this.inventoryManager.getInventory();
    }

    @Override
    public void registerServerRemovedCallback(Executor exec, ServerView.ServerRemovedCallback callback) {
        this.serverRemovedCallbacks.put(callback, exec);
    }

    @Override
    public void registerSegmentCallback(Executor exec, ServerView.SegmentCallback callback) {
        this.segmentCallbacks.put(callback, exec);
    }

    protected void runSegmentCallbacks(Function<ServerView.SegmentCallback, ServerView.CallbackAction> fn) {
        for (Map.Entry entry : this.segmentCallbacks.entrySet()) {
            ((Executor)entry.getValue()).execute(() -> {
                if (ServerView.CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
                    this.segmentCallbackRemoved((ServerView.SegmentCallback)entry.getKey());
                    this.segmentCallbacks.remove(entry.getKey());
                }
            });
        }
    }

    private void runServerRemovedCallbacks(DruidServer server) {
        for (Map.Entry entry : this.serverRemovedCallbacks.entrySet()) {
            ((Executor)entry.getValue()).execute(() -> {
                if (ServerView.CallbackAction.UNREGISTER == ((ServerView.ServerRemovedCallback)entry.getKey()).serverRemoved(server)) {
                    this.serverRemovedCallbacks.remove(entry.getKey());
                }
            });
        }
    }

    protected void addSingleInventory(DruidServer container, DataSegment inventory) {
        log.debug("Server[%s] added segment[%s]", new Object[]{container.getName(), inventory.getId()});
        if (container.getSegment(inventory.getId()) != null) {
            log.warn("Not adding or running callbacks for existing segment[%s] on server[%s]", new Object[]{inventory.getId(), container.getName()});
            return;
        }
        container.addDataSegment(inventory);
        this.runSegmentCallbacks((Function<ServerView.SegmentCallback, ServerView.CallbackAction>)((Function)input -> input.segmentAdded(container.getMetadata(), inventory)));
    }

    void removeSingleInventory(DruidServer container, SegmentId segmentId) {
        log.debug("Server[%s] removed segment[%s]", new Object[]{container.getName(), segmentId});
        if (!this.doRemoveSingleInventory(container, segmentId)) {
            log.warn("Not running cleanup or callbacks for non-existing segment[%s] on server[%s]", new Object[]{segmentId, container.getName()});
        }
    }

    private boolean doRemoveSingleInventory(DruidServer container, SegmentId segmentId) {
        DataSegment segment = container.removeDataSegment(segmentId);
        if (segment != null) {
            this.runSegmentCallbacks((Function<ServerView.SegmentCallback, ServerView.CallbackAction>)((Function)input -> input.segmentRemoved(container.getMetadata(), segment)));
            return true;
        }
        return false;
    }

    @Override
    public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) {
        try {
            DruidServer server = this.getInventoryValue(serverKey);
            return server != null && server.getSegment(segment.getId()) != null;
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    protected DruidServer addInnerInventory(DruidServer container, String inventoryKey, Set<DataSegment> inventory) {
        Set<DataSegment> filteredInventory = this.filterInventory(container, inventory);
        this.zNodes.put(inventoryKey, filteredInventory);
        for (DataSegment segment : filteredInventory) {
            this.addSingleInventory(container, segment);
        }
        return container;
    }

    private Set<DataSegment> filterInventory(final DruidServer container, Set<DataSegment> inventory) {
        Predicate predicate = Predicates.or(this.defaultFilter, (Predicate)Predicates.or(this.segmentPredicates.values()));
        HashSet filteredInventory = Sets.newHashSet((Iterable)Iterables.transform((Iterable)Iterables.filter((Iterable)Iterables.transform(inventory, (Function)new Function<DataSegment, Pair<DruidServerMetadata, DataSegment>>(){

            public Pair<DruidServerMetadata, DataSegment> apply(DataSegment input) {
                return Pair.of((Object)container.getMetadata(), (Object)input);
            }
        }), (Predicate)predicate), (Function)new Function<Pair<DruidServerMetadata, DataSegment>, DataSegment>(){

            public DataSegment apply(Pair<DruidServerMetadata, DataSegment> input) {
                return DataSegmentInterner.intern((DataSegment)input.rhs);
            }
        }));
        return filteredInventory;
    }

    protected DruidServer updateInnerInventory(DruidServer container, String inventoryKey, Set<DataSegment> inventory) {
        Set<DataSegment> filteredInventory = this.filterInventory(container, inventory);
        Set existing = (Set)this.zNodes.get(inventoryKey);
        if (existing == null) {
            throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", new Object[]{inventoryKey});
        }
        for (DataSegment segment : Sets.difference(filteredInventory, (Set)existing)) {
            this.addSingleInventory(container, segment);
        }
        for (DataSegment segment : Sets.difference((Set)existing, filteredInventory)) {
            this.removeSingleInventory(container, segment.getId());
        }
        this.zNodes.put(inventoryKey, filteredInventory);
        return container;
    }

    protected DruidServer removeInnerInventory(DruidServer container, String inventoryKey) {
        log.debug("Server[%s] removed container[%s]", new Object[]{container.getName(), inventoryKey});
        Set segments = (Set)this.zNodes.remove(inventoryKey);
        if (segments == null) {
            log.warn("Told to remove container[%s], which didn't exist", new Object[]{inventoryKey});
            return container;
        }
        for (DataSegment segment : segments) {
            this.removeSingleInventory(container, segment.getId());
        }
        return container;
    }

    @Override
    public void registerSegmentCallback(Executor exec, ServerView.SegmentCallback callback, Predicate<Pair<DruidServerMetadata, DataSegment>> filter) {
        FilteringSegmentCallback filteringCallback = new FilteringSegmentCallback(callback, filter);
        this.segmentPredicates.put(filteringCallback, filter);
        this.registerSegmentCallback(exec, filteringCallback);
    }

    protected void segmentCallbackRemoved(ServerView.SegmentCallback callback) {
        this.segmentPredicates.remove(callback);
    }
}

