1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Map;
24
25 import org.apache.hadoop.hbase.CellScannable;
26 import org.apache.hadoop.hbase.CellUtil;
27 import org.apache.hadoop.hbase.DoNotRetryIOException;
28 import org.apache.hadoop.hbase.HConstants;
29 import org.apache.hadoop.hbase.HRegionInfo;
30 import org.apache.hadoop.hbase.HRegionLocation;
31 import org.apache.hadoop.hbase.ServerName;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
34 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.RequestConverter;
37 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
38 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
39 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
40 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
41 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
42 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
43
44 import com.google.common.annotations.VisibleForTesting;
45 import com.google.protobuf.ServiceException;
46
47
48
49
50
51
52
53 class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> implements Cancellable {
54 private final MultiAction<R> multiAction;
55 private final boolean cellBlock;
56 private final PayloadCarryingRpcController controller;
57
58 MultiServerCallable(final ClusterConnection connection, final TableName tableName,
59 final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi) {
60 super(connection, tableName, null);
61 this.multiAction = multi;
62
63
64
65 this.location = new HRegionLocation(null, location);
66 this.cellBlock = isCellBlock();
67 controller = rpcFactory.newController();
68 }
69
70 @Override
71 protected HRegionLocation getLocation() {
72 throw new RuntimeException("Cannot get region location for multi-region request");
73 }
74
75 @Override
76 public HRegionInfo getHRegionInfo() {
77 throw new RuntimeException("Cannot get region info for multi-region request");
78 }
79
80 MultiAction<R> getMulti() {
81 return this.multiAction;
82 }
83
84 @Override
85 public MultiResponse call(int callTimeout) throws IOException {
86 int countOfActions = this.multiAction.size();
87 if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
88 MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
89 RegionAction.Builder regionActionBuilder = RegionAction.newBuilder();
90 ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
91 MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
92 List<CellScannable> cells = null;
93
94 long nonceGroup = multiAction.getNonceGroup();
95 if (nonceGroup != HConstants.NO_NONCE) {
96 multiRequestBuilder.setNonceGroup(nonceGroup);
97 }
98 for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet()) {
99 final byte [] regionName = e.getKey();
100 final List<Action<R>> actions = e.getValue();
101 regionActionBuilder.clear();
102 regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
103 HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName) );
104
105
106 if (this.cellBlock) {
107
108 if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
109
110
111 regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells,
112 regionActionBuilder, actionBuilder, mutationBuilder);
113 } else {
114 regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions,
115 regionActionBuilder, actionBuilder, mutationBuilder);
116 }
117 multiRequestBuilder.addRegionAction(regionActionBuilder.build());
118 }
119
120
121
122 if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
123 controller.setPriority(getTableName());
124 controller.setCallTimeout(callTimeout);
125 ClientProtos.MultiResponse responseProto;
126 ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
127 try {
128 responseProto = getStub().multi(controller, requestProto);
129 } catch (ServiceException e) {
130 throw ProtobufUtil.getRemoteException(e);
131 }
132 if (responseProto == null) return null;
133 return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
134 }
135
136 @Override
137 public void cancel() {
138 controller.startCancel();
139 }
140
141 @Override
142 public boolean isCancelled() {
143 return controller.isCanceled();
144 }
145
146
147
148
149
150 private boolean isCellBlock() {
151
152
153 HConnection connection = getConnection();
154 if (!(connection instanceof ClusterConnection)) return true;
155 return ((ClusterConnection) connection).hasCellBlockSupport();
156 }
157
158 @Override
159 public void prepare(boolean reload) throws IOException {
160
161 setStub(getConnection().getClient(this.location.getServerName()));
162 }
163
164 @VisibleForTesting
165 ServerName getServerName() {
166 return location.getServerName();
167 }
168 }