/* * * Copyright 2016 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include <memory> #include <mutex> #include <grpc/slice.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpcpp/impl/codegen/method_handler_impl.h> #include "pb_decode.h" #include "pb_encode.h" #include "src/cpp/server/health/default_health_check_service.h" #include "src/cpp/server/health/health.pb.h" namespace grpc { namespace { const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check"; } // namespace DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( DefaultHealthCheckService* service) : service_(service), method_(nullptr) { internal::MethodHandler* handler = new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, ByteBuffer>( std::mem_fn(&HealthCheckServiceImpl::Check), this); method_ = new internal::RpcServiceMethod( kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler); AddMethod(method_); } Status DefaultHealthCheckService::HealthCheckServiceImpl::Check( ServerContext* context, const ByteBuffer* request, ByteBuffer* response) { // Decode request. std::vector<Slice> slices; if (!request->Dump(&slices).ok()) { return Status(StatusCode::INVALID_ARGUMENT, ""); } uint8_t* request_bytes = nullptr; bool request_bytes_owned = false; size_t request_size = 0; grpc_health_v1_HealthCheckRequest request_struct; if (slices.empty()) { request_struct.has_service = false; } else if (slices.size() == 1) { request_bytes = const_cast<uint8_t*>(slices[0].begin()); request_size = slices[0].size(); } else { request_bytes_owned = true; request_bytes = static_cast<uint8_t*>(gpr_malloc(request->Length())); uint8_t* copy_to = request_bytes; for (size_t i = 0; i < slices.size(); i++) { memcpy(copy_to, slices[i].begin(), slices[i].size()); copy_to += slices[i].size(); } } if (request_bytes != nullptr) { pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size); bool decode_status = pb_decode( &istream, grpc_health_v1_HealthCheckRequest_fields, &request_struct); if (request_bytes_owned) { gpr_free(request_bytes); } if (!decode_status) { return Status(StatusCode::INVALID_ARGUMENT, ""); } } // Check status from the associated default health checking service. DefaultHealthCheckService::ServingStatus serving_status = service_->GetServingStatus( request_struct.has_service ? request_struct.service : ""); if (serving_status == DefaultHealthCheckService::NOT_FOUND) { return Status(StatusCode::NOT_FOUND, ""); } // Encode response grpc_health_v1_HealthCheckResponse response_struct; response_struct.has_status = true; response_struct.status = serving_status == DefaultHealthCheckService::SERVING ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING; pb_ostream_t ostream; memset(&ostream, 0, sizeof(ostream)); pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct); grpc_slice response_slice = grpc_slice_malloc(ostream.bytes_written); ostream = pb_ostream_from_buffer(GRPC_SLICE_START_PTR(response_slice), GRPC_SLICE_LENGTH(response_slice)); bool encode_status = pb_encode( &ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct); if (!encode_status) { return Status(StatusCode::INTERNAL, "Failed to encode response."); } Slice encoded_response(response_slice, Slice::STEAL_REF); ByteBuffer response_buffer(&encoded_response, 1); response->Swap(&response_buffer); return Status::OK; } DefaultHealthCheckService::DefaultHealthCheckService() { services_map_.emplace("", true); } void DefaultHealthCheckService::SetServingStatus( const grpc::string& service_name, bool serving) { std::lock_guard<std::mutex> lock(mu_); services_map_[service_name] = serving; } void DefaultHealthCheckService::SetServingStatus(bool serving) { std::lock_guard<std::mutex> lock(mu_); for (auto iter = services_map_.begin(); iter != services_map_.end(); ++iter) { iter->second = serving; } } DefaultHealthCheckService::ServingStatus DefaultHealthCheckService::GetServingStatus( const grpc::string& service_name) const { std::lock_guard<std::mutex> lock(mu_); const auto& iter = services_map_.find(service_name); if (iter == services_map_.end()) { return NOT_FOUND; } return iter->second ? SERVING : NOT_SERVING; } DefaultHealthCheckService::HealthCheckServiceImpl* DefaultHealthCheckService::GetHealthCheckService() { GPR_ASSERT(impl_ == nullptr); impl_.reset(new HealthCheckServiceImpl(this)); return impl_.get(); } } // namespace grpc