/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.bookkeeper;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.bookkeeper.CombinedLedgerRangeIterator;
import org.apache.pulsar.metadata.bookkeeper.LegacyHierarchicalLedgerManager;
import org.apache.pulsar.metadata.bookkeeper.LegacyHierarchicalLedgerRangeIterator;
import org.apache.pulsar.metadata.bookkeeper.LongHierarchicalLedgerManager;
import org.apache.pulsar.metadata.bookkeeper.LongHierarchicalLedgerRangeIterator;
import org.apache.zookeeper.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarLedgerManager
implements LedgerManager {
    private static final Logger log = LoggerFactory.getLogger(PulsarLedgerManager.class);
    private final String ledgerRootPath;
    private final MetadataStore store;
    private final MetadataCache<LedgerMetadata> cache;
    private final LedgerMetadataSerDe serde;
    private final LegacyHierarchicalLedgerManager legacyLedgerManager;
    private final LongHierarchicalLedgerManager longLedgerManager;
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultThreadFactory("pulsar-bk-ledger-manager-scheduler"));
    protected final ConcurrentMap<Long, Set<BookkeeperInternalCallbacks.LedgerMetadataListener>> listeners = new ConcurrentHashMap<Long, Set<BookkeeperInternalCallbacks.LedgerMetadataListener>>();
    private static final Pattern ledgerPathRegex = Pattern.compile("/L[0-9]+$");

    PulsarLedgerManager(MetadataStore store, String ledgersRootPath) {
        this.ledgerRootPath = ledgersRootPath;
        this.store = store;
        this.legacyLedgerManager = new LegacyHierarchicalLedgerManager(store, this.scheduler, this.ledgerRootPath);
        this.longLedgerManager = new LongHierarchicalLedgerManager(store, this.scheduler, this.ledgerRootPath);
        this.serde = new LedgerMetadataSerDe();
        this.cache = store.getMetadataCache(new MetadataSerde<LedgerMetadata>(){

            @Override
            public byte[] serialize(String path, LedgerMetadata value) throws IOException {
                return PulsarLedgerManager.this.serde.serialize(value);
            }

            @Override
            public LedgerMetadata deserialize(String path, byte[] content, Stat stat) throws IOException {
                return PulsarLedgerManager.this.serde.parseConfig(content, PulsarLedgerManager.this.getLedgerId(path), Optional.of(stat.getCreationTimestamp()));
            }
        });
        store.registerListener(this::handleDataNotification);
    }

    private static Throwable mapToBkException(Throwable ex) {
        if (ex instanceof CompletionException || ex instanceof ExecutionException) {
            return PulsarLedgerManager.mapToBkException(ex.getCause());
        }
        if (ex instanceof MetadataStoreException.NotFoundException) {
            BKException bke = BKException.create((int)-25);
            bke.initCause(ex);
            return bke;
        }
        if (ex instanceof MetadataStoreException.AlreadyExistsException) {
            BKException bke = BKException.create((int)-20);
            bke.initCause(ex);
            return bke;
        }
        if (ex instanceof MetadataStoreException.BadVersionException) {
            BKException bke = BKException.create((int)-17);
            bke.initCause(ex);
            return bke;
        }
        if (ex instanceof MetadataStoreException.AlreadyClosedException) {
            BKException bke = BKException.create((int)-11);
            bke.initCause(ex);
            return bke;
        }
        return ex;
    }

    public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long ledgerId, LedgerMetadata inputMetadata) {
        byte[] data;
        LedgerMetadata metadata = inputMetadata.getMetadataFormatVersion() > 2 ? LedgerMetadataBuilder.from((LedgerMetadata)inputMetadata).withId(ledgerId).build() : inputMetadata;
        try {
            data = this.serde.serialize(metadata);
        }
        catch (IOException ioe) {
            return FutureUtil.failedFuture((Throwable)new BKException.BKMetadataSerializationException((Throwable)ioe));
        }
        CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<Versioned<LedgerMetadata>>();
        this.store.put(this.getLedgerPath(ledgerId), data, Optional.of(-1L)).whenComplete((stat, ex) -> {
            if (ex != null) {
                log.error("Failed to create ledger {}: {}", (Object)ledgerId, (Object)ex.getMessage());
                promise.completeExceptionally(PulsarLedgerManager.mapToBkException(ex));
                return;
            }
            Versioned result = new Versioned((Object)metadata, (Version)new LongVersion(stat.getVersion()));
            promise.complete(result);
        });
        return promise;
    }

    public CompletableFuture<Void> removeLedgerMetadata(long ledgerId, Version version) {
        Optional<Long> existingVersion = Optional.empty();
        if (Version.NEW == version) {
            log.error("Request to delete ledger {} metadata with version set to the initial one", (Object)ledgerId);
            return FutureUtil.failedFuture((Throwable)new BKException.BKMetadataVersionException());
        }
        if (Version.ANY != version) {
            if (!(version instanceof LongVersion)) {
                log.info("Not an instance of ZKVersion: {}", (Object)ledgerId);
                return FutureUtil.failedFuture((Throwable)new BKException.BKMetadataVersionException());
            }
            existingVersion = Optional.of(((LongVersion)version).getLongVersion());
        }
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.store.delete(this.getLedgerPath(ledgerId), existingVersion).whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("Failed to remove ledger metadata {}: {}", (Object)ledgerId, (Object)ex.getMessage());
                promise.completeExceptionally(PulsarLedgerManager.mapToBkException(ex));
                return;
            }
            promise.complete((Void)result);
            Set listenerSet = (Set)this.listeners.remove(ledgerId);
            if (null != listenerSet) {
                if (log.isDebugEnabled()) {
                    log.debug("Remove registered ledger metadata listeners on ledger {} after ledger is deleted.", (Object)ledgerId);
                }
            } else if (log.isDebugEnabled()) {
                log.debug("No ledger metadata listeners to remove from ledger {} when it's being deleted.", (Object)ledgerId);
            }
        });
        return promise;
    }

    public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId) {
        CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<Versioned<LedgerMetadata>>();
        String ledgerPath = this.getLedgerPath(ledgerId);
        ((CompletableFuture)this.cache.getWithStats(ledgerPath).thenAccept(optRes -> {
            if (!optRes.isPresent()) {
                if (log.isDebugEnabled()) {
                    log.debug("No such ledger: {} at path {}", (Object)ledgerId, (Object)ledgerPath);
                }
                promise.completeExceptionally((Throwable)new BKException.BKNoSuchLedgerExistsOnMetadataServerException());
                return;
            }
            Stat stat = ((CacheGetResult)optRes.get()).getStat();
            LongVersion version = new LongVersion(stat.getVersion());
            LedgerMetadata metadata = (LedgerMetadata)((CacheGetResult)optRes.get()).getValue();
            promise.complete(new Versioned((Object)metadata, (Version)version));
        })).exceptionally(ex -> {
            log.error("Could not read metadata for ledger: {}: {}", (Object)ledgerId, (Object)ex.getMessage());
            promise.completeExceptionally((Throwable)new BKException.ZKException(ex.getCause()));
            return null;
        });
        return promise;
    }

    public CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, Version currentVersion) {
        byte[] data;
        if (!(currentVersion instanceof LongVersion)) {
            return FutureUtil.failedFuture((Throwable)new BKException.BKMetadataVersionException());
        }
        LongVersion zv = (LongVersion)currentVersion;
        try {
            data = this.serde.serialize(metadata);
        }
        catch (IOException ioe) {
            return FutureUtil.failedFuture((Throwable)new BKException.BKMetadataSerializationException((Throwable)ioe));
        }
        CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<Versioned<LedgerMetadata>>();
        ((CompletableFuture)this.store.put(this.getLedgerPath(ledgerId), data, Optional.of(zv.getLongVersion())).thenAccept(stat -> promise.complete(new Versioned((Object)metadata, (Version)new LongVersion(stat.getVersion()))))).exceptionally(ex -> {
            if (ex.getCause() instanceof MetadataStoreException.BadVersionException) {
                promise.completeExceptionally((Throwable)new BKException.BKMetadataVersionException());
            } else {
                log.warn("Conditional update ledger metadata failed: {}", (Object)ex.getMessage());
                promise.completeExceptionally((Throwable)new BKException.ZKException(ex.getCause()));
            }
            return null;
        });
        return promise;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerLedgerMetadataListener(long ledgerId, BookkeeperInternalCallbacks.LedgerMetadataListener listener) {
        Set listenerSet;
        if (listener == null) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Registered ledger metadata listener {} on ledger {}.", (Object)listener, (Object)ledgerId);
        }
        Set set = listenerSet = this.listeners.computeIfAbsent(ledgerId, k -> new HashSet());
        synchronized (set) {
            listenerSet.add(listener);
        }
        new ReadLedgerMetadataTask(ledgerId).run();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregisterLedgerMetadataListener(long ledgerId, BookkeeperInternalCallbacks.LedgerMetadataListener listener) {
        Set listenerSet = (Set)this.listeners.get(ledgerId);
        if (listenerSet == null) {
            return;
        }
        Set set = listenerSet;
        synchronized (set) {
            if (listenerSet.remove(listener) && log.isDebugEnabled()) {
                log.debug("Unregistered ledger metadata listener {} on ledger {}.", (Object)listener, (Object)ledgerId);
            }
            if (listenerSet.isEmpty()) {
                this.listeners.remove(ledgerId, listenerSet);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleDataNotification(Notification n) {
        long ledgerId;
        if (!n.getPath().startsWith(this.ledgerRootPath) || !ledgerPathRegex.matcher(n.getPath()).matches()) {
            return;
        }
        try {
            ledgerId = this.getLedgerId(n.getPath());
        }
        catch (IOException ioe) {
            log.warn("Received invalid ledger path {} : ", (Object)n.getPath(), (Object)ioe);
            return;
        }
        switch (n.getType()) {
            case Modified: {
                new ReadLedgerMetadataTask(ledgerId).run();
                break;
            }
            case Deleted: {
                Set listenerSet = (Set)this.listeners.get(ledgerId);
                if (listenerSet != null) {
                    Set set = listenerSet;
                    synchronized (set) {
                        if (log.isDebugEnabled()) {
                            log.debug("Removed ledger metadata listeners on ledger {} : {}", (Object)ledgerId, (Object)listenerSet);
                        }
                        for (BookkeeperInternalCallbacks.LedgerMetadataListener l : listenerSet) {
                            l.onChanged(ledgerId, null);
                        }
                        this.listeners.remove(ledgerId, listenerSet);
                        break;
                    }
                }
                if (!log.isDebugEnabled()) break;
                log.debug("No ledger metadata listeners to remove from ledger {} after it's deleted.", (Object)ledgerId);
                break;
            }
            default: {
                if (!log.isDebugEnabled()) break;
                log.debug("Received event {} on {}.", (Object)n.getType(), (Object)n.getPath());
            }
        }
    }

    public void asyncProcessLedgers(BookkeeperInternalCallbacks.Processor<Long> processor, AsyncCallback.VoidCallback finalCb, Object context, int successRc, int failureRc) {
        this.legacyLedgerManager.asyncProcessLedgers(processor, (rc, path, ctx) -> {
            if (rc == failureRc) {
                finalCb.processResult(rc, path, ctx);
            } else {
                this.longLedgerManager.asyncProcessLedgers(processor, finalCb, context, successRc, failureRc);
            }
        }, context, successRc, failureRc);
    }

    public LedgerManager.LedgerRangeIterator getLedgerRanges(long ledgerId) {
        LegacyHierarchicalLedgerRangeIterator iteratorA = new LegacyHierarchicalLedgerRangeIterator(this.store, this.ledgerRootPath);
        LongHierarchicalLedgerRangeIterator iteratorB = new LongHierarchicalLedgerRangeIterator(this.store, this.ledgerRootPath);
        return new CombinedLedgerRangeIterator(iteratorA, iteratorB);
    }

    public void close() throws IOException {
        this.scheduler.shutdownNow();
        try {
            this.scheduler.awaitTermination(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new IOException(ie);
        }
    }

    private String getLedgerPath(long ledgerId) {
        return this.ledgerRootPath + StringUtils.getHybridHierarchicalLedgerPath((long)ledgerId);
    }

    private long getLedgerId(String ledgerPath) throws IOException {
        if (!ledgerPath.startsWith(this.ledgerRootPath)) {
            throw new IOException("it is not a valid hashed path name : " + ledgerPath);
        }
        String hierarchicalPath = ledgerPath.substring(this.ledgerRootPath.length() + 1);
        return StringUtils.stringToLongHierarchicalLedgerId((String)hierarchicalPath);
    }

    public boolean isLedgerParentNode(String path) {
        return path.matches("\\d{2,3}");
    }

    private class ReadLedgerMetadataTask
    implements Runnable {
        final long ledgerId;

        ReadLedgerMetadataTask(long ledgerId) {
            this.ledgerId = ledgerId;
        }

        @Override
        public void run() {
            if (null != PulsarLedgerManager.this.listeners.get(this.ledgerId)) {
                if (log.isDebugEnabled()) {
                    log.debug("Re-read ledger metadata for {}.", (Object)this.ledgerId);
                }
                PulsarLedgerManager.this.readLedgerMetadata(this.ledgerId).whenComplete((metadata, exception) -> this.handleMetadata((Versioned<LedgerMetadata>)metadata, (Throwable)exception));
            } else if (log.isDebugEnabled()) {
                log.debug("Ledger metadata listener for ledger {} is already removed.", (Object)this.ledgerId);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handleMetadata(Versioned<LedgerMetadata> result, Throwable exception) {
            if (exception == null) {
                Set listenerSet = (Set)PulsarLedgerManager.this.listeners.get(this.ledgerId);
                if (null != listenerSet) {
                    if (log.isDebugEnabled()) {
                        log.debug("Ledger metadata is changed for {} : {}.", (Object)this.ledgerId, result);
                    }
                    PulsarLedgerManager.this.scheduler.execute(() -> {
                        Set set = listenerSet;
                        synchronized (set) {
                            for (BookkeeperInternalCallbacks.LedgerMetadataListener listener : listenerSet) {
                                listener.onChanged(this.ledgerId, result);
                            }
                        }
                    });
                }
            } else if (BKException.getExceptionCode((Throwable)exception) == -25) {
                Set listenerSet = (Set)PulsarLedgerManager.this.listeners.remove(this.ledgerId);
                if (null != listenerSet) {
                    if (log.isDebugEnabled()) {
                        log.debug("Removed ledger metadata listener set on ledger {} as its ledger is deleted : {}", (Object)this.ledgerId, (Object)listenerSet.size());
                    }
                    Set set = listenerSet;
                    synchronized (set) {
                        for (BookkeeperInternalCallbacks.LedgerMetadataListener listener : listenerSet) {
                            listener.onChanged(this.ledgerId, null);
                        }
                    }
                }
            } else {
                log.warn("Failed on read ledger metadata of ledger {}: {}", (Object)this.ledgerId, (Object)BKException.getExceptionCode((Throwable)exception));
                PulsarLedgerManager.this.scheduler.schedule(this, 10L, TimeUnit.SECONDS);
            }
        }
    }
}

