View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.NavigableMap;
28  import java.util.TreeMap;
29  import java.util.concurrent.Callable;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ExecutorService;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.SynchronousQueue;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.TimeUnit;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HRegionLocation;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.KeyValueUtil;
47  import org.apache.hadoop.hbase.ServerName;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.TableNotFoundException;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.classification.InterfaceStability;
52  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
53  import org.apache.hadoop.hbase.client.coprocessor.Batch;
54  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
55  import org.apache.hadoop.hbase.filter.BinaryComparator;
56  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
57  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
58  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
59  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
60  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
61  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
62  import org.apache.hadoop.hbase.protobuf.RequestConverter;
63  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
64  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
65  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
66  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
68  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
69  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
70  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
71  import org.apache.hadoop.hbase.util.Bytes;
72  import org.apache.hadoop.hbase.util.Pair;
73  import org.apache.hadoop.hbase.util.Threads;
74  
75  import com.google.common.annotations.VisibleForTesting;
76  import com.google.protobuf.Descriptors;
77  import com.google.protobuf.InvalidProtocolBufferException;
78  import com.google.protobuf.Message;
79  import com.google.protobuf.Service;
80  import com.google.protobuf.ServiceException;
81  
82  /**
83   * An implementation of {@link Table}. Used to communicate with a single HBase table.
84   * Lightweight. Get as needed and just close when done.
85   * Instances of this class SHOULD NOT be constructed directly.
86   * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
87   * class comment for an example of how.
88   *
89   * <p>This class is NOT thread safe for reads nor writes.
90   * In the case of writes (Put, Delete), the underlying write buffer can
91   * be corrupted if multiple threads contend over a single HTable instance.
92   * In the case of reads, some fields used by a Scan are shared among all threads.
93   *
94   * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
95   * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
96   * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
97   * Interface Classification</a>
98   * There are no guarantees for backwards source / binary compatibility and methods or class can
99   * change or go away without deprecation.
100  *
101  * @see Table
102  * @see Admin
103  * @see Connection
104  * @see ConnectionFactory
105  */
106 @InterfaceAudience.Private
107 @InterfaceStability.Stable
108 public class HTable implements HTableInterface {
109   private static final Log LOG = LogFactory.getLog(HTable.class);
110   protected ClusterConnection connection;
111   private final TableName tableName;
112   private volatile Configuration configuration;
113   private TableConfiguration tableConfiguration;
114   protected BufferedMutatorImpl mutator;
115   private boolean autoFlush = true;
116   private boolean closed = false;
117   protected int scannerCaching;
118   private ExecutorService pool;  // For Multi & Scan
119   private int operationTimeout;
120   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
121   private final boolean cleanupConnectionOnClose; // close the connection in close()
122   private Consistency defaultConsistency = Consistency.STRONG;
123   private HRegionLocator locator;
124 
125   /** The Async process for batch */
126   protected AsyncProcess multiAp;
127   private RpcRetryingCallerFactory rpcCallerFactory;
128   private RpcControllerFactory rpcControllerFactory;
129 
130   /**
131    * Creates an object to access a HBase table.
132    * @param conf Configuration object to use.
133    * @param tableName Name of the table.
134    * @throws IOException if a remote or network exception occurs
135    * @deprecated Constructing HTable objects manually has been deprecated. Please use
136    * {@link Connection} to instantiate a {@link Table} instead.
137    */
138   @Deprecated
139   public HTable(Configuration conf, final String tableName)
140   throws IOException {
141     this(conf, TableName.valueOf(tableName));
142   }
143 
144   /**
145    * Creates an object to access a HBase table.
146    * @param conf Configuration object to use.
147    * @param tableName Name of the table.
148    * @throws IOException if a remote or network exception occurs
149    * @deprecated Constructing HTable objects manually has been deprecated. Please use
150    * {@link Connection} to instantiate a {@link Table} instead.
151    */
152   @Deprecated
153   public HTable(Configuration conf, final byte[] tableName)
154   throws IOException {
155     this(conf, TableName.valueOf(tableName));
156   }
157 
158   /**
159    * Creates an object to access a HBase table.
160    * @param conf Configuration object to use.
161    * @param tableName table name pojo
162    * @throws IOException if a remote or network exception occurs
163    * @deprecated Constructing HTable objects manually has been deprecated. Please use
164    * {@link Connection} to instantiate a {@link Table} instead.
165    */
166   @Deprecated
167   public HTable(Configuration conf, final TableName tableName)
168   throws IOException {
169     this.tableName = tableName;
170     this.cleanupPoolOnClose = true;
171     this.cleanupConnectionOnClose = true;
172     if (conf == null) {
173       this.connection = null;
174       return;
175     }
176     this.connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
177     this.configuration = conf;
178 
179     this.pool = getDefaultExecutor(conf);
180     this.finishSetup();
181   }
182 
183   /**
184    * Creates an object to access a HBase table.
185    * @param tableName Name of the table.
186    * @param connection HConnection to be used.
187    * @throws IOException if a remote or network exception occurs
188    * @deprecated Do not use.
189    */
190   @Deprecated
191   public HTable(TableName tableName, Connection connection) throws IOException {
192     this.tableName = tableName;
193     this.cleanupPoolOnClose = true;
194     this.cleanupConnectionOnClose = false;
195     this.connection = (ClusterConnection)connection;
196     this.configuration = connection.getConfiguration();
197 
198     this.pool = getDefaultExecutor(this.configuration);
199     this.finishSetup();
200   }
201 
202   // Marked Private @since 1.0
203   @InterfaceAudience.Private
204   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
205     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
206     if (maxThreads == 0) {
207       maxThreads = 1; // is there a better default?
208     }
209     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
210 
211     // Using the "direct handoff" approach, new threads will only be created
212     // if it is necessary and will grow unbounded. This could be bad but in HCM
213     // we only create as many Runnables as there are region servers. It means
214     // it also scales when new region servers are added.
215     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
216         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
217     pool.allowCoreThreadTimeOut(true);
218     return pool;
219   }
220 
221   /**
222    * Creates an object to access a HBase table.
223    * @param conf Configuration object to use.
224    * @param tableName Name of the table.
225    * @param pool ExecutorService to be used.
226    * @throws IOException if a remote or network exception occurs
227    * @deprecated Constructing HTable objects manually has been deprecated. Please use
228    * {@link Connection} to instantiate a {@link Table} instead.
229    */
230   @Deprecated
231   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
232       throws IOException {
233     this(conf, TableName.valueOf(tableName), pool);
234   }
235 
236   /**
237    * Creates an object to access a HBase table.
238    * @param conf Configuration object to use.
239    * @param tableName Name of the table.
240    * @param pool ExecutorService to be used.
241    * @throws IOException if a remote or network exception occurs
242    * @deprecated Constructing HTable objects manually has been deprecated. Please use
243    * {@link Connection} to instantiate a {@link Table} instead.
244    */
245   @Deprecated
246   public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
247       throws IOException {
248     this.connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
249     this.configuration = conf;
250     this.pool = pool;
251     if (pool == null) {
252       this.pool = getDefaultExecutor(conf);
253       this.cleanupPoolOnClose = true;
254     } else {
255       this.cleanupPoolOnClose = false;
256     }
257     this.tableName = tableName;
258     this.cleanupConnectionOnClose = true;
259     this.finishSetup();
260   }
261 
262   /**
263    * Creates an object to access a HBase table.
264    * @param tableName Name of the table.
265    * @param connection HConnection to be used.
266    * @param pool ExecutorService to be used.
267    * @throws IOException if a remote or network exception occurs.
268    * @deprecated Do not use, internal ctor.
269    */
270   @Deprecated
271   public HTable(final byte[] tableName, final Connection connection,
272       final ExecutorService pool) throws IOException {
273     this(TableName.valueOf(tableName), connection, pool);
274   }
275 
276   /** @deprecated Do not use, internal ctor. */
277   @Deprecated
278   public HTable(TableName tableName, final Connection connection,
279       final ExecutorService pool) throws IOException {
280     this(tableName, (ClusterConnection)connection, null, null, null, pool);
281   }
282 
283   /**
284    * Creates an object to access a HBase table.
285    * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
286    * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
287    * @param tableName Name of the table.
288    * @param connection HConnection to be used.
289    * @param pool ExecutorService to be used.
290    * @throws IOException if a remote or network exception occurs
291    */
292   @InterfaceAudience.Private
293   public HTable(TableName tableName, final ClusterConnection connection,
294       final TableConfiguration tableConfig,
295       final RpcRetryingCallerFactory rpcCallerFactory,
296       final RpcControllerFactory rpcControllerFactory,
297       final ExecutorService pool) throws IOException {
298     if (connection == null || connection.isClosed()) {
299       throw new IllegalArgumentException("Connection is null or closed.");
300     }
301     this.tableName = tableName;
302     this.cleanupConnectionOnClose = false;
303     this.connection = connection;
304     this.configuration = connection.getConfiguration();
305     this.tableConfiguration = tableConfig;
306     this.pool = pool;
307     if (pool == null) {
308       this.pool = getDefaultExecutor(this.configuration);
309       this.cleanupPoolOnClose = true;
310     } else {
311       this.cleanupPoolOnClose = false;
312     }
313 
314     this.rpcCallerFactory = rpcCallerFactory;
315     this.rpcControllerFactory = rpcControllerFactory;
316 
317     this.finishSetup();
318   }
319 
320   /**
321    * For internal testing. Uses Connection provided in {@code params}.
322    * @throws IOException
323    */
324   @VisibleForTesting
325   protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
326     connection = conn;
327     tableName = params.getTableName();
328     tableConfiguration = new TableConfiguration(connection.getConfiguration());
329     cleanupPoolOnClose = false;
330     cleanupConnectionOnClose = false;
331     // used from tests, don't trust the connection is real
332     this.mutator = new BufferedMutatorImpl(conn, null, null, params);
333   }
334 
335   /**
336    * @return maxKeyValueSize from configuration.
337    */
338   public static int getMaxKeyValueSize(Configuration conf) {
339     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
340   }
341 
342   /**
343    * setup this HTable's parameter based on the passed configuration
344    */
345   private void finishSetup() throws IOException {
346     if (tableConfiguration == null) {
347       tableConfiguration = new TableConfiguration(configuration);
348     }
349 
350     this.operationTimeout = tableName.isSystemTable() ?
351         tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
352     this.scannerCaching = tableConfiguration.getScannerCaching();
353 
354     if (this.rpcCallerFactory == null) {
355       this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
356     }
357     if (this.rpcControllerFactory == null) {
358       this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
359     }
360 
361     // puts need to track errors globally due to how the APIs currently work.
362     multiAp = this.connection.getAsyncProcess();
363     this.locator = new HRegionLocator(getName(), connection);
364   }
365 
366   /**
367    * {@inheritDoc}
368    */
369   @Override
370   public Configuration getConfiguration() {
371     return configuration;
372   }
373 
374   /**
375    * Tells whether or not a table is enabled or not. This method creates a
376    * new HBase configuration, so it might make your unit tests fail due to
377    * incorrect ZK client port.
378    * @param tableName Name of table to check.
379    * @return {@code true} if table is online.
380    * @throws IOException if a remote or network exception occurs
381    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
382    */
383   @Deprecated
384   public static boolean isTableEnabled(String tableName) throws IOException {
385     return isTableEnabled(TableName.valueOf(tableName));
386   }
387 
388   /**
389    * Tells whether or not a table is enabled or not. This method creates a
390    * new HBase configuration, so it might make your unit tests fail due to
391    * incorrect ZK client port.
392    * @param tableName Name of table to check.
393    * @return {@code true} if table is online.
394    * @throws IOException if a remote or network exception occurs
395    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
396    */
397   @Deprecated
398   public static boolean isTableEnabled(byte[] tableName) throws IOException {
399     return isTableEnabled(TableName.valueOf(tableName));
400   }
401 
402   /**
403    * Tells whether or not a table is enabled or not. This method creates a
404    * new HBase configuration, so it might make your unit tests fail due to
405    * incorrect ZK client port.
406    * @param tableName Name of table to check.
407    * @return {@code true} if table is online.
408    * @throws IOException if a remote or network exception occurs
409    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
410    */
411   @Deprecated
412   public static boolean isTableEnabled(TableName tableName) throws IOException {
413     return isTableEnabled(HBaseConfiguration.create(), tableName);
414   }
415 
416   /**
417    * Tells whether or not a table is enabled or not.
418    * @param conf The Configuration object to use.
419    * @param tableName Name of table to check.
420    * @return {@code true} if table is online.
421    * @throws IOException if a remote or network exception occurs
422    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
423    */
424   @Deprecated
425   public static boolean isTableEnabled(Configuration conf, String tableName)
426   throws IOException {
427     return isTableEnabled(conf, TableName.valueOf(tableName));
428   }
429 
430   /**
431    * Tells whether or not a table is enabled or not.
432    * @param conf The Configuration object to use.
433    * @param tableName Name of table to check.
434    * @return {@code true} if table is online.
435    * @throws IOException if a remote or network exception occurs
436    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
437    */
438   @Deprecated
439   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
440   throws IOException {
441     return isTableEnabled(conf, TableName.valueOf(tableName));
442   }
443 
444   /**
445    * Tells whether or not a table is enabled or not.
446    * @param conf The Configuration object to use.
447    * @param tableName Name of table to check.
448    * @return {@code true} if table is online.
449    * @throws IOException if a remote or network exception occurs
450    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
451    */
452   @Deprecated
453   public static boolean isTableEnabled(Configuration conf,
454       final TableName tableName) throws IOException {
455     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
456       @Override
457       public Boolean connect(HConnection connection) throws IOException {
458         return connection.isTableEnabled(tableName);
459       }
460     });
461   }
462 
463   /**
464    * Find region location hosting passed row using cached info
465    * @param row Row to find.
466    * @return The location of the given row.
467    * @throws IOException if a remote or network exception occurs
468    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])}
469    */
470   @Deprecated
471   public HRegionLocation getRegionLocation(final String row)
472   throws IOException {
473     return getRegionLocation(Bytes.toBytes(row), false);
474   }
475 
476   /**
477    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])} instead.
478    */
479   @Deprecated
480   public HRegionLocation getRegionLocation(final byte [] row)
481   throws IOException {
482     return locator.getRegionLocation(row);
483   }
484 
485   /**
486    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[], boolean)} instead.
487    */
488   @Deprecated
489   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
490   throws IOException {
491     return locator.getRegionLocation(row, reload);
492   }
493 
494   /**
495    * {@inheritDoc}
496    */
497   @Override
498   public byte [] getTableName() {
499     return this.tableName.getName();
500   }
501 
502   @Override
503   public TableName getName() {
504     return tableName;
505   }
506 
507   /**
508    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
509    * manipulations.
510    * @return An HConnection instance.
511    * @deprecated This method will be changed from public to package protected.
512    */
513   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
514   @Deprecated
515   @VisibleForTesting
516   public HConnection getConnection() {
517     return this.connection;
518   }
519 
520   /**
521    * Gets the number of rows that a scanner will fetch at once.
522    * <p>
523    * The default value comes from {@code hbase.client.scanner.caching}.
524    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
525    */
526   @Deprecated
527   public int getScannerCaching() {
528     return scannerCaching;
529   }
530 
531   /**
532    * Kept in 0.96 for backward compatibility
533    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
534    */
535   @Deprecated
536   public List<Row> getWriteBuffer() {
537     return mutator == null ? null : mutator.getWriteBuffer();
538   }
539 
540   /**
541    * Sets the number of rows that a scanner will fetch at once.
542    * <p>
543    * This will override the value specified by
544    * {@code hbase.client.scanner.caching}.
545    * Increasing this value will reduce the amount of work needed each time
546    * {@code next()} is called on a scanner, at the expense of memory use
547    * (since more rows will need to be maintained in memory by the scanners).
548    * @param scannerCaching the number of rows a scanner will fetch at once.
549    * @deprecated Use {@link Scan#setCaching(int)}
550    */
551   @Deprecated
552   public void setScannerCaching(int scannerCaching) {
553     this.scannerCaching = scannerCaching;
554   }
555 
556   /**
557    * {@inheritDoc}
558    */
559   @Override
560   public HTableDescriptor getTableDescriptor() throws IOException {
561     // TODO: This is the same as HBaseAdmin.getTableDescriptor(). Only keep one.
562     if (tableName == null) return null;
563     if (tableName.equals(TableName.META_TABLE_NAME)) {
564       return HTableDescriptor.META_TABLEDESC;
565     }
566     HTableDescriptor htd = executeMasterCallable(
567       new MasterCallable<HTableDescriptor>(getConnection()) {
568       @Override
569       public HTableDescriptor call(int callTimeout) throws ServiceException {
570         GetTableDescriptorsResponse htds;
571         GetTableDescriptorsRequest req =
572             RequestConverter.buildGetTableDescriptorsRequest(tableName);
573         htds = master.getTableDescriptors(null, req);
574 
575         if (!htds.getTableSchemaList().isEmpty()) {
576           return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
577         }
578         return null;
579       }
580     });
581     if (htd != null) {
582       return new UnmodifyableHTableDescriptor(htd);
583     }
584     throw new TableNotFoundException(tableName.getNameAsString());
585   }
586 
587   private <V> V executeMasterCallable(MasterCallable<V> callable) throws IOException {
588     RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller();
589     try {
590       return caller.callWithRetries(callable, operationTimeout);
591     } finally {
592       callable.close();
593     }
594   }
595 
596   /**
597    * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead;
598    */
599   @Deprecated
600   public byte [][] getStartKeys() throws IOException {
601     return locator.getStartKeys();
602   }
603 
604   /**
605    * @deprecated Use {@link RegionLocator#getEndKeys()} instead;
606    */
607   @Deprecated
608   public byte[][] getEndKeys() throws IOException {
609     return locator.getEndKeys();
610   }
611 
612   /**
613    * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead;
614    */
615   @Deprecated
616   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
617     return locator.getStartEndKeys();
618   }
619 
620   /**
621    * Gets all the regions and their address for this table.
622    * <p>
623    * This is mainly useful for the MapReduce integration.
624    * @return A map of HRegionInfo with it's server address
625    * @throws IOException if a remote or network exception occurs
626    * @deprecated This is no longer a public API.  Use {@link #getAllRegionLocations()} instead.
627    */
628   @Deprecated
629   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
630     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocator, singular,
631     // returns an HRegionLocation.
632     return MetaScanner.allTableRegions(this.connection, getName());
633   }
634 
635   /**
636    * Gets all the regions and their address for this table.
637    * <p>
638    * This is mainly useful for the MapReduce integration.
639    * @return A map of HRegionInfo with it's server address
640    * @throws IOException if a remote or network exception occurs
641    *
642    * @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead;
643    */
644   @Deprecated
645   public List<HRegionLocation> getAllRegionLocations() throws IOException {
646     return locator.getAllRegionLocations();
647   }
648 
649   /**
650    * Get the corresponding regions for an arbitrary range of keys.
651    * <p>
652    * @param startKey Starting row in range, inclusive
653    * @param endKey Ending row in range, exclusive
654    * @return A list of HRegionLocations corresponding to the regions that
655    * contain the specified range
656    * @throws IOException if a remote or network exception occurs
657    * @deprecated This is no longer a public API
658    */
659   @Deprecated
660   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
661     final byte [] endKey) throws IOException {
662     return getRegionsInRange(startKey, endKey, false);
663   }
664 
665   /**
666    * Get the corresponding regions for an arbitrary range of keys.
667    * <p>
668    * @param startKey Starting row in range, inclusive
669    * @param endKey Ending row in range, exclusive
670    * @param reload true to reload information or false to use cached information
671    * @return A list of HRegionLocations corresponding to the regions that
672    * contain the specified range
673    * @throws IOException if a remote or network exception occurs
674    * @deprecated This is no longer a public API
675    */
676   @Deprecated
677   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
678       final byte [] endKey, final boolean reload) throws IOException {
679     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
680   }
681 
682   /**
683    * Get the corresponding start keys and regions for an arbitrary range of
684    * keys.
685    * <p>
686    * @param startKey Starting row in range, inclusive
687    * @param endKey Ending row in range
688    * @param includeEndKey true if endRow is inclusive, false if exclusive
689    * @return A pair of list of start keys and list of HRegionLocations that
690    *         contain the specified range
691    * @throws IOException if a remote or network exception occurs
692    * @deprecated This is no longer a public API
693    */
694   @Deprecated
695   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
696       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
697       throws IOException {
698     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
699   }
700 
701   /**
702    * Get the corresponding start keys and regions for an arbitrary range of
703    * keys.
704    * <p>
705    * @param startKey Starting row in range, inclusive
706    * @param endKey Ending row in range
707    * @param includeEndKey true if endRow is inclusive, false if exclusive
708    * @param reload true to reload information or false to use cached information
709    * @return A pair of list of start keys and list of HRegionLocations that
710    *         contain the specified range
711    * @throws IOException if a remote or network exception occurs
712    * @deprecated This is no longer a public API
713    */
714   @Deprecated
715   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
716       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
717       final boolean reload) throws IOException {
718     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
719     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
720       throw new IllegalArgumentException(
721         "Invalid range: " + Bytes.toStringBinary(startKey) +
722         " > " + Bytes.toStringBinary(endKey));
723     }
724     List<byte[]> keysInRange = new ArrayList<byte[]>();
725     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
726     byte[] currentKey = startKey;
727     do {
728       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
729       keysInRange.add(currentKey);
730       regionsInRange.add(regionLocation);
731       currentKey = regionLocation.getRegionInfo().getEndKey();
732     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
733         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
734             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
735     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
736         regionsInRange);
737   }
738 
739   /**
740    * {@inheritDoc}
741    * @deprecated Use reversed scan instead.
742    */
743    @Override
744    @Deprecated
745    public Result getRowOrBefore(final byte[] row, final byte[] family)
746        throws IOException {
747      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
748          tableName, row) {
749        @Override
750       public Result call(int callTimeout) throws IOException {
751          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
752          controller.setPriority(tableName);
753          controller.setCallTimeout(callTimeout);
754          ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
755              getLocation().getRegionInfo().getRegionName(), row, family);
756          try {
757            ClientProtos.GetResponse response = getStub().get(controller, request);
758            if (!response.hasResult()) return null;
759            return ProtobufUtil.toResult(response.getResult());
760          } catch (ServiceException se) {
761            throw ProtobufUtil.getRemoteException(se);
762          }
763        }
764      };
765      return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
766    }
767 
768   /**
769    * The underlying {@link HTable} must not be closed.
770    * {@link HTableInterface#getScanner(Scan)} has other usage details.
771    */
772   @Override
773   public ResultScanner getScanner(final Scan scan) throws IOException {
774     if (scan.getBatch() > 0 && scan.isSmall()) {
775       throw new IllegalArgumentException("Small scan should not be used with batching");
776     }
777     if (scan.getCaching() <= 0) {
778       scan.setCaching(getScannerCaching());
779     }
780 
781     if (scan.isReversed()) {
782       if (scan.isSmall()) {
783         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
784             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
785             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
786       } else {
787         return new ReversedClientScanner(getConfiguration(), scan, getName(),
788             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
789             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
790       }
791     }
792 
793     if (scan.isSmall()) {
794       return new ClientSmallScanner(getConfiguration(), scan, getName(),
795           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
796           pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
797     } else {
798       return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
799           this.rpcCallerFactory, this.rpcControllerFactory,
800           pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
801     }
802   }
803 
804   /**
805    * The underlying {@link HTable} must not be closed.
806    * {@link HTableInterface#getScanner(byte[])} has other usage details.
807    */
808   @Override
809   public ResultScanner getScanner(byte [] family) throws IOException {
810     Scan scan = new Scan();
811     scan.addFamily(family);
812     return getScanner(scan);
813   }
814 
815   /**
816    * The underlying {@link HTable} must not be closed.
817    * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
818    */
819   @Override
820   public ResultScanner getScanner(byte [] family, byte [] qualifier)
821   throws IOException {
822     Scan scan = new Scan();
823     scan.addColumn(family, qualifier);
824     return getScanner(scan);
825   }
826 
827   /**
828    * {@inheritDoc}
829    */
830   @Override
831   public Result get(final Get get) throws IOException {
832     if (get.getConsistency() == null){
833       get.setConsistency(defaultConsistency);
834     }
835 
836     if (get.getConsistency() == Consistency.STRONG) {
837       // Good old call.
838       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
839           getName(), get.getRow()) {
840         @Override
841         public Result call(int callTimeout) throws IOException {
842           ClientProtos.GetRequest request =
843               RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
844           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
845           controller.setPriority(tableName);
846           controller.setCallTimeout(callTimeout);
847           try {
848             ClientProtos.GetResponse response = getStub().get(controller, request);
849             if (response == null) return null;
850             return ProtobufUtil.toResult(response.getResult());
851           } catch (ServiceException se) {
852             throw ProtobufUtil.getRemoteException(se);
853           }
854         }
855       };
856       return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
857     }
858 
859     // Call that takes into account the replica
860     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
861       rpcControllerFactory, tableName, this.connection, get, pool,
862       tableConfiguration.getRetriesNumber(),
863       operationTimeout,
864       tableConfiguration.getPrimaryCallTimeoutMicroSecond());
865     return callable.call();
866   }
867 
868 
869   /**
870    * {@inheritDoc}
871    */
872   @Override
873   public Result[] get(List<Get> gets) throws IOException {
874     if (gets.size() == 1) {
875       return new Result[]{get(gets.get(0))};
876     }
877     try {
878       Object [] r1 = batch((List)gets);
879 
880       // translate.
881       Result [] results = new Result[r1.length];
882       int i=0;
883       for (Object o : r1) {
884         // batch ensures if there is a failure we get an exception instead
885         results[i++] = (Result) o;
886       }
887 
888       return results;
889     } catch (InterruptedException e) {
890       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
891     }
892   }
893 
894   /**
895    * {@inheritDoc}
896    */
897   @Override
898   public void batch(final List<? extends Row> actions, final Object[] results)
899       throws InterruptedException, IOException {
900     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
901     ars.waitUntilDone();
902     if (ars.hasError()) {
903       throw ars.getErrors();
904     }
905   }
906 
907   /**
908    * {@inheritDoc}
909    * @deprecated If any exception is thrown by one of the actions, there is no way to
910    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
911    */
912   @Deprecated
913   @Override
914   public Object[] batch(final List<? extends Row> actions)
915      throws InterruptedException, IOException {
916     Object[] results = new Object[actions.size()];
917     batch(actions, results);
918     return results;
919   }
920 
921   /**
922    * {@inheritDoc}
923    */
924   @Override
925   public <R> void batchCallback(
926       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
927       throws IOException, InterruptedException {
928     connection.processBatchCallback(actions, tableName, pool, results, callback);
929   }
930 
931   /**
932    * {@inheritDoc}
933    * @deprecated If any exception is thrown by one of the actions, there is no way to
934    * retrieve the partially executed results. Use
935    * {@link #batchCallback(List, Object[], Batch.Callback)}
936    * instead.
937    */
938   @Deprecated
939   @Override
940   public <R> Object[] batchCallback(
941     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
942       InterruptedException {
943     Object[] results = new Object[actions.size()];
944     batchCallback(actions, results, callback);
945     return results;
946   }
947 
948   /**
949    * {@inheritDoc}
950    */
951   @Override
952   public void delete(final Delete delete)
953   throws IOException {
954     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
955         tableName, delete.getRow()) {
956       @Override
957       public Boolean call(int callTimeout) throws IOException {
958         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
959         controller.setPriority(tableName);
960         controller.setCallTimeout(callTimeout);
961 
962         try {
963           MutateRequest request = RequestConverter.buildMutateRequest(
964             getLocation().getRegionInfo().getRegionName(), delete);
965           MutateResponse response = getStub().mutate(controller, request);
966           return Boolean.valueOf(response.getProcessed());
967         } catch (ServiceException se) {
968           throw ProtobufUtil.getRemoteException(se);
969         }
970       }
971     };
972     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
973   }
974 
975   /**
976    * {@inheritDoc}
977    */
978   @Override
979   public void delete(final List<Delete> deletes)
980   throws IOException {
981     Object[] results = new Object[deletes.size()];
982     try {
983       batch(deletes, results);
984     } catch (InterruptedException e) {
985       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
986     } finally {
987       // mutate list so that it is empty for complete success, or contains only failed records
988       // results are returned in the same order as the requests in list walk the list backwards,
989       // so we can remove from list without impacting the indexes of earlier members
990       for (int i = results.length - 1; i>=0; i--) {
991         // if result is not null, it succeeded
992         if (results[i] instanceof Result) {
993           deletes.remove(i);
994         }
995       }
996     }
997   }
998 
999   /**
1000    * {@inheritDoc}
1001    * @throws IOException
1002    */
1003   @Override
1004   public void put(final Put put) throws IOException {
1005     getBufferedMutator().mutate(put);
1006     if (autoFlush) {
1007       flushCommits();
1008     }
1009   }
1010 
1011   /**
1012    * {@inheritDoc}
1013    * @throws IOException
1014    */
1015   @Override
1016   public void put(final List<Put> puts) throws IOException {
1017     getBufferedMutator().mutate(puts);
1018     if (autoFlush) {
1019       flushCommits();
1020     }
1021   }
1022 
1023   /**
1024    * {@inheritDoc}
1025    */
1026   @Override
1027   public void mutateRow(final RowMutations rm) throws IOException {
1028     RegionServerCallable<Void> callable =
1029         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1030       @Override
1031       public Void call(int callTimeout) throws IOException {
1032         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1033         controller.setPriority(tableName);
1034         controller.setCallTimeout(callTimeout);
1035         try {
1036           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1037             getLocation().getRegionInfo().getRegionName(), rm);
1038           regionMutationBuilder.setAtomic(true);
1039           MultiRequest request =
1040             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1041           ClientProtos.MultiResponse response = getStub().multi(controller, request);
1042           ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1043           if (res.hasException()) {
1044             Throwable ex = ProtobufUtil.toException(res.getException());
1045             if(ex instanceof IOException) {
1046               throw (IOException)ex;
1047             }
1048             throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
1049           }
1050         } catch (ServiceException se) {
1051           throw ProtobufUtil.getRemoteException(se);
1052         }
1053         return null;
1054       }
1055     };
1056     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1057   }
1058 
1059   /**
1060    * {@inheritDoc}
1061    */
1062   @Override
1063   public Result append(final Append append) throws IOException {
1064     if (append.numFamilies() == 0) {
1065       throw new IOException(
1066           "Invalid arguments to append, no columns specified");
1067     }
1068 
1069     NonceGenerator ng = this.connection.getNonceGenerator();
1070     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1071     RegionServerCallable<Result> callable =
1072       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1073         @Override
1074         public Result call(int callTimeout) throws IOException {
1075           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1076           controller.setPriority(getTableName());
1077           controller.setCallTimeout(callTimeout);
1078           try {
1079             MutateRequest request = RequestConverter.buildMutateRequest(
1080               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1081             MutateResponse response = getStub().mutate(controller, request);
1082             if (!response.hasResult()) return null;
1083             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1084           } catch (ServiceException se) {
1085             throw ProtobufUtil.getRemoteException(se);
1086           }
1087         }
1088       };
1089     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1090   }
1091 
1092   /**
1093    * {@inheritDoc}
1094    */
1095   @Override
1096   public Result increment(final Increment increment) throws IOException {
1097     if (!increment.hasFamilies()) {
1098       throw new IOException(
1099           "Invalid arguments to increment, no columns specified");
1100     }
1101     NonceGenerator ng = this.connection.getNonceGenerator();
1102     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1103     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1104         getName(), increment.getRow()) {
1105       @Override
1106       public Result call(int callTimeout) throws IOException {
1107         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1108         controller.setPriority(getTableName());
1109         controller.setCallTimeout(callTimeout);
1110         try {
1111           MutateRequest request = RequestConverter.buildMutateRequest(
1112             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1113           MutateResponse response = getStub().mutate(controller, request);
1114           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1115         } catch (ServiceException se) {
1116           throw ProtobufUtil.getRemoteException(se);
1117         }
1118       }
1119     };
1120     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1121   }
1122 
1123   /**
1124    * {@inheritDoc}
1125    */
1126   @Override
1127   public long incrementColumnValue(final byte [] row, final byte [] family,
1128       final byte [] qualifier, final long amount)
1129   throws IOException {
1130     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1131   }
1132 
1133   /**
1134    * @deprecated Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
1135    */
1136   @Deprecated
1137   @Override
1138   public long incrementColumnValue(final byte [] row, final byte [] family,
1139       final byte [] qualifier, final long amount, final boolean writeToWAL)
1140   throws IOException {
1141     return incrementColumnValue(row, family, qualifier, amount,
1142       writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
1143   }
1144 
1145   /**
1146    * {@inheritDoc}
1147    */
1148   @Override
1149   public long incrementColumnValue(final byte [] row, final byte [] family,
1150       final byte [] qualifier, final long amount, final Durability durability)
1151   throws IOException {
1152     NullPointerException npe = null;
1153     if (row == null) {
1154       npe = new NullPointerException("row is null");
1155     } else if (family == null) {
1156       npe = new NullPointerException("family is null");
1157     } else if (qualifier == null) {
1158       npe = new NullPointerException("qualifier is null");
1159     }
1160     if (npe != null) {
1161       throw new IOException(
1162           "Invalid arguments to incrementColumnValue", npe);
1163     }
1164 
1165     NonceGenerator ng = this.connection.getNonceGenerator();
1166     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1167     RegionServerCallable<Long> callable =
1168       new RegionServerCallable<Long>(connection, getName(), row) {
1169         @Override
1170         public Long call(int callTimeout) throws IOException {
1171           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1172           controller.setPriority(getTableName());
1173           controller.setCallTimeout(callTimeout);
1174           try {
1175             MutateRequest request = RequestConverter.buildIncrementRequest(
1176               getLocation().getRegionInfo().getRegionName(), row, family,
1177               qualifier, amount, durability, nonceGroup, nonce);
1178             MutateResponse response = getStub().mutate(controller, request);
1179             Result result =
1180               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1181             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1182           } catch (ServiceException se) {
1183             throw ProtobufUtil.getRemoteException(se);
1184           }
1185         }
1186       };
1187     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1188   }
1189 
1190   /**
1191    * {@inheritDoc}
1192    */
1193   @Override
1194   public boolean checkAndPut(final byte [] row,
1195       final byte [] family, final byte [] qualifier, final byte [] value,
1196       final Put put)
1197   throws IOException {
1198     RegionServerCallable<Boolean> callable =
1199       new RegionServerCallable<Boolean>(connection, getName(), row) {
1200         @Override
1201         public Boolean call(int callTimeout) throws IOException {
1202           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1203           controller.setPriority(tableName);
1204           controller.setCallTimeout(callTimeout);
1205           try {
1206             MutateRequest request = RequestConverter.buildMutateRequest(
1207                 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1208                 new BinaryComparator(value), CompareType.EQUAL, put);
1209             MutateResponse response = getStub().mutate(controller, request);
1210             return Boolean.valueOf(response.getProcessed());
1211           } catch (ServiceException se) {
1212             throw ProtobufUtil.getRemoteException(se);
1213           }
1214         }
1215       };
1216     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1217   }
1218 
1219   /**
1220    * {@inheritDoc}
1221    */
1222   @Override
1223   public boolean checkAndPut(final byte [] row, final byte [] family,
1224       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1225       final Put put)
1226   throws IOException {
1227     RegionServerCallable<Boolean> callable =
1228       new RegionServerCallable<Boolean>(connection, getName(), row) {
1229         @Override
1230         public Boolean call(int callTimeout) throws IOException {
1231           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1232           controller.setPriority(tableName);
1233           controller.setCallTimeout(callTimeout);
1234           try {
1235             CompareType compareType = CompareType.valueOf(compareOp.name());
1236             MutateRequest request = RequestConverter.buildMutateRequest(
1237               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1238                 new BinaryComparator(value), compareType, put);
1239             MutateResponse response = getStub().mutate(controller, request);
1240             return Boolean.valueOf(response.getProcessed());
1241           } catch (ServiceException se) {
1242             throw ProtobufUtil.getRemoteException(se);
1243           }
1244         }
1245       };
1246     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1247   }
1248 
1249   /**
1250    * {@inheritDoc}
1251    */
1252   @Override
1253   public boolean checkAndDelete(final byte [] row,
1254       final byte [] family, final byte [] qualifier, final byte [] value,
1255       final Delete delete)
1256   throws IOException {
1257     RegionServerCallable<Boolean> callable =
1258       new RegionServerCallable<Boolean>(connection, getName(), row) {
1259         @Override
1260         public Boolean call(int callTimeout) throws IOException {
1261           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1262           controller.setPriority(tableName);
1263           controller.setCallTimeout(callTimeout);
1264           try {
1265             MutateRequest request = RequestConverter.buildMutateRequest(
1266               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1267                 new BinaryComparator(value), CompareType.EQUAL, delete);
1268             MutateResponse response = getStub().mutate(controller, request);
1269             return Boolean.valueOf(response.getProcessed());
1270           } catch (ServiceException se) {
1271             throw ProtobufUtil.getRemoteException(se);
1272           }
1273         }
1274       };
1275     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1276   }
1277 
1278   /**
1279    * {@inheritDoc}
1280    */
1281   @Override
1282   public boolean checkAndDelete(final byte [] row, final byte [] family,
1283       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1284       final Delete delete)
1285   throws IOException {
1286     RegionServerCallable<Boolean> callable =
1287       new RegionServerCallable<Boolean>(connection, getName(), row) {
1288         @Override
1289         public Boolean call(int callTimeout) throws IOException {
1290           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1291           controller.setPriority(tableName);
1292           controller.setCallTimeout(callTimeout);
1293           try {
1294             CompareType compareType = CompareType.valueOf(compareOp.name());
1295             MutateRequest request = RequestConverter.buildMutateRequest(
1296               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1297                 new BinaryComparator(value), compareType, delete);
1298             MutateResponse response = getStub().mutate(controller, request);
1299             return Boolean.valueOf(response.getProcessed());
1300           } catch (ServiceException se) {
1301             throw ProtobufUtil.getRemoteException(se);
1302           }
1303         }
1304       };
1305     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1306   }
1307 
1308   /**
1309    * {@inheritDoc}
1310    */
1311   @Override
1312   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
1313       final CompareOp compareOp, final byte [] value, final RowMutations rm)
1314   throws IOException {
1315     RegionServerCallable<Boolean> callable =
1316         new RegionServerCallable<Boolean>(connection, getName(), row) {
1317           @Override
1318           public Boolean call(int callTimeout) throws IOException {
1319             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1320             controller.setPriority(tableName);
1321             controller.setCallTimeout(callTimeout);
1322             try {
1323               CompareType compareType = CompareType.valueOf(compareOp.name());
1324               MultiRequest request = RequestConverter.buildMutateRequest(
1325                   getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1326                   new BinaryComparator(value), compareType, rm);
1327               ClientProtos.MultiResponse response = getStub().multi(controller, request);
1328               ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1329               if (res.hasException()) {
1330                 Throwable ex = ProtobufUtil.toException(res.getException());
1331                 if(ex instanceof IOException) {
1332                   throw (IOException)ex;
1333                 }
1334                 throw new IOException("Failed to checkAndMutate row: "+
1335                     Bytes.toStringBinary(rm.getRow()), ex);
1336               }
1337               return Boolean.valueOf(response.getProcessed());
1338             } catch (ServiceException se) {
1339               throw ProtobufUtil.getRemoteException(se);
1340             }
1341           }
1342         };
1343     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1344   }
1345 
1346   /**
1347    * {@inheritDoc}
1348    */
1349   @Override
1350   public boolean exists(final Get get) throws IOException {
1351     get.setCheckExistenceOnly(true);
1352     Result r = get(get);
1353     assert r.getExists() != null;
1354     return r.getExists();
1355   }
1356 
1357   /**
1358    * {@inheritDoc}
1359    */
1360   @Override
1361   public boolean[] existsAll(final List<Get> gets) throws IOException {
1362     if (gets.isEmpty()) return new boolean[]{};
1363     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
1364 
1365     for (Get g: gets){
1366       g.setCheckExistenceOnly(true);
1367     }
1368 
1369     Object[] r1;
1370     try {
1371       r1 = batch(gets);
1372     } catch (InterruptedException e) {
1373       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1374     }
1375 
1376     // translate.
1377     boolean[] results = new boolean[r1.length];
1378     int i = 0;
1379     for (Object o : r1) {
1380       // batch ensures if there is a failure we get an exception instead
1381       results[i++] = ((Result)o).getExists();
1382     }
1383 
1384     return results;
1385   }
1386 
1387   /**
1388    * {@inheritDoc}
1389    * @deprecated Use {@link #existsAll(java.util.List)}  instead.
1390    */
1391   @Override
1392   @Deprecated
1393   public Boolean[] exists(final List<Get> gets) throws IOException {
1394     boolean[] results = existsAll(gets);
1395     Boolean[] objectResults = new Boolean[results.length];
1396     for (int i = 0; i < results.length; ++i) {
1397       objectResults[i] = results[i];
1398     }
1399     return objectResults;
1400   }
1401 
1402   /**
1403    * {@inheritDoc}
1404    * @throws IOException
1405    */
1406   @Override
1407   public void flushCommits() throws IOException {
1408     if (mutator == null) {
1409       // nothing to flush if there's no mutator; don't bother creating one.
1410       return;
1411     }
1412     getBufferedMutator().flush();
1413   }
1414 
1415   /**
1416    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1417    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1418    *
1419    * @param list The collection of actions.
1420    * @param results An empty array, same size as list. If an exception is thrown,
1421    * you can test here for partial results, and to determine which actions
1422    * processed successfully.
1423    * @throws IOException if there are problems talking to META. Per-item
1424    * exceptions are stored in the results array.
1425    */
1426   public <R> void processBatchCallback(
1427     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1428     throws IOException, InterruptedException {
1429     this.batchCallback(list, results, callback);
1430   }
1431 
1432 
1433   /**
1434    * Parameterized batch processing, allowing varying return types for different
1435    * {@link Row} implementations.
1436    */
1437   public void processBatch(final List<? extends Row> list, final Object[] results)
1438     throws IOException, InterruptedException {
1439     this.batch(list, results);
1440   }
1441 
1442 
1443   @Override
1444   public void close() throws IOException {
1445     if (this.closed) {
1446       return;
1447     }
1448     flushCommits();
1449     if (cleanupPoolOnClose) {
1450       this.pool.shutdown();
1451       try {
1452         boolean terminated = false;
1453         do {
1454           // wait until the pool has terminated
1455           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1456         } while (!terminated);
1457       } catch (InterruptedException e) {
1458         this.pool.shutdownNow();
1459         LOG.warn("waitForTermination interrupted");
1460       }
1461     }
1462     if (cleanupConnectionOnClose) {
1463       if (this.connection != null) {
1464         this.connection.close();
1465       }
1466     }
1467     this.closed = true;
1468   }
1469 
1470   // validate for well-formedness
1471   public void validatePut(final Put put) throws IllegalArgumentException {
1472     validatePut(put, tableConfiguration.getMaxKeyValueSize());
1473   }
1474 
1475   // validate for well-formedness
1476   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1477     if (put.isEmpty()) {
1478       throw new IllegalArgumentException("No columns to insert");
1479     }
1480     if (maxKeyValueSize > 0) {
1481       for (List<Cell> list : put.getFamilyCellMap().values()) {
1482         for (Cell cell : list) {
1483           if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1484             throw new IllegalArgumentException("KeyValue size too large");
1485           }
1486         }
1487       }
1488     }
1489   }
1490 
1491   /**
1492    * {@inheritDoc}
1493    */
1494   @Override
1495   public boolean isAutoFlush() {
1496     return autoFlush;
1497   }
1498 
1499   /**
1500    * {@inheritDoc}
1501    * @deprecated in 0.96. When called with setAutoFlush(false), this function also
1502    *  set clearBufferOnFail to true, which is unexpected but kept for historical reasons.
1503    *  Replace it with setAutoFlush(false, false) if this is exactly what you want, or by
1504    *  {@link #setAutoFlushTo(boolean)} for all other cases.
1505    */
1506   @Deprecated
1507   @Override
1508   public void setAutoFlush(boolean autoFlush) {
1509     this.autoFlush = autoFlush;
1510   }
1511 
1512   /**
1513    * {@inheritDoc}
1514    */
1515   @Override
1516   public void setAutoFlushTo(boolean autoFlush) {
1517     this.autoFlush = autoFlush;
1518   }
1519 
1520   /**
1521    * {@inheritDoc}
1522    */
1523   @Override
1524   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1525     this.autoFlush = autoFlush;
1526   }
1527 
1528   /**
1529    * Returns the maximum size in bytes of the write buffer for this HTable.
1530    * <p>
1531    * The default value comes from the configuration parameter
1532    * {@code hbase.client.write.buffer}.
1533    * @return The size of the write buffer in bytes.
1534    */
1535   @Override
1536   public long getWriteBufferSize() {
1537     if (mutator == null) {
1538       return tableConfiguration.getWriteBufferSize();
1539     } else {
1540       return mutator.getWriteBufferSize();
1541     }
1542   }
1543 
1544   /**
1545    * Sets the size of the buffer in bytes.
1546    * <p>
1547    * If the new size is less than the current amount of data in the
1548    * write buffer, the buffer gets flushed.
1549    * @param writeBufferSize The new write buffer size, in bytes.
1550    * @throws IOException if a remote or network exception occurs.
1551    */
1552   @Override
1553   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1554     getBufferedMutator();
1555     mutator.setWriteBufferSize(writeBufferSize);
1556   }
1557 
1558   /**
1559    * The pool is used for mutli requests for this HTable
1560    * @return the pool used for mutli
1561    */
1562   ExecutorService getPool() {
1563     return this.pool;
1564   }
1565 
1566   /**
1567    * Enable or disable region cache prefetch for the table. It will be
1568    * applied for the given table's all HTable instances who share the same
1569    * connection. By default, the cache prefetch is enabled.
1570    * @param tableName name of table to configure.
1571    * @param enable Set to true to enable region cache prefetch. Or set to
1572    * false to disable it.
1573    * @throws IOException
1574    * @deprecated does nothing since 0.99
1575    */
1576   @Deprecated
1577   public static void setRegionCachePrefetch(final byte[] tableName,
1578       final boolean enable)  throws IOException {
1579   }
1580 
1581   /**
1582    * @deprecated does nothing since 0.99
1583    */
1584   @Deprecated
1585   public static void setRegionCachePrefetch(
1586       final TableName tableName,
1587       final boolean enable) throws IOException {
1588   }
1589 
1590   /**
1591    * Enable or disable region cache prefetch for the table. It will be
1592    * applied for the given table's all HTable instances who share the same
1593    * connection. By default, the cache prefetch is enabled.
1594    * @param conf The Configuration object to use.
1595    * @param tableName name of table to configure.
1596    * @param enable Set to true to enable region cache prefetch. Or set to
1597    * false to disable it.
1598    * @throws IOException
1599    * @deprecated does nothing since 0.99
1600    */
1601   @Deprecated
1602   public static void setRegionCachePrefetch(final Configuration conf,
1603       final byte[] tableName, final boolean enable) throws IOException {
1604   }
1605 
1606   /**
1607    * @deprecated does nothing since 0.99
1608    */
1609   @Deprecated
1610   public static void setRegionCachePrefetch(final Configuration conf,
1611       final TableName tableName,
1612       final boolean enable) throws IOException {
1613   }
1614 
1615   /**
1616    * Check whether region cache prefetch is enabled or not for the table.
1617    * @param conf The Configuration object to use.
1618    * @param tableName name of table to check
1619    * @return true if table's region cache prefecth is enabled. Otherwise
1620    * it is disabled.
1621    * @throws IOException
1622    * @deprecated always return false since 0.99
1623    */
1624   @Deprecated
1625   public static boolean getRegionCachePrefetch(final Configuration conf,
1626       final byte[] tableName) throws IOException {
1627     return false;
1628   }
1629 
1630   /**
1631    * @deprecated always return false since 0.99
1632    */
1633   @Deprecated
1634   public static boolean getRegionCachePrefetch(final Configuration conf,
1635       final TableName tableName) throws IOException {
1636     return false;
1637   }
1638 
1639   /**
1640    * Check whether region cache prefetch is enabled or not for the table.
1641    * @param tableName name of table to check
1642    * @return true if table's region cache prefecth is enabled. Otherwise
1643    * it is disabled.
1644    * @throws IOException
1645    * @deprecated always return false since 0.99
1646    */
1647   @Deprecated
1648   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1649     return false;
1650   }
1651 
1652   /**
1653    * @deprecated always return false since 0.99
1654    */
1655   @Deprecated
1656   public static boolean getRegionCachePrefetch(
1657       final TableName tableName) throws IOException {
1658     return false;
1659   }
1660 
1661   /**
1662    * Explicitly clears the region cache to fetch the latest value from META.
1663    * This is a power user function: avoid unless you know the ramifications.
1664    */
1665   public void clearRegionCache() {
1666     this.connection.clearRegionCache();
1667   }
1668 
1669   /**
1670    * {@inheritDoc}
1671    */
1672   @Override
1673   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1674     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1675   }
1676 
1677   /**
1678    * {@inheritDoc}
1679    */
1680   @Override
1681   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1682       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1683       throws ServiceException, Throwable {
1684     final Map<byte[],R> results =  Collections.synchronizedMap(
1685         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1686     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1687       @Override
1688       public void update(byte[] region, byte[] row, R value) {
1689         if (region != null) {
1690           results.put(region, value);
1691         }
1692       }
1693     });
1694     return results;
1695   }
1696 
1697   /**
1698    * {@inheritDoc}
1699    */
1700   @Override
1701   public <T extends Service, R> void coprocessorService(final Class<T> service,
1702       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1703       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1704 
1705     // get regions covered by the row range
1706     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1707 
1708     Map<byte[],Future<R>> futures =
1709         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1710     for (final byte[] r : keys) {
1711       final RegionCoprocessorRpcChannel channel =
1712           new RegionCoprocessorRpcChannel(connection, tableName, r);
1713       Future<R> future = pool.submit(
1714           new Callable<R>() {
1715             @Override
1716             public R call() throws Exception {
1717               T instance = ProtobufUtil.newServiceStub(service, channel);
1718               R result = callable.call(instance);
1719               byte[] region = channel.getLastRegion();
1720               if (callback != null) {
1721                 callback.update(region, r, result);
1722               }
1723               return result;
1724             }
1725           });
1726       futures.put(r, future);
1727     }
1728     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1729       try {
1730         e.getValue().get();
1731       } catch (ExecutionException ee) {
1732         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1733             + Bytes.toStringBinary(e.getKey()), ee);
1734         throw ee.getCause();
1735       } catch (InterruptedException ie) {
1736         throw new InterruptedIOException("Interrupted calling coprocessor service "
1737             + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
1738       }
1739     }
1740   }
1741 
1742   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1743   throws IOException {
1744     if (start == null) {
1745       start = HConstants.EMPTY_START_ROW;
1746     }
1747     if (end == null) {
1748       end = HConstants.EMPTY_END_ROW;
1749     }
1750     return getKeysAndRegionsInRange(start, end, true).getFirst();
1751   }
1752 
1753   public void setOperationTimeout(int operationTimeout) {
1754     this.operationTimeout = operationTimeout;
1755   }
1756 
1757   public int getOperationTimeout() {
1758     return operationTimeout;
1759   }
1760 
1761   @Override
1762   public String toString() {
1763     return tableName + ";" + connection;
1764   }
1765 
1766   /**
1767    * {@inheritDoc}
1768    */
1769   @Override
1770   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1771       Descriptors.MethodDescriptor methodDescriptor, Message request,
1772       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1773     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1774         Bytes.BYTES_COMPARATOR));
1775     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1776         new Callback<R>() {
1777 
1778           @Override
1779           public void update(byte[] region, byte[] row, R result) {
1780             if (region != null) {
1781               results.put(region, result);
1782             }
1783           }
1784         });
1785     return results;
1786   }
1787 
1788   /**
1789    * {@inheritDoc}
1790    */
1791   @Override
1792   public <R extends Message> void batchCoprocessorService(
1793       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1794       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1795       throws ServiceException, Throwable {
1796 
1797     // get regions covered by the row range
1798     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1799         getKeysAndRegionsInRange(startKey, endKey, true);
1800     List<byte[]> keys = keysAndRegions.getFirst();
1801     List<HRegionLocation> regions = keysAndRegions.getSecond();
1802 
1803     // check if we have any calls to make
1804     if (keys.isEmpty()) {
1805       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1806           ", end=" + Bytes.toStringBinary(endKey));
1807       return;
1808     }
1809 
1810     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1811     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1812         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1813     for (int i = 0; i < keys.size(); i++) {
1814       final byte[] rowKey = keys.get(i);
1815       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1816       RegionCoprocessorServiceExec exec =
1817           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1818       execs.add(exec);
1819       execsByRow.put(rowKey, exec);
1820     }
1821 
1822     // tracking for any possible deserialization errors on success callback
1823     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1824     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1825     final List<Row> callbackErrorActions = new ArrayList<Row>();
1826     final List<String> callbackErrorServers = new ArrayList<String>();
1827     Object[] results = new Object[execs.size()];
1828 
1829     AsyncProcess asyncProcess =
1830         new AsyncProcess(connection, configuration, pool,
1831             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1832             true, RpcControllerFactory.instantiate(configuration));
1833 
1834     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1835         new Callback<ClientProtos.CoprocessorServiceResult>() {
1836           @Override
1837           public void update(byte[] region, byte[] row,
1838                               ClientProtos.CoprocessorServiceResult serviceResult) {
1839             if (LOG.isTraceEnabled()) {
1840               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1841                   ": region=" + Bytes.toStringBinary(region) +
1842                   ", row=" + Bytes.toStringBinary(row) +
1843                   ", value=" + serviceResult.getValue().getValue());
1844             }
1845             try {
1846               callback.update(region, row,
1847                   (R) responsePrototype.newBuilderForType().mergeFrom(
1848                       serviceResult.getValue().getValue()).build());
1849             } catch (InvalidProtocolBufferException e) {
1850               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1851                   e);
1852               callbackErrorExceptions.add(e);
1853               callbackErrorActions.add(execsByRow.get(row));
1854               callbackErrorServers.add("null");
1855             }
1856           }
1857         }, results);
1858 
1859     future.waitUntilDone();
1860 
1861     if (future.hasError()) {
1862       throw future.getErrors();
1863     } else if (!callbackErrorExceptions.isEmpty()) {
1864       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1865           callbackErrorServers);
1866     }
1867   }
1868 
1869   public RegionLocator getRegionLocator() {
1870     return this.locator;
1871   }
1872 
1873   @VisibleForTesting
1874   BufferedMutator getBufferedMutator() throws IOException {
1875     if (mutator == null) {
1876       this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
1877           new BufferedMutatorParams(tableName)
1878               .pool(pool)
1879               .writeBufferSize(tableConfiguration.getWriteBufferSize())
1880               .maxKeyValueSize(tableConfiguration.getMaxKeyValueSize())
1881       );
1882     }
1883     return mutator;
1884   }
1885 }