1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.NavigableMap;
28 import java.util.TreeMap;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.SynchronousQueue;
34 import java.util.concurrent.ThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.classification.InterfaceStability;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hbase.Cell;
43 import org.apache.hadoop.hbase.HBaseConfiguration;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.HRegionLocation;
47 import org.apache.hadoop.hbase.HTableDescriptor;
48 import org.apache.hadoop.hbase.KeyValueUtil;
49 import org.apache.hadoop.hbase.ServerName;
50 import org.apache.hadoop.hbase.TableName;
51 import org.apache.hadoop.hbase.TableNotFoundException;
52 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
53 import org.apache.hadoop.hbase.client.coprocessor.Batch;
54 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
55 import org.apache.hadoop.hbase.filter.BinaryComparator;
56 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
57 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
58 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
59 import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
60 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
61 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
62 import org.apache.hadoop.hbase.protobuf.RequestConverter;
63 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
64 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
65 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
66 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
67 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
68 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
69 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
70 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
71 import org.apache.hadoop.hbase.util.Bytes;
72 import org.apache.hadoop.hbase.util.Pair;
73 import org.apache.hadoop.hbase.util.ReflectionUtils;
74 import org.apache.hadoop.hbase.util.Threads;
75
76 import com.google.common.annotations.VisibleForTesting;
77 import com.google.protobuf.Descriptors;
78 import com.google.protobuf.Message;
79 import com.google.protobuf.Service;
80 import com.google.protobuf.ServiceException;
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 @InterfaceAudience.Private
115 @InterfaceStability.Stable
116 public class HTable implements HTableInterface, RegionLocator {
117 private static final Log LOG = LogFactory.getLog(HTable.class);
118 protected ClusterConnection connection;
119 private final TableName tableName;
120 private volatile Configuration configuration;
121 private ConnectionConfiguration connConfiguration;
122 protected BufferedMutatorImpl mutator;
123 private boolean autoFlush = true;
124 private boolean closed = false;
125 protected int scannerCaching;
126 protected long scannerMaxResultSize;
127 private ExecutorService pool;
128 private int operationTimeout;
129 private int rpcTimeout;
130 private final boolean cleanupPoolOnClose;
131 private final boolean cleanupConnectionOnClose;
132 private Consistency defaultConsistency = Consistency.STRONG;
133 private HRegionLocator locator;
134
135
136 protected AsyncProcess multiAp;
137 private RpcRetryingCallerFactory rpcCallerFactory;
138 private RpcControllerFactory rpcControllerFactory;
139
140
141
142
143
144
145
146
147
148 @Deprecated
149 public HTable(Configuration conf, final String tableName)
150 throws IOException {
151 this(conf, TableName.valueOf(tableName));
152 }
153
154
155
156
157
158
159
160
161
162 @Deprecated
163 public HTable(Configuration conf, final byte[] tableName)
164 throws IOException {
165 this(conf, TableName.valueOf(tableName));
166 }
167
168
169
170
171
172
173
174
175
176 @Deprecated
177 public HTable(Configuration conf, final TableName tableName)
178 throws IOException {
179 this.tableName = tableName;
180 this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
181 if (conf == null) {
182 this.connection = null;
183 return;
184 }
185 this.connection = ConnectionManager.getConnectionInternal(conf);
186 this.configuration = conf;
187
188 this.pool = getDefaultExecutor(conf);
189 this.finishSetup();
190 }
191
192
193
194
195
196
197
198
199 @Deprecated
200 public HTable(TableName tableName, Connection connection) throws IOException {
201 this.tableName = tableName;
202 this.cleanupPoolOnClose = true;
203 this.cleanupConnectionOnClose = false;
204 this.connection = (ClusterConnection)connection;
205 this.configuration = connection.getConfiguration();
206
207 this.pool = getDefaultExecutor(this.configuration);
208 this.finishSetup();
209 }
210
211
212 @InterfaceAudience.Private
213 public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
214 int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
215 if (maxThreads == 0) {
216 maxThreads = 1;
217 }
218 long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
219
220
221
222
223
224 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
225 new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
226 pool.allowCoreThreadTimeOut(true);
227 return pool;
228 }
229
230
231
232
233
234
235
236
237
238
239 @Deprecated
240 public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
241 throws IOException {
242 this(conf, TableName.valueOf(tableName), pool);
243 }
244
245
246
247
248
249
250
251
252
253
254 @Deprecated
255 public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
256 throws IOException {
257 this.connection = ConnectionManager.getConnectionInternal(conf);
258 this.configuration = conf;
259 this.pool = pool;
260 if (pool == null) {
261 this.pool = getDefaultExecutor(conf);
262 this.cleanupPoolOnClose = true;
263 } else {
264 this.cleanupPoolOnClose = false;
265 }
266 this.tableName = tableName;
267 this.cleanupConnectionOnClose = true;
268 this.finishSetup();
269 }
270
271
272
273
274
275
276
277
278
279 @Deprecated
280 public HTable(final byte[] tableName, final Connection connection,
281 final ExecutorService pool) throws IOException {
282 this(TableName.valueOf(tableName), connection, pool);
283 }
284
285
286 @Deprecated
287 public HTable(TableName tableName, final Connection connection,
288 final ExecutorService pool) throws IOException {
289 this(tableName, (ClusterConnection)connection, null, null, null, pool);
290 }
291
292
293
294
295
296
297
298
299
300
301 @InterfaceAudience.Private
302 public HTable(TableName tableName, final ClusterConnection connection,
303 final ConnectionConfiguration tableConfig,
304 final RpcRetryingCallerFactory rpcCallerFactory,
305 final RpcControllerFactory rpcControllerFactory,
306 final ExecutorService pool) throws IOException {
307 if (connection == null || connection.isClosed()) {
308 throw new IllegalArgumentException("Connection is null or closed.");
309 }
310 this.tableName = tableName;
311 this.cleanupConnectionOnClose = false;
312 this.connection = connection;
313 this.configuration = connection.getConfiguration();
314 this.connConfiguration = tableConfig;
315 this.pool = pool;
316 if (pool == null) {
317 this.pool = getDefaultExecutor(this.configuration);
318 this.cleanupPoolOnClose = true;
319 } else {
320 this.cleanupPoolOnClose = false;
321 }
322
323 this.rpcCallerFactory = rpcCallerFactory;
324 this.rpcControllerFactory = rpcControllerFactory;
325
326 this.finishSetup();
327 }
328
329
330
331
332
333 @VisibleForTesting
334 protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
335 connection = conn;
336 tableName = params.getTableName();
337 connConfiguration = new ConnectionConfiguration(connection.getConfiguration());
338 cleanupPoolOnClose = false;
339 cleanupConnectionOnClose = false;
340
341 this.mutator = new BufferedMutatorImpl(conn, null, null, params);
342 }
343
344
345
346
347 public static int getMaxKeyValueSize(Configuration conf) {
348 return conf.getInt("hbase.client.keyvalue.maxsize", -1);
349 }
350
351
352
353
354 private void finishSetup() throws IOException {
355 if (connConfiguration == null) {
356 connConfiguration = new ConnectionConfiguration(configuration);
357 }
358 this.operationTimeout = tableName.isSystemTable() ?
359 connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
360 this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
361 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
362 this.scannerCaching = connConfiguration.getScannerCaching();
363 this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
364 if (this.rpcCallerFactory == null) {
365 this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
366 }
367 if (this.rpcControllerFactory == null) {
368 this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
369 }
370
371
372 multiAp = this.connection.getAsyncProcess();
373
374 this.closed = false;
375
376 this.locator = new HRegionLocator(tableName, connection);
377 }
378
379
380
381
382 @Override
383 public Configuration getConfiguration() {
384 return configuration;
385 }
386
387
388
389
390
391
392
393
394
395
396 @Deprecated
397 public static boolean isTableEnabled(String tableName) throws IOException {
398 return isTableEnabled(TableName.valueOf(tableName));
399 }
400
401
402
403
404
405
406
407
408
409
410 @Deprecated
411 public static boolean isTableEnabled(byte[] tableName) throws IOException {
412 return isTableEnabled(TableName.valueOf(tableName));
413 }
414
415
416
417
418
419
420
421
422
423
424 @Deprecated
425 public static boolean isTableEnabled(TableName tableName) throws IOException {
426 return isTableEnabled(HBaseConfiguration.create(), tableName);
427 }
428
429
430
431
432
433
434
435
436
437 @Deprecated
438 public static boolean isTableEnabled(Configuration conf, String tableName)
439 throws IOException {
440 return isTableEnabled(conf, TableName.valueOf(tableName));
441 }
442
443
444
445
446
447
448
449
450
451 @Deprecated
452 public static boolean isTableEnabled(Configuration conf, byte[] tableName)
453 throws IOException {
454 return isTableEnabled(conf, TableName.valueOf(tableName));
455 }
456
457
458
459
460
461
462
463
464
465 @Deprecated
466 public static boolean isTableEnabled(Configuration conf,
467 final TableName tableName) throws IOException {
468 return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
469 @Override
470 public Boolean connect(HConnection connection) throws IOException {
471 return connection.isTableEnabled(tableName);
472 }
473 });
474 }
475
476
477
478
479
480
481
482
483 @Deprecated
484 public HRegionLocation getRegionLocation(final String row)
485 throws IOException {
486 return getRegionLocation(Bytes.toBytes(row), false);
487 }
488
489
490
491
492 @Override
493 @Deprecated
494 public HRegionLocation getRegionLocation(final byte [] row)
495 throws IOException {
496 return locator.getRegionLocation(row);
497 }
498
499
500
501
502 @Override
503 @Deprecated
504 public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
505 throws IOException {
506 return locator.getRegionLocation(row, reload);
507 }
508
509
510
511
512 @Override
513 public byte [] getTableName() {
514 return this.tableName.getName();
515 }
516
517 @Override
518 public TableName getName() {
519 return tableName;
520 }
521
522
523
524
525
526
527
528
529 @Deprecated
530 @VisibleForTesting
531 public HConnection getConnection() {
532 return this.connection;
533 }
534
535
536
537
538
539
540
541 @Deprecated
542 public int getScannerCaching() {
543 return scannerCaching;
544 }
545
546
547
548
549
550 @Deprecated
551 public List<Row> getWriteBuffer() {
552 return mutator == null ? null : mutator.getWriteBuffer();
553 }
554
555
556
557
558
559
560
561
562
563
564
565
566 @Deprecated
567 public void setScannerCaching(int scannerCaching) {
568 this.scannerCaching = scannerCaching;
569 }
570
571
572
573
574 @Override
575 public HTableDescriptor getTableDescriptor() throws IOException {
576 HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection,
577 rpcCallerFactory, rpcControllerFactory, operationTimeout, rpcTimeout);
578 if (htd != null) {
579 return new UnmodifyableHTableDescriptor(htd);
580 }
581 return null;
582 }
583
584
585
586
587
588 @Override
589 @Deprecated
590 public byte [][] getStartKeys() throws IOException {
591 return locator.getStartKeys();
592 }
593
594
595
596
597
598 @Override
599 @Deprecated
600 public byte[][] getEndKeys() throws IOException {
601 return locator.getEndKeys();
602 }
603
604
605
606
607
608 @Override
609 @Deprecated
610 public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
611 return locator.getStartEndKeys();
612 }
613
614
615
616
617
618
619
620
621
622 @Deprecated
623 public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
624
625 return MetaScanner.allTableRegions(this.connection, getName());
626 }
627
628
629
630
631
632
633
634
635
636
637 @Override
638 @Deprecated
639 public List<HRegionLocation> getAllRegionLocations() throws IOException {
640 return locator.getAllRegionLocations();
641 }
642
643
644
645
646
647
648
649
650
651
652
653 @Deprecated
654 public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
655 final byte [] endKey) throws IOException {
656 return getRegionsInRange(startKey, endKey, false);
657 }
658
659
660
661
662
663
664
665
666
667
668
669
670 @Deprecated
671 public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
672 final byte [] endKey, final boolean reload) throws IOException {
673 return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
674 }
675
676
677
678
679
680
681
682
683
684
685
686
687
688 @Deprecated
689 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
690 final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
691 throws IOException {
692 return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
693 }
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708 @Deprecated
709 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
710 final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
711 final boolean reload) throws IOException {
712 final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
713 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
714 throw new IllegalArgumentException(
715 "Invalid range: " + Bytes.toStringBinary(startKey) +
716 " > " + Bytes.toStringBinary(endKey));
717 }
718 List<byte[]> keysInRange = new ArrayList<byte[]>();
719 List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
720 byte[] currentKey = startKey;
721 do {
722 HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
723 keysInRange.add(currentKey);
724 regionsInRange.add(regionLocation);
725 currentKey = regionLocation.getRegionInfo().getEndKey();
726 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
727 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
728 || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
729 return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
730 regionsInRange);
731 }
732
733
734
735
736
737 @Override
738 @Deprecated
739 public Result getRowOrBefore(final byte[] row, final byte[] family)
740 throws IOException {
741 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
742 tableName, row) {
743 @Override
744 public Result call(int callTimeout) throws IOException {
745 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
746 controller.setPriority(tableName);
747 controller.setCallTimeout(callTimeout);
748 ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
749 getLocation().getRegionInfo().getRegionName(), row, family);
750 try {
751 ClientProtos.GetResponse response = getStub().get(controller, request);
752 if (!response.hasResult()) return null;
753 return ProtobufUtil.toResult(response.getResult());
754 } catch (ServiceException se) {
755 throw ProtobufUtil.getRemoteException(se);
756 }
757 }
758 };
759 return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
760 this.operationTimeout);
761 }
762
763
764
765
766
767 @Override
768 public ResultScanner getScanner(final Scan scan) throws IOException {
769 if (scan.getBatch() > 0 && scan.isSmall()) {
770 throw new IllegalArgumentException("Small scan should not be used with batching");
771 }
772
773 if (scan.getCaching() <= 0) {
774 scan.setCaching(getScannerCaching());
775 }
776 if (scan.getMaxResultSize() <= 0) {
777 scan.setMaxResultSize(scannerMaxResultSize);
778 }
779
780 if (scan.isReversed()) {
781 if (scan.isSmall()) {
782 return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
783 this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
784 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
785 } else {
786 return new ReversedClientScanner(getConfiguration(), scan, getName(),
787 this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
788 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
789 }
790 }
791
792 if (scan.isSmall()) {
793 return new ClientSmallScanner(getConfiguration(), scan, getName(),
794 this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
795 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
796 } else {
797 return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
798 this.rpcCallerFactory, this.rpcControllerFactory,
799 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
800 }
801 }
802
803
804
805
806
807 @Override
808 public ResultScanner getScanner(byte [] family) throws IOException {
809 Scan scan = new Scan();
810 scan.addFamily(family);
811 return getScanner(scan);
812 }
813
814
815
816
817
818 @Override
819 public ResultScanner getScanner(byte [] family, byte [] qualifier)
820 throws IOException {
821 Scan scan = new Scan();
822 scan.addColumn(family, qualifier);
823 return getScanner(scan);
824 }
825
826
827
828
829 @Override
830 public Result get(final Get get) throws IOException {
831 return get(get, get.isCheckExistenceOnly());
832 }
833
834 private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
835
836 if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
837 get = ReflectionUtils.newInstance(get.getClass(), get);
838 get.setCheckExistenceOnly(checkExistenceOnly);
839 if (get.getConsistency() == null){
840 get.setConsistency(defaultConsistency);
841 }
842 }
843
844 if (get.getConsistency() == Consistency.STRONG) {
845
846 final Get getReq = get;
847 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
848 getName(), get.getRow()) {
849 @Override
850 public Result call(int callTimeout) throws IOException {
851 ClientProtos.GetRequest request =
852 RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
853 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
854 controller.setPriority(tableName);
855 controller.setCallTimeout(callTimeout);
856 try {
857 ClientProtos.GetResponse response = getStub().get(controller, request);
858 if (response == null) return null;
859 return ProtobufUtil.toResult(response.getResult());
860 } catch (ServiceException se) {
861 throw ProtobufUtil.getRemoteException(se);
862 }
863 }
864 };
865 return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
866 this.operationTimeout);
867 }
868
869
870 RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
871 rpcControllerFactory, tableName, this.connection, get, pool,
872 connConfiguration.getRetriesNumber(),
873 operationTimeout,
874 connConfiguration.getPrimaryCallTimeoutMicroSecond());
875 return callable.call();
876 }
877
878
879
880
881
882 @Override
883 public Result[] get(List<Get> gets) throws IOException {
884 if (gets.size() == 1) {
885 return new Result[]{get(gets.get(0))};
886 }
887 try {
888 Object [] r1 = batch((List)gets);
889
890
891 Result [] results = new Result[r1.length];
892 int i=0;
893 for (Object o : r1) {
894
895 results[i++] = (Result) o;
896 }
897
898 return results;
899 } catch (InterruptedException e) {
900 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
901 }
902 }
903
904
905
906
907 @Override
908 public void batch(final List<? extends Row> actions, final Object[] results)
909 throws InterruptedException, IOException {
910 AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
911 ars.waitUntilDone();
912 if (ars.hasError()) {
913 throw ars.getErrors();
914 }
915 }
916
917
918
919
920
921
922 @Deprecated
923 @Override
924 public Object[] batch(final List<? extends Row> actions)
925 throws InterruptedException, IOException {
926 Object[] results = new Object[actions.size()];
927 batch(actions, results);
928 return results;
929 }
930
931
932
933
934 @Override
935 public <R> void batchCallback(
936 final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
937 throws IOException, InterruptedException {
938 connection.processBatchCallback(actions, tableName, pool, results, callback);
939 }
940
941
942
943
944
945
946
947
948 @Deprecated
949 @Override
950 public <R> Object[] batchCallback(
951 final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
952 InterruptedException {
953 Object[] results = new Object[actions.size()];
954 batchCallback(actions, results, callback);
955 return results;
956 }
957
958
959
960
961 @Override
962 public void delete(final Delete delete)
963 throws IOException {
964 RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
965 tableName, delete.getRow()) {
966 @Override
967 public Boolean call(int callTimeout) throws IOException {
968 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
969 controller.setPriority(tableName);
970 controller.setCallTimeout(callTimeout);
971
972 try {
973 MutateRequest request = RequestConverter.buildMutateRequest(
974 getLocation().getRegionInfo().getRegionName(), delete);
975 MutateResponse response = getStub().mutate(controller, request);
976 return Boolean.valueOf(response.getProcessed());
977 } catch (ServiceException se) {
978 throw ProtobufUtil.getRemoteException(se);
979 }
980 }
981 };
982 rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
983 this.operationTimeout);
984 }
985
986
987
988
989 @Override
990 public void delete(final List<Delete> deletes)
991 throws IOException {
992 Object[] results = new Object[deletes.size()];
993 try {
994 batch(deletes, results);
995 } catch (InterruptedException e) {
996 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
997 } finally {
998
999
1000
1001 for (int i = results.length - 1; i>=0; i--) {
1002
1003 if (results[i] instanceof Result) {
1004 deletes.remove(i);
1005 }
1006 }
1007 }
1008 }
1009
1010
1011
1012
1013
1014 @Override
1015 public void put(final Put put) throws IOException {
1016 getBufferedMutator().mutate(put);
1017 if (autoFlush) {
1018 flushCommits();
1019 }
1020 }
1021
1022
1023
1024
1025
1026 @Override
1027 public void put(final List<Put> puts) throws IOException {
1028 getBufferedMutator().mutate(puts);
1029 if (autoFlush) {
1030 flushCommits();
1031 }
1032 }
1033
1034
1035
1036
1037 @Override
1038 public void mutateRow(final RowMutations rm) throws IOException {
1039 RegionServerCallable<Void> callable =
1040 new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1041 @Override
1042 public Void call(int callTimeout) throws IOException {
1043 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1044 controller.setPriority(tableName);
1045 controller.setCallTimeout(callTimeout);
1046 try {
1047 RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1048 getLocation().getRegionInfo().getRegionName(), rm);
1049 regionMutationBuilder.setAtomic(true);
1050 MultiRequest request =
1051 MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1052 ClientProtos.MultiResponse response = getStub().multi(controller, request);
1053 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1054 if (res.hasException()) {
1055 Throwable ex = ProtobufUtil.toException(res.getException());
1056 if(ex instanceof IOException) {
1057 throw (IOException)ex;
1058 }
1059 throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
1060 }
1061 } catch (ServiceException se) {
1062 throw ProtobufUtil.getRemoteException(se);
1063 }
1064 return null;
1065 }
1066 };
1067 rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1068 }
1069
1070
1071
1072
1073 @Override
1074 public Result append(final Append append) throws IOException {
1075 if (append.numFamilies() == 0) {
1076 throw new IOException(
1077 "Invalid arguments to append, no columns specified");
1078 }
1079
1080 NonceGenerator ng = this.connection.getNonceGenerator();
1081 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1082 RegionServerCallable<Result> callable =
1083 new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1084 @Override
1085 public Result call(int callTimeout) throws IOException {
1086 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1087 controller.setPriority(getTableName());
1088 controller.setCallTimeout(callTimeout);
1089 try {
1090 MutateRequest request = RequestConverter.buildMutateRequest(
1091 getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1092 MutateResponse response = getStub().mutate(controller, request);
1093 if (!response.hasResult()) return null;
1094 return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1095 } catch (ServiceException se) {
1096 throw ProtobufUtil.getRemoteException(se);
1097 }
1098 }
1099 };
1100 return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
1101 this.operationTimeout);
1102 }
1103
1104
1105
1106
1107 @Override
1108 public Result increment(final Increment increment) throws IOException {
1109 if (!increment.hasFamilies()) {
1110 throw new IOException(
1111 "Invalid arguments to increment, no columns specified");
1112 }
1113 NonceGenerator ng = this.connection.getNonceGenerator();
1114 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1115 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1116 getName(), increment.getRow()) {
1117 @Override
1118 public Result call(int callTimeout) throws IOException {
1119 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1120 controller.setPriority(getTableName());
1121 controller.setCallTimeout(callTimeout);
1122 try {
1123 MutateRequest request = RequestConverter.buildMutateRequest(
1124 getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1125 MutateResponse response = getStub().mutate(controller, request);
1126 return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1127 } catch (ServiceException se) {
1128 throw ProtobufUtil.getRemoteException(se);
1129 }
1130 }
1131 };
1132 return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
1133 this.operationTimeout);
1134 }
1135
1136
1137
1138
1139 @Override
1140 public long incrementColumnValue(final byte [] row, final byte [] family,
1141 final byte [] qualifier, final long amount)
1142 throws IOException {
1143 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1144 }
1145
1146
1147
1148
1149
1150
1151
1152 @Deprecated
1153 @Override
1154 public long incrementColumnValue(final byte [] row, final byte [] family,
1155 final byte [] qualifier, final long amount, final boolean writeToWAL)
1156 throws IOException {
1157 return incrementColumnValue(row, family, qualifier, amount,
1158 writeToWAL? Durability.SYNC_WAL: Durability.SKIP_WAL);
1159 }
1160
1161
1162
1163
1164 @Override
1165 public long incrementColumnValue(final byte [] row, final byte [] family,
1166 final byte [] qualifier, final long amount, final Durability durability)
1167 throws IOException {
1168 NullPointerException npe = null;
1169 if (row == null) {
1170 npe = new NullPointerException("row is null");
1171 } else if (family == null) {
1172 npe = new NullPointerException("family is null");
1173 } else if (qualifier == null) {
1174 npe = new NullPointerException("qualifier is null");
1175 }
1176 if (npe != null) {
1177 throw new IOException(
1178 "Invalid arguments to incrementColumnValue", npe);
1179 }
1180
1181 NonceGenerator ng = this.connection.getNonceGenerator();
1182 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1183 RegionServerCallable<Long> callable =
1184 new RegionServerCallable<Long>(connection, getName(), row) {
1185 @Override
1186 public Long call(int callTimeout) throws IOException {
1187 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1188 controller.setPriority(getTableName());
1189 controller.setCallTimeout(callTimeout);
1190 try {
1191 MutateRequest request = RequestConverter.buildIncrementRequest(
1192 getLocation().getRegionInfo().getRegionName(), row, family,
1193 qualifier, amount, durability, nonceGroup, nonce);
1194 MutateResponse response = getStub().mutate(controller, request);
1195 Result result =
1196 ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1197 return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1198 } catch (ServiceException se) {
1199 throw ProtobufUtil.getRemoteException(se);
1200 }
1201 }
1202 };
1203 return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable,
1204 this.operationTimeout);
1205 }
1206
1207
1208
1209
1210 @Override
1211 public boolean checkAndPut(final byte [] row,
1212 final byte [] family, final byte [] qualifier, final byte [] value,
1213 final Put put)
1214 throws IOException {
1215 RegionServerCallable<Boolean> callable =
1216 new RegionServerCallable<Boolean>(connection, getName(), row) {
1217 @Override
1218 public Boolean call(int callTimeout) throws IOException {
1219 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1220 controller.setPriority(tableName);
1221 controller.setCallTimeout(callTimeout);
1222 try {
1223 MutateRequest request = RequestConverter.buildMutateRequest(
1224 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1225 new BinaryComparator(value), CompareType.EQUAL, put);
1226 MutateResponse response = getStub().mutate(controller, request);
1227 return Boolean.valueOf(response.getProcessed());
1228 } catch (ServiceException se) {
1229 throw ProtobufUtil.getRemoteException(se);
1230 }
1231 }
1232 };
1233 return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1234 this.operationTimeout);
1235 }
1236
1237
1238
1239
1240 @Override
1241 public boolean checkAndPut(final byte [] row, final byte [] family,
1242 final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1243 final Put put)
1244 throws IOException {
1245 RegionServerCallable<Boolean> callable =
1246 new RegionServerCallable<Boolean>(connection, getName(), row) {
1247 @Override
1248 public Boolean call(int callTimeout) throws IOException {
1249 PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1250 controller.setPriority(tableName);
1251 controller.setCallTimeout(callTimeout);
1252 try {
1253 CompareType compareType = CompareType.valueOf(compareOp.name());
1254 MutateRequest request = RequestConverter.buildMutateRequest(
1255 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1256 new BinaryComparator(value), compareType, put);
1257 MutateResponse response = getStub().mutate(controller, request);
1258 return Boolean.valueOf(response.getProcessed());
1259 } catch (ServiceException se) {
1260 throw ProtobufUtil.getRemoteException(se);
1261 }
1262 }
1263 };
1264 return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1265 this.operationTimeout);
1266 }
1267
1268
1269
1270
1271 @Override
1272 public boolean checkAndDelete(final byte [] row,
1273 final byte [] family, final byte [] qualifier, final byte [] value,
1274 final Delete delete)
1275 throws IOException {
1276 RegionServerCallable<Boolean> callable =
1277 new RegionServerCallable<Boolean>(connection, getName(), row) {
1278 @Override
1279 public Boolean call(int callTimeout) throws IOException {
1280 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1281 controller.setPriority(tableName);
1282 controller.setCallTimeout(callTimeout);
1283 try {
1284 MutateRequest request = RequestConverter.buildMutateRequest(
1285 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1286 new BinaryComparator(value), CompareType.EQUAL, delete);
1287 MutateResponse response = getStub().mutate(controller, request);
1288 return Boolean.valueOf(response.getProcessed());
1289 } catch (ServiceException se) {
1290 throw ProtobufUtil.getRemoteException(se);
1291 }
1292 }
1293 };
1294 return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1295 this.operationTimeout);
1296 }
1297
1298
1299
1300
1301 @Override
1302 public boolean checkAndDelete(final byte [] row, final byte [] family,
1303 final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1304 final Delete delete)
1305 throws IOException {
1306 RegionServerCallable<Boolean> callable =
1307 new RegionServerCallable<Boolean>(connection, getName(), row) {
1308 @Override
1309 public Boolean call(int callTimeout) throws IOException {
1310 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1311 controller.setPriority(tableName);
1312 controller.setCallTimeout(callTimeout);
1313 try {
1314 CompareType compareType = CompareType.valueOf(compareOp.name());
1315 MutateRequest request = RequestConverter.buildMutateRequest(
1316 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1317 new BinaryComparator(value), compareType, delete);
1318 MutateResponse response = getStub().mutate(controller, request);
1319 return Boolean.valueOf(response.getProcessed());
1320 } catch (ServiceException se) {
1321 throw ProtobufUtil.getRemoteException(se);
1322 }
1323 }
1324 };
1325 return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1326 this.operationTimeout);
1327 }
1328
1329
1330
1331
1332 @Override
1333 public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
1334 final CompareOp compareOp, final byte [] value, final RowMutations rm)
1335 throws IOException {
1336 RegionServerCallable<Boolean> callable =
1337 new RegionServerCallable<Boolean>(connection, getName(), row) {
1338 @Override
1339 public Boolean call(int callTimeout) throws IOException {
1340 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1341 controller.setPriority(tableName);
1342 controller.setCallTimeout(callTimeout);
1343 try {
1344 CompareType compareType = CompareType.valueOf(compareOp.name());
1345 MultiRequest request = RequestConverter.buildMutateRequest(
1346 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1347 new BinaryComparator(value), compareType, rm);
1348 ClientProtos.MultiResponse response = getStub().multi(controller, request);
1349 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1350 if (res.hasException()) {
1351 Throwable ex = ProtobufUtil.toException(res.getException());
1352 if(ex instanceof IOException) {
1353 throw (IOException)ex;
1354 }
1355 throw new IOException("Failed to checkAndMutate row: "+
1356 Bytes.toStringBinary(rm.getRow()), ex);
1357 }
1358 return Boolean.valueOf(response.getProcessed());
1359 } catch (ServiceException se) {
1360 throw ProtobufUtil.getRemoteException(se);
1361 }
1362 }
1363 };
1364 return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1365 }
1366
1367
1368
1369
1370 @Override
1371 public boolean exists(final Get get) throws IOException {
1372 Result r = get(get, true);
1373 assert r.getExists() != null;
1374 return r.getExists();
1375 }
1376
1377
1378
1379
1380 @Override
1381 public boolean[] existsAll(final List<Get> gets) throws IOException {
1382 if (gets.isEmpty()) return new boolean[]{};
1383 if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
1384
1385 ArrayList<Get> exists = new ArrayList<Get>(gets.size());
1386 for (Get g: gets){
1387 Get ge = new Get(g);
1388 ge.setCheckExistenceOnly(true);
1389 exists.add(ge);
1390 }
1391
1392 Object[] r1;
1393 try {
1394 r1 = batch(exists);
1395 } catch (InterruptedException e) {
1396 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1397 }
1398
1399
1400 boolean[] results = new boolean[r1.length];
1401 int i = 0;
1402 for (Object o : r1) {
1403
1404 results[i++] = ((Result)o).getExists();
1405 }
1406
1407 return results;
1408 }
1409
1410
1411
1412
1413 @Override
1414 @Deprecated
1415 public Boolean[] exists(final List<Get> gets) throws IOException {
1416 boolean[] results = existsAll(gets);
1417 Boolean[] objectResults = new Boolean[results.length];
1418 for (int i = 0; i < results.length; ++i) {
1419 objectResults[i] = results[i];
1420 }
1421 return objectResults;
1422 }
1423
1424
1425
1426
1427
1428 @Override
1429 public void flushCommits() throws IOException {
1430 if (mutator == null) {
1431
1432 return;
1433 }
1434 getBufferedMutator().flush();
1435 }
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448 public <R> void processBatchCallback(
1449 final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1450 throws IOException, InterruptedException {
1451 this.batchCallback(list, results, callback);
1452 }
1453
1454
1455
1456
1457
1458
1459 public void processBatch(final List<? extends Row> list, final Object[] results)
1460 throws IOException, InterruptedException {
1461 this.batch(list, results);
1462 }
1463
1464
1465 @Override
1466 public void close() throws IOException {
1467 if (this.closed) {
1468 return;
1469 }
1470 flushCommits();
1471 if (cleanupPoolOnClose) {
1472 this.pool.shutdown();
1473 try {
1474 boolean terminated = false;
1475 do {
1476
1477 terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1478 } while (!terminated);
1479 } catch (InterruptedException e) {
1480 this.pool.shutdownNow();
1481 LOG.warn("waitForTermination interrupted");
1482 }
1483 }
1484 if (cleanupConnectionOnClose) {
1485 if (this.connection != null) {
1486 this.connection.close();
1487 }
1488 }
1489 this.closed = true;
1490 }
1491
1492
1493 public void validatePut(final Put put) throws IllegalArgumentException {
1494 validatePut(put, connConfiguration.getMaxKeyValueSize());
1495 }
1496
1497
1498 public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1499 if (put.isEmpty()) {
1500 throw new IllegalArgumentException("No columns to insert");
1501 }
1502 if (maxKeyValueSize > 0) {
1503 for (List<Cell> list : put.getFamilyCellMap().values()) {
1504 for (Cell cell : list) {
1505 if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1506 throw new IllegalArgumentException("KeyValue size too large");
1507 }
1508 }
1509 }
1510 }
1511 }
1512
1513
1514
1515
1516 @Override
1517 public boolean isAutoFlush() {
1518 return autoFlush;
1519 }
1520
1521
1522
1523
1524 @Deprecated
1525 @Override
1526 public void setAutoFlush(boolean autoFlush) {
1527 this.autoFlush = autoFlush;
1528 }
1529
1530
1531
1532
1533 @Override
1534 public void setAutoFlushTo(boolean autoFlush) {
1535 this.autoFlush = autoFlush;
1536 }
1537
1538
1539
1540
1541 @Override
1542 public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1543 this.autoFlush = autoFlush;
1544 }
1545
1546
1547
1548
1549
1550
1551
1552
1553 @Override
1554 public long getWriteBufferSize() {
1555 if (mutator == null) {
1556 return connConfiguration.getWriteBufferSize();
1557 } else {
1558 return mutator.getWriteBufferSize();
1559 }
1560 }
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570 @Override
1571 public void setWriteBufferSize(long writeBufferSize) throws IOException {
1572 getBufferedMutator();
1573 mutator.setWriteBufferSize(writeBufferSize);
1574 }
1575
1576
1577
1578
1579
1580 ExecutorService getPool() {
1581 return this.pool;
1582 }
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594 @Deprecated
1595 public static void setRegionCachePrefetch(final byte[] tableName,
1596 final boolean enable) throws IOException {
1597 }
1598
1599
1600
1601
1602 @Deprecated
1603 public static void setRegionCachePrefetch(
1604 final TableName tableName,
1605 final boolean enable) throws IOException {
1606 }
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619 @Deprecated
1620 public static void setRegionCachePrefetch(final Configuration conf,
1621 final byte[] tableName, final boolean enable) throws IOException {
1622 }
1623
1624
1625
1626
1627 @Deprecated
1628 public static void setRegionCachePrefetch(final Configuration conf,
1629 final TableName tableName,
1630 final boolean enable) throws IOException {
1631 }
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642 @Deprecated
1643 public static boolean getRegionCachePrefetch(final Configuration conf,
1644 final byte[] tableName) throws IOException {
1645 return false;
1646 }
1647
1648
1649
1650
1651 @Deprecated
1652 public static boolean getRegionCachePrefetch(final Configuration conf,
1653 final TableName tableName) throws IOException {
1654 return false;
1655 }
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665 @Deprecated
1666 public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1667 return false;
1668 }
1669
1670
1671
1672
1673 @Deprecated
1674 public static boolean getRegionCachePrefetch(
1675 final TableName tableName) throws IOException {
1676 return false;
1677 }
1678
1679
1680
1681
1682
1683 public void clearRegionCache() {
1684 this.connection.clearRegionCache();
1685 }
1686
1687
1688
1689
1690 @Override
1691 public CoprocessorRpcChannel coprocessorService(byte[] row) {
1692 return new RegionCoprocessorRpcChannel(connection, tableName, row);
1693 }
1694
1695
1696
1697
1698 @Override
1699 public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1700 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1701 throws ServiceException, Throwable {
1702 final Map<byte[],R> results = Collections.synchronizedMap(
1703 new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1704 coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1705 @Override
1706 public void update(byte[] region, byte[] row, R value) {
1707 if (region != null) {
1708 results.put(region, value);
1709 }
1710 }
1711 });
1712 return results;
1713 }
1714
1715
1716
1717
1718 @Override
1719 public <T extends Service, R> void coprocessorService(final Class<T> service,
1720 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1721 final Batch.Callback<R> callback) throws ServiceException, Throwable {
1722
1723
1724 List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1725
1726 Map<byte[],Future<R>> futures =
1727 new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1728 for (final byte[] r : keys) {
1729 final RegionCoprocessorRpcChannel channel =
1730 new RegionCoprocessorRpcChannel(connection, tableName, r);
1731 Future<R> future = pool.submit(
1732 new Callable<R>() {
1733 @Override
1734 public R call() throws Exception {
1735 T instance = ProtobufUtil.newServiceStub(service, channel);
1736 R result = callable.call(instance);
1737 byte[] region = channel.getLastRegion();
1738 if (callback != null) {
1739 callback.update(region, r, result);
1740 }
1741 return result;
1742 }
1743 });
1744 futures.put(r, future);
1745 }
1746 for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1747 try {
1748 e.getValue().get();
1749 } catch (ExecutionException ee) {
1750 LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1751 + Bytes.toStringBinary(e.getKey()), ee);
1752 throw ee.getCause();
1753 } catch (InterruptedException ie) {
1754 throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1755 + " for row " + Bytes.toStringBinary(e.getKey()))
1756 .initCause(ie);
1757 }
1758 }
1759 }
1760
1761 private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1762 throws IOException {
1763 if (start == null) {
1764 start = HConstants.EMPTY_START_ROW;
1765 }
1766 if (end == null) {
1767 end = HConstants.EMPTY_END_ROW;
1768 }
1769 return getKeysAndRegionsInRange(start, end, true).getFirst();
1770 }
1771
1772 public void setOperationTimeout(int operationTimeout) {
1773 this.operationTimeout = operationTimeout;
1774 }
1775
1776 public int getOperationTimeout() {
1777 return operationTimeout;
1778 }
1779
1780 public void setRpcTimeout(int rpcTimeout) {
1781 this.rpcTimeout = rpcTimeout;
1782 }
1783
1784 public int getRpcTimeout() {
1785 return rpcTimeout;
1786 }
1787
1788 @Override
1789 public String toString() {
1790 return tableName + ";" + connection;
1791 }
1792
1793
1794
1795
1796 @Override
1797 public <R extends Message> Map<byte[], R> batchCoprocessorService(
1798 Descriptors.MethodDescriptor methodDescriptor, Message request,
1799 byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1800 final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1801 Bytes.BYTES_COMPARATOR));
1802 batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1803 new Callback<R>() {
1804
1805 @Override
1806 public void update(byte[] region, byte[] row, R result) {
1807 if (region != null) {
1808 results.put(region, result);
1809 }
1810 }
1811 });
1812 return results;
1813 }
1814
1815
1816
1817
1818 @Override
1819 public <R extends Message> void batchCoprocessorService(
1820 final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1821 byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1822 throws ServiceException, Throwable {
1823
1824 if (startKey == null) {
1825 startKey = HConstants.EMPTY_START_ROW;
1826 }
1827 if (endKey == null) {
1828 endKey = HConstants.EMPTY_END_ROW;
1829 }
1830
1831 Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1832 getKeysAndRegionsInRange(startKey, endKey, true);
1833 List<byte[]> keys = keysAndRegions.getFirst();
1834 List<HRegionLocation> regions = keysAndRegions.getSecond();
1835
1836
1837 if (keys.isEmpty()) {
1838 LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1839 ", end=" + Bytes.toStringBinary(endKey));
1840 return;
1841 }
1842
1843 List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1844 final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1845 new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1846 for (int i = 0; i < keys.size(); i++) {
1847 final byte[] rowKey = keys.get(i);
1848 final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1849 RegionCoprocessorServiceExec exec =
1850 new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1851 execs.add(exec);
1852 execsByRow.put(rowKey, exec);
1853 }
1854
1855
1856
1857 final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1858 final List<Row> callbackErrorActions = new ArrayList<Row>();
1859 final List<String> callbackErrorServers = new ArrayList<String>();
1860 Object[] results = new Object[execs.size()];
1861
1862 AsyncProcess asyncProcess =
1863 new AsyncProcess(connection, configuration, pool,
1864 RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1865 true, RpcControllerFactory.instantiate(configuration));
1866
1867 AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1868 new Callback<ClientProtos.CoprocessorServiceResult>() {
1869 @Override
1870 public void update(byte[] region, byte[] row,
1871 ClientProtos.CoprocessorServiceResult serviceResult) {
1872 if (LOG.isTraceEnabled()) {
1873 LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1874 ": region=" + Bytes.toStringBinary(region) +
1875 ", row=" + Bytes.toStringBinary(row) +
1876 ", value=" + serviceResult.getValue().getValue());
1877 }
1878 try {
1879 Message.Builder builder = responsePrototype.newBuilderForType();
1880 ProtobufUtil.mergeFrom(builder, serviceResult.getValue().getValue());
1881 callback.update(region, row, (R) builder.build());
1882 } catch (IOException e) {
1883 LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1884 e);
1885 callbackErrorExceptions.add(e);
1886 callbackErrorActions.add(execsByRow.get(row));
1887 callbackErrorServers.add("null");
1888 }
1889 }
1890 }, results);
1891
1892 future.waitUntilDone();
1893
1894 if (future.hasError()) {
1895 throw future.getErrors();
1896 } else if (!callbackErrorExceptions.isEmpty()) {
1897 throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1898 callbackErrorServers);
1899 }
1900 }
1901
1902 public RegionLocator getRegionLocator() {
1903 return this.locator;
1904 }
1905
1906 @VisibleForTesting
1907 BufferedMutator getBufferedMutator() throws IOException {
1908 if (mutator == null) {
1909 this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
1910 new BufferedMutatorParams(tableName)
1911 .pool(pool)
1912 .writeBufferSize(connConfiguration.getWriteBufferSize())
1913 .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
1914 );
1915 }
1916 return mutator;
1917 }
1918 }