1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.ExecutorService;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.HTableDescriptor;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.client.coprocessor.Batch;
31 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
32 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost.Environment;
33 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
34 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
35 import org.apache.hadoop.io.MultipleIOException;
36
37 import com.google.protobuf.Descriptors.MethodDescriptor;
38 import com.google.protobuf.Message;
39 import com.google.protobuf.Service;
40 import com.google.protobuf.ServiceException;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 public class HTableWrapper implements HTableInterface {
57
58 private final HTableInterface table;
59 private ClusterConnection connection;
60 private final List<HTableInterface> openTables;
61
62
63
64
65
66 public static HTableInterface createWrapper(List<HTableInterface> openTables,
67 TableName tableName, Environment env, ExecutorService pool) throws IOException {
68 return new HTableWrapper(openTables, tableName,
69 CoprocessorHConnection.getConnectionForEnvironment(env), pool);
70 }
71
72 private HTableWrapper(List<HTableInterface> openTables, TableName tableName,
73 ClusterConnection connection, ExecutorService pool)
74 throws IOException {
75 this.table = connection.getTable(tableName, pool);
76 this.connection = connection;
77 this.openTables = openTables;
78 this.openTables.add(this);
79 }
80
81 public void internalClose() throws IOException {
82 List<IOException> exceptions = new ArrayList<IOException>(2);
83 try {
84 table.close();
85 } catch (IOException e) {
86 exceptions.add(e);
87 }
88 try {
89
90 if (this.connection != null) {
91 this.connection.close();
92 }
93 } catch (IOException e) {
94 exceptions.add(e);
95 }
96 if (!exceptions.isEmpty()) {
97 throw MultipleIOException.createIOException(exceptions);
98 }
99 }
100
101 public Configuration getConfiguration() {
102 return table.getConfiguration();
103 }
104
105 public void close() throws IOException {
106 try {
107 internalClose();
108 } finally {
109 openTables.remove(this);
110 }
111 }
112
113 @Deprecated
114 public Result getRowOrBefore(byte[] row, byte[] family)
115 throws IOException {
116 Scan scan = Scan.createGetClosestRowOrBeforeReverseScan(row);
117 Result startRowResult = null;
118 try (ResultScanner resultScanner = this.table.getScanner(scan)) {
119 startRowResult = resultScanner.next();
120 }
121 return startRowResult;
122 }
123
124 public Result get(Get get) throws IOException {
125 return table.get(get);
126 }
127
128 public boolean exists(Get get) throws IOException {
129 return table.exists(get);
130 }
131
132 public boolean[] existsAll(List<Get> gets) throws IOException{
133 return table.existsAll(gets);
134 }
135
136 @Deprecated
137 public Boolean[] exists(List<Get> gets) throws IOException {
138
139 boolean [] exists = table.existsAll(gets);
140 if (exists == null) return null;
141 Boolean [] results = new Boolean [exists.length];
142 for (int i = 0; i < exists.length; i++) {
143 results[i] = exists[i]? Boolean.TRUE: Boolean.FALSE;
144 }
145 return results;
146 }
147
148 public void put(Put put) throws IOException {
149 table.put(put);
150 }
151
152 public void put(List<Put> puts) throws IOException {
153 table.put(puts);
154 }
155
156 public void delete(Delete delete) throws IOException {
157 table.delete(delete);
158 }
159
160 public void delete(List<Delete> deletes) throws IOException {
161 table.delete(deletes);
162 }
163
164 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
165 byte[] value, Put put) throws IOException {
166 return table.checkAndPut(row, family, qualifier, value, put);
167 }
168
169 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
170 CompareOp compareOp, byte[] value, Put put) throws IOException {
171 return table.checkAndPut(row, family, qualifier, compareOp, value, put);
172 }
173
174 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
175 byte[] value, Delete delete) throws IOException {
176 return table.checkAndDelete(row, family, qualifier, value, delete);
177 }
178
179 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
180 CompareOp compareOp, byte[] value, Delete delete) throws IOException {
181 return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
182 }
183
184 public long incrementColumnValue(byte[] row, byte[] family,
185 byte[] qualifier, long amount) throws IOException {
186 return table.incrementColumnValue(row, family, qualifier, amount);
187 }
188
189 public long incrementColumnValue(byte[] row, byte[] family,
190 byte[] qualifier, long amount, Durability durability)
191 throws IOException {
192 return table.incrementColumnValue(row, family, qualifier, amount,
193 durability);
194 }
195
196 @Override
197 public Result append(Append append) throws IOException {
198 return table.append(append);
199 }
200
201 @Override
202 public Result increment(Increment increment) throws IOException {
203 return table.increment(increment);
204 }
205
206 public void flushCommits() throws IOException {
207 table.flushCommits();
208 }
209
210 public boolean isAutoFlush() {
211 return table.isAutoFlush();
212 }
213
214 public ResultScanner getScanner(Scan scan) throws IOException {
215 return table.getScanner(scan);
216 }
217
218 public ResultScanner getScanner(byte[] family) throws IOException {
219 return table.getScanner(family);
220 }
221
222 public ResultScanner getScanner(byte[] family, byte[] qualifier)
223 throws IOException {
224 return table.getScanner(family, qualifier);
225 }
226
227 public HTableDescriptor getTableDescriptor() throws IOException {
228 return table.getTableDescriptor();
229 }
230
231 @Override
232 public byte[] getTableName() {
233 return table.getTableName();
234 }
235
236 @Override
237 public TableName getName() {
238 return table.getName();
239 }
240
241 @Override
242 public void batch(List<? extends Row> actions, Object[] results)
243 throws IOException, InterruptedException {
244 table.batch(actions, results);
245 }
246
247
248
249
250
251
252 @Override
253 public Object[] batch(List<? extends Row> actions)
254 throws IOException, InterruptedException {
255 return table.batch(actions);
256 }
257
258 @Override
259 public <R> void batchCallback(List<? extends Row> actions, Object[] results,
260 Batch.Callback<R> callback) throws IOException, InterruptedException {
261 table.batchCallback(actions, results, callback);
262 }
263
264
265
266
267
268
269
270
271 @Override
272 public <R> Object[] batchCallback(List<? extends Row> actions,
273 Batch.Callback<R> callback) throws IOException, InterruptedException {
274 return table.batchCallback(actions, callback);
275 }
276
277 @Override
278 public Result[] get(List<Get> gets) throws IOException {
279 return table.get(gets);
280 }
281
282 @Override
283 public CoprocessorRpcChannel coprocessorService(byte[] row) {
284 return table.coprocessorService(row);
285 }
286
287 @Override
288 public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
289 byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
290 throws ServiceException, Throwable {
291 return table.coprocessorService(service, startKey, endKey, callable);
292 }
293
294 @Override
295 public <T extends Service, R> void coprocessorService(Class<T> service,
296 byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
297 throws ServiceException, Throwable {
298 table.coprocessorService(service, startKey, endKey, callable, callback);
299 }
300
301 @Override
302 public void mutateRow(RowMutations rm) throws IOException {
303 table.mutateRow(rm);
304 }
305
306 @Override
307 public void setAutoFlush(boolean autoFlush) {
308 table.setAutoFlush(autoFlush);
309 }
310
311 @Override
312 public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
313 setAutoFlush(autoFlush);
314 if (!autoFlush && !clearBufferOnFail) {
315
316
317
318
319
320
321 throw new UnsupportedOperationException("Can't do this via wrapper");
322 }
323 }
324
325 @Override
326 public void setAutoFlushTo(boolean autoFlush) {
327 table.setAutoFlushTo(autoFlush);
328 }
329
330 @Override
331 public long getWriteBufferSize() {
332 return table.getWriteBufferSize();
333 }
334
335 @Override
336 public void setWriteBufferSize(long writeBufferSize) throws IOException {
337 table.setWriteBufferSize(writeBufferSize);
338 }
339
340 @Override
341 public long incrementColumnValue(byte[] row, byte[] family,
342 byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
343 return table.incrementColumnValue(row, family, qualifier, amount,
344 writeToWAL? Durability.USE_DEFAULT: Durability.SKIP_WAL);
345 }
346
347 @Override
348 public <R extends Message> Map<byte[], R> batchCoprocessorService(
349 MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,
350 R responsePrototype) throws ServiceException, Throwable {
351 return table.batchCoprocessorService(methodDescriptor, request, startKey, endKey,
352 responsePrototype);
353 }
354
355 @Override
356 public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
357 Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
358 throws ServiceException, Throwable {
359 table.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
360 callback);
361 }
362
363 @Override
364 public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
365 CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
366 return table.checkAndMutate(row, family, qualifier, compareOp, value, rm);
367 }
368
369 }