1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import java.io.IOException;
22
23 import org.apache.hadoop.hbase.util.ByteStringer;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.TableName;
29 import org.apache.hadoop.hbase.client.HConnection;
30 import org.apache.hadoop.hbase.client.ClusterConnection;
31 import org.apache.hadoop.hbase.client.RegionServerCallable;
32 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
33 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
34 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
35 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
36 import org.apache.hadoop.hbase.util.Bytes;
37
38 import com.google.protobuf.Descriptors;
39 import com.google.protobuf.Message;
40 import com.google.protobuf.RpcController;
41
42
43
44
45
46
47
48
49
50 @InterfaceAudience.Private
51 public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
52 private static final Log LOG = LogFactory.getLog(RegionCoprocessorRpcChannel.class);
53
54 private final ClusterConnection connection;
55 private final TableName table;
56 private final byte[] row;
57 private byte[] lastRegion;
58 private int operationTimeout;
59
60 private RpcRetryingCallerFactory rpcCallerFactory;
61 private RpcControllerFactory rpcControllerFactory;
62
63 public RegionCoprocessorRpcChannel(ClusterConnection conn, TableName table, byte[] row) {
64 this.connection = conn;
65 this.table = table;
66 this.row = row;
67 this.rpcCallerFactory = conn.getRpcRetryingCallerFactory();
68 this.rpcControllerFactory = conn.getRpcControllerFactory();
69 this.operationTimeout = conn.getConnectionConfiguration().getOperationTimeout();
70 }
71
72 @Override
73 protected Message callExecService(RpcController controller,
74 Descriptors.MethodDescriptor method, Message request, Message responsePrototype)
75 throws IOException {
76 if (LOG.isTraceEnabled()) {
77 LOG.trace("Call: "+method.getName()+", "+request.toString());
78 }
79
80 if (row == null) {
81 throw new IllegalArgumentException("Missing row property for remote region location");
82 }
83
84 final RpcController rpcController = controller == null
85 ? rpcControllerFactory.newController() : controller;
86
87 final ClientProtos.CoprocessorServiceCall call =
88 ClientProtos.CoprocessorServiceCall.newBuilder()
89 .setRow(ByteStringer.wrap(row))
90 .setServiceName(method.getService().getFullName())
91 .setMethodName(method.getName())
92 .setRequest(request.toByteString()).build();
93 RegionServerCallable<CoprocessorServiceResponse> callable =
94 new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
95 @Override
96 public CoprocessorServiceResponse call(int callTimeout) throws Exception {
97 if (rpcController instanceof PayloadCarryingRpcController) {
98 ((PayloadCarryingRpcController) rpcController).setPriority(tableName);
99 }
100 if (rpcController instanceof TimeLimitedRpcController) {
101 ((TimeLimitedRpcController) rpcController).setCallTimeout(callTimeout);
102 }
103 byte[] regionName = getLocation().getRegionInfo().getRegionName();
104 return ProtobufUtil.execService(rpcController, getStub(), call, regionName);
105 }
106 };
107 CoprocessorServiceResponse result = rpcCallerFactory.<CoprocessorServiceResponse> newCaller()
108 .callWithRetries(callable, operationTimeout);
109 Message response = null;
110 if (result.getValue().hasValue()) {
111 Message.Builder builder = responsePrototype.newBuilderForType();
112 ProtobufUtil.mergeFrom(builder, result.getValue().getValue());
113 response = builder.build();
114 } else {
115 response = responsePrototype.getDefaultInstanceForType();
116 }
117 lastRegion = result.getRegion().getValue().toByteArray();
118 if (LOG.isTraceEnabled()) {
119 LOG.trace("Result is region=" + Bytes.toStringBinary(lastRegion) + ", value=" + response);
120 }
121 return response;
122 }
123
124 public byte[] getLastRegion() {
125 return lastRegion;
126 }
127 }