View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.NavigableMap;
28  import java.util.TreeMap;
29  import java.util.concurrent.Callable;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ExecutorService;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.SynchronousQueue;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.TimeUnit;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HRegionLocation;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.KeyValueUtil;
47  import org.apache.hadoop.hbase.MetaTableAccessor;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.TableNotFoundException;
51  import org.apache.hadoop.hbase.classification.InterfaceAudience;
52  import org.apache.hadoop.hbase.classification.InterfaceStability;
53  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
54  import org.apache.hadoop.hbase.client.coprocessor.Batch;
55  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
56  import org.apache.hadoop.hbase.filter.BinaryComparator;
57  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
58  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
59  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
60  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
61  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
62  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
63  import org.apache.hadoop.hbase.protobuf.RequestConverter;
64  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
65  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
66  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
69  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
70  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
71  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
72  import org.apache.hadoop.hbase.util.Bytes;
73  import org.apache.hadoop.hbase.util.Pair;
74  import org.apache.hadoop.hbase.util.Threads;
75  
76  import com.google.common.annotations.VisibleForTesting;
77  import com.google.protobuf.Descriptors;
78  import com.google.protobuf.InvalidProtocolBufferException;
79  import com.google.protobuf.Message;
80  import com.google.protobuf.Service;
81  import com.google.protobuf.ServiceException;
82  
83  /**
84   * An implementation of {@link Table}. Used to communicate with a single HBase table.
85   * Lightweight. Get as needed and just close when done.
86   * Instances of this class SHOULD NOT be constructed directly.
87   * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
88   * class comment for an example of how.
89   *
90   * <p>This class is NOT thread safe for reads nor writes.
91   * In the case of writes (Put, Delete), the underlying write buffer can
92   * be corrupted if multiple threads contend over a single HTable instance.
93   * In the case of reads, some fields used by a Scan are shared among all threads.
94   *
95   * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
96   * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
97   * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
98   * Interface Classification</a>
99   * There are no guarantees for backwards source / binary compatibility and methods or class can
100  * change or go away without deprecation.
101  *
102  * @see Table
103  * @see Admin
104  * @see Connection
105  * @see ConnectionFactory
106  */
107 @InterfaceAudience.Private
108 @InterfaceStability.Stable
109 public class HTable implements HTableInterface {
110   private static final Log LOG = LogFactory.getLog(HTable.class);
111   protected ClusterConnection connection;
112   private final TableName tableName;
113   private volatile Configuration configuration;
114   private TableConfiguration tableConfiguration;
115   protected BufferedMutatorImpl mutator;
116   private boolean autoFlush = true;
117   private boolean closed = false;
118   protected int scannerCaching;
119   protected long scannerMaxResultSize;
120   private ExecutorService pool;  // For Multi & Scan
121   private int operationTimeout;
122   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
123   private final boolean cleanupConnectionOnClose; // close the connection in close()
124   private Consistency defaultConsistency = Consistency.STRONG;
125   private HRegionLocator locator;
126 
127   /** The Async process for batch */
128   protected AsyncProcess multiAp;
129   private RpcRetryingCallerFactory rpcCallerFactory;
130   private RpcControllerFactory rpcControllerFactory;
131 
132   // Marked Private @since 1.0
133   @InterfaceAudience.Private
134   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
135     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
136     if (maxThreads == 0) {
137       maxThreads = 1; // is there a better default?
138     }
139     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
140 
141     // Using the "direct handoff" approach, new threads will only be created
142     // if it is necessary and will grow unbounded. This could be bad but in HCM
143     // we only create as many Runnables as there are region servers. It means
144     // it also scales when new region servers are added.
145     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
146         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
147     pool.allowCoreThreadTimeOut(true);
148     return pool;
149   }
150 
151   /**
152    * Creates an object to access a HBase table.
153    * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
154    * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
155    * @param tableName Name of the table.
156    * @param connection HConnection to be used.
157    * @param pool ExecutorService to be used.
158    * @throws IOException if a remote or network exception occurs
159    */
160   @InterfaceAudience.Private
161   protected HTable(TableName tableName, final ClusterConnection connection,
162       final TableConfiguration tableConfig,
163       final RpcRetryingCallerFactory rpcCallerFactory,
164       final RpcControllerFactory rpcControllerFactory,
165       final ExecutorService pool) throws IOException {
166     if (connection == null || connection.isClosed()) {
167       throw new IllegalArgumentException("Connection is null or closed.");
168     }
169     this.tableName = tableName;
170     this.cleanupConnectionOnClose = false;
171     this.connection = connection;
172     this.configuration = connection.getConfiguration();
173     this.tableConfiguration = tableConfig;
174     this.pool = pool;
175     if (pool == null) {
176       this.pool = getDefaultExecutor(this.configuration);
177       this.cleanupPoolOnClose = true;
178     } else {
179       this.cleanupPoolOnClose = false;
180     }
181 
182     this.rpcCallerFactory = rpcCallerFactory;
183     this.rpcControllerFactory = rpcControllerFactory;
184 
185     this.finishSetup();
186   }
187 
188   /**
189    * For internal testing. Uses Connection provided in {@code params}.
190    * @throws IOException
191    */
192   @VisibleForTesting
193   protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
194     connection = conn;
195     tableName = params.getTableName();
196     tableConfiguration = new TableConfiguration(connection.getConfiguration());
197     cleanupPoolOnClose = false;
198     cleanupConnectionOnClose = false;
199     // used from tests, don't trust the connection is real
200     this.mutator = new BufferedMutatorImpl(conn, null, null, params);
201   }
202 
203   /**
204    * @return maxKeyValueSize from configuration.
205    */
206   public static int getMaxKeyValueSize(Configuration conf) {
207     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
208   }
209 
210   /**
211    * setup this HTable's parameter based on the passed configuration
212    */
213   private void finishSetup() throws IOException {
214     if (tableConfiguration == null) {
215       tableConfiguration = new TableConfiguration(configuration);
216     }
217 
218     this.operationTimeout = tableName.isSystemTable() ?
219         tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
220     this.scannerCaching = tableConfiguration.getScannerCaching();
221     this.scannerMaxResultSize = tableConfiguration.getScannerMaxResultSize();
222     if (this.rpcCallerFactory == null) {
223       this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
224     }
225     if (this.rpcControllerFactory == null) {
226       this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
227     }
228 
229     // puts need to track errors globally due to how the APIs currently work.
230     multiAp = this.connection.getAsyncProcess();
231     this.locator = new HRegionLocator(getName(), connection);
232   }
233 
234   /**
235    * {@inheritDoc}
236    */
237   @Override
238   public Configuration getConfiguration() {
239     return configuration;
240   }
241 
242   /**
243    * Tells whether or not a table is enabled or not. This method creates a
244    * new HBase configuration, so it might make your unit tests fail due to
245    * incorrect ZK client port.
246    * @param tableName Name of table to check.
247    * @return {@code true} if table is online.
248    * @throws IOException if a remote or network exception occurs
249    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
250    */
251   @Deprecated
252   public static boolean isTableEnabled(String tableName) throws IOException {
253     return isTableEnabled(TableName.valueOf(tableName));
254   }
255 
256   /**
257    * Tells whether or not a table is enabled or not. This method creates a
258    * new HBase configuration, so it might make your unit tests fail due to
259    * incorrect ZK client port.
260    * @param tableName Name of table to check.
261    * @return {@code true} if table is online.
262    * @throws IOException if a remote or network exception occurs
263    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
264    */
265   @Deprecated
266   public static boolean isTableEnabled(byte[] tableName) throws IOException {
267     return isTableEnabled(TableName.valueOf(tableName));
268   }
269 
270   /**
271    * Tells whether or not a table is enabled or not. This method creates a
272    * new HBase configuration, so it might make your unit tests fail due to
273    * incorrect ZK client port.
274    * @param tableName Name of table to check.
275    * @return {@code true} if table is online.
276    * @throws IOException if a remote or network exception occurs
277    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
278    */
279   @Deprecated
280   public static boolean isTableEnabled(TableName tableName) throws IOException {
281     return isTableEnabled(HBaseConfiguration.create(), tableName);
282   }
283 
284   /**
285    * Tells whether or not a table is enabled or not.
286    * @param conf The Configuration object to use.
287    * @param tableName Name of table to check.
288    * @return {@code true} if table is online.
289    * @throws IOException if a remote or network exception occurs
290    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
291    */
292   @Deprecated
293   public static boolean isTableEnabled(Configuration conf, String tableName)
294   throws IOException {
295     return isTableEnabled(conf, TableName.valueOf(tableName));
296   }
297 
298   /**
299    * Tells whether or not a table is enabled or not.
300    * @param conf The Configuration object to use.
301    * @param tableName Name of table to check.
302    * @return {@code true} if table is online.
303    * @throws IOException if a remote or network exception occurs
304    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
305    */
306   @Deprecated
307   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
308   throws IOException {
309     return isTableEnabled(conf, TableName.valueOf(tableName));
310   }
311 
312   /**
313    * Tells whether or not a table is enabled or not.
314    * @param conf The Configuration object to use.
315    * @param tableName Name of table to check.
316    * @return {@code true} if table is online.
317    * @throws IOException if a remote or network exception occurs
318    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
319    */
320   @Deprecated
321   public static boolean isTableEnabled(Configuration conf,
322       final TableName tableName) throws IOException {
323     try(Connection conn = ConnectionFactory.createConnection(conf)) {
324       return conn.getAdmin().isTableEnabled(tableName);
325     }
326   }
327 
328   /**
329    * Find region location hosting passed row using cached info
330    * @param row Row to find.
331    * @return The location of the given row.
332    * @throws IOException if a remote or network exception occurs
333    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])}
334    */
335   @Deprecated
336   public HRegionLocation getRegionLocation(final String row)
337   throws IOException {
338     return getRegionLocation(Bytes.toBytes(row), false);
339   }
340 
341   /**
342    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])} instead.
343    */
344   @Deprecated
345   public HRegionLocation getRegionLocation(final byte [] row)
346   throws IOException {
347     return locator.getRegionLocation(row);
348   }
349 
350   /**
351    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[], boolean)} instead.
352    */
353   @Deprecated
354   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
355   throws IOException {
356     return locator.getRegionLocation(row, reload);
357   }
358 
359   /**
360    * {@inheritDoc}
361    */
362   @Override
363   public byte [] getTableName() {
364     return this.tableName.getName();
365   }
366 
367   @Override
368   public TableName getName() {
369     return tableName;
370   }
371 
372   /**
373    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
374    * manipulations.
375    * @return An HConnection instance.
376    * @deprecated This method will be changed from public to package protected.
377    */
378   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
379   @Deprecated
380   @VisibleForTesting
381   public HConnection getConnection() {
382     return this.connection;
383   }
384 
385   /**
386    * Kept in 0.96 for backward compatibility
387    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
388    */
389   @Deprecated
390   public List<Row> getWriteBuffer() {
391     return mutator == null ? null : mutator.getWriteBuffer();
392   }
393 
394   /**
395    * {@inheritDoc}
396    */
397   @Override
398   public HTableDescriptor getTableDescriptor() throws IOException {
399     // TODO: This is the same as HBaseAdmin.getTableDescriptor(). Only keep one.
400     if (tableName == null) return null;
401     if (tableName.equals(TableName.META_TABLE_NAME)) {
402       return HTableDescriptor.META_TABLEDESC;
403     }
404     HTableDescriptor htd = executeMasterCallable(
405       new MasterCallable<HTableDescriptor>(getConnection()) {
406       @Override
407       public HTableDescriptor call(int callTimeout) throws ServiceException {
408         GetTableDescriptorsResponse htds;
409         GetTableDescriptorsRequest req =
410             RequestConverter.buildGetTableDescriptorsRequest(tableName);
411         htds = master.getTableDescriptors(null, req);
412 
413         if (!htds.getTableSchemaList().isEmpty()) {
414           return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
415         }
416         return null;
417       }
418     });
419     if (htd != null) {
420       return new UnmodifyableHTableDescriptor(htd);
421     }
422     throw new TableNotFoundException(tableName.getNameAsString());
423   }
424 
425   private <V> V executeMasterCallable(MasterCallable<V> callable) throws IOException {
426     RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller();
427     try {
428       return caller.callWithRetries(callable, operationTimeout);
429     } finally {
430       callable.close();
431     }
432   }
433 
434   /**
435    * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead;
436    */
437   @Deprecated
438   public byte [][] getStartKeys() throws IOException {
439     return locator.getStartKeys();
440   }
441 
442   /**
443    * @deprecated Use {@link RegionLocator#getEndKeys()} instead;
444    */
445   @Deprecated
446   public byte[][] getEndKeys() throws IOException {
447     return locator.getEndKeys();
448   }
449 
450   /**
451    * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead;
452    */
453   @Deprecated
454   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
455     return locator.getStartEndKeys();
456   }
457 
458   /**
459    * Gets all the regions and their address for this table.
460    * <p>
461    * This is mainly useful for the MapReduce integration.
462    * @return A map of HRegionInfo with it's server address
463    * @throws IOException if a remote or network exception occurs
464    * @deprecated This is no longer a public API.  Use {@link #getAllRegionLocations()} instead.
465    */
466   @SuppressWarnings("deprecation")
467   @Deprecated
468   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
469     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocator, singular,
470     // returns an HRegionLocation.
471     return MetaTableAccessor.allTableRegions(this.connection, getName());
472   }
473 
474   /**
475    * Gets all the regions and their address for this table.
476    * <p>
477    * This is mainly useful for the MapReduce integration.
478    * @return A map of HRegionInfo with it's server address
479    * @throws IOException if a remote or network exception occurs
480    *
481    * @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead;
482    */
483   @Deprecated
484   public List<HRegionLocation> getAllRegionLocations() throws IOException {
485     return locator.getAllRegionLocations();
486   }
487 
488   /**
489    * Get the corresponding regions for an arbitrary range of keys.
490    * <p>
491    * @param startKey Starting row in range, inclusive
492    * @param endKey Ending row in range, exclusive
493    * @return A list of HRegionLocations corresponding to the regions that
494    * contain the specified range
495    * @throws IOException if a remote or network exception occurs
496    * @deprecated This is no longer a public API
497    */
498   @Deprecated
499   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
500     final byte [] endKey) throws IOException {
501     return getRegionsInRange(startKey, endKey, false);
502   }
503 
504   /**
505    * Get the corresponding regions for an arbitrary range of keys.
506    * <p>
507    * @param startKey Starting row in range, inclusive
508    * @param endKey Ending row in range, exclusive
509    * @param reload true to reload information or false to use cached information
510    * @return A list of HRegionLocations corresponding to the regions that
511    * contain the specified range
512    * @throws IOException if a remote or network exception occurs
513    * @deprecated This is no longer a public API
514    */
515   @Deprecated
516   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
517       final byte [] endKey, final boolean reload) throws IOException {
518     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
519   }
520 
521   /**
522    * Get the corresponding start keys and regions for an arbitrary range of
523    * keys.
524    * <p>
525    * @param startKey Starting row in range, inclusive
526    * @param endKey Ending row in range
527    * @param includeEndKey true if endRow is inclusive, false if exclusive
528    * @return A pair of list of start keys and list of HRegionLocations that
529    *         contain the specified range
530    * @throws IOException if a remote or network exception occurs
531    * @deprecated This is no longer a public API
532    */
533   @Deprecated
534   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
535       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
536       throws IOException {
537     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
538   }
539 
540   /**
541    * Get the corresponding start keys and regions for an arbitrary range of
542    * keys.
543    * <p>
544    * @param startKey Starting row in range, inclusive
545    * @param endKey Ending row in range
546    * @param includeEndKey true if endRow is inclusive, false if exclusive
547    * @param reload true to reload information or false to use cached information
548    * @return A pair of list of start keys and list of HRegionLocations that
549    *         contain the specified range
550    * @throws IOException if a remote or network exception occurs
551    * @deprecated This is no longer a public API
552    */
553   @Deprecated
554   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
555       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
556       final boolean reload) throws IOException {
557     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
558     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
559       throw new IllegalArgumentException(
560         "Invalid range: " + Bytes.toStringBinary(startKey) +
561         " > " + Bytes.toStringBinary(endKey));
562     }
563     List<byte[]> keysInRange = new ArrayList<byte[]>();
564     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
565     byte[] currentKey = startKey;
566     do {
567       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
568       keysInRange.add(currentKey);
569       regionsInRange.add(regionLocation);
570       currentKey = regionLocation.getRegionInfo().getEndKey();
571     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
572         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
573             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
574     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
575         regionsInRange);
576   }
577 
578   /**
579    * {@inheritDoc}
580    * @deprecated Use reversed scan instead.
581    */
582    @Override
583    @Deprecated
584    public Result getRowOrBefore(final byte[] row, final byte[] family)
585        throws IOException {
586      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
587          tableName, row) {
588        @Override
589       public Result call(int callTimeout) throws IOException {
590          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
591          controller.setPriority(tableName);
592          controller.setCallTimeout(callTimeout);
593          ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
594              getLocation().getRegionInfo().getRegionName(), row, family);
595          try {
596            ClientProtos.GetResponse response = getStub().get(controller, request);
597            if (!response.hasResult()) return null;
598            return ProtobufUtil.toResult(response.getResult());
599          } catch (ServiceException se) {
600            throw ProtobufUtil.getRemoteException(se);
601          }
602        }
603      };
604      return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
605    }
606 
607   /**
608    * The underlying {@link HTable} must not be closed.
609    * {@link HTableInterface#getScanner(Scan)} has other usage details.
610    */
611   @Override
612   public ResultScanner getScanner(final Scan scan) throws IOException {
613     if (scan.getBatch() > 0 && scan.isSmall()) {
614       throw new IllegalArgumentException("Small scan should not be used with batching");
615     }
616     if (scan.getCaching() <= 0) {
617       scan.setCaching(scannerCaching);
618     }
619     if (scan.getMaxResultSize() <= 0) {
620       scan.setMaxResultSize(scannerMaxResultSize);
621     }
622 
623     if (scan.isReversed()) {
624       if (scan.isSmall()) {
625         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
626             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
627             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
628       } else {
629         return new ReversedClientScanner(getConfiguration(), scan, getName(),
630             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
631             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
632       }
633     }
634 
635     if (scan.isSmall()) {
636       return new ClientSmallScanner(getConfiguration(), scan, getName(),
637           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
638           pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
639     } else {
640       return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
641           this.rpcCallerFactory, this.rpcControllerFactory,
642           pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
643     }
644   }
645 
646   /**
647    * The underlying {@link HTable} must not be closed.
648    * {@link HTableInterface#getScanner(byte[])} has other usage details.
649    */
650   @Override
651   public ResultScanner getScanner(byte [] family) throws IOException {
652     Scan scan = new Scan();
653     scan.addFamily(family);
654     return getScanner(scan);
655   }
656 
657   /**
658    * The underlying {@link HTable} must not be closed.
659    * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
660    */
661   @Override
662   public ResultScanner getScanner(byte [] family, byte [] qualifier)
663   throws IOException {
664     Scan scan = new Scan();
665     scan.addColumn(family, qualifier);
666     return getScanner(scan);
667   }
668 
669   /**
670    * {@inheritDoc}
671    */
672   @Override
673   public Result get(final Get get) throws IOException {
674     if (get.getConsistency() == null){
675       get.setConsistency(defaultConsistency);
676     }
677 
678     if (get.getConsistency() == Consistency.STRONG) {
679       // Good old call.
680       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
681           getName(), get.getRow()) {
682         @Override
683         public Result call(int callTimeout) throws IOException {
684           ClientProtos.GetRequest request =
685               RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
686           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
687           controller.setPriority(tableName);
688           controller.setCallTimeout(callTimeout);
689           try {
690             ClientProtos.GetResponse response = getStub().get(controller, request);
691             if (response == null) return null;
692             return ProtobufUtil.toResult(response.getResult());
693           } catch (ServiceException se) {
694             throw ProtobufUtil.getRemoteException(se);
695           }
696         }
697       };
698       return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
699     }
700 
701     // Call that takes into account the replica
702     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
703       rpcControllerFactory, tableName, this.connection, get, pool,
704       tableConfiguration.getRetriesNumber(),
705       operationTimeout,
706       tableConfiguration.getPrimaryCallTimeoutMicroSecond());
707     return callable.call();
708   }
709 
710 
711   /**
712    * {@inheritDoc}
713    */
714   @Override
715   public Result[] get(List<Get> gets) throws IOException {
716     if (gets.size() == 1) {
717       return new Result[]{get(gets.get(0))};
718     }
719     try {
720       Object [] r1 = batch((List)gets);
721 
722       // translate.
723       Result [] results = new Result[r1.length];
724       int i=0;
725       for (Object o : r1) {
726         // batch ensures if there is a failure we get an exception instead
727         results[i++] = (Result) o;
728       }
729 
730       return results;
731     } catch (InterruptedException e) {
732       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
733     }
734   }
735 
736   /**
737    * {@inheritDoc}
738    */
739   @Override
740   public void batch(final List<? extends Row> actions, final Object[] results)
741       throws InterruptedException, IOException {
742     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
743     ars.waitUntilDone();
744     if (ars.hasError()) {
745       throw ars.getErrors();
746     }
747   }
748 
749   /**
750    * {@inheritDoc}
751    * @deprecated If any exception is thrown by one of the actions, there is no way to
752    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
753    */
754   @Deprecated
755   @Override
756   public Object[] batch(final List<? extends Row> actions)
757      throws InterruptedException, IOException {
758     Object[] results = new Object[actions.size()];
759     batch(actions, results);
760     return results;
761   }
762 
763   /**
764    * {@inheritDoc}
765    */
766   @Override
767   public <R> void batchCallback(
768       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
769       throws IOException, InterruptedException {
770     connection.processBatchCallback(actions, tableName, pool, results, callback);
771   }
772 
773   /**
774    * {@inheritDoc}
775    * @deprecated If any exception is thrown by one of the actions, there is no way to
776    * retrieve the partially executed results. Use
777    * {@link #batchCallback(List, Object[], Batch.Callback)}
778    * instead.
779    */
780   @Deprecated
781   @Override
782   public <R> Object[] batchCallback(
783     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
784       InterruptedException {
785     Object[] results = new Object[actions.size()];
786     batchCallback(actions, results, callback);
787     return results;
788   }
789 
790   /**
791    * {@inheritDoc}
792    */
793   @Override
794   public void delete(final Delete delete)
795   throws IOException {
796     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
797         tableName, delete.getRow()) {
798       @Override
799       public Boolean call(int callTimeout) throws IOException {
800         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
801         controller.setPriority(tableName);
802         controller.setCallTimeout(callTimeout);
803 
804         try {
805           MutateRequest request = RequestConverter.buildMutateRequest(
806             getLocation().getRegionInfo().getRegionName(), delete);
807           MutateResponse response = getStub().mutate(controller, request);
808           return Boolean.valueOf(response.getProcessed());
809         } catch (ServiceException se) {
810           throw ProtobufUtil.getRemoteException(se);
811         }
812       }
813     };
814     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
815   }
816 
817   /**
818    * {@inheritDoc}
819    */
820   @Override
821   public void delete(final List<Delete> deletes)
822   throws IOException {
823     Object[] results = new Object[deletes.size()];
824     try {
825       batch(deletes, results);
826     } catch (InterruptedException e) {
827       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
828     } finally {
829       // mutate list so that it is empty for complete success, or contains only failed records
830       // results are returned in the same order as the requests in list walk the list backwards,
831       // so we can remove from list without impacting the indexes of earlier members
832       for (int i = results.length - 1; i>=0; i--) {
833         // if result is not null, it succeeded
834         if (results[i] instanceof Result) {
835           deletes.remove(i);
836         }
837       }
838     }
839   }
840 
841   /**
842    * {@inheritDoc}
843    * @throws IOException
844    */
845   @Override
846   public void put(final Put put) throws IOException {
847     getBufferedMutator().mutate(put);
848     if (autoFlush) {
849       flushCommits();
850     }
851   }
852 
853   /**
854    * {@inheritDoc}
855    * @throws IOException
856    */
857   @Override
858   public void put(final List<Put> puts) throws IOException {
859     getBufferedMutator().mutate(puts);
860     if (autoFlush) {
861       flushCommits();
862     }
863   }
864 
865   /**
866    * {@inheritDoc}
867    */
868   @Override
869   public void mutateRow(final RowMutations rm) throws IOException {
870     RegionServerCallable<Void> callable =
871         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
872       @Override
873       public Void call(int callTimeout) throws IOException {
874         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
875         controller.setPriority(tableName);
876         controller.setCallTimeout(callTimeout);
877         try {
878           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
879             getLocation().getRegionInfo().getRegionName(), rm);
880           regionMutationBuilder.setAtomic(true);
881           MultiRequest request =
882             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
883           ClientProtos.MultiResponse response = getStub().multi(controller, request);
884           ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
885           if (res.hasException()) {
886             Throwable ex = ProtobufUtil.toException(res.getException());
887             if(ex instanceof IOException) {
888               throw (IOException)ex;
889             }
890             throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
891           }
892         } catch (ServiceException se) {
893           throw ProtobufUtil.getRemoteException(se);
894         }
895         return null;
896       }
897     };
898     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
899   }
900 
901   /**
902    * {@inheritDoc}
903    */
904   @Override
905   public Result append(final Append append) throws IOException {
906     if (append.numFamilies() == 0) {
907       throw new IOException(
908           "Invalid arguments to append, no columns specified");
909     }
910 
911     NonceGenerator ng = this.connection.getNonceGenerator();
912     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
913     RegionServerCallable<Result> callable =
914       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
915         @Override
916         public Result call(int callTimeout) throws IOException {
917           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
918           controller.setPriority(getTableName());
919           controller.setCallTimeout(callTimeout);
920           try {
921             MutateRequest request = RequestConverter.buildMutateRequest(
922               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
923             MutateResponse response = getStub().mutate(controller, request);
924             if (!response.hasResult()) return null;
925             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
926           } catch (ServiceException se) {
927             throw ProtobufUtil.getRemoteException(se);
928           }
929         }
930       };
931     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
932   }
933 
934   /**
935    * {@inheritDoc}
936    */
937   @Override
938   public Result increment(final Increment increment) throws IOException {
939     if (!increment.hasFamilies()) {
940       throw new IOException(
941           "Invalid arguments to increment, no columns specified");
942     }
943     NonceGenerator ng = this.connection.getNonceGenerator();
944     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
945     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
946         getName(), increment.getRow()) {
947       @Override
948       public Result call(int callTimeout) throws IOException {
949         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
950         controller.setPriority(getTableName());
951         controller.setCallTimeout(callTimeout);
952         try {
953           MutateRequest request = RequestConverter.buildMutateRequest(
954             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
955           MutateResponse response = getStub().mutate(controller, request);
956           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
957         } catch (ServiceException se) {
958           throw ProtobufUtil.getRemoteException(se);
959         }
960       }
961     };
962     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
963   }
964 
965   /**
966    * {@inheritDoc}
967    */
968   @Override
969   public long incrementColumnValue(final byte [] row, final byte [] family,
970       final byte [] qualifier, final long amount)
971   throws IOException {
972     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
973   }
974 
975   /**
976    * @deprecated As of release 0.96
977    *             (<a href="https://issues.apache.org/jira/browse/HBASE-9508">HBASE-9508</a>).
978    *             This will be removed in HBase 2.0.0.
979    *             Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}.
980    */
981   @Deprecated
982   @Override
983   public long incrementColumnValue(final byte [] row, final byte [] family,
984       final byte [] qualifier, final long amount, final boolean writeToWAL)
985   throws IOException {
986     return incrementColumnValue(row, family, qualifier, amount,
987       writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
988   }
989 
990   /**
991    * {@inheritDoc}
992    */
993   @Override
994   public long incrementColumnValue(final byte [] row, final byte [] family,
995       final byte [] qualifier, final long amount, final Durability durability)
996   throws IOException {
997     NullPointerException npe = null;
998     if (row == null) {
999       npe = new NullPointerException("row is null");
1000     } else if (family == null) {
1001       npe = new NullPointerException("family is null");
1002     } else if (qualifier == null) {
1003       npe = new NullPointerException("qualifier is null");
1004     }
1005     if (npe != null) {
1006       throw new IOException(
1007           "Invalid arguments to incrementColumnValue", npe);
1008     }
1009 
1010     NonceGenerator ng = this.connection.getNonceGenerator();
1011     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1012     RegionServerCallable<Long> callable =
1013       new RegionServerCallable<Long>(connection, getName(), row) {
1014         @Override
1015         public Long call(int callTimeout) throws IOException {
1016           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1017           controller.setPriority(getTableName());
1018           controller.setCallTimeout(callTimeout);
1019           try {
1020             MutateRequest request = RequestConverter.buildIncrementRequest(
1021               getLocation().getRegionInfo().getRegionName(), row, family,
1022               qualifier, amount, durability, nonceGroup, nonce);
1023             MutateResponse response = getStub().mutate(controller, request);
1024             Result result =
1025               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1026             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1027           } catch (ServiceException se) {
1028             throw ProtobufUtil.getRemoteException(se);
1029           }
1030         }
1031       };
1032     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1033   }
1034 
1035   /**
1036    * {@inheritDoc}
1037    */
1038   @Override
1039   public boolean checkAndPut(final byte [] row,
1040       final byte [] family, final byte [] qualifier, final byte [] value,
1041       final Put put)
1042   throws IOException {
1043     RegionServerCallable<Boolean> callable =
1044       new RegionServerCallable<Boolean>(connection, getName(), row) {
1045         @Override
1046         public Boolean call(int callTimeout) throws IOException {
1047           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1048           controller.setPriority(tableName);
1049           controller.setCallTimeout(callTimeout);
1050           try {
1051             MutateRequest request = RequestConverter.buildMutateRequest(
1052                 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1053                 new BinaryComparator(value), CompareType.EQUAL, put);
1054             MutateResponse response = getStub().mutate(controller, request);
1055             return Boolean.valueOf(response.getProcessed());
1056           } catch (ServiceException se) {
1057             throw ProtobufUtil.getRemoteException(se);
1058           }
1059         }
1060       };
1061     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1062   }
1063 
1064   /**
1065    * {@inheritDoc}
1066    */
1067   @Override
1068   public boolean checkAndPut(final byte [] row, final byte [] family,
1069       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1070       final Put put)
1071   throws IOException {
1072     RegionServerCallable<Boolean> callable =
1073       new RegionServerCallable<Boolean>(connection, getName(), row) {
1074         @Override
1075         public Boolean call(int callTimeout) throws IOException {
1076           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1077           controller.setPriority(tableName);
1078           controller.setCallTimeout(callTimeout);
1079           try {
1080             CompareType compareType = CompareType.valueOf(compareOp.name());
1081             MutateRequest request = RequestConverter.buildMutateRequest(
1082               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1083                 new BinaryComparator(value), compareType, put);
1084             MutateResponse response = getStub().mutate(controller, request);
1085             return Boolean.valueOf(response.getProcessed());
1086           } catch (ServiceException se) {
1087             throw ProtobufUtil.getRemoteException(se);
1088           }
1089         }
1090       };
1091     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1092   }
1093 
1094   /**
1095    * {@inheritDoc}
1096    */
1097   @Override
1098   public boolean checkAndDelete(final byte [] row,
1099       final byte [] family, final byte [] qualifier, final byte [] value,
1100       final Delete delete)
1101   throws IOException {
1102     RegionServerCallable<Boolean> callable =
1103       new RegionServerCallable<Boolean>(connection, getName(), row) {
1104         @Override
1105         public Boolean call(int callTimeout) throws IOException {
1106           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1107           controller.setPriority(tableName);
1108           controller.setCallTimeout(callTimeout);
1109           try {
1110             MutateRequest request = RequestConverter.buildMutateRequest(
1111               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1112                 new BinaryComparator(value), CompareType.EQUAL, delete);
1113             MutateResponse response = getStub().mutate(controller, request);
1114             return Boolean.valueOf(response.getProcessed());
1115           } catch (ServiceException se) {
1116             throw ProtobufUtil.getRemoteException(se);
1117           }
1118         }
1119       };
1120     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1121   }
1122 
1123   /**
1124    * {@inheritDoc}
1125    */
1126   @Override
1127   public boolean checkAndDelete(final byte [] row, final byte [] family,
1128       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1129       final Delete delete)
1130   throws IOException {
1131     RegionServerCallable<Boolean> callable =
1132       new RegionServerCallable<Boolean>(connection, getName(), row) {
1133         @Override
1134         public Boolean call(int callTimeout) throws IOException {
1135           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1136           controller.setPriority(tableName);
1137           controller.setCallTimeout(callTimeout);
1138           try {
1139             CompareType compareType = CompareType.valueOf(compareOp.name());
1140             MutateRequest request = RequestConverter.buildMutateRequest(
1141               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1142                 new BinaryComparator(value), compareType, delete);
1143             MutateResponse response = getStub().mutate(controller, request);
1144             return Boolean.valueOf(response.getProcessed());
1145           } catch (ServiceException se) {
1146             throw ProtobufUtil.getRemoteException(se);
1147           }
1148         }
1149       };
1150     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1151   }
1152 
1153   /**
1154    * {@inheritDoc}
1155    */
1156   @Override
1157   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
1158       final CompareOp compareOp, final byte [] value, final RowMutations rm)
1159   throws IOException {
1160     RegionServerCallable<Boolean> callable =
1161         new RegionServerCallable<Boolean>(connection, getName(), row) {
1162           @Override
1163           public Boolean call(int callTimeout) throws IOException {
1164             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1165             controller.setPriority(tableName);
1166             controller.setCallTimeout(callTimeout);
1167             try {
1168               CompareType compareType = CompareType.valueOf(compareOp.name());
1169               MultiRequest request = RequestConverter.buildMutateRequest(
1170                   getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1171                   new BinaryComparator(value), compareType, rm);
1172               ClientProtos.MultiResponse response = getStub().multi(controller, request);
1173               ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1174               if (res.hasException()) {
1175                 Throwable ex = ProtobufUtil.toException(res.getException());
1176                 if(ex instanceof IOException) {
1177                   throw (IOException)ex;
1178                 }
1179                 throw new IOException("Failed to checkAndMutate row: "+
1180                     Bytes.toStringBinary(rm.getRow()), ex);
1181               }
1182               return Boolean.valueOf(response.getProcessed());
1183             } catch (ServiceException se) {
1184               throw ProtobufUtil.getRemoteException(se);
1185             }
1186           }
1187         };
1188     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1189   }
1190 
1191   /**
1192    * {@inheritDoc}
1193    */
1194   @Override
1195   public boolean exists(final Get get) throws IOException {
1196     get.setCheckExistenceOnly(true);
1197     Result r = get(get);
1198     assert r.getExists() != null;
1199     return r.getExists();
1200   }
1201 
1202   /**
1203    * {@inheritDoc}
1204    */
1205   @Override
1206   public boolean[] existsAll(final List<Get> gets) throws IOException {
1207     if (gets.isEmpty()) return new boolean[]{};
1208     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
1209 
1210     for (Get g: gets){
1211       g.setCheckExistenceOnly(true);
1212     }
1213 
1214     Object[] r1;
1215     try {
1216       r1 = batch(gets);
1217     } catch (InterruptedException e) {
1218       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1219     }
1220 
1221     // translate.
1222     boolean[] results = new boolean[r1.length];
1223     int i = 0;
1224     for (Object o : r1) {
1225       // batch ensures if there is a failure we get an exception instead
1226       results[i++] = ((Result)o).getExists();
1227     }
1228 
1229     return results;
1230   }
1231 
1232   /**
1233    * {@inheritDoc}
1234    * @deprecated Use {@link #existsAll(java.util.List)}  instead.
1235    */
1236   @Override
1237   @Deprecated
1238   public Boolean[] exists(final List<Get> gets) throws IOException {
1239     boolean[] results = existsAll(gets);
1240     Boolean[] objectResults = new Boolean[results.length];
1241     for (int i = 0; i < results.length; ++i) {
1242       objectResults[i] = results[i];
1243     }
1244     return objectResults;
1245   }
1246 
1247   /**
1248    * {@inheritDoc}
1249    * @throws IOException
1250    */
1251   @Override
1252   public void flushCommits() throws IOException {
1253     if (mutator == null) {
1254       // nothing to flush if there's no mutator; don't bother creating one.
1255       return;
1256     }
1257     getBufferedMutator().flush();
1258   }
1259 
1260   /**
1261    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1262    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1263    *
1264    * @param list The collection of actions.
1265    * @param results An empty array, same size as list. If an exception is thrown,
1266    * you can test here for partial results, and to determine which actions
1267    * processed successfully.
1268    * @throws IOException if there are problems talking to META. Per-item
1269    * exceptions are stored in the results array.
1270    */
1271   public <R> void processBatchCallback(
1272     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1273     throws IOException, InterruptedException {
1274     this.batchCallback(list, results, callback);
1275   }
1276 
1277 
1278   /**
1279    * Parameterized batch processing, allowing varying return types for different
1280    * {@link Row} implementations.
1281    */
1282   public void processBatch(final List<? extends Row> list, final Object[] results)
1283     throws IOException, InterruptedException {
1284     this.batch(list, results);
1285   }
1286 
1287 
1288   @Override
1289   public void close() throws IOException {
1290     if (this.closed) {
1291       return;
1292     }
1293     flushCommits();
1294     if (cleanupPoolOnClose) {
1295       this.pool.shutdown();
1296       try {
1297         boolean terminated = false;
1298         do {
1299           // wait until the pool has terminated
1300           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1301         } while (!terminated);
1302       } catch (InterruptedException e) {
1303         this.pool.shutdownNow();
1304         LOG.warn("waitForTermination interrupted");
1305       }
1306     }
1307     if (cleanupConnectionOnClose) {
1308       if (this.connection != null) {
1309         this.connection.close();
1310       }
1311     }
1312     this.closed = true;
1313   }
1314 
1315   // validate for well-formedness
1316   public void validatePut(final Put put) throws IllegalArgumentException {
1317     validatePut(put, tableConfiguration.getMaxKeyValueSize());
1318   }
1319 
1320   // validate for well-formedness
1321   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1322     if (put.isEmpty()) {
1323       throw new IllegalArgumentException("No columns to insert");
1324     }
1325     if (maxKeyValueSize > 0) {
1326       for (List<Cell> list : put.getFamilyCellMap().values()) {
1327         for (Cell cell : list) {
1328           if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1329             throw new IllegalArgumentException("KeyValue size too large");
1330           }
1331         }
1332       }
1333     }
1334   }
1335 
1336   /**
1337    * {@inheritDoc}
1338    */
1339   @Override
1340   public boolean isAutoFlush() {
1341     return autoFlush;
1342   }
1343 
1344   /**
1345    * {@inheritDoc}
1346    * @deprecated in 0.96. When called with setAutoFlush(false), this function also
1347    *  set clearBufferOnFail to true, which is unexpected but kept for historical reasons.
1348    *  Replace it with setAutoFlush(false, false) if this is exactly what you want, or by
1349    *  {@link #setAutoFlushTo(boolean)} for all other cases.
1350    */
1351   @Deprecated
1352   @Override
1353   public void setAutoFlush(boolean autoFlush) {
1354     this.autoFlush = autoFlush;
1355   }
1356 
1357   /**
1358    * {@inheritDoc}
1359    */
1360   @Override
1361   public void setAutoFlushTo(boolean autoFlush) {
1362     this.autoFlush = autoFlush;
1363   }
1364 
1365   /**
1366    * {@inheritDoc}
1367    */
1368   @Override
1369   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1370     this.autoFlush = autoFlush;
1371   }
1372 
1373   /**
1374    * Returns the maximum size in bytes of the write buffer for this HTable.
1375    * <p>
1376    * The default value comes from the configuration parameter
1377    * {@code hbase.client.write.buffer}.
1378    * @return The size of the write buffer in bytes.
1379    */
1380   @Override
1381   public long getWriteBufferSize() {
1382     if (mutator == null) {
1383       return tableConfiguration.getWriteBufferSize();
1384     } else {
1385       return mutator.getWriteBufferSize();
1386     }
1387   }
1388 
1389   /**
1390    * Sets the size of the buffer in bytes.
1391    * <p>
1392    * If the new size is less than the current amount of data in the
1393    * write buffer, the buffer gets flushed.
1394    * @param writeBufferSize The new write buffer size, in bytes.
1395    * @throws IOException if a remote or network exception occurs.
1396    */
1397   @Override
1398   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1399     getBufferedMutator();
1400     mutator.setWriteBufferSize(writeBufferSize);
1401   }
1402 
1403   /**
1404    * The pool is used for mutli requests for this HTable
1405    * @return the pool used for mutli
1406    */
1407   ExecutorService getPool() {
1408     return this.pool;
1409   }
1410 
1411   /**
1412    * Enable or disable region cache prefetch for the table. It will be
1413    * applied for the given table's all HTable instances who share the same
1414    * connection. By default, the cache prefetch is enabled.
1415    * @param tableName name of table to configure.
1416    * @param enable Set to true to enable region cache prefetch. Or set to
1417    * false to disable it.
1418    * @throws IOException
1419    * @deprecated does nothing since 0.99
1420    */
1421   @Deprecated
1422   public static void setRegionCachePrefetch(final byte[] tableName,
1423       final boolean enable)  throws IOException {
1424   }
1425 
1426   /**
1427    * @deprecated does nothing since 0.99
1428    */
1429   @Deprecated
1430   public static void setRegionCachePrefetch(
1431       final TableName tableName,
1432       final boolean enable) throws IOException {
1433   }
1434 
1435   /**
1436    * Enable or disable region cache prefetch for the table. It will be
1437    * applied for the given table's all HTable instances who share the same
1438    * connection. By default, the cache prefetch is enabled.
1439    * @param conf The Configuration object to use.
1440    * @param tableName name of table to configure.
1441    * @param enable Set to true to enable region cache prefetch. Or set to
1442    * false to disable it.
1443    * @throws IOException
1444    * @deprecated does nothing since 0.99
1445    */
1446   @Deprecated
1447   public static void setRegionCachePrefetch(final Configuration conf,
1448       final byte[] tableName, final boolean enable) throws IOException {
1449   }
1450 
1451   /**
1452    * @deprecated does nothing since 0.99
1453    */
1454   @Deprecated
1455   public static void setRegionCachePrefetch(final Configuration conf,
1456       final TableName tableName,
1457       final boolean enable) throws IOException {
1458   }
1459 
1460   /**
1461    * Check whether region cache prefetch is enabled or not for the table.
1462    * @param conf The Configuration object to use.
1463    * @param tableName name of table to check
1464    * @return true if table's region cache prefecth is enabled. Otherwise
1465    * it is disabled.
1466    * @throws IOException
1467    * @deprecated always return false since 0.99
1468    */
1469   @Deprecated
1470   public static boolean getRegionCachePrefetch(final Configuration conf,
1471       final byte[] tableName) throws IOException {
1472     return false;
1473   }
1474 
1475   /**
1476    * @deprecated always return false since 0.99
1477    */
1478   @Deprecated
1479   public static boolean getRegionCachePrefetch(final Configuration conf,
1480       final TableName tableName) throws IOException {
1481     return false;
1482   }
1483 
1484   /**
1485    * Check whether region cache prefetch is enabled or not for the table.
1486    * @param tableName name of table to check
1487    * @return true if table's region cache prefecth is enabled. Otherwise
1488    * it is disabled.
1489    * @throws IOException
1490    * @deprecated always return false since 0.99
1491    */
1492   @Deprecated
1493   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1494     return false;
1495   }
1496 
1497   /**
1498    * @deprecated always return false since 0.99
1499    */
1500   @Deprecated
1501   public static boolean getRegionCachePrefetch(
1502       final TableName tableName) throws IOException {
1503     return false;
1504   }
1505 
1506   /**
1507    * Explicitly clears the region cache to fetch the latest value from META.
1508    * This is a power user function: avoid unless you know the ramifications.
1509    */
1510   public void clearRegionCache() {
1511     this.connection.clearRegionCache();
1512   }
1513 
1514   /**
1515    * {@inheritDoc}
1516    */
1517   @Override
1518   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1519     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1520   }
1521 
1522   /**
1523    * {@inheritDoc}
1524    */
1525   @Override
1526   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1527       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1528       throws ServiceException, Throwable {
1529     final Map<byte[],R> results =  Collections.synchronizedMap(
1530         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1531     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1532       @Override
1533       public void update(byte[] region, byte[] row, R value) {
1534         if (region != null) {
1535           results.put(region, value);
1536         }
1537       }
1538     });
1539     return results;
1540   }
1541 
1542   /**
1543    * {@inheritDoc}
1544    */
1545   @Override
1546   public <T extends Service, R> void coprocessorService(final Class<T> service,
1547       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1548       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1549 
1550     // get regions covered by the row range
1551     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1552 
1553     Map<byte[],Future<R>> futures =
1554         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1555     for (final byte[] r : keys) {
1556       final RegionCoprocessorRpcChannel channel =
1557           new RegionCoprocessorRpcChannel(connection, tableName, r);
1558       Future<R> future = pool.submit(
1559           new Callable<R>() {
1560             @Override
1561             public R call() throws Exception {
1562               T instance = ProtobufUtil.newServiceStub(service, channel);
1563               R result = callable.call(instance);
1564               byte[] region = channel.getLastRegion();
1565               if (callback != null) {
1566                 callback.update(region, r, result);
1567               }
1568               return result;
1569             }
1570           });
1571       futures.put(r, future);
1572     }
1573     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1574       try {
1575         e.getValue().get();
1576       } catch (ExecutionException ee) {
1577         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1578             + Bytes.toStringBinary(e.getKey()), ee);
1579         throw ee.getCause();
1580       } catch (InterruptedException ie) {
1581         throw new InterruptedIOException("Interrupted calling coprocessor service "
1582             + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
1583       }
1584     }
1585   }
1586 
1587   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1588   throws IOException {
1589     if (start == null) {
1590       start = HConstants.EMPTY_START_ROW;
1591     }
1592     if (end == null) {
1593       end = HConstants.EMPTY_END_ROW;
1594     }
1595     return getKeysAndRegionsInRange(start, end, true).getFirst();
1596   }
1597 
1598   public void setOperationTimeout(int operationTimeout) {
1599     this.operationTimeout = operationTimeout;
1600   }
1601 
1602   public int getOperationTimeout() {
1603     return operationTimeout;
1604   }
1605 
1606   @Override
1607   public String toString() {
1608     return tableName + ";" + connection;
1609   }
1610 
1611   /**
1612    * {@inheritDoc}
1613    */
1614   @Override
1615   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1616       Descriptors.MethodDescriptor methodDescriptor, Message request,
1617       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1618     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1619         Bytes.BYTES_COMPARATOR));
1620     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1621         new Callback<R>() {
1622 
1623           @Override
1624           public void update(byte[] region, byte[] row, R result) {
1625             if (region != null) {
1626               results.put(region, result);
1627             }
1628           }
1629         });
1630     return results;
1631   }
1632 
1633   /**
1634    * {@inheritDoc}
1635    */
1636   @Override
1637   public <R extends Message> void batchCoprocessorService(
1638       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1639       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1640       throws ServiceException, Throwable {
1641 
1642     // get regions covered by the row range
1643     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1644         getKeysAndRegionsInRange(startKey, endKey, true);
1645     List<byte[]> keys = keysAndRegions.getFirst();
1646     List<HRegionLocation> regions = keysAndRegions.getSecond();
1647 
1648     // check if we have any calls to make
1649     if (keys.isEmpty()) {
1650       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1651           ", end=" + Bytes.toStringBinary(endKey));
1652       return;
1653     }
1654 
1655     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1656     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1657         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1658     for (int i = 0; i < keys.size(); i++) {
1659       final byte[] rowKey = keys.get(i);
1660       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1661       RegionCoprocessorServiceExec exec =
1662           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1663       execs.add(exec);
1664       execsByRow.put(rowKey, exec);
1665     }
1666 
1667     // tracking for any possible deserialization errors on success callback
1668     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1669     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1670     final List<Row> callbackErrorActions = new ArrayList<Row>();
1671     final List<String> callbackErrorServers = new ArrayList<String>();
1672     Object[] results = new Object[execs.size()];
1673 
1674     AsyncProcess asyncProcess =
1675         new AsyncProcess(connection, configuration, pool,
1676             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1677             true, RpcControllerFactory.instantiate(configuration));
1678 
1679     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1680         new Callback<ClientProtos.CoprocessorServiceResult>() {
1681           @Override
1682           public void update(byte[] region, byte[] row,
1683                               ClientProtos.CoprocessorServiceResult serviceResult) {
1684             if (LOG.isTraceEnabled()) {
1685               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1686                   ": region=" + Bytes.toStringBinary(region) +
1687                   ", row=" + Bytes.toStringBinary(row) +
1688                   ", value=" + serviceResult.getValue().getValue());
1689             }
1690             try {
1691               callback.update(region, row,
1692                   (R) responsePrototype.newBuilderForType().mergeFrom(
1693                       serviceResult.getValue().getValue()).build());
1694             } catch (InvalidProtocolBufferException e) {
1695               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1696                   e);
1697               callbackErrorExceptions.add(e);
1698               callbackErrorActions.add(execsByRow.get(row));
1699               callbackErrorServers.add("null");
1700             }
1701           }
1702         }, results);
1703 
1704     future.waitUntilDone();
1705 
1706     if (future.hasError()) {
1707       throw future.getErrors();
1708     } else if (!callbackErrorExceptions.isEmpty()) {
1709       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1710           callbackErrorServers);
1711     }
1712   }
1713 
1714   public RegionLocator getRegionLocator() {
1715     return this.locator;
1716   }
1717 
1718   @VisibleForTesting
1719   BufferedMutator getBufferedMutator() throws IOException {
1720     if (mutator == null) {
1721       this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
1722           new BufferedMutatorParams(tableName)
1723               .pool(pool)
1724               .writeBufferSize(tableConfiguration.getWriteBufferSize())
1725               .maxKeyValueSize(tableConfiguration.getMaxKeyValueSize())
1726       );
1727     }
1728     return mutator;
1729   }
1730 }