/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.transport.rsocket;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.net.Address;
import io.scalecube.services.ServiceReference;
import io.scalecube.services.auth.CredentialsSupplier;
import io.scalecube.services.exceptions.MessageCodecException;
import io.scalecube.services.exceptions.ServiceException;
import io.scalecube.services.exceptions.UnauthorizedException;
import io.scalecube.services.transport.api.ClientChannel;
import io.scalecube.services.transport.api.ClientTransport;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.HeadersCodec;
import io.scalecube.services.transport.api.ReferenceCountUtil;
import io.scalecube.services.transport.api.ServiceMessageCodec;
import io.scalecube.services.transport.rsocket.ConnectionSetup;
import io.scalecube.services.transport.rsocket.ConnectionSetupCodec;
import io.scalecube.services.transport.rsocket.RSocketClientChannel;
import io.scalecube.services.transport.rsocket.RSocketClientTransportFactory;
import io.scalecube.utils.MaskUtil;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class RSocketClientTransport
implements ClientTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketClientTransport.class);
    private final ThreadLocal<Map<Address, Mono<RSocket>>> rsockets = ThreadLocal.withInitial(ConcurrentHashMap::new);
    private final CredentialsSupplier credentialsSupplier;
    private final ConnectionSetupCodec connectionSetupCodec;
    private final HeadersCodec headersCodec;
    private final Collection<DataCodec> dataCodecs;
    private final RSocketClientTransportFactory clientTransportFactory;

    public RSocketClientTransport(CredentialsSupplier credentialsSupplier, ConnectionSetupCodec connectionSetupCodec, HeadersCodec headersCodec, Collection<DataCodec> dataCodecs, RSocketClientTransportFactory clientTransportFactory) {
        this.credentialsSupplier = credentialsSupplier;
        this.connectionSetupCodec = connectionSetupCodec;
        this.headersCodec = headersCodec;
        this.dataCodecs = dataCodecs;
        this.clientTransportFactory = clientTransportFactory;
    }

    public ClientChannel create(ServiceReference serviceReference) {
        Map<Address, Mono<RSocket>> monoMap = this.rsockets.get();
        Address address = serviceReference.address();
        Mono mono = monoMap.computeIfAbsent(address, key -> this.getCredentials(serviceReference).flatMap(creds -> this.connect((Address)key, (Map<String, String>)creds, monoMap)).cache().doOnError(ex -> {
            Mono cfr_ignored_0 = (Mono)monoMap.remove(key);
        }));
        return new RSocketClientChannel((Mono<RSocket>)mono, new ServiceMessageCodec(this.headersCodec, this.dataCodecs));
    }

    private Mono<Map<String, String>> getCredentials(ServiceReference serviceReference) {
        return Mono.defer(() -> {
            if (this.credentialsSupplier == null) {
                return Mono.just(Collections.emptyMap());
            }
            return ((Mono)this.credentialsSupplier.apply((Object)serviceReference)).switchIfEmpty(Mono.just(Collections.emptyMap())).doOnSuccess(creds -> LOGGER.debug("[credentialsSupplier] Got credentials ({}) for service: {}", (Object)MaskUtil.mask((Map)creds), (Object)serviceReference)).doOnError(ex -> LOGGER.error("[credentialsSupplier] Failed to get credentials for service: {}, cause: {}", (Object)serviceReference, (Object)ex.toString())).onErrorMap(this::toUnauthorizedException);
        });
    }

    private Mono<RSocket> connect(Address address, Map<String, String> creds, Map<Address, Mono<RSocket>> monoMap) {
        return RSocketConnector.create().payloadDecoder(PayloadDecoder.DEFAULT).setupPayload(this.encodeConnectionSetup(new ConnectionSetup(creds))).connect(() -> this.clientTransportFactory.clientTransport(address)).doOnSuccess(rsocket -> {
            LOGGER.debug("[rsocket][client][{}] Connected successfully", (Object)address);
            rsocket.onClose().doFinally(s -> {
                monoMap.remove(address);
                LOGGER.debug("[rsocket][client][{}] Connection closed", (Object)address);
            }).doOnError(th -> LOGGER.warn("[rsocket][client][{}][onClose] Exception occurred: {}", (Object)address, (Object)th.toString())).subscribe();
        }).doOnError(th -> LOGGER.warn("[rsocket][client][{}] Failed to connect, cause: {}", (Object)address, (Object)th.toString()));
    }

    private Payload encodeConnectionSetup(ConnectionSetup connectionSetup) {
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
        try {
            this.connectionSetupCodec.encode((OutputStream)new ByteBufOutputStream(byteBuf), connectionSetup);
        }
        catch (Throwable ex) {
            ReferenceCountUtil.safestRelease((Object)byteBuf);
            LOGGER.error("Failed to encode connectionSetup: {}, cause: {}", (Object)connectionSetup, (Object)ex.toString());
            throw new MessageCodecException("Failed to encode ConnectionSetup", ex);
        }
        return ByteBufPayload.create((ByteBuf)byteBuf);
    }

    private UnauthorizedException toUnauthorizedException(Throwable th) {
        if (th instanceof ServiceException) {
            ServiceException e = (ServiceException)th;
            return new UnauthorizedException(e.errorCode(), e.getMessage());
        }
        return new UnauthorizedException(th);
    }
}

