001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.setCoprocessorError; 021import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 022 023import io.opentelemetry.context.Context; 024import io.opentelemetry.context.Scope; 025import java.io.IOException; 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.TimeUnit; 028import org.apache.hadoop.hbase.DoNotRetryIOException; 029import org.apache.hadoop.hbase.HRegionLocation; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 032import org.apache.hadoop.hbase.ipc.HBaseRpcController; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.yetus.audience.InterfaceAudience; 035 036import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 037import org.apache.hbase.thirdparty.com.google.protobuf.Message; 038import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 039import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 040import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 041 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 044 045/** 046 * The implementation of a region based coprocessor rpc channel. 047 */ 048@InterfaceAudience.Private 049class RegionCoprocessorRpcChannelImpl implements RpcChannel { 050 051 private final AsyncConnectionImpl conn; 052 053 private final TableName tableName; 054 055 private final RegionInfo region; 056 057 private final byte[] row; 058 059 private final long rpcTimeoutNs; 060 061 private final long operationTimeoutNs; 062 063 private byte[] lastRegion; 064 065 RegionCoprocessorRpcChannelImpl(AsyncConnectionImpl conn, TableName tableName, RegionInfo region, 066 byte[] row, long rpcTimeoutNs, long operationTimeoutNs) { 067 this.conn = conn; 068 this.tableName = tableName; 069 this.region = region; 070 this.row = row; 071 this.rpcTimeoutNs = rpcTimeoutNs; 072 this.operationTimeoutNs = operationTimeoutNs; 073 } 074 075 private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request, 076 Message responsePrototype, HBaseRpcController controller, HRegionLocation loc, 077 ClientService.Interface stub) { 078 final Context context = Context.current(); 079 CompletableFuture<Message> future = new CompletableFuture<>(); 080 if (region != null && !Bytes.equals(loc.getRegion().getRegionName(), region.getRegionName())) { 081 future.completeExceptionally(new DoNotRetryIOException("Region name is changed, expected " 082 + region.getRegionNameAsString() + ", actual " + loc.getRegion().getRegionNameAsString())); 083 return future; 084 } 085 CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method, 086 request, row, loc.getRegion().getRegionName()); 087 stub.execService(controller, csr, resp -> { 088 try (Scope ignored = context.makeCurrent()) { 089 if (controller.failed()) { 090 future.completeExceptionally(controller.getFailed()); 091 } else { 092 lastRegion = resp.getRegion().getValue().toByteArray(); 093 try { 094 future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype)); 095 } catch (IOException e) { 096 future.completeExceptionally(e); 097 } 098 } 099 } 100 }); 101 return future; 102 } 103 104 @Override 105 public void callMethod(MethodDescriptor method, RpcController controller, Message request, 106 Message responsePrototype, RpcCallback<Message> done) { 107 final Context context = Context.current(); 108 addListener(conn.callerFactory.<Message> single().table(tableName).row(row) 109 .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 110 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS).action((c, l, s) -> { 111 try (Scope ignored = context.makeCurrent()) { 112 return rpcCall(method, request, responsePrototype, c, l, s); 113 } 114 }).call(), (r, e) -> { 115 try (Scope ignored = context.makeCurrent()) { 116 if (e != null) { 117 setCoprocessorError(controller, e); 118 } 119 done.run(r); 120 } 121 }); 122 } 123 124 public byte[] getLastRegion() { 125 return lastRegion; 126 } 127}