1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.NavigableMap;
28 import java.util.TreeMap;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.SynchronousQueue;
34 import java.util.concurrent.ThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.classification.InterfaceStability;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hbase.Cell;
43 import org.apache.hadoop.hbase.HBaseConfiguration;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.HRegionLocation;
47 import org.apache.hadoop.hbase.HTableDescriptor;
48 import org.apache.hadoop.hbase.KeyValueUtil;
49 import org.apache.hadoop.hbase.ServerName;
50 import org.apache.hadoop.hbase.TableName;
51 import org.apache.hadoop.hbase.TableNotFoundException;
52 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
53 import org.apache.hadoop.hbase.client.coprocessor.Batch;
54 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
55 import org.apache.hadoop.hbase.filter.BinaryComparator;
56 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
57 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
58 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
59 import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
60 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
61 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
62 import org.apache.hadoop.hbase.protobuf.RequestConverter;
63 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
64 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
65 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
66 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
67 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
68 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
69 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
70 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
71 import org.apache.hadoop.hbase.util.Bytes;
72 import org.apache.hadoop.hbase.util.Pair;
73 import org.apache.hadoop.hbase.util.ReflectionUtils;
74 import org.apache.hadoop.hbase.util.Threads;
75
76 import com.google.common.annotations.VisibleForTesting;
77 import com.google.protobuf.Descriptors;
78 import com.google.protobuf.Message;
79 import com.google.protobuf.Service;
80 import com.google.protobuf.ServiceException;
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 @InterfaceAudience.Private
115 @InterfaceStability.Stable
116 public class HTable implements HTableInterface, RegionLocator {
117 private static final Log LOG = LogFactory.getLog(HTable.class);
118 protected ClusterConnection connection;
119 private final TableName tableName;
120 private volatile Configuration configuration;
121 private ConnectionConfiguration connConfiguration;
122 protected BufferedMutatorImpl mutator;
123 private boolean autoFlush = true;
124 private boolean closed = false;
125 protected int scannerCaching;
126 protected long scannerMaxResultSize;
127 private ExecutorService pool;
128 private int operationTimeout;
129 private int rpcTimeout;
130 private final boolean cleanupPoolOnClose;
131 private final boolean cleanupConnectionOnClose;
132 private Consistency defaultConsistency = Consistency.STRONG;
133 private HRegionLocator locator;
134
135
136 protected AsyncProcess multiAp;
137 private RpcRetryingCallerFactory rpcCallerFactory;
138 private RpcControllerFactory rpcControllerFactory;
139
140
141
142
143
144
145
146
147
148 @Deprecated
149 public HTable(Configuration conf, final String tableName)
150 throws IOException {
151 this(conf, TableName.valueOf(tableName));
152 }
153
154
155
156
157
158
159
160
161
162 @Deprecated
163 public HTable(Configuration conf, final byte[] tableName)
164 throws IOException {
165 this(conf, TableName.valueOf(tableName));
166 }
167
168
169
170
171
172
173
174
175
176 @Deprecated
177 public HTable(Configuration conf, final TableName tableName)
178 throws IOException {
179 this.tableName = tableName;
180 this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
181 if (conf == null) {
182 this.connection = null;
183 return;
184 }
185 this.connection = ConnectionManager.getConnectionInternal(conf);
186 this.configuration = conf;
187
188 this.pool = getDefaultExecutor(conf);
189 this.finishSetup();
190 }
191
192
193
194
195
196
197
198
199 @Deprecated
200 public HTable(TableName tableName, Connection connection) throws IOException {
201 this.tableName = tableName;
202 this.cleanupPoolOnClose = true;
203 this.cleanupConnectionOnClose = false;
204 this.connection = (ClusterConnection)connection;
205 this.configuration = connection.getConfiguration();
206
207 this.pool = getDefaultExecutor(this.configuration);
208 this.finishSetup();
209 }
210
211
212 @InterfaceAudience.Private
213 public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
214 int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
215 if (maxThreads == 0) {
216 maxThreads = 1;
217 }
218 long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
219
220
221
222
223
224 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
225 new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
226 pool.allowCoreThreadTimeOut(true);
227 return pool;
228 }
229
230
231
232
233
234
235
236
237
238
239 @Deprecated
240 public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
241 throws IOException {
242 this(conf, TableName.valueOf(tableName), pool);
243 }
244
245
246
247
248
249
250
251
252
253
254 @Deprecated
255 public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
256 throws IOException {
257 this.connection = ConnectionManager.getConnectionInternal(conf);
258 this.configuration = conf;
259 this.pool = pool;
260 if (pool == null) {
261 this.pool = getDefaultExecutor(conf);
262 this.cleanupPoolOnClose = true;
263 } else {
264 this.cleanupPoolOnClose = false;
265 }
266 this.tableName = tableName;
267 this.cleanupConnectionOnClose = true;
268 this.finishSetup();
269 }
270
271
272
273
274
275
276
277
278
279 @Deprecated
280 public HTable(final byte[] tableName, final Connection connection,
281 final ExecutorService pool) throws IOException {
282 this(TableName.valueOf(tableName), connection, pool);
283 }
284
285
286 @Deprecated
287 public HTable(TableName tableName, final Connection connection,
288 final ExecutorService pool) throws IOException {
289 this(tableName, (ClusterConnection)connection, null, null, null, pool);
290 }
291
292
293
294
295
296
297
298
299
300
301 @InterfaceAudience.Private
302 public HTable(TableName tableName, final ClusterConnection connection,
303 final ConnectionConfiguration tableConfig,
304 final RpcRetryingCallerFactory rpcCallerFactory,
305 final RpcControllerFactory rpcControllerFactory,
306 final ExecutorService pool) throws IOException {
307 if (connection == null || connection.isClosed()) {
308 throw new IllegalArgumentException("Connection is null or closed.");
309 }
310 this.tableName = tableName;
311 this.cleanupConnectionOnClose = false;
312 this.connection = connection;
313 this.configuration = connection.getConfiguration();
314 this.connConfiguration = tableConfig;
315 this.pool = pool;
316 if (pool == null) {
317 this.pool = getDefaultExecutor(this.configuration);
318 this.cleanupPoolOnClose = true;
319 } else {
320 this.cleanupPoolOnClose = false;
321 }
322
323 this.rpcCallerFactory = rpcCallerFactory;
324 this.rpcControllerFactory = rpcControllerFactory;
325
326 this.finishSetup();
327 }
328
329
330
331
332
333 @VisibleForTesting
334 protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
335 connection = conn;
336 tableName = params.getTableName();
337 connConfiguration = new ConnectionConfiguration(connection.getConfiguration());
338 cleanupPoolOnClose = false;
339 cleanupConnectionOnClose = false;
340
341 this.mutator = new BufferedMutatorImpl(conn, null, null, params);
342 }
343
344
345
346
347 public static int getMaxKeyValueSize(Configuration conf) {
348 return conf.getInt("hbase.client.keyvalue.maxsize", -1);
349 }
350
351
352
353
354 private void finishSetup() throws IOException {
355 if (connConfiguration == null) {
356 connConfiguration = new ConnectionConfiguration(configuration);
357 }
358 this.operationTimeout = tableName.isSystemTable() ?
359 connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
360 this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
361 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
362 this.scannerCaching = connConfiguration.getScannerCaching();
363 this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
364 if (this.rpcCallerFactory == null) {
365 this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
366 }
367 if (this.rpcControllerFactory == null) {
368 this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
369 }
370
371
372 multiAp = this.connection.getAsyncProcess();
373
374 this.closed = false;
375
376 this.locator = new HRegionLocator(tableName, connection);
377 }
378
379
380
381
382 @Override
383 public Configuration getConfiguration() {
384 return configuration;
385 }
386
387
388
389
390
391
392
393
394
395
396 @Deprecated
397 public static boolean isTableEnabled(String tableName) throws IOException {
398 return isTableEnabled(TableName.valueOf(tableName));
399 }
400
401
402
403
404
405
406
407
408
409
410 @Deprecated
411 public static boolean isTableEnabled(byte[] tableName) throws IOException {
412 return isTableEnabled(TableName.valueOf(tableName));
413 }
414
415
416
417
418
419
420
421
422
423
424 @Deprecated
425 public static boolean isTableEnabled(TableName tableName) throws IOException {
426 return isTableEnabled(HBaseConfiguration.create(), tableName);
427 }
428
429
430
431
432
433
434
435
436
437 @Deprecated
438 public static boolean isTableEnabled(Configuration conf, String tableName)
439 throws IOException {
440 return isTableEnabled(conf, TableName.valueOf(tableName));
441 }
442
443
444
445
446
447
448
449
450
451 @Deprecated
452 public static boolean isTableEnabled(Configuration conf, byte[] tableName)
453 throws IOException {
454 return isTableEnabled(conf, TableName.valueOf(tableName));
455 }
456
457
458
459
460
461
462
463
464
465 @Deprecated
466 public static boolean isTableEnabled(Configuration conf,
467 final TableName tableName) throws IOException {
468 return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
469 @Override
470 public Boolean connect(HConnection connection) throws IOException {
471 return connection.isTableEnabled(tableName);
472 }
473 });
474 }
475
476
477
478
479
480
481
482
483 @Deprecated
484 public HRegionLocation getRegionLocation(final String row)
485 throws IOException {
486 return getRegionLocation(Bytes.toBytes(row), false);
487 }
488
489
490
491
492 @Override
493 @Deprecated
494 public HRegionLocation getRegionLocation(final byte [] row)
495 throws IOException {
496 return locator.getRegionLocation(row);
497 }
498
499
500
501
502 @Override
503 @Deprecated
504 public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
505 throws IOException {
506 return locator.getRegionLocation(row, reload);
507 }
508
509
510
511
512 @Override
513 public byte [] getTableName() {
514 return this.tableName.getName();
515 }
516
517 @Override
518 public TableName getName() {
519 return tableName;
520 }
521
522
523
524
525
526
527
528
529 @Deprecated
530 @VisibleForTesting
531 public HConnection getConnection() {
532 return this.connection;
533 }
534
535
536
537
538
539
540
541 @Deprecated
542 public int getScannerCaching() {
543 return scannerCaching;
544 }
545
546
547
548
549
550 @Deprecated
551 public List<Row> getWriteBuffer() {
552 return mutator == null ? null : mutator.getWriteBuffer();
553 }
554
555
556
557
558
559
560
561
562
563
564
565
566 @Deprecated
567 public void setScannerCaching(int scannerCaching) {
568 this.scannerCaching = scannerCaching;
569 }
570
571
572
573
574 @Override
575 public HTableDescriptor getTableDescriptor() throws IOException {
576 HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
577 rpcControllerFactory, operationTimeout, rpcTimeout);
578 if (htd != null) {
579 return new UnmodifyableHTableDescriptor(htd);
580 }
581 return null;
582 }
583
584
585
586
587
588 @Override
589 @Deprecated
590 public byte [][] getStartKeys() throws IOException {
591 return locator.getStartKeys();
592 }
593
594
595
596
597
598 @Override
599 @Deprecated
600 public byte[][] getEndKeys() throws IOException {
601 return locator.getEndKeys();
602 }
603
604
605
606
607
608 @Override
609 @Deprecated
610 public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
611 return locator.getStartEndKeys();
612 }
613
614
615
616
617
618
619
620
621
622 @Deprecated
623 public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
624
625 return MetaScanner.allTableRegions(this.connection, getName());
626 }
627
628
629
630
631
632
633
634
635
636
637 @Override
638 @Deprecated
639 public List<HRegionLocation> getAllRegionLocations() throws IOException {
640 return locator.getAllRegionLocations();
641 }
642
643
644
645
646
647
648
649
650
651
652
653 @Deprecated
654 public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
655 final byte [] endKey) throws IOException {
656 return getRegionsInRange(startKey, endKey, false);
657 }
658
659
660
661
662
663
664
665
666
667
668
669
670 @Deprecated
671 public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
672 final byte [] endKey, final boolean reload) throws IOException {
673 return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
674 }
675
676
677
678
679
680
681
682
683
684
685
686
687
688 @Deprecated
689 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
690 final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
691 throws IOException {
692 return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
693 }
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708 @Deprecated
709 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
710 final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
711 final boolean reload) throws IOException {
712 final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
713 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
714 throw new IllegalArgumentException(
715 "Invalid range: " + Bytes.toStringBinary(startKey) +
716 " > " + Bytes.toStringBinary(endKey));
717 }
718 List<byte[]> keysInRange = new ArrayList<byte[]>();
719 List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
720 byte[] currentKey = startKey;
721 do {
722 HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
723 keysInRange.add(currentKey);
724 regionsInRange.add(regionLocation);
725 currentKey = regionLocation.getRegionInfo().getEndKey();
726 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
727 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
728 || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
729 return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
730 regionsInRange);
731 }
732
733
734
735
736
737 @Override
738 @Deprecated
739 public Result getRowOrBefore(final byte[] row, final byte[] family)
740 throws IOException {
741 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
742 tableName, row) {
743 @Override
744 public Result call(int callTimeout) throws IOException {
745 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
746 controller.setPriority(tableName);
747 controller.setCallTimeout(callTimeout);
748 ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
749 getLocation().getRegionInfo().getRegionName(), row, family);
750 try {
751 ClientProtos.GetResponse response = getStub().get(controller, request);
752 if (!response.hasResult()) return null;
753 return ProtobufUtil.toResult(response.getResult());
754 } catch (ServiceException se) {
755 throw ProtobufUtil.getRemoteException(se);
756 }
757 }
758 };
759 return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
760 this.operationTimeout);
761 }
762
763
764
765
766
767 @Override
768 public ResultScanner getScanner(final Scan scan) throws IOException {
769 if (scan.getBatch() > 0 && scan.isSmall()) {
770 throw new IllegalArgumentException("Small scan should not be used with batching");
771 }
772
773 if (scan.getCaching() <= 0) {
774 scan.setCaching(getScannerCaching());
775 }
776 if (scan.getMaxResultSize() <= 0) {
777 scan.setMaxResultSize(scannerMaxResultSize);
778 }
779
780 if (scan.isReversed()) {
781 if (scan.isSmall()) {
782 return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
783 this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
784 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
785 } else {
786 return new ReversedClientScanner(getConfiguration(), scan, getName(),
787 this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
788 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
789 }
790 }
791
792 if (scan.isSmall()) {
793 return new ClientSmallScanner(getConfiguration(), scan, getName(),
794 this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
795 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
796 } else {
797 return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
798 this.rpcCallerFactory, this.rpcControllerFactory,
799 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
800 }
801 }
802
803
804
805
806
807 @Override
808 public ResultScanner getScanner(byte [] family) throws IOException {
809 Scan scan = new Scan();
810 scan.addFamily(family);
811 return getScanner(scan);
812 }
813
814
815
816
817
818 @Override
819 public ResultScanner getScanner(byte [] family, byte [] qualifier)
820 throws IOException {
821 Scan scan = new Scan();
822 scan.addColumn(family, qualifier);
823 return getScanner(scan);
824 }
825
826
827
828
829 @Override
830 public Result get(final Get get) throws IOException {
831 return get(get, get.isCheckExistenceOnly());
832 }
833
834 private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
835
836 if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
837 get = ReflectionUtils.newInstance(get.getClass(), get);
838 get.setCheckExistenceOnly(checkExistenceOnly);
839 if (get.getConsistency() == null){
840 get.setConsistency(defaultConsistency);
841 }
842 }
843
844 if (get.getConsistency() == Consistency.STRONG) {
845
846 final Get getReq = get;
847 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
848 getName(), get.getRow()) {
849 @Override
850 public Result call(int callTimeout) throws IOException {
851 ClientProtos.GetRequest request =
852 RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
853 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
854 controller.setPriority(tableName);
855 controller.setCallTimeout(callTimeout);
856 try {
857 ClientProtos.GetResponse response = getStub().get(controller, request);
858 if (response == null) return null;
859 return ProtobufUtil.toResult(response.getResult());
860 } catch (ServiceException se) {
861 throw ProtobufUtil.getRemoteException(se);
862 }
863 }
864 };
865 return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
866 this.operationTimeout);
867 }
868
869
870 RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
871 rpcControllerFactory, tableName, this.connection, get, pool,
872 connConfiguration.getRetriesNumber(),
873 operationTimeout,
874 connConfiguration.getPrimaryCallTimeoutMicroSecond());
875 return callable.call();
876 }
877
878
879
880
881
882 @Override
883 public Result[] get(List<Get> gets) throws IOException {
884 if (gets.size() == 1) {
885 return new Result[]{get(gets.get(0))};
886 }
887 try {
888 Object [] r1 = batch((List)gets);
889
890
891 Result [] results = new Result[r1.length];
892 int i=0;
893 for (Object o : r1) {
894
895 results[i++] = (Result) o;
896 }
897
898 return results;
899 } catch (InterruptedException e) {
900 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
901 }
902 }
903
904
905
906
907 @Override
908 public void batch(final List<? extends Row> actions, final Object[] results)
909 throws InterruptedException, IOException {
910 AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
911 ars.waitUntilDone();
912 if (ars.hasError()) {
913 throw ars.getErrors();
914 }
915 }
916
917
918
919
920
921
922 @Deprecated
923 @Override
924 public Object[] batch(final List<? extends Row> actions)
925 throws InterruptedException, IOException {
926 Object[] results = new Object[actions.size()];
927 batch(actions, results);
928 return results;
929 }
930
931
932
933
934 @Override
935 public <R> void batchCallback(
936 final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
937 throws IOException, InterruptedException {
938 connection.processBatchCallback(actions, tableName, pool, results, callback);
939 }
940
941
942
943
944
945
946
947
948 @Deprecated
949 @Override
950 public <R> Object[] batchCallback(
951 final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
952 InterruptedException {
953 Object[] results = new Object[actions.size()];
954 batchCallback(actions, results, callback);
955 return results;
956 }
957
958
959
960
961 @Override
962 public void delete(final Delete delete)
963 throws IOException {
964 RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
965 tableName, delete.getRow()) {
966 @Override
967 public Boolean call(int callTimeout) throws IOException {
968 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
969 controller.setPriority(tableName);
970 controller.setCallTimeout(callTimeout);
971
972 try {
973 MutateRequest request = RequestConverter.buildMutateRequest(
974 getLocation().getRegionInfo().getRegionName(), delete);
975 MutateResponse response = getStub().mutate(controller, request);
976 return Boolean.valueOf(response.getProcessed());
977 } catch (ServiceException se) {
978 throw ProtobufUtil.getRemoteException(se);
979 }
980 }
981 };
982 rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
983 this.operationTimeout);
984 }
985
986
987
988
989 @Override
990 public void delete(final List<Delete> deletes)
991 throws IOException {
992 Object[] results = new Object[deletes.size()];
993 try {
994 batch(deletes, results);
995 } catch (InterruptedException e) {
996 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
997 } finally {
998
999
1000
1001 for (int i = results.length - 1; i>=0; i--) {
1002
1003 if (results[i] instanceof Result) {
1004 deletes.remove(i);
1005 }
1006 }
1007 }
1008 }
1009
1010
1011
1012
1013
1014 @Override
1015 public void put(final Put put) throws IOException {
1016 getBufferedMutator().mutate(put);
1017 if (autoFlush) {
1018 flushCommits();
1019 }
1020 }
1021
1022
1023
1024
1025
1026 @Override
1027 public void put(final List<Put> puts) throws IOException {
1028 getBufferedMutator().mutate(puts);
1029 if (autoFlush) {
1030 flushCommits();
1031 }
1032 }
1033
1034
1035
1036
1037 @Override
1038 public void mutateRow(final RowMutations rm) throws IOException {
1039 RegionServerCallable<Void> callable =
1040 new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1041 @Override
1042 public Void call(int callTimeout) throws IOException {
1043 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1044 controller.setPriority(tableName);
1045 controller.setCallTimeout(callTimeout);
1046 try {
1047 RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1048 getLocation().getRegionInfo().getRegionName(), rm);
1049 regionMutationBuilder.setAtomic(true);
1050 MultiRequest request =
1051 MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1052 ClientProtos.MultiResponse response = getStub().multi(controller, request);
1053 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1054 if (res.hasException()) {
1055 Throwable ex = ProtobufUtil.toException(res.getException());
1056 if(ex instanceof IOException) {
1057 throw (IOException)ex;
1058 }
1059 throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
1060 }
1061 } catch (ServiceException se) {
1062 throw ProtobufUtil.getRemoteException(se);
1063 }
1064 return null;
1065 }
1066 };
1067 rpcCallerFactory.<Void> newCaller(rpcTimeout).callWithRetries(callable,
1068 this.operationTimeout);
1069 }
1070
1071
1072
1073
1074 @Override
1075 public Result append(final Append append) throws IOException {
1076 if (append.numFamilies() == 0) {
1077 throw new IOException(
1078 "Invalid arguments to append, no columns specified");
1079 }
1080
1081 NonceGenerator ng = this.connection.getNonceGenerator();
1082 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1083 RegionServerCallable<Result> callable =
1084 new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1085 @Override
1086 public Result call(int callTimeout) throws IOException {
1087 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1088 controller.setPriority(getTableName());
1089 controller.setCallTimeout(callTimeout);
1090 try {
1091 MutateRequest request = RequestConverter.buildMutateRequest(
1092 getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1093 MutateResponse response = getStub().mutate(controller, request);
1094 if (!response.hasResult()) return null;
1095 return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1096 } catch (ServiceException se) {
1097 throw ProtobufUtil.getRemoteException(se);
1098 }
1099 }
1100 };
1101 return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
1102 this.operationTimeout);
1103 }
1104
1105
1106
1107
1108 @Override
1109 public Result increment(final Increment increment) throws IOException {
1110 if (!increment.hasFamilies()) {
1111 throw new IOException(
1112 "Invalid arguments to increment, no columns specified");
1113 }
1114 NonceGenerator ng = this.connection.getNonceGenerator();
1115 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1116 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1117 getName(), increment.getRow()) {
1118 @Override
1119 public Result call(int callTimeout) throws IOException {
1120 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1121 controller.setPriority(getTableName());
1122 controller.setCallTimeout(callTimeout);
1123 try {
1124 MutateRequest request = RequestConverter.buildMutateRequest(
1125 getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1126 MutateResponse response = getStub().mutate(controller, request);
1127 return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1128 } catch (ServiceException se) {
1129 throw ProtobufUtil.getRemoteException(se);
1130 }
1131 }
1132 };
1133 return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
1134 this.operationTimeout);
1135 }
1136
1137
1138
1139
1140 @Override
1141 public long incrementColumnValue(final byte [] row, final byte [] family,
1142 final byte [] qualifier, final long amount)
1143 throws IOException {
1144 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1145 }
1146
1147
1148
1149
1150
1151
1152
1153 @Deprecated
1154 @Override
1155 public long incrementColumnValue(final byte [] row, final byte [] family,
1156 final byte [] qualifier, final long amount, final boolean writeToWAL)
1157 throws IOException {
1158 return incrementColumnValue(row, family, qualifier, amount,
1159 writeToWAL? Durability.SYNC_WAL: Durability.SKIP_WAL);
1160 }
1161
1162
1163
1164
1165 @Override
1166 public long incrementColumnValue(final byte [] row, final byte [] family,
1167 final byte [] qualifier, final long amount, final Durability durability)
1168 throws IOException {
1169 NullPointerException npe = null;
1170 if (row == null) {
1171 npe = new NullPointerException("row is null");
1172 } else if (family == null) {
1173 npe = new NullPointerException("family is null");
1174 } else if (qualifier == null) {
1175 npe = new NullPointerException("qualifier is null");
1176 }
1177 if (npe != null) {
1178 throw new IOException(
1179 "Invalid arguments to incrementColumnValue", npe);
1180 }
1181
1182 NonceGenerator ng = this.connection.getNonceGenerator();
1183 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1184 RegionServerCallable<Long> callable =
1185 new RegionServerCallable<Long>(connection, getName(), row) {
1186 @Override
1187 public Long call(int callTimeout) throws IOException {
1188 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1189 controller.setPriority(getTableName());
1190 controller.setCallTimeout(callTimeout);
1191 try {
1192 MutateRequest request = RequestConverter.buildIncrementRequest(
1193 getLocation().getRegionInfo().getRegionName(), row, family,
1194 qualifier, amount, durability, nonceGroup, nonce);
1195 MutateResponse response = getStub().mutate(controller, request);
1196 Result result =
1197 ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1198 return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1199 } catch (ServiceException se) {
1200 throw ProtobufUtil.getRemoteException(se);
1201 }
1202 }
1203 };
1204 return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable,
1205 this.operationTimeout);
1206 }
1207
1208
1209
1210
1211 @Override
1212 public boolean checkAndPut(final byte [] row,
1213 final byte [] family, final byte [] qualifier, final byte [] value,
1214 final Put put)
1215 throws IOException {
1216 RegionServerCallable<Boolean> callable =
1217 new RegionServerCallable<Boolean>(connection, getName(), row) {
1218 @Override
1219 public Boolean call(int callTimeout) throws IOException {
1220 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1221 controller.setPriority(tableName);
1222 controller.setCallTimeout(callTimeout);
1223 try {
1224 MutateRequest request = RequestConverter.buildMutateRequest(
1225 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1226 new BinaryComparator(value), CompareType.EQUAL, put);
1227 MutateResponse response = getStub().mutate(controller, request);
1228 return Boolean.valueOf(response.getProcessed());
1229 } catch (ServiceException se) {
1230 throw ProtobufUtil.getRemoteException(se);
1231 }
1232 }
1233 };
1234 return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1235 this.operationTimeout);
1236 }
1237
1238
1239
1240
1241 @Override
1242 public boolean checkAndPut(final byte [] row, final byte [] family,
1243 final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1244 final Put put)
1245 throws IOException {
1246 RegionServerCallable<Boolean> callable =
1247 new RegionServerCallable<Boolean>(connection, getName(), row) {
1248 @Override
1249 public Boolean call(int callTimeout) throws IOException {
1250 PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1251 controller.setPriority(tableName);
1252 controller.setCallTimeout(callTimeout);
1253 try {
1254 CompareType compareType = CompareType.valueOf(compareOp.name());
1255 MutateRequest request = RequestConverter.buildMutateRequest(
1256 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1257 new BinaryComparator(value), compareType, put);
1258 MutateResponse response = getStub().mutate(controller, request);
1259 return Boolean.valueOf(response.getProcessed());
1260 } catch (ServiceException se) {
1261 throw ProtobufUtil.getRemoteException(se);
1262 }
1263 }
1264 };
1265 return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1266 this.operationTimeout);
1267 }
1268
1269
1270
1271
1272 @Override
1273 public boolean checkAndDelete(final byte [] row,
1274 final byte [] family, final byte [] qualifier, final byte [] value,
1275 final Delete delete)
1276 throws IOException {
1277 RegionServerCallable<Boolean> callable =
1278 new RegionServerCallable<Boolean>(connection, getName(), row) {
1279 @Override
1280 public Boolean call(int callTimeout) throws IOException {
1281 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1282 controller.setPriority(tableName);
1283 controller.setCallTimeout(callTimeout);
1284 try {
1285 MutateRequest request = RequestConverter.buildMutateRequest(
1286 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1287 new BinaryComparator(value), CompareType.EQUAL, delete);
1288 MutateResponse response = getStub().mutate(controller, request);
1289 return Boolean.valueOf(response.getProcessed());
1290 } catch (ServiceException se) {
1291 throw ProtobufUtil.getRemoteException(se);
1292 }
1293 }
1294 };
1295 return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1296 this.operationTimeout);
1297 }
1298
1299
1300
1301
1302 @Override
1303 public boolean checkAndDelete(final byte [] row, final byte [] family,
1304 final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1305 final Delete delete)
1306 throws IOException {
1307 RegionServerCallable<Boolean> callable =
1308 new RegionServerCallable<Boolean>(connection, getName(), row) {
1309 @Override
1310 public Boolean call(int callTimeout) throws IOException {
1311 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1312 controller.setPriority(tableName);
1313 controller.setCallTimeout(callTimeout);
1314 try {
1315 CompareType compareType = CompareType.valueOf(compareOp.name());
1316 MutateRequest request = RequestConverter.buildMutateRequest(
1317 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1318 new BinaryComparator(value), compareType, delete);
1319 MutateResponse response = getStub().mutate(controller, request);
1320 return Boolean.valueOf(response.getProcessed());
1321 } catch (ServiceException se) {
1322 throw ProtobufUtil.getRemoteException(se);
1323 }
1324 }
1325 };
1326 return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1327 this.operationTimeout);
1328 }
1329
1330
1331
1332
1333 @Override
1334 public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
1335 final CompareOp compareOp, final byte [] value, final RowMutations rm)
1336 throws IOException {
1337 RegionServerCallable<Boolean> callable =
1338 new RegionServerCallable<Boolean>(connection, getName(), row) {
1339 @Override
1340 public Boolean call(int callTimeout) throws IOException {
1341 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1342 controller.setPriority(tableName);
1343 controller.setCallTimeout(callTimeout);
1344 try {
1345 CompareType compareType = CompareType.valueOf(compareOp.name());
1346 MultiRequest request = RequestConverter.buildMutateRequest(
1347 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1348 new BinaryComparator(value), compareType, rm);
1349 ClientProtos.MultiResponse response = getStub().multi(controller, request);
1350 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1351 if (res.hasException()) {
1352 Throwable ex = ProtobufUtil.toException(res.getException());
1353 if(ex instanceof IOException) {
1354 throw (IOException)ex;
1355 }
1356 throw new IOException("Failed to checkAndMutate row: "+
1357 Bytes.toStringBinary(rm.getRow()), ex);
1358 }
1359 return Boolean.valueOf(response.getProcessed());
1360 } catch (ServiceException se) {
1361 throw ProtobufUtil.getRemoteException(se);
1362 }
1363 }
1364 };
1365 return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1366 this.operationTimeout);
1367 }
1368
1369
1370
1371
1372 @Override
1373 public boolean exists(final Get get) throws IOException {
1374 Result r = get(get, true);
1375 assert r.getExists() != null;
1376 return r.getExists();
1377 }
1378
1379
1380
1381
1382 @Override
1383 public boolean[] existsAll(final List<Get> gets) throws IOException {
1384 if (gets.isEmpty()) return new boolean[]{};
1385 if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
1386
1387 ArrayList<Get> exists = new ArrayList<Get>(gets.size());
1388 for (Get g: gets){
1389 Get ge = new Get(g);
1390 ge.setCheckExistenceOnly(true);
1391 exists.add(ge);
1392 }
1393
1394 Object[] r1;
1395 try {
1396 r1 = batch(exists);
1397 } catch (InterruptedException e) {
1398 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1399 }
1400
1401
1402 boolean[] results = new boolean[r1.length];
1403 int i = 0;
1404 for (Object o : r1) {
1405
1406 results[i++] = ((Result)o).getExists();
1407 }
1408
1409 return results;
1410 }
1411
1412
1413
1414
1415 @Override
1416 @Deprecated
1417 public Boolean[] exists(final List<Get> gets) throws IOException {
1418 boolean[] results = existsAll(gets);
1419 Boolean[] objectResults = new Boolean[results.length];
1420 for (int i = 0; i < results.length; ++i) {
1421 objectResults[i] = results[i];
1422 }
1423 return objectResults;
1424 }
1425
1426
1427
1428
1429
1430 @Override
1431 public void flushCommits() throws IOException {
1432 if (mutator == null) {
1433
1434 return;
1435 }
1436 getBufferedMutator().flush();
1437 }
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450 public <R> void processBatchCallback(
1451 final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1452 throws IOException, InterruptedException {
1453 this.batchCallback(list, results, callback);
1454 }
1455
1456
1457
1458
1459
1460
1461 public void processBatch(final List<? extends Row> list, final Object[] results)
1462 throws IOException, InterruptedException {
1463 this.batch(list, results);
1464 }
1465
1466
1467 @Override
1468 public void close() throws IOException {
1469 if (this.closed) {
1470 return;
1471 }
1472 flushCommits();
1473 if (cleanupPoolOnClose) {
1474 this.pool.shutdown();
1475 try {
1476 boolean terminated = false;
1477 do {
1478
1479 terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1480 } while (!terminated);
1481 } catch (InterruptedException e) {
1482 this.pool.shutdownNow();
1483 LOG.warn("waitForTermination interrupted");
1484 }
1485 }
1486 if (cleanupConnectionOnClose) {
1487 if (this.connection != null) {
1488 this.connection.close();
1489 }
1490 }
1491 this.closed = true;
1492 }
1493
1494
1495 public void validatePut(final Put put) throws IllegalArgumentException {
1496 validatePut(put, connConfiguration.getMaxKeyValueSize());
1497 }
1498
1499
1500 public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1501 if (put.isEmpty()) {
1502 throw new IllegalArgumentException("No columns to insert");
1503 }
1504 if (maxKeyValueSize > 0) {
1505 for (List<Cell> list : put.getFamilyCellMap().values()) {
1506 for (Cell cell : list) {
1507 if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1508 throw new IllegalArgumentException("KeyValue size too large");
1509 }
1510 }
1511 }
1512 }
1513 }
1514
1515
1516
1517
1518 @Override
1519 public boolean isAutoFlush() {
1520 return autoFlush;
1521 }
1522
1523
1524
1525
1526 @Deprecated
1527 @Override
1528 public void setAutoFlush(boolean autoFlush) {
1529 this.autoFlush = autoFlush;
1530 }
1531
1532
1533
1534
1535 @Override
1536 public void setAutoFlushTo(boolean autoFlush) {
1537 this.autoFlush = autoFlush;
1538 }
1539
1540
1541
1542
1543 @Override
1544 public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1545 this.autoFlush = autoFlush;
1546 }
1547
1548
1549
1550
1551
1552
1553
1554
1555 @Override
1556 public long getWriteBufferSize() {
1557 if (mutator == null) {
1558 return connConfiguration.getWriteBufferSize();
1559 } else {
1560 return mutator.getWriteBufferSize();
1561 }
1562 }
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572 @Override
1573 public void setWriteBufferSize(long writeBufferSize) throws IOException {
1574 getBufferedMutator();
1575 mutator.setWriteBufferSize(writeBufferSize);
1576 }
1577
1578
1579
1580
1581
1582 ExecutorService getPool() {
1583 return this.pool;
1584 }
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596 @Deprecated
1597 public static void setRegionCachePrefetch(final byte[] tableName,
1598 final boolean enable) throws IOException {
1599 }
1600
1601
1602
1603
1604 @Deprecated
1605 public static void setRegionCachePrefetch(
1606 final TableName tableName,
1607 final boolean enable) throws IOException {
1608 }
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621 @Deprecated
1622 public static void setRegionCachePrefetch(final Configuration conf,
1623 final byte[] tableName, final boolean enable) throws IOException {
1624 }
1625
1626
1627
1628
1629 @Deprecated
1630 public static void setRegionCachePrefetch(final Configuration conf,
1631 final TableName tableName,
1632 final boolean enable) throws IOException {
1633 }
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644 @Deprecated
1645 public static boolean getRegionCachePrefetch(final Configuration conf,
1646 final byte[] tableName) throws IOException {
1647 return false;
1648 }
1649
1650
1651
1652
1653 @Deprecated
1654 public static boolean getRegionCachePrefetch(final Configuration conf,
1655 final TableName tableName) throws IOException {
1656 return false;
1657 }
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667 @Deprecated
1668 public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1669 return false;
1670 }
1671
1672
1673
1674
1675 @Deprecated
1676 public static boolean getRegionCachePrefetch(
1677 final TableName tableName) throws IOException {
1678 return false;
1679 }
1680
1681
1682
1683
1684
1685 public void clearRegionCache() {
1686 this.connection.clearRegionCache();
1687 }
1688
1689
1690
1691
1692 @Override
1693 public CoprocessorRpcChannel coprocessorService(byte[] row) {
1694 return new RegionCoprocessorRpcChannel(connection, tableName, row);
1695 }
1696
1697
1698
1699
1700 @Override
1701 public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1702 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1703 throws ServiceException, Throwable {
1704 final Map<byte[],R> results = Collections.synchronizedMap(
1705 new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1706 coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1707 @Override
1708 public void update(byte[] region, byte[] row, R value) {
1709 if (region != null) {
1710 results.put(region, value);
1711 }
1712 }
1713 });
1714 return results;
1715 }
1716
1717
1718
1719
1720 @Override
1721 public <T extends Service, R> void coprocessorService(final Class<T> service,
1722 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1723 final Batch.Callback<R> callback) throws ServiceException, Throwable {
1724
1725
1726 List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1727
1728 Map<byte[],Future<R>> futures =
1729 new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1730 for (final byte[] r : keys) {
1731 final RegionCoprocessorRpcChannel channel =
1732 new RegionCoprocessorRpcChannel(connection, tableName, r);
1733 Future<R> future = pool.submit(
1734 new Callable<R>() {
1735 @Override
1736 public R call() throws Exception {
1737 T instance = ProtobufUtil.newServiceStub(service, channel);
1738 R result = callable.call(instance);
1739 byte[] region = channel.getLastRegion();
1740 if (callback != null) {
1741 callback.update(region, r, result);
1742 }
1743 return result;
1744 }
1745 });
1746 futures.put(r, future);
1747 }
1748 for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1749 try {
1750 e.getValue().get();
1751 } catch (ExecutionException ee) {
1752 LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1753 + Bytes.toStringBinary(e.getKey()), ee);
1754 throw ee.getCause();
1755 } catch (InterruptedException ie) {
1756 throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1757 + " for row " + Bytes.toStringBinary(e.getKey()))
1758 .initCause(ie);
1759 }
1760 }
1761 }
1762
1763 private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1764 throws IOException {
1765 if (start == null) {
1766 start = HConstants.EMPTY_START_ROW;
1767 }
1768 if (end == null) {
1769 end = HConstants.EMPTY_END_ROW;
1770 }
1771 return getKeysAndRegionsInRange(start, end, true).getFirst();
1772 }
1773
1774 public void setOperationTimeout(int operationTimeout) {
1775 this.operationTimeout = operationTimeout;
1776 }
1777
1778 public int getOperationTimeout() {
1779 return operationTimeout;
1780 }
1781
1782 public void setRpcTimeout(int rpcTimeout) {
1783 this.rpcTimeout = rpcTimeout;
1784 }
1785
1786 public int getRpcTimeout() {
1787 return rpcTimeout;
1788 }
1789
1790 @Override
1791 public String toString() {
1792 return tableName + ";" + connection;
1793 }
1794
1795
1796
1797
1798 @Override
1799 public <R extends Message> Map<byte[], R> batchCoprocessorService(
1800 Descriptors.MethodDescriptor methodDescriptor, Message request,
1801 byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1802 final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1803 Bytes.BYTES_COMPARATOR));
1804 batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1805 new Callback<R>() {
1806
1807 @Override
1808 public void update(byte[] region, byte[] row, R result) {
1809 if (region != null) {
1810 results.put(region, result);
1811 }
1812 }
1813 });
1814 return results;
1815 }
1816
1817
1818
1819
1820 @Override
1821 public <R extends Message> void batchCoprocessorService(
1822 final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1823 byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1824 throws ServiceException, Throwable {
1825
1826 if (startKey == null) {
1827 startKey = HConstants.EMPTY_START_ROW;
1828 }
1829 if (endKey == null) {
1830 endKey = HConstants.EMPTY_END_ROW;
1831 }
1832
1833 Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1834 getKeysAndRegionsInRange(startKey, endKey, true);
1835 List<byte[]> keys = keysAndRegions.getFirst();
1836 List<HRegionLocation> regions = keysAndRegions.getSecond();
1837
1838
1839 if (keys.isEmpty()) {
1840 LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1841 ", end=" + Bytes.toStringBinary(endKey));
1842 return;
1843 }
1844
1845 List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1846 final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1847 new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1848 for (int i = 0; i < keys.size(); i++) {
1849 final byte[] rowKey = keys.get(i);
1850 final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1851 RegionCoprocessorServiceExec exec =
1852 new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1853 execs.add(exec);
1854 execsByRow.put(rowKey, exec);
1855 }
1856
1857
1858
1859 final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1860 final List<Row> callbackErrorActions = new ArrayList<Row>();
1861 final List<String> callbackErrorServers = new ArrayList<String>();
1862 Object[] results = new Object[execs.size()];
1863
1864 AsyncProcess asyncProcess =
1865 new AsyncProcess(connection, configuration, pool,
1866 RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1867 true, RpcControllerFactory.instantiate(configuration));
1868
1869 AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1870 new Callback<ClientProtos.CoprocessorServiceResult>() {
1871 @Override
1872 public void update(byte[] region, byte[] row,
1873 ClientProtos.CoprocessorServiceResult serviceResult) {
1874 if (LOG.isTraceEnabled()) {
1875 LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1876 ": region=" + Bytes.toStringBinary(region) +
1877 ", row=" + Bytes.toStringBinary(row) +
1878 ", value=" + serviceResult.getValue().getValue());
1879 }
1880 try {
1881 Message.Builder builder = responsePrototype.newBuilderForType();
1882 ProtobufUtil.mergeFrom(builder, serviceResult.getValue().getValue());
1883 callback.update(region, row, (R) builder.build());
1884 } catch (IOException e) {
1885 LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1886 e);
1887 callbackErrorExceptions.add(e);
1888 callbackErrorActions.add(execsByRow.get(row));
1889 callbackErrorServers.add("null");
1890 }
1891 }
1892 }, results);
1893
1894 future.waitUntilDone();
1895
1896 if (future.hasError()) {
1897 throw future.getErrors();
1898 } else if (!callbackErrorExceptions.isEmpty()) {
1899 throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1900 callbackErrorServers);
1901 }
1902 }
1903
1904 public RegionLocator getRegionLocator() {
1905 return this.locator;
1906 }
1907
1908 @VisibleForTesting
1909 BufferedMutator getBufferedMutator() throws IOException {
1910 if (mutator == null) {
1911 this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
1912 new BufferedMutatorParams(tableName)
1913 .pool(pool)
1914 .writeBufferSize(connConfiguration.getWriteBufferSize())
1915 .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
1916 );
1917 }
1918 return mutator;
1919 }
1920 }