/* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include <ruby/ruby.h> #include "rb_completion_queue.h" #include "rb_grpc_imports.generated.h" #include <ruby/thread.h> #include <grpc/grpc.h> #include <grpc/support/log.h> #include <grpc/support/time.h> #include "rb_grpc.h" /* Used to allow grpc_completion_queue_next call to release the GIL */ typedef struct next_call_stack { grpc_completion_queue* cq; grpc_event event; gpr_timespec timeout; void* tag; volatile int interrupted; } next_call_stack; /* Calls grpc_completion_queue_pluck without holding the ruby GIL */ static void* grpc_rb_completion_queue_pluck_no_gil(void* param) { next_call_stack* const next_call = (next_call_stack*)param; gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN); gpr_timespec deadline; do { deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment); next_call->event = grpc_completion_queue_pluck( next_call->cq, next_call->tag, deadline, NULL); if (next_call->event.type != GRPC_QUEUE_TIMEOUT || gpr_time_cmp(deadline, next_call->timeout) > 0) { break; } } while (!next_call->interrupted); return NULL; } /* Helper function to free a completion queue. */ void grpc_rb_completion_queue_destroy(grpc_completion_queue* cq) { /* Every function that adds an event to a queue also synchronously plucks that event from the queue, and holds a reference to the Ruby object that holds the queue, so we only get to this point if all of those functions have completed, and the queue is empty */ grpc_completion_queue_shutdown(cq); grpc_completion_queue_destroy(cq); } static void unblock_func(void* param) { next_call_stack* const next_call = (next_call_stack*)param; next_call->interrupted = 1; } /* Does the same thing as grpc_completion_queue_pluck, while properly releasing the GVL and handling interrupts */ grpc_event rb_completion_queue_pluck(grpc_completion_queue* queue, void* tag, gpr_timespec deadline, void* reserved) { next_call_stack next_call; MEMZERO(&next_call, next_call_stack, 1); next_call.cq = queue; next_call.timeout = deadline; next_call.tag = tag; next_call.event.type = GRPC_QUEUE_TIMEOUT; (void)reserved; /* Loop until we finish a pluck without an interruption. The internal pluck function runs either until it is interrupted or it gets an event, or time runs out. The basic reason we need this relatively complicated construction is that we need to re-acquire the GVL when an interrupt comes in, so that the ruby interpreter can do what it needs to do with the interrupt. But we also need to get back to plucking when the interrupt has been handled. */ do { next_call.interrupted = 0; rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil, (void*)&next_call, unblock_func, (void*)&next_call); /* If an interrupt prevented pluck from returning useful information, then any plucks that did complete must have timed out */ } while (next_call.interrupted && next_call.event.type == GRPC_QUEUE_TIMEOUT); return next_call.event; }