001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import com.google.protobuf.Descriptors; 021import com.google.protobuf.Message; 022import com.google.protobuf.RpcController; 023import io.opentelemetry.context.Context; 024import io.opentelemetry.context.Scope; 025import java.io.IOException; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 035 036/** 037 * Provides clients with an RPC connection to call Coprocessor Endpoint 038 * {@link com.google.protobuf.Service}s against a given table region. An instance of this class may 039 * be obtained by calling {@link org.apache.hadoop.hbase.client.Table#coprocessorService(byte[])}, 040 * but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to 041 * call the endpoint methods. 042 * @see org.apache.hadoop.hbase.client.Table#coprocessorService(byte[]) 043 */ 044@InterfaceAudience.Private 045class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel { 046 private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorRpcChannel.class); 047 private final TableName table; 048 private final byte[] row; 049 private final ClusterConnection conn; 050 private byte[] lastRegion; 051 private final int operationTimeout; 052 private final RpcRetryingCallerFactory rpcCallerFactory; 053 054 /** 055 * Constructor 056 * @param conn connection to use 057 * @param table to connect to 058 * @param row to locate region with 059 */ 060 RegionCoprocessorRpcChannel(ClusterConnection conn, TableName table, byte[] row) { 061 this.table = table; 062 this.row = row; 063 this.conn = conn; 064 this.operationTimeout = conn.getConnectionConfiguration().getOperationTimeout(); 065 this.rpcCallerFactory = conn.getRpcRetryingCallerFactory(); 066 } 067 068 @Override 069 protected Message callExecService(final RpcController controller, 070 final Descriptors.MethodDescriptor method, final Message request, 071 final Message responsePrototype) throws IOException { 072 if (LOG.isTraceEnabled()) { 073 LOG.trace("Call: " + method.getName() + ", " + request.toString()); 074 } 075 if (row == null) { 076 throw new NullPointerException("Can't be null!"); 077 } 078 final Context context = Context.current(); 079 ClientServiceCallable<CoprocessorServiceResponse> callable = 080 new ClientServiceCallable<CoprocessorServiceResponse>(this.conn, this.table, this.row, 081 this.conn.getRpcControllerFactory().newController(), HConstants.PRIORITY_UNSET) { 082 @Override 083 protected CoprocessorServiceResponse rpcCall() throws Exception { 084 try (Scope ignored = context.makeCurrent()) { 085 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 086 CoprocessorServiceRequest csr = 087 CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request, row, regionName); 088 return getStub().execService(getRpcController(), csr); 089 } 090 } 091 }; 092 CoprocessorServiceResponse result = this.rpcCallerFactory 093 .<CoprocessorServiceResponse> newCaller().callWithRetries(callable, operationTimeout); 094 this.lastRegion = result.getRegion().getValue().toByteArray(); 095 return CoprocessorRpcUtils.getResponse(result, responsePrototype); 096 } 097 098 /** 099 * Get last region this RpcChannel communicated with 100 * @return region name as byte array 101 */ 102 public byte[] getLastRegion() { 103 return lastRegion; 104 } 105}