View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.NavigableMap;
28  import java.util.TreeMap;
29  import java.util.concurrent.Callable;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ExecutorService;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.SynchronousQueue;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.TimeUnit;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.classification.InterfaceStability;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.Cell;
43  import org.apache.hadoop.hbase.HBaseConfiguration;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.HRegionLocation;
47  import org.apache.hadoop.hbase.HTableDescriptor;
48  import org.apache.hadoop.hbase.KeyValueUtil;
49  import org.apache.hadoop.hbase.ServerName;
50  import org.apache.hadoop.hbase.TableName;
51  import org.apache.hadoop.hbase.TableNotFoundException;
52  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
53  import org.apache.hadoop.hbase.client.coprocessor.Batch;
54  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
55  import org.apache.hadoop.hbase.filter.BinaryComparator;
56  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
57  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
58  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
59  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
60  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
61  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
62  import org.apache.hadoop.hbase.protobuf.RequestConverter;
63  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
64  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
65  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
66  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
68  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
69  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
70  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
71  import org.apache.hadoop.hbase.util.Bytes;
72  import org.apache.hadoop.hbase.util.Pair;
73  import org.apache.hadoop.hbase.util.ReflectionUtils;
74  import org.apache.hadoop.hbase.util.Threads;
75  
76  import com.google.common.annotations.VisibleForTesting;
77  import com.google.protobuf.Descriptors;
78  import com.google.protobuf.Message;
79  import com.google.protobuf.Service;
80  import com.google.protobuf.ServiceException;
81  
82  /**
83   * An implementation of {@link Table}. Used to communicate with a single HBase table.
84   * Lightweight. Get as needed and just close when done.
85   * Instances of this class SHOULD NOT be constructed directly.
86   * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
87   * class comment for an example of how.
88   *
89   * <p>This class is NOT thread safe for reads nor writes.
90   * In the case of writes (Put, Delete), the underlying write buffer can
91   * be corrupted if multiple threads contend over a single HTable instance.
92   * In the case of reads, some fields used by a Scan are shared among all threads.
93   *
94   * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
95   * InterfaceAudience.Private as of hbase-1.0.0 indicating that this is an
96   * HBase-internal class as defined in
97   * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
98   * Interface Classification</a>. There are no guarantees for backwards
99   * source / binary compatibility and methods or the class can
100  * change or go away without deprecation.
101  * <p>Near all methods of this * class made it out to the new {@link Table}
102  * Interface or were * instantiations of methods defined in {@link HTableInterface}.
103  * A few did not. Namely, the {@link #getStartEndKeys}, {@link #getEndKeys},
104  * and {@link #getStartKeys} methods. These three methods are available
105  * in {@link RegionLocator} as of 1.0.0 but were NOT marked as
106  * deprecated when we released 1.0.0. In spite of this oversight on our
107  * part, these methods will be removed in 2.0.0.
108  *
109  * @see Table
110  * @see Admin
111  * @see Connection
112  * @see ConnectionFactory
113  */
114 @InterfaceAudience.Private
115 @InterfaceStability.Stable
116 public class HTable implements HTableInterface, RegionLocator {
117   private static final Log LOG = LogFactory.getLog(HTable.class);
118   protected ClusterConnection connection;
119   private final TableName tableName;
120   private volatile Configuration configuration;
121   private ConnectionConfiguration connConfiguration;
122   protected BufferedMutatorImpl mutator;
123   private boolean autoFlush = true;
124   private boolean closed = false;
125   protected int scannerCaching;
126   protected long scannerMaxResultSize;
127   private ExecutorService pool;  // For Multi & Scan
128   private int operationTimeout; // global timeout for each blocking method with retrying rpc
129   private int rpcTimeout; // timeout for each rpc request
130   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
131   private final boolean cleanupConnectionOnClose; // close the connection in close()
132   private Consistency defaultConsistency = Consistency.STRONG;
133   private HRegionLocator locator;
134 
135   /** The Async process for batch */
136   protected AsyncProcess multiAp;
137   private RpcRetryingCallerFactory rpcCallerFactory;
138   private RpcControllerFactory rpcControllerFactory;
139 
140   /**
141    * Creates an object to access a HBase table.
142    * @param conf Configuration object to use.
143    * @param tableName Name of the table.
144    * @throws IOException if a remote or network exception occurs
145    * @deprecated Constructing HTable objects manually has been deprecated. Please use
146    * {@link Connection} to instantiate a {@link Table} instead.
147    */
148   @Deprecated
149   public HTable(Configuration conf, final String tableName)
150   throws IOException {
151     this(conf, TableName.valueOf(tableName));
152   }
153 
154   /**
155    * Creates an object to access a HBase table.
156    * @param conf Configuration object to use.
157    * @param tableName Name of the table.
158    * @throws IOException if a remote or network exception occurs
159    * @deprecated Constructing HTable objects manually has been deprecated. Please use
160    * {@link Connection} to instantiate a {@link Table} instead.
161    */
162   @Deprecated
163   public HTable(Configuration conf, final byte[] tableName)
164   throws IOException {
165     this(conf, TableName.valueOf(tableName));
166   }
167 
168   /**
169    * Creates an object to access a HBase table.
170    * @param conf Configuration object to use.
171    * @param tableName table name pojo
172    * @throws IOException if a remote or network exception occurs
173    * @deprecated Constructing HTable objects manually has been deprecated. Please use
174    * {@link Connection} to instantiate a {@link Table} instead.
175    */
176   @Deprecated
177   public HTable(Configuration conf, final TableName tableName)
178   throws IOException {
179     this.tableName = tableName;
180     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
181     if (conf == null) {
182       this.connection = null;
183       return;
184     }
185     this.connection = ConnectionManager.getConnectionInternal(conf);
186     this.configuration = conf;
187 
188     this.pool = getDefaultExecutor(conf);
189     this.finishSetup();
190   }
191 
192   /**
193    * Creates an object to access a HBase table.
194    * @param tableName Name of the table.
195    * @param connection HConnection to be used.
196    * @throws IOException if a remote or network exception occurs
197    * @deprecated Do not use.
198    */
199   @Deprecated
200   public HTable(TableName tableName, Connection connection) throws IOException {
201     this.tableName = tableName;
202     this.cleanupPoolOnClose = true;
203     this.cleanupConnectionOnClose = false;
204     this.connection = (ClusterConnection)connection;
205     this.configuration = connection.getConfiguration();
206 
207     this.pool = getDefaultExecutor(this.configuration);
208     this.finishSetup();
209   }
210 
211   // Marked Private @since 1.0
212   @InterfaceAudience.Private
213   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
214     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
215     if (maxThreads == 0) {
216       maxThreads = 1; // is there a better default?
217     }
218     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
219 
220     // Using the "direct handoff" approach, new threads will only be created
221     // if it is necessary and will grow unbounded. This could be bad but in HCM
222     // we only create as many Runnables as there are region servers. It means
223     // it also scales when new region servers are added.
224     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
225         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
226     pool.allowCoreThreadTimeOut(true);
227     return pool;
228   }
229 
230   /**
231    * Creates an object to access a HBase table.
232    * @param conf Configuration object to use.
233    * @param tableName Name of the table.
234    * @param pool ExecutorService to be used.
235    * @throws IOException if a remote or network exception occurs
236    * @deprecated Constructing HTable objects manually has been deprecated. Please use
237    * {@link Connection} to instantiate a {@link Table} instead.
238    */
239   @Deprecated
240   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
241       throws IOException {
242     this(conf, TableName.valueOf(tableName), pool);
243   }
244 
245   /**
246    * Creates an object to access a HBase table.
247    * @param conf Configuration object to use.
248    * @param tableName Name of the table.
249    * @param pool ExecutorService to be used.
250    * @throws IOException if a remote or network exception occurs
251    * @deprecated Constructing HTable objects manually has been deprecated. Please use
252    * {@link Connection} to instantiate a {@link Table} instead.
253    */
254   @Deprecated
255   public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
256       throws IOException {
257     this.connection = ConnectionManager.getConnectionInternal(conf);
258     this.configuration = conf;
259     this.pool = pool;
260     if (pool == null) {
261       this.pool = getDefaultExecutor(conf);
262       this.cleanupPoolOnClose = true;
263     } else {
264       this.cleanupPoolOnClose = false;
265     }
266     this.tableName = tableName;
267     this.cleanupConnectionOnClose = true;
268     this.finishSetup();
269   }
270 
271   /**
272    * Creates an object to access a HBase table.
273    * @param tableName Name of the table.
274    * @param connection HConnection to be used.
275    * @param pool ExecutorService to be used.
276    * @throws IOException if a remote or network exception occurs.
277    * @deprecated Do not use, internal ctor.
278    */
279   @Deprecated
280   public HTable(final byte[] tableName, final Connection connection,
281       final ExecutorService pool) throws IOException {
282     this(TableName.valueOf(tableName), connection, pool);
283   }
284 
285   /** @deprecated Do not use, internal ctor. */
286   @Deprecated
287   public HTable(TableName tableName, final Connection connection,
288       final ExecutorService pool) throws IOException {
289     this(tableName, (ClusterConnection)connection, null, null, null, pool);
290   }
291 
292   /**
293    * Creates an object to access a HBase table.
294    * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
295    * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
296    * @param tableName Name of the table.
297    * @param connection HConnection to be used.
298    * @param pool ExecutorService to be used.
299    * @throws IOException if a remote or network exception occurs
300    */
301   @InterfaceAudience.Private
302   public HTable(TableName tableName, final ClusterConnection connection,
303       final ConnectionConfiguration tableConfig,
304       final RpcRetryingCallerFactory rpcCallerFactory,
305       final RpcControllerFactory rpcControllerFactory,
306       final ExecutorService pool) throws IOException {
307     if (connection == null || connection.isClosed()) {
308       throw new IllegalArgumentException("Connection is null or closed.");
309     }
310     this.tableName = tableName;
311     this.cleanupConnectionOnClose = false;
312     this.connection = connection;
313     this.configuration = connection.getConfiguration();
314     this.connConfiguration = tableConfig;
315     this.pool = pool;
316     if (pool == null) {
317       this.pool = getDefaultExecutor(this.configuration);
318       this.cleanupPoolOnClose = true;
319     } else {
320       this.cleanupPoolOnClose = false;
321     }
322 
323     this.rpcCallerFactory = rpcCallerFactory;
324     this.rpcControllerFactory = rpcControllerFactory;
325 
326     this.finishSetup();
327   }
328 
329   /**
330    * For internal testing. Uses Connection provided in {@code params}.
331    * @throws IOException
332    */
333   @VisibleForTesting
334   protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
335     connection = conn;
336     tableName = params.getTableName();
337     connConfiguration = new ConnectionConfiguration(connection.getConfiguration());
338     cleanupPoolOnClose = false;
339     cleanupConnectionOnClose = false;
340     // used from tests, don't trust the connection is real
341     this.mutator = new BufferedMutatorImpl(conn, null, null, params);
342   }
343 
344   /**
345    * @return maxKeyValueSize from configuration.
346    */
347   public static int getMaxKeyValueSize(Configuration conf) {
348     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
349   }
350 
351   /**
352    * setup this HTable's parameter based on the passed configuration
353    */
354   private void finishSetup() throws IOException {
355     if (connConfiguration == null) {
356       connConfiguration = new ConnectionConfiguration(configuration);
357     }
358     this.operationTimeout = tableName.isSystemTable() ?
359         connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
360     this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
361         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
362     this.scannerCaching = connConfiguration.getScannerCaching();
363     this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
364     if (this.rpcCallerFactory == null) {
365       this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
366     }
367     if (this.rpcControllerFactory == null) {
368       this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
369     }
370 
371     // puts need to track errors globally due to how the APIs currently work.
372     multiAp = this.connection.getAsyncProcess();
373 
374     this.closed = false;
375 
376     this.locator = new HRegionLocator(tableName, connection);
377   }
378 
379   /**
380    * {@inheritDoc}
381    */
382   @Override
383   public Configuration getConfiguration() {
384     return configuration;
385   }
386 
387   /**
388    * Tells whether or not a table is enabled or not. This method creates a
389    * new HBase configuration, so it might make your unit tests fail due to
390    * incorrect ZK client port.
391    * @param tableName Name of table to check.
392    * @return {@code true} if table is online.
393    * @throws IOException if a remote or network exception occurs
394    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
395    */
396   @Deprecated
397   public static boolean isTableEnabled(String tableName) throws IOException {
398     return isTableEnabled(TableName.valueOf(tableName));
399   }
400 
401   /**
402    * Tells whether or not a table is enabled or not. This method creates a
403    * new HBase configuration, so it might make your unit tests fail due to
404    * incorrect ZK client port.
405    * @param tableName Name of table to check.
406    * @return {@code true} if table is online.
407    * @throws IOException if a remote or network exception occurs
408    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
409    */
410   @Deprecated
411   public static boolean isTableEnabled(byte[] tableName) throws IOException {
412     return isTableEnabled(TableName.valueOf(tableName));
413   }
414 
415   /**
416    * Tells whether or not a table is enabled or not. This method creates a
417    * new HBase configuration, so it might make your unit tests fail due to
418    * incorrect ZK client port.
419    * @param tableName Name of table to check.
420    * @return {@code true} if table is online.
421    * @throws IOException if a remote or network exception occurs
422    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
423    */
424   @Deprecated
425   public static boolean isTableEnabled(TableName tableName) throws IOException {
426     return isTableEnabled(HBaseConfiguration.create(), tableName);
427   }
428 
429   /**
430    * Tells whether or not a table is enabled or not.
431    * @param conf The Configuration object to use.
432    * @param tableName Name of table to check.
433    * @return {@code true} if table is online.
434    * @throws IOException if a remote or network exception occurs
435    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
436    */
437   @Deprecated
438   public static boolean isTableEnabled(Configuration conf, String tableName)
439   throws IOException {
440     return isTableEnabled(conf, TableName.valueOf(tableName));
441   }
442 
443   /**
444    * Tells whether or not a table is enabled or not.
445    * @param conf The Configuration object to use.
446    * @param tableName Name of table to check.
447    * @return {@code true} if table is online.
448    * @throws IOException if a remote or network exception occurs
449    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
450    */
451   @Deprecated
452   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
453   throws IOException {
454     return isTableEnabled(conf, TableName.valueOf(tableName));
455   }
456 
457   /**
458    * Tells whether or not a table is enabled or not.
459    * @param conf The Configuration object to use.
460    * @param tableName Name of table to check.
461    * @return {@code true} if table is online.
462    * @throws IOException if a remote or network exception occurs
463    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
464    */
465   @Deprecated
466   public static boolean isTableEnabled(Configuration conf,
467       final TableName tableName) throws IOException {
468     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
469       @Override
470       public Boolean connect(HConnection connection) throws IOException {
471         return connection.isTableEnabled(tableName);
472       }
473     });
474   }
475 
476   /**
477    * Find region location hosting passed row using cached info
478    * @param row Row to find.
479    * @return The location of the given row.
480    * @throws IOException if a remote or network exception occurs
481    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])}
482    */
483   @Deprecated
484   public HRegionLocation getRegionLocation(final String row)
485   throws IOException {
486     return getRegionLocation(Bytes.toBytes(row), false);
487   }
488 
489   /**
490    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])} instead.
491    */
492   @Override
493   @Deprecated
494   public HRegionLocation getRegionLocation(final byte [] row)
495   throws IOException {
496     return locator.getRegionLocation(row);
497   }
498 
499   /**
500    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[], boolean)} instead.
501    */
502   @Override
503   @Deprecated
504   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
505   throws IOException {
506     return locator.getRegionLocation(row, reload);
507   }
508 
509   /**
510    * {@inheritDoc}
511    */
512   @Override
513   public byte [] getTableName() {
514     return this.tableName.getName();
515   }
516 
517   @Override
518   public TableName getName() {
519     return tableName;
520   }
521 
522   /**
523    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
524    * manipulations.
525    * @return An HConnection instance.
526    * @deprecated This method will be changed from public to package protected.
527    */
528   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
529   @Deprecated
530   @VisibleForTesting
531   public HConnection getConnection() {
532     return this.connection;
533   }
534 
535   /**
536    * Gets the number of rows that a scanner will fetch at once.
537    * <p>
538    * The default value comes from {@code hbase.client.scanner.caching}.
539    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
540    */
541   @Deprecated
542   public int getScannerCaching() {
543     return scannerCaching;
544   }
545 
546   /**
547    * Kept in 0.96 for backward compatibility
548    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
549    */
550   @Deprecated
551   public List<Row> getWriteBuffer() {
552     return mutator == null ? null : mutator.getWriteBuffer();
553   }
554 
555   /**
556    * Sets the number of rows that a scanner will fetch at once.
557    * <p>
558    * This will override the value specified by
559    * {@code hbase.client.scanner.caching}.
560    * Increasing this value will reduce the amount of work needed each time
561    * {@code next()} is called on a scanner, at the expense of memory use
562    * (since more rows will need to be maintained in memory by the scanners).
563    * @param scannerCaching the number of rows a scanner will fetch at once.
564    * @deprecated Use {@link Scan#setCaching(int)}
565    */
566   @Deprecated
567   public void setScannerCaching(int scannerCaching) {
568     this.scannerCaching = scannerCaching;
569   }
570 
571   /**
572    * {@inheritDoc}
573    */
574   @Override
575   public HTableDescriptor getTableDescriptor() throws IOException {
576     HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
577       rpcControllerFactory, operationTimeout, rpcTimeout);
578     if (htd != null) {
579       return new UnmodifyableHTableDescriptor(htd);
580     }
581     return null;
582   }
583 
584   /**
585    * To be removed in 2.0.0.
586    * @deprecated Since 1.1.0. Use {@link RegionLocator#getStartEndKeys()} instead
587    */
588   @Override
589   @Deprecated
590   public byte [][] getStartKeys() throws IOException {
591     return locator.getStartKeys();
592   }
593 
594   /**
595    * To be removed in 2.0.0.
596    * @deprecated Since 1.1.0. Use {@link RegionLocator#getEndKeys()} instead;
597    */
598   @Override
599   @Deprecated
600   public byte[][] getEndKeys() throws IOException {
601     return locator.getEndKeys();
602   }
603 
604   /**
605    * To be removed in 2.0.0.
606    * @deprecated Since 1.1.0. Use {@link RegionLocator#getStartEndKeys()} instead;
607    */
608   @Override
609   @Deprecated
610   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
611     return locator.getStartEndKeys();
612   }
613 
614   /**
615    * Gets all the regions and their address for this table.
616    * <p>
617    * This is mainly useful for the MapReduce integration.
618    * @return A map of HRegionInfo with it's server address
619    * @throws IOException if a remote or network exception occurs
620    * @deprecated This is no longer a public API.  Use {@link #getAllRegionLocations()} instead.
621    */
622   @Deprecated
623   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
624     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocator, singular, returns an HRegionLocation.
625     return MetaScanner.allTableRegions(this.connection, getName());
626   }
627 
628   /**
629    * Gets all the regions and their address for this table.
630    * <p>
631    * This is mainly useful for the MapReduce integration.
632    * @return A map of HRegionInfo with it's server address
633    * @throws IOException if a remote or network exception occurs
634    *
635    * @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead;
636    */
637   @Override
638   @Deprecated
639   public List<HRegionLocation> getAllRegionLocations() throws IOException {
640     return locator.getAllRegionLocations();
641   }
642 
643   /**
644    * Get the corresponding regions for an arbitrary range of keys.
645    * <p>
646    * @param startKey Starting row in range, inclusive
647    * @param endKey Ending row in range, exclusive
648    * @return A list of HRegionLocations corresponding to the regions that
649    * contain the specified range
650    * @throws IOException if a remote or network exception occurs
651    * @deprecated This is no longer a public API
652    */
653   @Deprecated
654   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
655     final byte [] endKey) throws IOException {
656     return getRegionsInRange(startKey, endKey, false);
657   }
658 
659   /**
660    * Get the corresponding regions for an arbitrary range of keys.
661    * <p>
662    * @param startKey Starting row in range, inclusive
663    * @param endKey Ending row in range, exclusive
664    * @param reload true to reload information or false to use cached information
665    * @return A list of HRegionLocations corresponding to the regions that
666    * contain the specified range
667    * @throws IOException if a remote or network exception occurs
668    * @deprecated This is no longer a public API
669    */
670   @Deprecated
671   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
672       final byte [] endKey, final boolean reload) throws IOException {
673     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
674   }
675 
676   /**
677    * Get the corresponding start keys and regions for an arbitrary range of
678    * keys.
679    * <p>
680    * @param startKey Starting row in range, inclusive
681    * @param endKey Ending row in range
682    * @param includeEndKey true if endRow is inclusive, false if exclusive
683    * @return A pair of list of start keys and list of HRegionLocations that
684    *         contain the specified range
685    * @throws IOException if a remote or network exception occurs
686    * @deprecated This is no longer a public API
687    */
688   @Deprecated
689   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
690       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
691       throws IOException {
692     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
693   }
694 
695   /**
696    * Get the corresponding start keys and regions for an arbitrary range of
697    * keys.
698    * <p>
699    * @param startKey Starting row in range, inclusive
700    * @param endKey Ending row in range
701    * @param includeEndKey true if endRow is inclusive, false if exclusive
702    * @param reload true to reload information or false to use cached information
703    * @return A pair of list of start keys and list of HRegionLocations that
704    *         contain the specified range
705    * @throws IOException if a remote or network exception occurs
706    * @deprecated This is no longer a public API
707    */
708   @Deprecated
709   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
710       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
711       final boolean reload) throws IOException {
712     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
713     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
714       throw new IllegalArgumentException(
715         "Invalid range: " + Bytes.toStringBinary(startKey) +
716         " > " + Bytes.toStringBinary(endKey));
717     }
718     List<byte[]> keysInRange = new ArrayList<byte[]>();
719     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
720     byte[] currentKey = startKey;
721     do {
722       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
723       keysInRange.add(currentKey);
724       regionsInRange.add(regionLocation);
725       currentKey = regionLocation.getRegionInfo().getEndKey();
726     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
727         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
728             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
729     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
730         regionsInRange);
731   }
732 
733   /**
734    * {@inheritDoc}
735    * @deprecated Use reversed scan instead.
736    */
737    @Override
738    @Deprecated
739    public Result getRowOrBefore(final byte[] row, final byte[] family)
740        throws IOException {
741      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
742          tableName, row) {
743        @Override
744       public Result call(int callTimeout) throws IOException {
745          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
746          controller.setPriority(tableName);
747          controller.setCallTimeout(callTimeout);
748          ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
749              getLocation().getRegionInfo().getRegionName(), row, family);
750          try {
751            ClientProtos.GetResponse response = getStub().get(controller, request);
752            if (!response.hasResult()) return null;
753            return ProtobufUtil.toResult(response.getResult());
754          } catch (ServiceException se) {
755            throw ProtobufUtil.getRemoteException(se);
756          }
757        }
758      };
759      return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
760          this.operationTimeout);
761    }
762 
763   /**
764    * The underlying {@link HTable} must not be closed.
765    * {@link HTableInterface#getScanner(Scan)} has other usage details.
766    */
767   @Override
768   public ResultScanner getScanner(final Scan scan) throws IOException {
769     if (scan.getBatch() > 0 && scan.isSmall()) {
770       throw new IllegalArgumentException("Small scan should not be used with batching");
771     }
772 
773     if (scan.getCaching() <= 0) {
774       scan.setCaching(getScannerCaching());
775     }
776     if (scan.getMaxResultSize() <= 0) {
777       scan.setMaxResultSize(scannerMaxResultSize);
778     }
779 
780     if (scan.isReversed()) {
781       if (scan.isSmall()) {
782         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
783             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
784             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
785       } else {
786         return new ReversedClientScanner(getConfiguration(), scan, getName(),
787             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
788             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
789       }
790     }
791 
792     if (scan.isSmall()) {
793       return new ClientSmallScanner(getConfiguration(), scan, getName(),
794           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
795           pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
796     } else {
797       return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
798           this.rpcCallerFactory, this.rpcControllerFactory,
799           pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
800     }
801   }
802 
803   /**
804    * The underlying {@link HTable} must not be closed.
805    * {@link HTableInterface#getScanner(byte[])} has other usage details.
806    */
807   @Override
808   public ResultScanner getScanner(byte [] family) throws IOException {
809     Scan scan = new Scan();
810     scan.addFamily(family);
811     return getScanner(scan);
812   }
813 
814   /**
815    * The underlying {@link HTable} must not be closed.
816    * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
817    */
818   @Override
819   public ResultScanner getScanner(byte [] family, byte [] qualifier)
820   throws IOException {
821     Scan scan = new Scan();
822     scan.addColumn(family, qualifier);
823     return getScanner(scan);
824   }
825 
826   /**
827    * {@inheritDoc}
828    */
829   @Override
830   public Result get(final Get get) throws IOException {
831     return get(get, get.isCheckExistenceOnly());
832   }
833 
834   private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
835     // if we are changing settings to the get, clone it.
836     if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
837       get = ReflectionUtils.newInstance(get.getClass(), get);
838       get.setCheckExistenceOnly(checkExistenceOnly);
839       if (get.getConsistency() == null){
840         get.setConsistency(defaultConsistency);
841       }
842     }
843 
844     if (get.getConsistency() == Consistency.STRONG) {
845       // Good old call.
846       final Get getReq = get;
847       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
848           getName(), get.getRow()) {
849         @Override
850         public Result call(int callTimeout) throws IOException {
851           ClientProtos.GetRequest request =
852             RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
853           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
854           controller.setPriority(tableName);
855           controller.setCallTimeout(callTimeout);
856           try {
857             ClientProtos.GetResponse response = getStub().get(controller, request);
858             if (response == null) return null;
859             return ProtobufUtil.toResult(response.getResult());
860           } catch (ServiceException se) {
861             throw ProtobufUtil.getRemoteException(se);
862           }
863         }
864       };
865       return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
866           this.operationTimeout);
867     }
868 
869     // Call that takes into account the replica
870     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
871       rpcControllerFactory, tableName, this.connection, get, pool,
872       connConfiguration.getRetriesNumber(),
873       operationTimeout,
874       connConfiguration.getPrimaryCallTimeoutMicroSecond());
875     return callable.call();
876   }
877 
878 
879   /**
880    * {@inheritDoc}
881    */
882   @Override
883   public Result[] get(List<Get> gets) throws IOException {
884     if (gets.size() == 1) {
885       return new Result[]{get(gets.get(0))};
886     }
887     try {
888       Object [] r1 = batch((List)gets);
889 
890       // translate.
891       Result [] results = new Result[r1.length];
892       int i=0;
893       for (Object o : r1) {
894         // batch ensures if there is a failure we get an exception instead
895         results[i++] = (Result) o;
896       }
897 
898       return results;
899     } catch (InterruptedException e) {
900       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
901     }
902   }
903 
904   /**
905    * {@inheritDoc}
906    */
907   @Override
908   public void batch(final List<? extends Row> actions, final Object[] results)
909       throws InterruptedException, IOException {
910     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
911     ars.waitUntilDone();
912     if (ars.hasError()) {
913       throw ars.getErrors();
914     }
915   }
916 
917   /**
918    * {@inheritDoc}
919    * @deprecated If any exception is thrown by one of the actions, there is no way to
920    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
921    */
922   @Deprecated
923   @Override
924   public Object[] batch(final List<? extends Row> actions)
925      throws InterruptedException, IOException {
926     Object[] results = new Object[actions.size()];
927     batch(actions, results);
928     return results;
929   }
930 
931   /**
932    * {@inheritDoc}
933    */
934   @Override
935   public <R> void batchCallback(
936       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
937       throws IOException, InterruptedException {
938     connection.processBatchCallback(actions, tableName, pool, results, callback);
939   }
940 
941   /**
942    * {@inheritDoc}
943    * @deprecated If any exception is thrown by one of the actions, there is no way to
944    * retrieve the partially executed results. Use
945    * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
946    * instead.
947    */
948   @Deprecated
949   @Override
950   public <R> Object[] batchCallback(
951     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
952       InterruptedException {
953     Object[] results = new Object[actions.size()];
954     batchCallback(actions, results, callback);
955     return results;
956   }
957 
958   /**
959    * {@inheritDoc}
960    */
961   @Override
962   public void delete(final Delete delete)
963   throws IOException {
964     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
965         tableName, delete.getRow()) {
966       @Override
967       public Boolean call(int callTimeout) throws IOException {
968         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
969         controller.setPriority(tableName);
970         controller.setCallTimeout(callTimeout);
971 
972         try {
973           MutateRequest request = RequestConverter.buildMutateRequest(
974             getLocation().getRegionInfo().getRegionName(), delete);
975           MutateResponse response = getStub().mutate(controller, request);
976           return Boolean.valueOf(response.getProcessed());
977         } catch (ServiceException se) {
978           throw ProtobufUtil.getRemoteException(se);
979         }
980       }
981     };
982     rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
983         this.operationTimeout);
984   }
985 
986   /**
987    * {@inheritDoc}
988    */
989   @Override
990   public void delete(final List<Delete> deletes)
991   throws IOException {
992     Object[] results = new Object[deletes.size()];
993     try {
994       batch(deletes, results);
995     } catch (InterruptedException e) {
996       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
997     } finally {
998       // mutate list so that it is empty for complete success, or contains only failed records
999       // results are returned in the same order as the requests in list
1000       // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
1001       for (int i = results.length - 1; i>=0; i--) {
1002         // if result is not null, it succeeded
1003         if (results[i] instanceof Result) {
1004           deletes.remove(i);
1005         }
1006       }
1007     }
1008   }
1009 
1010   /**
1011    * {@inheritDoc}
1012    * @throws IOException
1013    */
1014   @Override
1015   public void put(final Put put) throws IOException {
1016     getBufferedMutator().mutate(put);
1017     if (autoFlush) {
1018       flushCommits();
1019     }
1020   }
1021 
1022   /**
1023    * {@inheritDoc}
1024    * @throws IOException
1025    */
1026   @Override
1027   public void put(final List<Put> puts) throws IOException {
1028     getBufferedMutator().mutate(puts);
1029     if (autoFlush) {
1030       flushCommits();
1031     }
1032   }
1033 
1034   /**
1035    * {@inheritDoc}
1036    */
1037   @Override
1038   public void mutateRow(final RowMutations rm) throws IOException {
1039     RegionServerCallable<Void> callable =
1040         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1041       @Override
1042       public Void call(int callTimeout) throws IOException {
1043         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1044         controller.setPriority(tableName);
1045         controller.setCallTimeout(callTimeout);
1046         try {
1047           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1048             getLocation().getRegionInfo().getRegionName(), rm);
1049           regionMutationBuilder.setAtomic(true);
1050           MultiRequest request =
1051             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1052           ClientProtos.MultiResponse response = getStub().multi(controller, request);
1053           ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1054           if (res.hasException()) {
1055             Throwable ex = ProtobufUtil.toException(res.getException());
1056             if(ex instanceof IOException) {
1057               throw (IOException)ex;
1058             }
1059             throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
1060           }
1061         } catch (ServiceException se) {
1062           throw ProtobufUtil.getRemoteException(se);
1063         }
1064         return null;
1065       }
1066     };
1067     rpcCallerFactory.<Void> newCaller(rpcTimeout).callWithRetries(callable,
1068         this.operationTimeout);
1069   }
1070 
1071   /**
1072    * {@inheritDoc}
1073    */
1074   @Override
1075   public Result append(final Append append) throws IOException {
1076     if (append.numFamilies() == 0) {
1077       throw new IOException(
1078           "Invalid arguments to append, no columns specified");
1079     }
1080 
1081     NonceGenerator ng = this.connection.getNonceGenerator();
1082     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1083     RegionServerCallable<Result> callable =
1084       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1085         @Override
1086         public Result call(int callTimeout) throws IOException {
1087           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1088           controller.setPriority(getTableName());
1089           controller.setCallTimeout(callTimeout);
1090           try {
1091             MutateRequest request = RequestConverter.buildMutateRequest(
1092               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1093             MutateResponse response = getStub().mutate(controller, request);
1094             if (!response.hasResult()) return null;
1095             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1096           } catch (ServiceException se) {
1097             throw ProtobufUtil.getRemoteException(se);
1098           }
1099         }
1100       };
1101     return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
1102         this.operationTimeout);
1103   }
1104 
1105   /**
1106    * {@inheritDoc}
1107    */
1108   @Override
1109   public Result increment(final Increment increment) throws IOException {
1110     if (!increment.hasFamilies()) {
1111       throw new IOException(
1112           "Invalid arguments to increment, no columns specified");
1113     }
1114     NonceGenerator ng = this.connection.getNonceGenerator();
1115     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1116     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1117         getName(), increment.getRow()) {
1118       @Override
1119       public Result call(int callTimeout) throws IOException {
1120         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1121         controller.setPriority(getTableName());
1122         controller.setCallTimeout(callTimeout);
1123         try {
1124           MutateRequest request = RequestConverter.buildMutateRequest(
1125             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1126           MutateResponse response = getStub().mutate(controller, request);
1127           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1128         } catch (ServiceException se) {
1129           throw ProtobufUtil.getRemoteException(se);
1130         }
1131       }
1132     };
1133     return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
1134         this.operationTimeout);
1135   }
1136 
1137   /**
1138    * {@inheritDoc}
1139    */
1140   @Override
1141   public long incrementColumnValue(final byte [] row, final byte [] family,
1142       final byte [] qualifier, final long amount)
1143   throws IOException {
1144     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1145   }
1146 
1147   /**
1148    * @deprecated As of release 0.96
1149    *             (<a href="https://issues.apache.org/jira/browse/HBASE-9508">HBASE-9508</a>).
1150    *             This will be removed in HBase 2.0.0.
1151    *             Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}.
1152    */
1153   @Deprecated
1154   @Override
1155   public long incrementColumnValue(final byte [] row, final byte [] family,
1156       final byte [] qualifier, final long amount, final boolean writeToWAL)
1157   throws IOException {
1158     return incrementColumnValue(row, family, qualifier, amount,
1159       writeToWAL? Durability.SYNC_WAL: Durability.SKIP_WAL);
1160   }
1161 
1162   /**
1163    * {@inheritDoc}
1164    */
1165   @Override
1166   public long incrementColumnValue(final byte [] row, final byte [] family,
1167       final byte [] qualifier, final long amount, final Durability durability)
1168   throws IOException {
1169     NullPointerException npe = null;
1170     if (row == null) {
1171       npe = new NullPointerException("row is null");
1172     } else if (family == null) {
1173       npe = new NullPointerException("family is null");
1174     } else if (qualifier == null) {
1175       npe = new NullPointerException("qualifier is null");
1176     }
1177     if (npe != null) {
1178       throw new IOException(
1179           "Invalid arguments to incrementColumnValue", npe);
1180     }
1181 
1182     NonceGenerator ng = this.connection.getNonceGenerator();
1183     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1184     RegionServerCallable<Long> callable =
1185       new RegionServerCallable<Long>(connection, getName(), row) {
1186         @Override
1187         public Long call(int callTimeout) throws IOException {
1188           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1189           controller.setPriority(getTableName());
1190           controller.setCallTimeout(callTimeout);
1191           try {
1192             MutateRequest request = RequestConverter.buildIncrementRequest(
1193               getLocation().getRegionInfo().getRegionName(), row, family,
1194               qualifier, amount, durability, nonceGroup, nonce);
1195             MutateResponse response = getStub().mutate(controller, request);
1196             Result result =
1197               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1198             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1199           } catch (ServiceException se) {
1200             throw ProtobufUtil.getRemoteException(se);
1201           }
1202         }
1203       };
1204     return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable,
1205         this.operationTimeout);
1206   }
1207 
1208   /**
1209    * {@inheritDoc}
1210    */
1211   @Override
1212   public boolean checkAndPut(final byte [] row,
1213       final byte [] family, final byte [] qualifier, final byte [] value,
1214       final Put put)
1215   throws IOException {
1216     RegionServerCallable<Boolean> callable =
1217       new RegionServerCallable<Boolean>(connection, getName(), row) {
1218         @Override
1219         public Boolean call(int callTimeout) throws IOException {
1220           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1221           controller.setPriority(tableName);
1222           controller.setCallTimeout(callTimeout);
1223           try {
1224             MutateRequest request = RequestConverter.buildMutateRequest(
1225                 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1226                 new BinaryComparator(value), CompareType.EQUAL, put);
1227             MutateResponse response = getStub().mutate(controller, request);
1228             return Boolean.valueOf(response.getProcessed());
1229           } catch (ServiceException se) {
1230             throw ProtobufUtil.getRemoteException(se);
1231           }
1232         }
1233       };
1234     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1235         this.operationTimeout);
1236   }
1237 
1238   /**
1239    * {@inheritDoc}
1240    */
1241   @Override
1242   public boolean checkAndPut(final byte [] row, final byte [] family,
1243       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1244       final Put put)
1245   throws IOException {
1246     RegionServerCallable<Boolean> callable =
1247       new RegionServerCallable<Boolean>(connection, getName(), row) {
1248         @Override
1249         public Boolean call(int callTimeout) throws IOException {
1250           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1251           controller.setPriority(tableName);
1252           controller.setCallTimeout(callTimeout);
1253           try {
1254             CompareType compareType = CompareType.valueOf(compareOp.name());
1255             MutateRequest request = RequestConverter.buildMutateRequest(
1256               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1257                 new BinaryComparator(value), compareType, put);
1258             MutateResponse response = getStub().mutate(controller, request);
1259             return Boolean.valueOf(response.getProcessed());
1260           } catch (ServiceException se) {
1261             throw ProtobufUtil.getRemoteException(se);
1262           }
1263         }
1264       };
1265     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1266         this.operationTimeout);
1267   }
1268 
1269   /**
1270    * {@inheritDoc}
1271    */
1272   @Override
1273   public boolean checkAndDelete(final byte [] row,
1274       final byte [] family, final byte [] qualifier, final byte [] value,
1275       final Delete delete)
1276   throws IOException {
1277     RegionServerCallable<Boolean> callable =
1278       new RegionServerCallable<Boolean>(connection, getName(), row) {
1279         @Override
1280         public Boolean call(int callTimeout) throws IOException {
1281           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1282           controller.setPriority(tableName);
1283           controller.setCallTimeout(callTimeout);
1284           try {
1285             MutateRequest request = RequestConverter.buildMutateRequest(
1286               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1287                 new BinaryComparator(value), CompareType.EQUAL, delete);
1288             MutateResponse response = getStub().mutate(controller, request);
1289             return Boolean.valueOf(response.getProcessed());
1290           } catch (ServiceException se) {
1291             throw ProtobufUtil.getRemoteException(se);
1292           }
1293         }
1294       };
1295     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1296         this.operationTimeout);
1297   }
1298 
1299   /**
1300    * {@inheritDoc}
1301    */
1302   @Override
1303   public boolean checkAndDelete(final byte [] row, final byte [] family,
1304       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1305       final Delete delete)
1306   throws IOException {
1307     RegionServerCallable<Boolean> callable =
1308       new RegionServerCallable<Boolean>(connection, getName(), row) {
1309         @Override
1310         public Boolean call(int callTimeout) throws IOException {
1311           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1312           controller.setPriority(tableName);
1313           controller.setCallTimeout(callTimeout);
1314           try {
1315             CompareType compareType = CompareType.valueOf(compareOp.name());
1316             MutateRequest request = RequestConverter.buildMutateRequest(
1317               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1318                 new BinaryComparator(value), compareType, delete);
1319             MutateResponse response = getStub().mutate(controller, request);
1320             return Boolean.valueOf(response.getProcessed());
1321           } catch (ServiceException se) {
1322             throw ProtobufUtil.getRemoteException(se);
1323           }
1324         }
1325       };
1326     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1327         this.operationTimeout);
1328   }
1329 
1330   /**
1331    * {@inheritDoc}
1332    */
1333   @Override
1334   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
1335       final CompareOp compareOp, final byte [] value, final RowMutations rm)
1336   throws IOException {
1337     RegionServerCallable<Boolean> callable =
1338         new RegionServerCallable<Boolean>(connection, getName(), row) {
1339           @Override
1340           public Boolean call(int callTimeout) throws IOException {
1341             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1342             controller.setPriority(tableName);
1343             controller.setCallTimeout(callTimeout);
1344             try {
1345               CompareType compareType = CompareType.valueOf(compareOp.name());
1346               MultiRequest request = RequestConverter.buildMutateRequest(
1347                   getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1348                   new BinaryComparator(value), compareType, rm);
1349               ClientProtos.MultiResponse response = getStub().multi(controller, request);
1350               ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1351               if (res.hasException()) {
1352                 Throwable ex = ProtobufUtil.toException(res.getException());
1353                 if(ex instanceof IOException) {
1354                   throw (IOException)ex;
1355                 }
1356                 throw new IOException("Failed to checkAndMutate row: "+
1357                     Bytes.toStringBinary(rm.getRow()), ex);
1358               }
1359               return Boolean.valueOf(response.getProcessed());
1360             } catch (ServiceException se) {
1361               throw ProtobufUtil.getRemoteException(se);
1362             }
1363           }
1364         };
1365     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1366         this.operationTimeout);
1367   }
1368 
1369   /**
1370    * {@inheritDoc}
1371    */
1372   @Override
1373   public boolean exists(final Get get) throws IOException {
1374     Result r = get(get, true);
1375     assert r.getExists() != null;
1376     return r.getExists();
1377   }
1378 
1379   /**
1380    * {@inheritDoc}
1381    */
1382   @Override
1383   public boolean[] existsAll(final List<Get> gets) throws IOException {
1384     if (gets.isEmpty()) return new boolean[]{};
1385     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
1386 
1387     ArrayList<Get> exists = new ArrayList<Get>(gets.size());
1388     for (Get g: gets){
1389       Get ge = new Get(g);
1390       ge.setCheckExistenceOnly(true);
1391       exists.add(ge);
1392     }
1393 
1394     Object[] r1;
1395     try {
1396       r1 = batch(exists);
1397     } catch (InterruptedException e) {
1398       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1399     }
1400 
1401     // translate.
1402     boolean[] results = new boolean[r1.length];
1403     int i = 0;
1404     for (Object o : r1) {
1405       // batch ensures if there is a failure we get an exception instead
1406       results[i++] = ((Result)o).getExists();
1407     }
1408 
1409     return results;
1410   }
1411 
1412   /**
1413    * {@inheritDoc}
1414    */
1415   @Override
1416   @Deprecated
1417   public Boolean[] exists(final List<Get> gets) throws IOException {
1418     boolean[] results = existsAll(gets);
1419     Boolean[] objectResults = new Boolean[results.length];
1420     for (int i = 0; i < results.length; ++i) {
1421       objectResults[i] = results[i];
1422     }
1423     return objectResults;
1424   }
1425 
1426   /**
1427    * {@inheritDoc}
1428    * @throws IOException
1429    */
1430   @Override
1431   public void flushCommits() throws IOException {
1432     if (mutator == null) {
1433       // nothing to flush if there's no mutator; don't bother creating one.
1434       return;
1435     }
1436     getBufferedMutator().flush();
1437   }
1438 
1439   /**
1440    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1441    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1442    *
1443    * @param list The collection of actions.
1444    * @param results An empty array, same size as list. If an exception is thrown,
1445    * you can test here for partial results, and to determine which actions
1446    * processed successfully.
1447    * @throws IOException if there are problems talking to META. Per-item
1448    * exceptions are stored in the results array.
1449    */
1450   public <R> void processBatchCallback(
1451     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1452     throws IOException, InterruptedException {
1453     this.batchCallback(list, results, callback);
1454   }
1455 
1456 
1457   /**
1458    * Parameterized batch processing, allowing varying return types for different
1459    * {@link Row} implementations.
1460    */
1461   public void processBatch(final List<? extends Row> list, final Object[] results)
1462     throws IOException, InterruptedException {
1463     this.batch(list, results);
1464   }
1465 
1466 
1467   @Override
1468   public void close() throws IOException {
1469     if (this.closed) {
1470       return;
1471     }
1472     flushCommits();
1473     if (cleanupPoolOnClose) {
1474       this.pool.shutdown();
1475       try {
1476         boolean terminated = false;
1477         do {
1478           // wait until the pool has terminated
1479           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1480         } while (!terminated);
1481       } catch (InterruptedException e) {
1482         this.pool.shutdownNow();
1483         LOG.warn("waitForTermination interrupted");
1484       }
1485     }
1486     if (cleanupConnectionOnClose) {
1487       if (this.connection != null) {
1488         this.connection.close();
1489       }
1490     }
1491     this.closed = true;
1492   }
1493 
1494   // validate for well-formedness
1495   public void validatePut(final Put put) throws IllegalArgumentException {
1496     validatePut(put, connConfiguration.getMaxKeyValueSize());
1497   }
1498 
1499   // validate for well-formedness
1500   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1501     if (put.isEmpty()) {
1502       throw new IllegalArgumentException("No columns to insert");
1503     }
1504     if (maxKeyValueSize > 0) {
1505       for (List<Cell> list : put.getFamilyCellMap().values()) {
1506         for (Cell cell : list) {
1507           if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1508             throw new IllegalArgumentException("KeyValue size too large");
1509           }
1510         }
1511       }
1512     }
1513   }
1514 
1515   /**
1516    * {@inheritDoc}
1517    */
1518   @Override
1519   public boolean isAutoFlush() {
1520     return autoFlush;
1521   }
1522 
1523   /**
1524    * {@inheritDoc}
1525    */
1526   @Deprecated
1527   @Override
1528   public void setAutoFlush(boolean autoFlush) {
1529     this.autoFlush = autoFlush;
1530   }
1531 
1532   /**
1533    * {@inheritDoc}
1534    */
1535   @Override
1536   public void setAutoFlushTo(boolean autoFlush) {
1537     this.autoFlush = autoFlush;
1538   }
1539 
1540   /**
1541    * {@inheritDoc}
1542    */
1543   @Override
1544   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1545     this.autoFlush = autoFlush;
1546   }
1547 
1548   /**
1549    * Returns the maximum size in bytes of the write buffer for this HTable.
1550    * <p>
1551    * The default value comes from the configuration parameter
1552    * {@code hbase.client.write.buffer}.
1553    * @return The size of the write buffer in bytes.
1554    */
1555   @Override
1556   public long getWriteBufferSize() {
1557     if (mutator == null) {
1558       return connConfiguration.getWriteBufferSize();
1559     } else {
1560       return mutator.getWriteBufferSize();
1561     }
1562   }
1563 
1564   /**
1565    * Sets the size of the buffer in bytes.
1566    * <p>
1567    * If the new size is less than the current amount of data in the
1568    * write buffer, the buffer gets flushed.
1569    * @param writeBufferSize The new write buffer size, in bytes.
1570    * @throws IOException if a remote or network exception occurs.
1571    */
1572   @Override
1573   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1574     getBufferedMutator();
1575     mutator.setWriteBufferSize(writeBufferSize);
1576   }
1577 
1578   /**
1579    * The pool is used for mutli requests for this HTable
1580    * @return the pool used for mutli
1581    */
1582   ExecutorService getPool() {
1583     return this.pool;
1584   }
1585 
1586   /**
1587    * Enable or disable region cache prefetch for the table. It will be
1588    * applied for the given table's all HTable instances who share the same
1589    * connection. By default, the cache prefetch is enabled.
1590    * @param tableName name of table to configure.
1591    * @param enable Set to true to enable region cache prefetch. Or set to
1592    * false to disable it.
1593    * @throws IOException
1594    * @deprecated does nothing since 0.99
1595    */
1596   @Deprecated
1597   public static void setRegionCachePrefetch(final byte[] tableName,
1598       final boolean enable)  throws IOException {
1599   }
1600 
1601   /**
1602    * @deprecated does nothing since 0.99
1603    */
1604   @Deprecated
1605   public static void setRegionCachePrefetch(
1606       final TableName tableName,
1607       final boolean enable) throws IOException {
1608   }
1609 
1610   /**
1611    * Enable or disable region cache prefetch for the table. It will be
1612    * applied for the given table's all HTable instances who share the same
1613    * connection. By default, the cache prefetch is enabled.
1614    * @param conf The Configuration object to use.
1615    * @param tableName name of table to configure.
1616    * @param enable Set to true to enable region cache prefetch. Or set to
1617    * false to disable it.
1618    * @throws IOException
1619    * @deprecated does nothing since 0.99
1620    */
1621   @Deprecated
1622   public static void setRegionCachePrefetch(final Configuration conf,
1623       final byte[] tableName, final boolean enable) throws IOException {
1624   }
1625 
1626   /**
1627    * @deprecated does nothing since 0.99
1628    */
1629   @Deprecated
1630   public static void setRegionCachePrefetch(final Configuration conf,
1631       final TableName tableName,
1632       final boolean enable) throws IOException {
1633   }
1634 
1635   /**
1636    * Check whether region cache prefetch is enabled or not for the table.
1637    * @param conf The Configuration object to use.
1638    * @param tableName name of table to check
1639    * @return true if table's region cache prefecth is enabled. Otherwise
1640    * it is disabled.
1641    * @throws IOException
1642    * @deprecated always return false since 0.99
1643    */
1644   @Deprecated
1645   public static boolean getRegionCachePrefetch(final Configuration conf,
1646       final byte[] tableName) throws IOException {
1647     return false;
1648   }
1649 
1650   /**
1651    * @deprecated always return false since 0.99
1652    */
1653   @Deprecated
1654   public static boolean getRegionCachePrefetch(final Configuration conf,
1655       final TableName tableName) throws IOException {
1656     return false;
1657   }
1658 
1659   /**
1660    * Check whether region cache prefetch is enabled or not for the table.
1661    * @param tableName name of table to check
1662    * @return true if table's region cache prefecth is enabled. Otherwise
1663    * it is disabled.
1664    * @throws IOException
1665    * @deprecated always return false since 0.99
1666    */
1667   @Deprecated
1668   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1669     return false;
1670   }
1671 
1672   /**
1673    * @deprecated always return false since 0.99
1674    */
1675   @Deprecated
1676   public static boolean getRegionCachePrefetch(
1677       final TableName tableName) throws IOException {
1678     return false;
1679   }
1680 
1681   /**
1682    * Explicitly clears the region cache to fetch the latest value from META.
1683    * This is a power user function: avoid unless you know the ramifications.
1684    */
1685   public void clearRegionCache() {
1686     this.connection.clearRegionCache();
1687   }
1688 
1689   /**
1690    * {@inheritDoc}
1691    */
1692   @Override
1693   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1694     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1695   }
1696 
1697   /**
1698    * {@inheritDoc}
1699    */
1700   @Override
1701   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1702       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1703       throws ServiceException, Throwable {
1704     final Map<byte[],R> results =  Collections.synchronizedMap(
1705         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1706     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1707       @Override
1708       public void update(byte[] region, byte[] row, R value) {
1709         if (region != null) {
1710           results.put(region, value);
1711         }
1712       }
1713     });
1714     return results;
1715   }
1716 
1717   /**
1718    * {@inheritDoc}
1719    */
1720   @Override
1721   public <T extends Service, R> void coprocessorService(final Class<T> service,
1722       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1723       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1724 
1725     // get regions covered by the row range
1726     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1727 
1728     Map<byte[],Future<R>> futures =
1729         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1730     for (final byte[] r : keys) {
1731       final RegionCoprocessorRpcChannel channel =
1732           new RegionCoprocessorRpcChannel(connection, tableName, r);
1733       Future<R> future = pool.submit(
1734           new Callable<R>() {
1735             @Override
1736             public R call() throws Exception {
1737               T instance = ProtobufUtil.newServiceStub(service, channel);
1738               R result = callable.call(instance);
1739               byte[] region = channel.getLastRegion();
1740               if (callback != null) {
1741                 callback.update(region, r, result);
1742               }
1743               return result;
1744             }
1745           });
1746       futures.put(r, future);
1747     }
1748     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1749       try {
1750         e.getValue().get();
1751       } catch (ExecutionException ee) {
1752         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1753             + Bytes.toStringBinary(e.getKey()), ee);
1754         throw ee.getCause();
1755       } catch (InterruptedException ie) {
1756         throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1757             + " for row " + Bytes.toStringBinary(e.getKey()))
1758             .initCause(ie);
1759       }
1760     }
1761   }
1762 
1763   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1764   throws IOException {
1765     if (start == null) {
1766       start = HConstants.EMPTY_START_ROW;
1767     }
1768     if (end == null) {
1769       end = HConstants.EMPTY_END_ROW;
1770     }
1771     return getKeysAndRegionsInRange(start, end, true).getFirst();
1772   }
1773 
1774   public void setOperationTimeout(int operationTimeout) {
1775     this.operationTimeout = operationTimeout;
1776   }
1777 
1778   public int getOperationTimeout() {
1779     return operationTimeout;
1780   }
1781 
1782   public void setRpcTimeout(int rpcTimeout) {
1783     this.rpcTimeout = rpcTimeout;
1784   }
1785 
1786   public int getRpcTimeout() {
1787     return rpcTimeout;
1788   }
1789 
1790   @Override
1791   public String toString() {
1792     return tableName + ";" + connection;
1793   }
1794 
1795   /**
1796    * {@inheritDoc}
1797    */
1798   @Override
1799   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1800       Descriptors.MethodDescriptor methodDescriptor, Message request,
1801       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1802     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1803         Bytes.BYTES_COMPARATOR));
1804     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1805         new Callback<R>() {
1806 
1807           @Override
1808           public void update(byte[] region, byte[] row, R result) {
1809             if (region != null) {
1810               results.put(region, result);
1811             }
1812           }
1813         });
1814     return results;
1815   }
1816 
1817   /**
1818    * {@inheritDoc}
1819    */
1820   @Override
1821   public <R extends Message> void batchCoprocessorService(
1822       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1823       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1824       throws ServiceException, Throwable {
1825 
1826     if (startKey == null) {
1827       startKey = HConstants.EMPTY_START_ROW;
1828     }
1829     if (endKey == null) {
1830       endKey = HConstants.EMPTY_END_ROW;
1831     }
1832     // get regions covered by the row range
1833     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1834         getKeysAndRegionsInRange(startKey, endKey, true);
1835     List<byte[]> keys = keysAndRegions.getFirst();
1836     List<HRegionLocation> regions = keysAndRegions.getSecond();
1837 
1838     // check if we have any calls to make
1839     if (keys.isEmpty()) {
1840       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1841           ", end=" + Bytes.toStringBinary(endKey));
1842       return;
1843     }
1844 
1845     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1846     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1847         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1848     for (int i = 0; i < keys.size(); i++) {
1849       final byte[] rowKey = keys.get(i);
1850       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1851       RegionCoprocessorServiceExec exec =
1852           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1853       execs.add(exec);
1854       execsByRow.put(rowKey, exec);
1855     }
1856 
1857     // tracking for any possible deserialization errors on success callback
1858     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1859     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1860     final List<Row> callbackErrorActions = new ArrayList<Row>();
1861     final List<String> callbackErrorServers = new ArrayList<String>();
1862     Object[] results = new Object[execs.size()];
1863 
1864     AsyncProcess asyncProcess =
1865         new AsyncProcess(connection, configuration, pool,
1866             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1867             true, RpcControllerFactory.instantiate(configuration));
1868 
1869     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1870         new Callback<ClientProtos.CoprocessorServiceResult>() {
1871           @Override
1872           public void update(byte[] region, byte[] row,
1873                               ClientProtos.CoprocessorServiceResult serviceResult) {
1874             if (LOG.isTraceEnabled()) {
1875               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1876                   ": region=" + Bytes.toStringBinary(region) +
1877                   ", row=" + Bytes.toStringBinary(row) +
1878                   ", value=" + serviceResult.getValue().getValue());
1879             }
1880             try {
1881               Message.Builder builder = responsePrototype.newBuilderForType();
1882               ProtobufUtil.mergeFrom(builder, serviceResult.getValue().getValue());
1883               callback.update(region, row, (R) builder.build());
1884             } catch (IOException e) {
1885               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1886                   e);
1887               callbackErrorExceptions.add(e);
1888               callbackErrorActions.add(execsByRow.get(row));
1889               callbackErrorServers.add("null");
1890             }
1891           }
1892         }, results);
1893 
1894     future.waitUntilDone();
1895 
1896     if (future.hasError()) {
1897       throw future.getErrors();
1898     } else if (!callbackErrorExceptions.isEmpty()) {
1899       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1900           callbackErrorServers);
1901     }
1902   }
1903 
1904   public RegionLocator getRegionLocator() {
1905     return this.locator;
1906   }
1907 
1908   @VisibleForTesting
1909   BufferedMutator getBufferedMutator() throws IOException {
1910     if (mutator == null) {
1911       this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
1912           new BufferedMutatorParams(tableName)
1913               .pool(pool)
1914               .writeBufferSize(connConfiguration.getWriteBufferSize())
1915               .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
1916       );
1917     }
1918     return mutator;
1919   }
1920 }