/* * * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package test import ( "context" "io" "testing" "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" testpb "google.golang.org/grpc/test/grpc_testing" ) func (s) TestStreamCleanup(t *testing.T) { const initialWindowSize uint = 70 * 1024 // Must be higher than default 64K, ignored otherwise const bodySize = 2 * initialWindowSize // Something that is not going to fit in a single window const callRecvMsgSize uint = 1 // The maximum message size the client can receive ss := &stubServer{ unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{Payload: &testpb.Payload{ Body: make([]byte, bodySize), }}, nil }, emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, } if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil { t.Fatalf("Error starting endpoint server: %v", err) } defer ss.Stop() if _, err := ss.client.UnaryCall(context.Background(), &testpb.SimpleRequest{}); status.Code(err) != codes.ResourceExhausted { t.Fatalf("should fail with ResourceExhausted, message's body size: %v, maximum message size the client can receive: %v", bodySize, callRecvMsgSize) } if _, err := ss.client.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { t.Fatalf("should succeed, err: %v", err) } } func (s) TestStreamCleanupAfterSendStatus(t *testing.T) { const initialWindowSize uint = 70 * 1024 // Must be higher than default 64K, ignored otherwise const bodySize = 2 * initialWindowSize // Something that is not going to fit in a single window serverReturnedStatus := make(chan struct{}) ss := &stubServer{ fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { defer func() { close(serverReturnedStatus) }() return stream.Send(&testpb.StreamingOutputCallResponse{ Payload: &testpb.Payload{ Body: make([]byte, bodySize), }, }) }, } if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}, grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil { t.Fatalf("Error starting endpoint server: %v", err) } defer ss.Stop() // This test makes sure we don't delete stream from server transport's // activeStreams list too aggressively. // 1. Make a long living stream RPC. So server's activeStream list is not // empty. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() stream, err := ss.client.FullDuplexCall(ctx) if err != nil { t.Fatalf("FullDuplexCall= _, %v; want _, ", err) } // 2. Wait for service handler to return status. // // This will trigger a stream cleanup code, which will eventually remove // this stream from activeStream. // // But the stream removal won't happen because it's supposed to be done // after the status is sent by loopyWriter, and the status send is blocked // by flow control. <-serverReturnedStatus // 3. GracefulStop (besides sending goaway) checks the number of // activeStreams. // // It will close the connection if there's no active streams. This won't // happen because of the pending stream. But if there's a bug in stream // cleanup that causes stream to be removed too aggressively, the connection // will be closd and the stream will be broken. gracefulStopDone := make(chan struct{}) go func() { defer close(gracefulStopDone) ss.s.GracefulStop() }() // 4. Make sure the stream is not broken. if _, err := stream.Recv(); err != nil { t.Fatalf("stream.Recv() = _, %v, want _, ", err) } if _, err := stream.Recv(); err != io.EOF { t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err) } timer := time.NewTimer(time.Second) select { case <-gracefulStopDone: timer.Stop() case <-timer.C: t.Fatalf("s.GracefulStop() didn't finish without 1 second after the last RPC") } }