package org.jetlinks.supports.config;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.netty.util.ReferenceCountUtil;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.function.Supplier;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.config.ConfigStorage;
import org.jetlinks.core.config.ConfigStorageManager;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.supports.cluster.EventBusLocalCache;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/supports/config/EventBusStorageManager.class */
public class EventBusStorageManager implements ConfigStorageManager {
    private final ConcurrentMap<String, ClusterConfigStorage> cache;
    private final Supplier<Cache<String, Object>> cacheSupplier;
    private final Function<String, ClusterConfigStorage> storageBuilder;

    public EventBusStorageManager(ClusterManager clusterManager, EventBus eventBus) {
        this(clusterManager, eventBus, () -> {
            return CacheBuilder.newBuilder().build();
        });
    }

    public EventBusStorageManager(ClusterManager clusterManager, EventBus eventBus, Supplier<Cache<String, Object>> supplier) {
        this.cache = supplier.get().asMap();
        this.cacheSupplier = supplier;
        this.storageBuilder = str -> {
            return new ClusterConfigStorage(new EventBusLocalCache(str, eventBus, clusterManager, this.cacheSupplier.get()));
        };
        eventBus.subscribe(Subscription.of("event-bus-storage-listener", new String[]{"/_sys/cluster_cache/*/*/*"}, new Subscription.Feature[]{Subscription.Feature.broker})).subscribe(topicPayload -> {
            try {
                Map topicVars = topicPayload.getTopicVars("/_sys/cluster_cache/{name}/{type}/{key}");
                ClusterConfigStorage clusterConfigStorage = this.cache.get(topicVars.get("name"));
                if (clusterConfigStorage != null) {
                    ((EventBusLocalCache) clusterConfigStorage.getCache()).clearLocalCache(topicVars.get("key"));
                }
            } finally {
                ReferenceCountUtil.safeRelease(topicPayload);
            }
        });
    }

    public Mono<ConfigStorage> getStorage(String str) {
        return Mono.just(this.cache.computeIfAbsent(str, this.storageBuilder));
    }
}
