/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.protocol.tri;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.protocol.tri.AbstractServerStream;
import org.apache.dubbo.rpc.protocol.tri.AbstractStream;
import org.apache.dubbo.rpc.protocol.tri.DefaultMetadata;
import org.apache.dubbo.rpc.protocol.tri.GrpcStatus;
import org.apache.dubbo.rpc.protocol.tri.Metadata;
import org.apache.dubbo.rpc.protocol.tri.Stream;
import org.apache.dubbo.rpc.protocol.tri.TransportObserver;

public class ServerStream
extends AbstractServerStream
implements Stream {
    protected ServerStream(URL url) {
        super(url);
    }

    @Override
    protected StreamObserver<Object> createStreamObserver() {
        return new ServerStreamObserver();
    }

    @Override
    protected TransportObserver createTransportObserver() {
        return new StreamTransportObserver();
    }

    private class StreamTransportObserver
    extends AbstractStream.AbstractTransportObserver
    implements TransportObserver {
        private StreamTransportObserver() {
        }

        @Override
        public void onMetadata(Metadata metadata, boolean endStream, Stream.OperationHandler handler) {
            super.onMetadata(metadata, endStream, handler);
            RpcInvocation inv = ServerStream.this.buildInvocation(metadata);
            inv.setArguments(new Object[]{ServerStream.this.asStreamObserver()});
            Result result = ServerStream.this.getInvoker().invoke(inv);
            try {
                ServerStream.this.subscribe((StreamObserver)result.getValue());
            }
            catch (Throwable t) {
                ServerStream.this.transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL).withDescription("Failed to create server's observer"));
            }
        }

        @Override
        public void onData(byte[] in, boolean endStream, Stream.OperationHandler handler) {
            try {
                Object[] arguments = ServerStream.this.deserializeRequest(in);
                if (arguments != null) {
                    ServerStream.this.getStreamSubscriber().onNext(arguments[0]);
                }
            }
            catch (Throwable t) {
                ServerStream.this.transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL).withDescription("Deserialize request failed").withCause(t));
            }
        }

        @Override
        public void onComplete(Stream.OperationHandler handler) {
            ServerStream.this.getStreamSubscriber().onCompleted();
        }
    }

    private class ServerStreamObserver
    implements StreamObserver<Object> {
        private boolean headersSent;

        private ServerStreamObserver() {
        }

        @Override
        public void onNext(Object data) {
            if (!this.headersSent) {
                ServerStream.this.getTransportSubscriber().tryOnMetadata(new DefaultMetadata(), false);
                this.headersSent = true;
            }
            byte[] bytes = ServerStream.this.encodeResponse(data);
            ServerStream.this.getTransportSubscriber().tryOnData(bytes, false);
        }

        @Override
        public void onError(Throwable throwable) {
            GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL).withCause(throwable).withDescription("Biz exception");
            ServerStream.this.transportError(status);
        }

        @Override
        public void onCompleted() {
            DefaultMetadata metadata = new DefaultMetadata();
            metadata.put("grpc-message", "OK");
            metadata.put("grpc-status", Integer.toString(GrpcStatus.Code.OK.code));
            ServerStream.this.getTransportSubscriber().tryOnMetadata(metadata, true);
        }
    }
}

