/*
 * Decompiled with CFR 0.152.
 */
package de.codecentric.boot.admin.server.domain.entities;

import de.codecentric.boot.admin.server.domain.entities.EventsourcingInstanceRepository;
import de.codecentric.boot.admin.server.domain.entities.Instance;
import de.codecentric.boot.admin.server.domain.events.InstanceEvent;
import de.codecentric.boot.admin.server.domain.values.InstanceId;
import de.codecentric.boot.admin.server.eventstore.InstanceEventStore;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.retry.Retry;

public class SnapshottingInstanceRepository
extends EventsourcingInstanceRepository {
    private static final Logger log = LoggerFactory.getLogger(SnapshottingInstanceRepository.class);
    private final ConcurrentMap<InstanceId, Instance> snapshots = new ConcurrentHashMap<InstanceId, Instance>();
    private Disposable subscription;
    private final Retry<Object> retryOnAny = Retry.any().retryMax(Integer.MAX_VALUE).doOnRetry(ctx -> log.error("Resubscribing after uncaught error", ctx.exception()));

    public SnapshottingInstanceRepository(InstanceEventStore eventStore) {
        super(eventStore);
    }

    @Override
    public Flux<Instance> findAll() {
        return Mono.fromSupplier(this.snapshots::values).flatMapIterable(Function.identity());
    }

    @Override
    public Mono<Instance> find(InstanceId id) {
        return Mono.defer(() -> Mono.justOrEmpty(this.snapshots.get(id)));
    }

    @Override
    public Flux<Instance> findByName(String name) {
        return this.findAll().filter(a -> a.isRegistered() && name.equals(a.getRegistration().getName()));
    }

    public void start() {
        this.subscription = this.getEventStore().findAll().concatWith((Publisher)this.getEventStore()).doOnNext(this::updateSnapshot).retryWhen(this.retryOnAny).subscribe();
    }

    public void stop() {
        if (this.subscription != null) {
            this.subscription.dispose();
        }
    }

    protected void updateSnapshot(InstanceEvent event) {
        this.snapshots.compute(event.getInstance(), (? super K key, ? super V old) -> {
            Instance instance = old != null ? old : Instance.create(key);
            return instance.apply(event);
        });
    }
}

