1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import com.google.protobuf.Service;
22 import com.google.protobuf.ServiceException;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.classification.InterfaceAudience;
26 import org.apache.hadoop.classification.InterfaceStability;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.HRegionInfo;
32 import org.apache.hadoop.hbase.HRegionLocation;
33 import org.apache.hadoop.hbase.HTableDescriptor;
34 import org.apache.hadoop.hbase.KeyValue;
35 import org.apache.hadoop.hbase.KeyValueUtil;
36 import org.apache.hadoop.hbase.ServerName;
37 import org.apache.hadoop.hbase.client.coprocessor.Batch;
38 import org.apache.hadoop.hbase.filter.BinaryComparator;
39 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
40 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
41 import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
42 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
43 import org.apache.hadoop.hbase.protobuf.RequestConverter;
44 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
45 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
46 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
47 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse;
48 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
49 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
50 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
51 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.hbase.util.Pair;
54 import org.apache.hadoop.hbase.util.Threads;
55
56 import java.io.Closeable;
57 import java.io.IOException;
58 import java.io.InterruptedIOException;
59 import java.util.ArrayList;
60 import java.util.Collections;
61 import java.util.HashMap;
62 import java.util.List;
63 import java.util.Map;
64 import java.util.NavigableMap;
65 import java.util.TreeMap;
66 import java.util.concurrent.Callable;
67 import java.util.concurrent.ExecutionException;
68 import java.util.concurrent.ExecutorService;
69 import java.util.concurrent.Future;
70 import java.util.concurrent.SynchronousQueue;
71 import java.util.concurrent.ThreadPoolExecutor;
72 import java.util.concurrent.TimeUnit;
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 @InterfaceAudience.Public
116 @InterfaceStability.Stable
117 public class HTable implements HTableInterface {
118 private static final Log LOG = LogFactory.getLog(HTable.class);
119 private HConnection connection;
120 private final byte [] tableName;
121 private volatile Configuration configuration;
122 private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
123 private long writeBufferSize;
124 private boolean clearBufferOnFail;
125 private boolean autoFlush;
126 private long currentWriteBufferSize;
127 protected int scannerCaching;
128 private int maxKeyValueSize;
129 private ExecutorService pool;
130 private boolean closed;
131 private int operationTimeout;
132 private final boolean cleanupPoolOnClose;
133 private final boolean cleanupConnectionOnClose;
134
135
136
137
138
139
140
141
142
143
144
145 public HTable(Configuration conf, final String tableName)
146 throws IOException {
147 this(conf, Bytes.toBytes(tableName));
148 }
149
150
151
152
153
154
155
156
157
158
159
160
161 public HTable(Configuration conf, final byte [] tableName)
162 throws IOException {
163 this.tableName = tableName;
164 this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
165 if (conf == null) {
166 this.connection = null;
167 return;
168 }
169 this.connection = HConnectionManager.getConnection(conf);
170 this.configuration = conf;
171
172 int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
173 if (maxThreads == 0) {
174 maxThreads = 1;
175 }
176 long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
177
178
179
180
181
182 this.pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
183 new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("hbase-table"));
184 ((ThreadPoolExecutor) this.pool).allowCoreThreadTimeOut(true);
185
186 this.finishSetup();
187 }
188
189
190
191
192
193
194
195
196
197
198
199
200
201 public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
202 throws IOException {
203 this.connection = HConnectionManager.getConnection(conf);
204 this.configuration = conf;
205 this.pool = pool;
206 this.tableName = tableName;
207 this.cleanupPoolOnClose = false;
208 this.cleanupConnectionOnClose = true;
209
210 this.finishSetup();
211 }
212
213
214
215
216
217
218
219
220
221
222
223
224 public HTable(final byte[] tableName, final HConnection connection,
225 final ExecutorService pool) throws IOException {
226 if (pool == null || pool.isShutdown()) {
227 throw new IllegalArgumentException("Pool is null or shut down.");
228 }
229 if (connection == null || connection.isClosed()) {
230 throw new IllegalArgumentException("Connection is null or closed.");
231 }
232 this.tableName = tableName;
233 this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false;
234 this.connection = connection;
235 this.configuration = connection.getConfiguration();
236 this.pool = pool;
237
238 this.finishSetup();
239 }
240
241
242
243
244 private void finishSetup() throws IOException {
245 this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
246 this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
247 : this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
248 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
249 this.writeBufferSize = this.configuration.getLong(
250 "hbase.client.write.buffer", 2097152);
251 this.clearBufferOnFail = true;
252 this.autoFlush = true;
253 this.currentWriteBufferSize = 0;
254 this.scannerCaching = this.configuration.getInt(
255 HConstants.HBASE_CLIENT_SCANNER_CACHING,
256 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
257
258 this.maxKeyValueSize = this.configuration.getInt(
259 "hbase.client.keyvalue.maxsize", -1);
260 this.closed = false;
261 }
262
263
264
265
266 @Override
267 public Configuration getConfiguration() {
268 return configuration;
269 }
270
271
272
273
274
275
276
277
278
279
280 @Deprecated
281 public static boolean isTableEnabled(String tableName) throws IOException {
282 return isTableEnabled(Bytes.toBytes(tableName));
283 }
284
285
286
287
288
289
290
291
292
293
294 @Deprecated
295 public static boolean isTableEnabled(byte[] tableName) throws IOException {
296 return isTableEnabled(HBaseConfiguration.create(), tableName);
297 }
298
299
300
301
302
303
304
305
306
307 @Deprecated
308 public static boolean isTableEnabled(Configuration conf, String tableName)
309 throws IOException {
310 return isTableEnabled(conf, Bytes.toBytes(tableName));
311 }
312
313
314
315
316
317
318
319
320
321 @Deprecated
322 public static boolean isTableEnabled(Configuration conf,
323 final byte[] tableName) throws IOException {
324 return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
325 @Override
326 public Boolean connect(HConnection connection) throws IOException {
327 return connection.isTableEnabled(tableName);
328 }
329 });
330 }
331
332
333
334
335
336
337
338 public HRegionLocation getRegionLocation(final String row)
339 throws IOException {
340 return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
341 }
342
343
344
345
346
347
348
349 public HRegionLocation getRegionLocation(final byte [] row)
350 throws IOException {
351 return connection.getRegionLocation(tableName, row, false);
352 }
353
354
355
356
357
358
359
360
361 public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
362 throws IOException {
363 return connection.getRegionLocation(tableName, row, reload);
364 }
365
366
367
368
369 @Override
370 public byte [] getTableName() {
371 return this.tableName;
372 }
373
374
375
376
377
378
379
380
381 @Deprecated
382 public HConnection getConnection() {
383 return this.connection;
384 }
385
386
387
388
389
390
391
392 @Deprecated
393 public int getScannerCaching() {
394 return scannerCaching;
395 }
396
397
398
399
400
401
402
403
404
405
406
407
408 @Deprecated
409 public void setScannerCaching(int scannerCaching) {
410 this.scannerCaching = scannerCaching;
411 }
412
413
414
415
416 @Override
417 public HTableDescriptor getTableDescriptor() throws IOException {
418 return new UnmodifyableHTableDescriptor(
419 this.connection.getHTableDescriptor(this.tableName));
420 }
421
422
423
424
425
426
427
428
429 public byte [][] getStartKeys() throws IOException {
430 return getStartEndKeys().getFirst();
431 }
432
433
434
435
436
437
438
439
440 public byte[][] getEndKeys() throws IOException {
441 return getStartEndKeys().getSecond();
442 }
443
444
445
446
447
448
449
450
451
452 public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
453 NavigableMap<HRegionInfo, ServerName> regions = getRegionLocations();
454 final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
455 final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
456
457 for (HRegionInfo region : regions.keySet()) {
458 startKeyList.add(region.getStartKey());
459 endKeyList.add(region.getEndKey());
460 }
461
462 return new Pair<byte [][], byte [][]>(
463 startKeyList.toArray(new byte[startKeyList.size()][]),
464 endKeyList.toArray(new byte[endKeyList.size()][]));
465 }
466
467
468
469
470
471
472
473
474 public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
475
476 return MetaScanner.allTableRegions(getConfiguration(), getTableName(), false);
477 }
478
479
480
481
482
483
484
485
486
487
488 public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
489 final byte [] endKey) throws IOException {
490 return getKeysAndRegionsInRange(startKey, endKey, false).getSecond();
491 }
492
493
494
495
496
497
498
499
500
501
502
503
504 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
505 final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
506 throws IOException {
507 final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
508 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
509 throw new IllegalArgumentException(
510 "Invalid range: " + Bytes.toStringBinary(startKey) +
511 " > " + Bytes.toStringBinary(endKey));
512 }
513 List<byte[]> keysInRange = new ArrayList<byte[]>();
514 List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
515 byte[] currentKey = startKey;
516 do {
517 HRegionLocation regionLocation = getRegionLocation(currentKey, false);
518 keysInRange.add(currentKey);
519 regionsInRange.add(regionLocation);
520 currentKey = regionLocation.getRegionInfo().getEndKey();
521 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
522 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
523 || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
524 return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
525 regionsInRange);
526 }
527
528
529
530
531 @Override
532 public Result getRowOrBefore(final byte[] row, final byte[] family)
533 throws IOException {
534 return new ServerCallable<Result>(connection, tableName, row, operationTimeout) {
535 public Result call() throws IOException {
536 return ProtobufUtil.getRowOrBefore(stub,
537 location.getRegionInfo().getRegionName(), row, family);
538 }
539 }.withRetries();
540 }
541
542
543
544
545 @Override
546 public ResultScanner getScanner(final Scan scan) throws IOException {
547 if (scan.getCaching() <= 0) {
548 scan.setCaching(getScannerCaching());
549 }
550 return new ClientScanner(getConfiguration(), scan, getTableName(),
551 this.connection);
552 }
553
554
555
556
557 @Override
558 public ResultScanner getScanner(byte [] family) throws IOException {
559 Scan scan = new Scan();
560 scan.addFamily(family);
561 return getScanner(scan);
562 }
563
564
565
566
567 @Override
568 public ResultScanner getScanner(byte [] family, byte [] qualifier)
569 throws IOException {
570 Scan scan = new Scan();
571 scan.addColumn(family, qualifier);
572 return getScanner(scan);
573 }
574
575
576
577
578 @Override
579 public Result get(final Get get) throws IOException {
580 return new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
581 public Result call() throws IOException {
582 return ProtobufUtil.get(stub,
583 location.getRegionInfo().getRegionName(), get);
584 }
585 }.withRetries();
586 }
587
588
589
590
591 @Override
592 public Result[] get(List<Get> gets) throws IOException {
593 try {
594 Object [] r1 = batch((List)gets);
595
596
597 Result [] results = new Result[r1.length];
598 int i=0;
599 for (Object o : r1) {
600
601 results[i++] = (Result) o;
602 }
603
604 return results;
605 } catch (InterruptedException e) {
606 throw new IOException(e);
607 }
608 }
609
610 @Override
611 public void batch(final List<?extends Row> actions, final Object[] results)
612 throws InterruptedException, IOException {
613 connection.processBatchCallback(actions, tableName, pool, results, null);
614 }
615
616 @Override
617 public Object[] batch(final List<? extends Row> actions)
618 throws InterruptedException, IOException {
619 Object[] results = new Object[actions.size()];
620 connection.processBatchCallback(actions, tableName, pool, results, null);
621 return results;
622 }
623
624 @Override
625 public <R> void batchCallback(
626 final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
627 throws IOException, InterruptedException {
628 connection.processBatchCallback(actions, tableName, pool, results, callback);
629 }
630
631 @Override
632 public <R> Object[] batchCallback(
633 final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
634 InterruptedException {
635 Object[] results = new Object[actions.size()];
636 connection.processBatchCallback(actions, tableName, pool, results, callback);
637 return results;
638 }
639
640
641
642
643 @Override
644 public void delete(final Delete delete)
645 throws IOException {
646 new ServerCallable<Boolean>(connection, tableName, delete.getRow(), operationTimeout) {
647 public Boolean call() throws IOException {
648 try {
649 MutateRequest request = RequestConverter.buildMutateRequest(
650 location.getRegionInfo().getRegionName(), delete);
651 MutateResponse response = stub.mutate(null, request);
652 return Boolean.valueOf(response.getProcessed());
653 } catch (ServiceException se) {
654 throw ProtobufUtil.getRemoteException(se);
655 }
656 }
657 }.withRetries();
658 }
659
660
661
662
663 @Override
664 public void delete(final List<Delete> deletes)
665 throws IOException {
666 Object[] results = new Object[deletes.size()];
667 try {
668 connection.processBatch((List) deletes, tableName, pool, results);
669 } catch (InterruptedException e) {
670 throw new IOException(e);
671 } finally {
672
673
674
675 for (int i = results.length - 1; i>=0; i--) {
676
677 if (results[i] instanceof Result) {
678 deletes.remove(i);
679 }
680 }
681 }
682 }
683
684
685
686
687 @Override
688 public void put(final Put put) throws IOException {
689 doPut(put);
690 if (autoFlush) {
691 flushCommits();
692 }
693 }
694
695
696
697
698 @Override
699 public void put(final List<Put> puts) throws IOException {
700 for (Put put : puts) {
701 doPut(put);
702 }
703 if (autoFlush) {
704 flushCommits();
705 }
706 }
707
708 private void doPut(Put put) throws IOException{
709 validatePut(put);
710 writeBuffer.add(put);
711 currentWriteBufferSize += put.heapSize();
712 if (currentWriteBufferSize > writeBufferSize) {
713 flushCommits();
714 }
715 }
716
717
718
719
720 @Override
721 public void mutateRow(final RowMutations rm) throws IOException {
722 new ServerCallable<Void>(connection, tableName, rm.getRow(),
723 operationTimeout) {
724 public Void call() throws IOException {
725 try {
726 MultiRequest request = RequestConverter.buildMultiRequest(
727 location.getRegionInfo().getRegionName(), rm);
728 stub.multi(null, request);
729 } catch (ServiceException se) {
730 throw ProtobufUtil.getRemoteException(se);
731 }
732 return null;
733 }
734 }.withRetries();
735 }
736
737
738
739
740 @Override
741 public Result append(final Append append) throws IOException {
742 if (append.numFamilies() == 0) {
743 throw new IOException(
744 "Invalid arguments to append, no columns specified");
745 }
746 return new ServerCallable<Result>(connection, tableName, append.getRow(), operationTimeout) {
747 public Result call() throws IOException {
748 try {
749 MutateRequest request = RequestConverter.buildMutateRequest(
750 location.getRegionInfo().getRegionName(), append);
751 PayloadCarryingRpcController rpcController =
752 new PayloadCarryingRpcController();
753 MutateResponse response = stub.mutate(rpcController, request);
754 if (!response.hasResult()) return null;
755 return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
756 } catch (ServiceException se) {
757 throw ProtobufUtil.getRemoteException(se);
758 }
759 }
760 }.withRetries();
761 }
762
763
764
765
766 @Override
767 public Result increment(final Increment increment) throws IOException {
768 if (!increment.hasFamilies()) {
769 throw new IOException(
770 "Invalid arguments to increment, no columns specified");
771 }
772 return new ServerCallable<Result>(connection, tableName, increment.getRow(), operationTimeout) {
773 public Result call() throws IOException {
774 try {
775 MutateRequest request = RequestConverter.buildMutateRequest(
776 location.getRegionInfo().getRegionName(), increment);
777 PayloadCarryingRpcController rpcContoller = new PayloadCarryingRpcController();
778 MutateResponse response = stub.mutate(rpcContoller, request);
779 return ProtobufUtil.toResult(response.getResult(), rpcContoller.cellScanner());
780 } catch (ServiceException se) {
781 throw ProtobufUtil.getRemoteException(se);
782 }
783 }
784 }.withRetries();
785 }
786
787
788
789
790 @Override
791 public long incrementColumnValue(final byte [] row, final byte [] family,
792 final byte [] qualifier, final long amount)
793 throws IOException {
794 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
795 }
796
797
798
799
800 @Override
801 public long incrementColumnValue(final byte [] row, final byte [] family,
802 final byte [] qualifier, final long amount, final Durability durability)
803 throws IOException {
804 NullPointerException npe = null;
805 if (row == null) {
806 npe = new NullPointerException("row is null");
807 } else if (family == null) {
808 npe = new NullPointerException("family is null");
809 } else if (qualifier == null) {
810 npe = new NullPointerException("qualifier is null");
811 }
812 if (npe != null) {
813 throw new IOException(
814 "Invalid arguments to incrementColumnValue", npe);
815 }
816 return new ServerCallable<Long>(connection, tableName, row, operationTimeout) {
817 public Long call() throws IOException {
818 try {
819 MutateRequest request = RequestConverter.buildMutateRequest(
820 location.getRegionInfo().getRegionName(), row, family,
821 qualifier, amount, durability);
822 PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
823 MutateResponse response = stub.mutate(rpcController, request);
824 Result result =
825 ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
826 return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
827 } catch (ServiceException se) {
828 throw ProtobufUtil.getRemoteException(se);
829 }
830 }
831 }.withRetries();
832 }
833
834
835
836
837 @Override
838 public boolean checkAndPut(final byte [] row,
839 final byte [] family, final byte [] qualifier, final byte [] value,
840 final Put put)
841 throws IOException {
842 return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
843 public Boolean call() throws IOException {
844 try {
845 MutateRequest request = RequestConverter.buildMutateRequest(
846 location.getRegionInfo().getRegionName(), row, family, qualifier,
847 new BinaryComparator(value), CompareType.EQUAL, put);
848 MutateResponse response = stub.mutate(null, request);
849 return Boolean.valueOf(response.getProcessed());
850 } catch (ServiceException se) {
851 throw ProtobufUtil.getRemoteException(se);
852 }
853 }
854 }.withRetries();
855 }
856
857
858
859
860
861 @Override
862 public boolean checkAndDelete(final byte [] row,
863 final byte [] family, final byte [] qualifier, final byte [] value,
864 final Delete delete)
865 throws IOException {
866 return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
867 public Boolean call() throws IOException {
868 try {
869 MutateRequest request = RequestConverter.buildMutateRequest(
870 location.getRegionInfo().getRegionName(), row, family, qualifier,
871 new BinaryComparator(value), CompareType.EQUAL, delete);
872 MutateResponse response = stub.mutate(null, request);
873 return Boolean.valueOf(response.getProcessed());
874 } catch (ServiceException se) {
875 throw ProtobufUtil.getRemoteException(se);
876 }
877 }
878 }.withRetries();
879 }
880
881
882
883
884 @Override
885 public boolean exists(final Get get) throws IOException {
886 return new ServerCallable<Boolean>(connection, tableName, get.getRow(), operationTimeout) {
887 public Boolean call() throws IOException {
888 try {
889 GetRequest request = RequestConverter.buildGetRequest(
890 location.getRegionInfo().getRegionName(), get, true);
891 GetResponse response = stub.get(null, request);
892 return response.getExists();
893 } catch (ServiceException se) {
894 throw ProtobufUtil.getRemoteException(se);
895 }
896 }
897 }.withRetries();
898 }
899
900
901
902
903
904
905 private static class SortedGet implements Comparable<SortedGet> {
906 protected int initialIndex = -1;
907 protected Get get;
908
909 public SortedGet (Get get, int initialIndex) {
910 this.get = get;
911 this.initialIndex = initialIndex;
912 }
913
914 public int getInitialIndex() {
915 return initialIndex;
916 }
917
918 @Override
919 public int compareTo(SortedGet o) {
920 return get.compareTo(o.get);
921 }
922
923 public Get getGet() {
924 return get;
925 }
926
927 @Override
928 public int hashCode() {
929 return get.hashCode();
930 }
931
932 @Override
933 public boolean equals(Object obj) {
934 if (obj instanceof SortedGet)
935 return get.equals(((SortedGet)obj).get);
936 else
937 return false;
938 }
939 }
940
941
942
943
944 @Override
945 public Boolean[] exists(final List<Get> gets) throws IOException {
946
947
948
949
950 ArrayList<SortedGet> sortedGetsList = new ArrayList<HTable.SortedGet>();
951 for (int indexGet = 0; indexGet < gets.size(); indexGet++) {
952 sortedGetsList.add(new SortedGet (gets.get(indexGet), indexGet));
953 }
954
955
956 Collections.sort(sortedGetsList);
957
958
959
960
961 Map<Integer, List<Get>> getsByRegion = new HashMap<Integer, List<Get>>();
962
963
964 Map<Get, Integer> getToRegionIndexMap = new HashMap<Get, Integer>();
965 Pair<byte[][], byte[][]> startEndKeys = getStartEndKeys();
966
967 int regionIndex = 0;
968 for (final SortedGet get : sortedGetsList) {
969
970 while ((regionIndex < startEndKeys.getSecond().length) && ((Bytes.compareTo(startEndKeys.getSecond()[regionIndex], get.getGet().getRow()) <= 0))) {
971 regionIndex++;
972 }
973 List<Get> regionGets = getsByRegion.get(regionIndex);
974 if (regionGets == null) {
975 regionGets = new ArrayList<Get>();
976 getsByRegion.put(regionIndex, regionGets);
977 }
978 regionGets.add(get.getGet());
979 getToRegionIndexMap.put(get.getGet(), regionIndex);
980 }
981
982
983 Map<Integer, Future<List<Boolean>>> futures =
984 new HashMap<Integer, Future<List<Boolean>>>(sortedGetsList.size());
985 for (final Map.Entry<Integer, List<Get>> getsByRegionEntry : getsByRegion.entrySet()) {
986 Callable<List<Boolean>> callable = new Callable<List<Boolean>>() {
987 public List<Boolean> call() throws Exception {
988 return new ServerCallable<List<Boolean>>(connection, tableName, getsByRegionEntry.getValue()
989 .get(0).getRow(), operationTimeout) {
990 public List<Boolean> call() throws IOException {
991 try {
992 MultiGetRequest requests = RequestConverter.buildMultiGetRequest(location
993 .getRegionInfo().getRegionName(), getsByRegionEntry.getValue(), true, false);
994 MultiGetResponse responses = stub.multiGet(null, requests);
995 return responses.getExistsList();
996 } catch (ServiceException se) {
997 throw ProtobufUtil.getRemoteException(se);
998 }
999 }
1000 }.withRetries();
1001 }
1002 };
1003 futures.put(getsByRegionEntry.getKey(), pool.submit(callable));
1004 }
1005
1006
1007 Map<Integer, List<Boolean>> responses = new HashMap<Integer, List<Boolean>>();
1008 for (final Map.Entry<Integer, List<Get>> sortedGetEntry : getsByRegion.entrySet()) {
1009 try {
1010 Future<List<Boolean>> future = futures.get(sortedGetEntry.getKey());
1011 List<Boolean> resp = future.get();
1012
1013 if (resp == null) {
1014 LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
1015 }
1016 responses.put(sortedGetEntry.getKey(), resp);
1017 } catch (ExecutionException e) {
1018 LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
1019 } catch (InterruptedException e) {
1020 LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
1021 Thread.currentThread().interrupt();
1022 }
1023 }
1024 Boolean[] results = new Boolean[sortedGetsList.size()];
1025
1026
1027 Map<Integer, Integer> indexes = new HashMap<Integer, Integer>();
1028 for (int i = 0; i < sortedGetsList.size(); i++) {
1029 Integer regionInfoIndex = getToRegionIndexMap.get(sortedGetsList.get(i).getGet());
1030 Integer index = indexes.get(regionInfoIndex);
1031 if (index == null) {
1032 index = 0;
1033 }
1034 results[sortedGetsList.get(i).getInitialIndex()] = responses.get(regionInfoIndex).get(index);
1035 indexes.put(regionInfoIndex, index + 1);
1036 }
1037
1038 return results;
1039 }
1040
1041
1042
1043
1044 @Override
1045 public void flushCommits() throws IOException {
1046 if (writeBuffer.isEmpty()){
1047
1048 return;
1049 }
1050
1051 Object[] results = new Object[writeBuffer.size()];
1052 boolean success = false;
1053 try {
1054 this.connection.processBatch(writeBuffer, tableName, pool, results);
1055 success = true;
1056 } catch (InterruptedException e) {
1057 throw new InterruptedIOException(e.getMessage());
1058 } finally {
1059
1060
1061
1062
1063 currentWriteBufferSize = 0;
1064 if (success || clearBufferOnFail) {
1065 writeBuffer.clear();
1066 } else {
1067 for (int i = results.length - 1; i >= 0; i--) {
1068 if (results[i] instanceof Result) {
1069 writeBuffer.remove(i);
1070 } else {
1071 currentWriteBufferSize += writeBuffer.get(i).heapSize();
1072 }
1073 }
1074 }
1075 }
1076 }
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089 public <R> void processBatchCallback(
1090 final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1091 throws IOException, InterruptedException {
1092 connection.processBatchCallback(list, tableName, pool, results, callback);
1093 }
1094
1095
1096
1097
1098
1099
1100 public void processBatch(final List<? extends Row> list, final Object[] results)
1101 throws IOException, InterruptedException {
1102
1103 this.processBatchCallback(list, results, null);
1104 }
1105
1106
1107 @Override
1108 public void close() throws IOException {
1109 if (this.closed) {
1110 return;
1111 }
1112 flushCommits();
1113 if (cleanupPoolOnClose) {
1114 this.pool.shutdown();
1115 }
1116 if (cleanupConnectionOnClose) {
1117 if (this.connection != null) {
1118 this.connection.close();
1119 }
1120 }
1121 this.closed = true;
1122 }
1123
1124
1125 public void validatePut(final Put put) throws IllegalArgumentException{
1126 if (put.isEmpty()) {
1127 throw new IllegalArgumentException("No columns to insert");
1128 }
1129 if (maxKeyValueSize > 0) {
1130 for (List<? extends Cell> list : put.getFamilyMap().values()) {
1131 for (Cell cell : list) {
1132
1133 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
1134 if (kv.getLength() > maxKeyValueSize) {
1135 throw new IllegalArgumentException("KeyValue size too large");
1136 }
1137 }
1138 }
1139 }
1140 }
1141
1142
1143
1144
1145 @Override
1146 public boolean isAutoFlush() {
1147 return autoFlush;
1148 }
1149
1150
1151
1152
1153
1154
1155
1156 public void setAutoFlush(boolean autoFlush) {
1157 setAutoFlush(autoFlush, autoFlush);
1158 }
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187 public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1188 this.autoFlush = autoFlush;
1189 this.clearBufferOnFail = autoFlush || clearBufferOnFail;
1190 }
1191
1192
1193
1194
1195
1196
1197
1198
1199 public long getWriteBufferSize() {
1200 return writeBufferSize;
1201 }
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211 public void setWriteBufferSize(long writeBufferSize) throws IOException {
1212 this.writeBufferSize = writeBufferSize;
1213 if(currentWriteBufferSize > writeBufferSize) {
1214 flushCommits();
1215 }
1216 }
1217
1218
1219
1220
1221
1222 public ArrayList<Put> getWriteBuffer() {
1223 return writeBuffer;
1224 }
1225
1226
1227
1228
1229
1230 ExecutorService getPool() {
1231 return this.pool;
1232 }
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243 public static void setRegionCachePrefetch(final byte[] tableName,
1244 final boolean enable) throws IOException {
1245 HConnectionManager.execute(new HConnectable<Void>(HBaseConfiguration
1246 .create()) {
1247 @Override
1248 public Void connect(HConnection connection) throws IOException {
1249 connection.setRegionCachePrefetch(tableName, enable);
1250 return null;
1251 }
1252 });
1253 }
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265 public static void setRegionCachePrefetch(final Configuration conf,
1266 final byte[] tableName, final boolean enable) throws IOException {
1267 HConnectionManager.execute(new HConnectable<Void>(conf) {
1268 @Override
1269 public Void connect(HConnection connection) throws IOException {
1270 connection.setRegionCachePrefetch(tableName, enable);
1271 return null;
1272 }
1273 });
1274 }
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284 public static boolean getRegionCachePrefetch(final Configuration conf,
1285 final byte[] tableName) throws IOException {
1286 return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
1287 @Override
1288 public Boolean connect(HConnection connection) throws IOException {
1289 return connection.getRegionCachePrefetch(tableName);
1290 }
1291 });
1292 }
1293
1294
1295
1296
1297
1298
1299
1300
1301 public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1302 return HConnectionManager.execute(new HConnectable<Boolean>(
1303 HBaseConfiguration.create()) {
1304 @Override
1305 public Boolean connect(HConnection connection) throws IOException {
1306 return connection.getRegionCachePrefetch(tableName);
1307 }
1308 });
1309 }
1310
1311
1312
1313
1314
1315 public void clearRegionCache() {
1316 this.connection.clearRegionCache();
1317 }
1318
1319
1320
1321
1322 public CoprocessorRpcChannel coprocessorService(byte[] row) {
1323 return new RegionCoprocessorRpcChannel(connection, tableName, row);
1324 }
1325
1326
1327
1328
1329 @Override
1330 public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1331 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1332 throws ServiceException, Throwable {
1333 final Map<byte[],R> results = Collections.synchronizedMap(
1334 new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1335 coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1336 public void update(byte[] region, byte[] row, R value) {
1337 results.put(region, value);
1338 }
1339 });
1340 return results;
1341 }
1342
1343
1344
1345
1346 @Override
1347 public <T extends Service, R> void coprocessorService(final Class<T> service,
1348 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1349 final Batch.Callback<R> callback) throws ServiceException, Throwable {
1350
1351
1352 List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1353
1354 Map<byte[],Future<R>> futures =
1355 new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1356 for (final byte[] r : keys) {
1357 final RegionCoprocessorRpcChannel channel =
1358 new RegionCoprocessorRpcChannel(connection, tableName, r);
1359 Future<R> future = pool.submit(
1360 new Callable<R>() {
1361 public R call() throws Exception {
1362 T instance = ProtobufUtil.newServiceStub(service, channel);
1363 R result = callable.call(instance);
1364 byte[] region = channel.getLastRegion();
1365 if (callback != null) {
1366 callback.update(region, r, result);
1367 }
1368 return result;
1369 }
1370 });
1371 futures.put(r, future);
1372 }
1373 for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1374 try {
1375 e.getValue().get();
1376 } catch (ExecutionException ee) {
1377 LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1378 + Bytes.toStringBinary(e.getKey()), ee);
1379 throw ee.getCause();
1380 } catch (InterruptedException ie) {
1381 Thread.currentThread().interrupt();
1382 throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1383 + " for row " + Bytes.toStringBinary(e.getKey()))
1384 .initCause(ie);
1385 }
1386 }
1387 }
1388
1389 private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1390 throws IOException {
1391 if (start == null) {
1392 start = HConstants.EMPTY_START_ROW;
1393 }
1394 if (end == null) {
1395 end = HConstants.EMPTY_END_ROW;
1396 }
1397 return getKeysAndRegionsInRange(start, end, true).getFirst();
1398 }
1399
1400 public void setOperationTimeout(int operationTimeout) {
1401 this.operationTimeout = operationTimeout;
1402 }
1403
1404 public int getOperationTimeout() {
1405 return operationTimeout;
1406 }
1407
1408 }