001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import com.google.protobuf.Descriptors.MethodDescriptor; 023import com.google.protobuf.Message; 024import com.google.protobuf.RpcCallback; 025import com.google.protobuf.RpcChannel; 026import com.google.protobuf.RpcController; 027import java.io.IOException; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.hbase.DoNotRetryIOException; 031import org.apache.hadoop.hbase.HRegionLocation; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 034import org.apache.hadoop.hbase.ipc.HBaseRpcController; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.yetus.audience.InterfaceAudience; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 041 042/** 043 * The implementation of a region based coprocessor rpc channel. 044 */ 045@InterfaceAudience.Private 046class RegionCoprocessorRpcChannelImpl implements RpcChannel { 047 048 private final AsyncConnectionImpl conn; 049 050 private final TableName tableName; 051 052 private final RegionInfo region; 053 054 private final byte[] row; 055 056 private final long rpcTimeoutNs; 057 058 private final long operationTimeoutNs; 059 060 RegionCoprocessorRpcChannelImpl(AsyncConnectionImpl conn, TableName tableName, RegionInfo region, 061 byte[] row, long rpcTimeoutNs, long operationTimeoutNs) { 062 this.conn = conn; 063 this.tableName = tableName; 064 this.region = region; 065 this.row = row; 066 this.rpcTimeoutNs = rpcTimeoutNs; 067 this.operationTimeoutNs = operationTimeoutNs; 068 } 069 070 private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request, 071 Message responsePrototype, HBaseRpcController controller, HRegionLocation loc, 072 ClientService.Interface stub) { 073 CompletableFuture<Message> future = new CompletableFuture<>(); 074 if (region != null 075 && !Bytes.equals(loc.getRegionInfo().getRegionName(), region.getRegionName())) { 076 future.completeExceptionally(new DoNotRetryIOException( 077 "Region name is changed, expected " + region.getRegionNameAsString() + ", actual " 078 + loc.getRegionInfo().getRegionNameAsString())); 079 return future; 080 } 081 CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method, 082 request, row, loc.getRegionInfo().getRegionName()); 083 stub.execService(controller, csr, 084 new org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>() { 085 086 @Override 087 public void run(CoprocessorServiceResponse resp) { 088 if (controller.failed()) { 089 future.completeExceptionally(controller.getFailed()); 090 } else { 091 try { 092 future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype)); 093 } catch (IOException e) { 094 future.completeExceptionally(e); 095 } 096 } 097 } 098 }); 099 return future; 100 } 101 102 @Override 103 public void callMethod(MethodDescriptor method, RpcController controller, Message request, 104 Message responsePrototype, RpcCallback<Message> done) { 105 addListener( 106 conn.callerFactory.<Message> single().table(tableName).row(row) 107 .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 108 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 109 .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call(), 110 (r, e) -> { 111 if (e != null) { 112 ((ClientCoprocessorRpcController) controller).setFailed(e); 113 } 114 done.run(r); 115 }); 116 } 117}