/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.protocol.tri;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.protocol.tri.AbstractClientStream;
import org.apache.dubbo.rpc.protocol.tri.AbstractStream;
import org.apache.dubbo.rpc.protocol.tri.GrpcStatus;
import org.apache.dubbo.rpc.protocol.tri.Metadata;
import org.apache.dubbo.rpc.protocol.tri.Stream;
import org.apache.dubbo.rpc.protocol.tri.TransportObserver;

public class ClientStream
extends AbstractClientStream
implements Stream {
    protected ClientStream(URL url) {
        super(url);
    }

    @Override
    protected StreamObserver<Object> createStreamObserver() {
        return new AbstractClientStream.ClientStreamObserver(){
            boolean metaSent;

            @Override
            public void onNext(Object data) {
                if (!this.metaSent) {
                    this.metaSent = true;
                    Metadata metadata = ClientStream.this.createRequestMeta((RpcInvocation)ClientStream.this.getRequest().getData());
                    ClientStream.this.getTransportSubscriber().onMetadata(metadata, false);
                }
                byte[] bytes = ClientStream.this.encodeRequest(data);
                ClientStream.this.getTransportSubscriber().onData(bytes, false);
            }

            @Override
            public void onError(Throwable throwable) {
                ClientStream.this.transportError(throwable);
            }
        };
    }

    @Override
    protected TransportObserver createTransportObserver() {
        return new AbstractStream.AbstractTransportObserver(){

            @Override
            public void onData(byte[] data, boolean endStream) {
                ClientStream.this.execute(() -> {
                    Object resp = ClientStream.this.deserializeResponse(data);
                    ClientStream.this.getStreamSubscriber().onNext(resp);
                });
            }

            @Override
            public void onComplete() {
                ClientStream.this.execute(() -> {
                    GrpcStatus status = this.extractStatusFromMeta(this.getHeaders());
                    if (GrpcStatus.Code.isOk(status.code.code)) {
                        ClientStream.this.getStreamSubscriber().onCompleted();
                    } else {
                        ClientStream.this.getStreamSubscriber().onError(status.asException());
                    }
                });
            }
        };
    }
}

