package org.jetlinks.supports.cluster.event;

import io.netty.util.ReferenceCountUtil;
import org.jetlinks.core.Payload;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.event.TopicPayload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/supports/cluster/event/RedisClusterEventBroker.class */
public class RedisClusterEventBroker extends AbstractClusterEventBroker {
    private static final Logger log = LoggerFactory.getLogger(RedisClusterEventBroker.class);

    public RedisClusterEventBroker(ClusterManager clusterManager, ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        super(clusterManager, reactiveRedisConnectionFactory);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.jetlinks.supports.cluster.event.AbstractClusterEventBroker
    public Flux<TopicPayload> listen(String str, String str2) {
        return this.clusterManager.getQueue("/broker/bus/" + str2 + "/" + str).subscribe().map(bArr -> {
            return (TopicPayload) Payload.of(bArr).decode(this.topicPayloadCodec, false);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.jetlinks.supports.cluster.event.AbstractClusterEventBroker
    public Mono<Void> dispatch(String str, String str2, TopicPayload topicPayload) {
        byte[] bytes = this.topicPayloadCodec.encode(topicPayload).getBytes(true);
        ReferenceCountUtil.safeRelease(topicPayload);
        return this.clusterManager.getQueue("/broker/bus/" + str + "/" + str2).add(Mono.just(bytes)).then();
    }
}
