View Javadoc

1   /**
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.client;
21  
22  import java.io.Closeable;
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.lang.reflect.Proxy;
27  import java.util.ArrayList;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Collections;
31  import java.util.NavigableMap;
32  import java.util.TreeMap;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.SynchronousQueue;
35  import java.util.concurrent.ThreadPoolExecutor;
36  import java.util.concurrent.TimeUnit;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HRegionLocation;
45  import org.apache.hadoop.hbase.HServerAddress;
46  import org.apache.hadoop.hbase.HTableDescriptor;
47  import org.apache.hadoop.hbase.KeyValue;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
50  import org.apache.hadoop.hbase.client.coprocessor.Batch;
51  import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
52  import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
53  import org.apache.hadoop.hbase.util.Addressing;
54  import org.apache.hadoop.hbase.util.Bytes;
55  import org.apache.hadoop.hbase.util.Pair;
56  import org.apache.hadoop.hbase.util.Threads;
57  
58  /**
59   * <p>Used to communicate with a single HBase table.
60   *
61   * <p>This class is not thread safe for reads nor write.
62   * 
63   * <p>In case of writes (Put, Delete), the underlying write buffer can
64   * be corrupted if multiple threads contend over a single HTable instance.
65   * 
66   * <p>In case of reads, some fields used by a Scan are shared among all threads.
67   * The HTable implementation can either not contract to be safe in case of a Get
68   *
69   * <p>To access a table in a multi threaded environment, please consider
70   * using the {@link HTablePool} class to create your HTable instances.
71   *
72   * <p>Instances of HTable passed the same {@link Configuration} instance will
73   * share connections to servers out on the cluster and to the zookeeper ensemble
74   * as well as caches of region locations.  This is usually a *good* thing and it
75   * is recommended to reuse the same configuration object for all your tables.
76   * This happens because they will all share the same underlying
77   * {@link HConnection} instance. See {@link HConnectionManager} for more on
78   * how this mechanism works.
79   *
80   * <p>{@link HConnection} will read most of the
81   * configuration it needs from the passed {@link Configuration} on initial
82   * construction.  Thereafter, for settings such as
83   * <code>hbase.client.pause</code>, <code>hbase.client.retries.number</code>,
84   * and <code>hbase.client.rpc.maxattempts</code> updating their values in the
85   * passed {@link Configuration} subsequent to {@link HConnection} construction
86   * will go unnoticed.  To run with changed values, make a new
87   * {@link HTable} passing a new {@link Configuration} instance that has the
88   * new configuration.
89   *
90   * <p>Note that this class implements the {@link Closeable} interface. When a
91   * HTable instance is no longer required, it *should* be closed in order to ensure
92   * that the underlying resources are promptly released. Please note that the close 
93   * method can throw java.io.IOException that must be handled.
94   *
95   * @see HBaseAdmin for create, drop, list, enable and disable of tables.
96   * @see HConnection
97   * @see HConnectionManager
98   */
99  public class HTable implements HTableInterface {
100   private static final Log LOG = LogFactory.getLog(HTable.class);
101   private HConnection connection;
102   private final byte [] tableName;
103   private volatile Configuration configuration;
104   private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
105   private long writeBufferSize;
106   private boolean clearBufferOnFail;
107   private boolean autoFlush;
108   private long currentWriteBufferSize;
109   protected int scannerCaching;
110   private int maxKeyValueSize;
111   private ExecutorService pool;  // For Multi
112   private boolean closed;
113   private int operationTimeout;
114   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
115   private final boolean cleanupConnectionOnClose; // close the connection in close()
116 
117   /**
118    * Creates an object to access a HBase table.
119    * Shares zookeeper connection and other resources with other HTable instances
120    * created with the same <code>conf</code> instance.  Uses already-populated
121    * region cache if one is available, populated by any other HTable instances
122    * sharing this <code>conf</code> instance.  Recommended.
123    * @param conf Configuration object to use.
124    * @param tableName Name of the table.
125    * @throws IOException if a remote or network exception occurs
126    */
127   public HTable(Configuration conf, final String tableName)
128   throws IOException {
129     this(conf, Bytes.toBytes(tableName));
130   }
131 
132 
133   /**
134    * Creates an object to access a HBase table.
135    * Shares zookeeper connection and other resources with other HTable instances
136    * created with the same <code>conf</code> instance.  Uses already-populated
137    * region cache if one is available, populated by any other HTable instances
138    * sharing this <code>conf</code> instance.  Recommended.
139    * @param conf Configuration object to use.
140    * @param tableName Name of the table.
141    * @throws IOException if a remote or network exception occurs
142    */
143   public HTable(Configuration conf, final byte [] tableName)
144   throws IOException {
145     this.tableName = tableName;
146     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
147     if (conf == null) {
148       this.connection = null;
149       return;
150     }
151     this.connection = HConnectionManager.getConnection(conf);
152     this.configuration = conf;
153 
154     this.pool = getDefaultExecutor(conf);
155     this.finishSetup();
156   }
157 
158   /**
159    * Creates an object to access a HBase table. Shares zookeeper connection and other resources with
160    * other HTable instances created with the same <code>connection</code> instance. Use this
161    * constructor when the HConnection instance is externally managed (does not close the connection
162    * on {@link #close()}).
163    * @param tableName Name of the table.
164    * @param connection @param connection HConnection to be used.
165    * @throws IOException if a remote or network exception occurs
166    */
167   public HTable(byte[] tableName, HConnection connection) throws IOException {
168     this.tableName = tableName;
169     this.cleanupPoolOnClose = true;
170     this.cleanupConnectionOnClose = false;
171     this.connection = connection;
172     this.configuration = connection.getConfiguration();
173 
174     this.pool = getDefaultExecutor(this.configuration);
175     this.finishSetup();
176   }
177 
178   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
179     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
180     if (maxThreads == 0) {
181       maxThreads = 1; // is there a better default?
182     }
183     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
184 
185     // Using the "direct handoff" approach, new threads will only be created
186     // if it is necessary and will grow unbounded. This could be bad but in HCM
187     // we only create as many Runnables as there are region servers. It means
188     // it also scales when new region servers are added.
189     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads,
190         keepAliveTime, TimeUnit.SECONDS,
191         new SynchronousQueue<Runnable>(),
192         Threads.newDaemonThreadFactory("hbase-table"));
193     pool.allowCoreThreadTimeOut(true);
194     return pool;
195   }
196 
197   /**
198    * Creates an object to access a HBase table.
199    * Shares zookeeper connection and other resources with other HTable instances
200    * created with the same <code>conf</code> instance.  Uses already-populated
201    * region cache if one is available, populated by any other HTable instances
202    * sharing this <code>conf</code> instance.
203    * Use this constructor when the ExecutorService is externally managed.
204    * @param conf Configuration object to use.
205    * @param tableName Name of the table.
206    * @param pool ExecutorService to be used.
207    * @throws IOException if a remote or network exception occurs
208    */
209   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
210       throws IOException {
211     this.connection = HConnectionManager.getConnection(conf);
212     this.configuration = conf;
213     this.pool = pool;
214     this.tableName = tableName;
215     this.cleanupPoolOnClose = false;
216     this.cleanupConnectionOnClose = true;
217 
218     this.finishSetup();
219   }
220 
221   /**
222    * Creates an object to access a HBase table.
223    * Shares zookeeper connection and other resources with other HTable instances
224    * created with the same <code>connection</code> instance.
225    * Use this constructor when the ExecutorService and HConnection instance are
226    * externally managed.
227    * @param tableName Name of the table.
228    * @param connection HConnection to be used.
229    * @param pool ExecutorService to be used.
230    * @throws IOException if a remote or network exception occurs
231    */
232   public HTable(final byte[] tableName, final HConnection connection, 
233       final ExecutorService pool) throws IOException {
234     if (connection == null || connection.isClosed()) {
235       throw new IllegalArgumentException("Connection is null or closed.");
236     }
237     this.tableName = tableName;
238     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false;
239     this.connection = connection;
240     this.configuration = connection.getConfiguration();
241     this.pool = pool;
242 
243     this.finishSetup();
244   }
245 
246   /**
247    * setup this HTable's parameter based on the passed configuration
248    * @param conf
249    */
250   private void finishSetup() throws IOException {
251     this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
252     this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
253         : this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
254             HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
255     this.writeBufferSize = this.configuration.getLong(
256         "hbase.client.write.buffer", 2097152);
257     this.clearBufferOnFail = true;
258     this.autoFlush = true;
259     this.currentWriteBufferSize = 0;
260     this.scannerCaching = this.configuration.getInt(
261         "hbase.client.scanner.caching", 1);
262 
263     this.maxKeyValueSize = this.configuration.getInt(
264         "hbase.client.keyvalue.maxsize", -1);
265     this.closed = false;
266   }
267 
268   /**
269    * {@inheritDoc}
270    */
271   @Override
272   public Configuration getConfiguration() {
273     return configuration;
274   }
275 
276   /**
277    * Tells whether or not a table is enabled or not. This method creates a
278    * new HBase configuration, so it might make your unit tests fail due to
279    * incorrect ZK client port.
280    * @param tableName Name of table to check.
281    * @return {@code true} if table is online.
282    * @throws IOException if a remote or network exception occurs
283 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
284    */
285   @Deprecated
286   public static boolean isTableEnabled(String tableName) throws IOException {
287     return isTableEnabled(Bytes.toBytes(tableName));
288   }
289 
290   /**
291    * Tells whether or not a table is enabled or not. This method creates a
292    * new HBase configuration, so it might make your unit tests fail due to
293    * incorrect ZK client port.
294    * @param tableName Name of table to check.
295    * @return {@code true} if table is online.
296    * @throws IOException if a remote or network exception occurs
297    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
298    */
299   @Deprecated
300   public static boolean isTableEnabled(byte[] tableName) throws IOException {
301     return isTableEnabled(HBaseConfiguration.create(), tableName);
302   }
303 
304   /**
305    * Tells whether or not a table is enabled or not.
306    * @param conf The Configuration object to use.
307    * @param tableName Name of table to check.
308    * @return {@code true} if table is online.
309    * @throws IOException if a remote or network exception occurs
310 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
311    */
312   @Deprecated
313   public static boolean isTableEnabled(Configuration conf, String tableName)
314   throws IOException {
315     return isTableEnabled(conf, Bytes.toBytes(tableName));
316   }
317 
318   /**
319    * Tells whether or not a table is enabled or not.
320    * @param conf The Configuration object to use.
321    * @param tableName Name of table to check.
322    * @return {@code true} if table is online.
323    * @throws IOException if a remote or network exception occurs
324    */
325   public static boolean isTableEnabled(Configuration conf,
326       final byte[] tableName) throws IOException {
327     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
328       @Override
329       public Boolean connect(HConnection connection) throws IOException {
330         return connection.isTableEnabled(tableName);
331       }
332     });
333   }
334 
335   /**
336    * Find region location hosting passed row using cached info
337    * @param row Row to find.
338    * @return The location of the given row.
339    * @throws IOException if a remote or network exception occurs
340    */
341   public HRegionLocation getRegionLocation(final String row)
342   throws IOException {
343     return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
344   }
345 
346   /**
347    * Finds the region on which the given row is being served.
348    * @param row Row to find.
349    * @return Location of the row.
350    * @throws IOException if a remote or network exception occurs
351    * @deprecated use {@link #getRegionLocation(byte [], boolean)} instead
352    */
353   public HRegionLocation getRegionLocation(final byte [] row)
354   throws IOException {
355     return connection.getRegionLocation(tableName, row, false);
356   }
357 
358   /**
359    * Finds the region on which the given row is being served.
360    * @param row Row to find.
361    * @param reload whether or not to reload information or just use cached
362    * information
363    * @return Location of the row.
364    * @throws IOException if a remote or network exception occurs
365    */
366   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
367   throws IOException {
368     return connection.getRegionLocation(tableName, row, reload);
369   }
370      
371   /**
372    * {@inheritDoc}
373    */
374   @Override
375   public byte [] getTableName() {
376     return this.tableName;
377   }
378 
379   /**
380    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
381    * manipulations.
382    * @return An HConnection instance.
383    * @deprecated This method will be changed from public to package protected.
384    */
385   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
386   public HConnection getConnection() {
387     return this.connection;
388   }
389 
390   /**
391    * Gets the number of rows that a scanner will fetch at once.
392    * <p>
393    * The default value comes from {@code hbase.client.scanner.caching}.
394    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
395    */
396   public int getScannerCaching() {
397     return scannerCaching;
398   }
399 
400   /**
401    * Sets the number of rows that a scanner will fetch at once.
402    * <p>
403    * This will override the value specified by
404    * {@code hbase.client.scanner.caching}.
405    * Increasing this value will reduce the amount of work needed each time
406    * {@code next()} is called on a scanner, at the expense of memory use
407    * (since more rows will need to be maintained in memory by the scanners).
408    * @param scannerCaching the number of rows a scanner will fetch at once.
409    * @deprecated Use {@link Scan#setCaching(int)}
410    */
411   public void setScannerCaching(int scannerCaching) {
412     this.scannerCaching = scannerCaching;
413   }
414 
415   /**
416    * {@inheritDoc}
417    */
418   @Override
419   public HTableDescriptor getTableDescriptor() throws IOException {
420     return new UnmodifyableHTableDescriptor(
421       this.connection.getHTableDescriptor(this.tableName));
422   }
423 
424   /**
425    * Gets the starting row key for every region in the currently open table.
426    * <p>
427    * This is mainly useful for the MapReduce integration.
428    * @return Array of region starting row keys
429    * @throws IOException if a remote or network exception occurs
430    */
431   public byte [][] getStartKeys() throws IOException {
432     return getStartEndKeys().getFirst();
433   }
434 
435   /**
436    * Gets the ending row key for every region in the currently open table.
437    * <p>
438    * This is mainly useful for the MapReduce integration.
439    * @return Array of region ending row keys
440    * @throws IOException if a remote or network exception occurs
441    */
442   public byte[][] getEndKeys() throws IOException {
443     return getStartEndKeys().getSecond();
444   }
445 
446   /**
447    * Gets the starting and ending row keys for every region in the currently
448    * open table.
449    * <p>
450    * This is mainly useful for the MapReduce integration.
451    * @return Pair of arrays of region starting and ending row keys
452    * @throws IOException if a remote or network exception occurs
453    */
454   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
455     NavigableMap<HRegionInfo, ServerName> regions = getRegionLocations();
456     final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
457     final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
458 
459     for (HRegionInfo region : regions.keySet()) {
460       startKeyList.add(region.getStartKey());
461       endKeyList.add(region.getEndKey());
462     }
463 
464     return new Pair<byte [][], byte [][]>(
465       startKeyList.toArray(new byte[startKeyList.size()][]),
466       endKeyList.toArray(new byte[endKeyList.size()][]));
467   }
468 
469   /**
470    * Gets all the regions and their address for this table.
471    * @return A map of HRegionInfo with it's server address
472    * @throws IOException if a remote or network exception occurs
473    * @deprecated Use {@link #getRegionLocations()} or {@link #getStartEndKeys()}
474    */
475   public Map<HRegionInfo, HServerAddress> getRegionsInfo() throws IOException {
476     final Map<HRegionInfo, HServerAddress> regionMap =
477       new TreeMap<HRegionInfo, HServerAddress>();
478 
479     final Map<HRegionInfo, ServerName> regionLocations = getRegionLocations();
480 
481     for (Map.Entry<HRegionInfo, ServerName> entry : regionLocations.entrySet()) {
482       HServerAddress server = new HServerAddress();
483       ServerName serverName = entry.getValue();
484       if (serverName != null && serverName.getHostAndPort() != null) {
485         server = new HServerAddress(Addressing.createInetSocketAddressFromHostAndPortStr(
486             serverName.getHostAndPort()));
487       }
488       regionMap.put(entry.getKey(), server);
489     }
490 
491     return regionMap;
492   }
493 
494   /**
495    * Gets all the regions and their address for this table.
496    * <p>
497    * This is mainly useful for the MapReduce integration.
498    * @return A map of HRegionInfo with it's server address
499    * @throws IOException if a remote or network exception occurs
500    */
501   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
502     return MetaScanner.allTableRegions(getConfiguration(), this.connection, getTableName(), false);
503   }
504 
505   /**
506    * Get the corresponding regions for an arbitrary range of keys.
507    * <p>
508    * @param startRow Starting row in range, inclusive
509    * @param endRow Ending row in range, exclusive
510    * @return A list of HRegionLocations corresponding to the regions that
511    * contain the specified range
512    * @throws IOException if a remote or network exception occurs
513    */
514   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
515     final byte [] endKey) throws IOException {
516     return getRegionsInRange(startKey, endKey, false);
517   }
518 
519   /**
520    * Get the corresponding regions for an arbitrary range of keys.
521    * <p>
522    * @param startKey Starting row in range, inclusive
523    * @param endKey Ending row in range, exclusive
524    * @param reload true to reload information or false to use cached information
525    * @return A list of HRegionLocations corresponding to the regions that
526    * contain the specified range
527    * @throws IOException if a remote or network exception occurs
528    */
529   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
530       final byte [] endKey, final boolean reload) throws IOException {
531     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
532   }
533 
534   /**
535    * Get the corresponding start keys and regions for an arbitrary range of
536    * keys.
537    * <p>
538    * @param startKey Starting row in range, inclusive
539    * @param endKey Ending row in range
540    * @param includeEndKey true if endRow is inclusive, false if exclusive
541    * @return A pair of list of start keys and list of HRegionLocations that
542    *         contain the specified range
543    * @throws IOException if a remote or network exception occurs
544    */
545   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
546       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
547       throws IOException {
548     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
549   }
550 
551   /**
552    * Get the corresponding start keys and regions for an arbitrary range of
553    * keys.
554    * <p>
555    * @param startKey Starting row in range, inclusive
556    * @param endKey Ending row in range
557    * @param includeEndKey true if endRow is inclusive, false if exclusive
558    * @param reload true to reload information or false to use cached information
559    * @return A pair of list of start keys and list of HRegionLocations that
560    *         contain the specified range
561    * @throws IOException if a remote or network exception occurs
562    */
563   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
564       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
565       final boolean reload) throws IOException {
566     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
567     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
568       throw new IllegalArgumentException(
569         "Invalid range: " + Bytes.toStringBinary(startKey) +
570         " > " + Bytes.toStringBinary(endKey));
571     }
572     List<byte[]> keysInRange = new ArrayList<byte[]>();
573     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
574     byte[] currentKey = startKey;
575     do {
576       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
577       keysInRange.add(currentKey);
578       regionsInRange.add(regionLocation);
579       currentKey = regionLocation.getRegionInfo().getEndKey();
580     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
581         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
582             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
583     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
584         regionsInRange);
585   }
586 
587   /**
588    * Save the passed region information and the table's regions
589    * cache.
590    * <p>
591    * This is mainly useful for the MapReduce integration. You can call
592    * {@link #deserializeRegionInfo deserializeRegionInfo}
593    * to deserialize regions information from a
594    * {@link DataInput}, then call this method to load them to cache.
595    *
596    * <pre>
597    * {@code
598    * HTable t1 = new HTable("foo");
599    * FileInputStream fis = new FileInputStream("regions.dat");
600    * DataInputStream dis = new DataInputStream(fis);
601    *
602    * Map<HRegionInfo, HServerAddress> hm = t1.deserializeRegionInfo(dis);
603    * t1.prewarmRegionCache(hm);
604    * }
605    * </pre>
606    * @param regionMap This piece of regions information will be loaded
607    * to region cache.
608    */
609   public void prewarmRegionCache(Map<HRegionInfo, HServerAddress> regionMap) {
610     this.connection.prewarmRegionCache(this.getTableName(), regionMap);
611   }
612 
613   /**
614    * Serialize the regions information of this table and output
615    * to <code>out</code>.
616    * <p>
617    * This is mainly useful for the MapReduce integration. A client could
618    * perform a large scan for all the regions for the table, serialize the
619    * region info to a file. MR job can ship a copy of the meta for the table in
620    * the DistributedCache.
621    * <pre>
622    * {@code
623    * FileOutputStream fos = new FileOutputStream("regions.dat");
624    * DataOutputStream dos = new DataOutputStream(fos);
625    * table.serializeRegionInfo(dos);
626    * dos.flush();
627    * dos.close();
628    * }
629    * </pre>
630    * @param out {@link DataOutput} to serialize this object into.
631    * @throws IOException if a remote or network exception occurs
632    */
633   public void serializeRegionInfo(DataOutput out) throws IOException {
634     Map<HRegionInfo, HServerAddress> allRegions = this.getRegionsInfo();
635     // first, write number of regions
636     out.writeInt(allRegions.size());
637     for (Map.Entry<HRegionInfo, HServerAddress> es : allRegions.entrySet()) {
638       es.getKey().write(out);
639       es.getValue().write(out);
640     }
641   }
642 
643   /**
644    * Read from <code>in</code> and deserialize the regions information.
645    *
646    * <p>It behaves similarly as {@link #getRegionsInfo getRegionsInfo}, except
647    * that it loads the region map from a {@link DataInput} object.
648    *
649    * <p>It is supposed to be followed immediately by  {@link
650    * #prewarmRegionCache prewarmRegionCache}.
651    *
652    * <p>
653    * Please refer to {@link #prewarmRegionCache prewarmRegionCache} for usage.
654    *
655    * @param in {@link DataInput} object.
656    * @return A map of HRegionInfo with its server address.
657    * @throws IOException if an I/O exception occurs.
658    */
659   public Map<HRegionInfo, HServerAddress> deserializeRegionInfo(DataInput in)
660   throws IOException {
661     final Map<HRegionInfo, HServerAddress> allRegions =
662       new TreeMap<HRegionInfo, HServerAddress>();
663 
664     // the first integer is expected to be the size of records
665     int regionsCount = in.readInt();
666     for (int i = 0; i < regionsCount; ++i) {
667       HRegionInfo hri = new HRegionInfo();
668       hri.readFields(in);
669       HServerAddress hsa = new HServerAddress();
670       hsa.readFields(in);
671       allRegions.put(hri, hsa);
672     }
673     return allRegions;
674   }
675 
676   /**
677    * {@inheritDoc}
678    */
679    @Override
680    public Result getRowOrBefore(final byte[] row, final byte[] family)
681    throws IOException {
682      return new ServerCallable<Result>(connection, tableName, row, operationTimeout) {
683        public Result call() throws IOException {
684          return server.getClosestRowBefore(location.getRegionInfo().getRegionName(),
685            row, family);
686        }
687      }.withRetries();
688    }
689 
690    /**
691     * {@inheritDoc}
692     */
693   @Override
694   public ResultScanner getScanner(final Scan scan) throws IOException {
695     if (scan.getCaching() <= 0) {
696       scan.setCaching(getScannerCaching());
697     }
698     if (scan.isSmall()) {
699       return new ClientSmallScanner(getConfiguration(), scan, getTableName(),
700           this.connection);
701     }
702     return new ClientScanner(getConfiguration(), scan, getTableName(),
703         this.connection);
704   }
705 
706   /**
707    * {@inheritDoc}
708    */
709   @Override
710   public ResultScanner getScanner(byte [] family) throws IOException {
711     Scan scan = new Scan();
712     scan.addFamily(family);
713     return getScanner(scan);
714   }
715 
716   /**
717    * {@inheritDoc}
718    */
719   @Override
720   public ResultScanner getScanner(byte [] family, byte [] qualifier)
721   throws IOException {
722     Scan scan = new Scan();
723     scan.addColumn(family, qualifier);
724     return getScanner(scan);
725   }
726 
727   /**
728    * {@inheritDoc}
729    */
730   @Override
731   public Result get(final Get get) throws IOException {
732     return new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
733           public Result call() throws IOException {
734             return server.get(location.getRegionInfo().getRegionName(), get);
735           }
736         }.withRetries();
737   }
738 
739   /**
740    * {@inheritDoc}
741    */
742   @Override
743   public Result[] get(List<Get> gets) throws IOException {
744     if (gets.size() == 1) {
745       return new Result[]{get(gets.get(0))};
746     }
747     try {
748       Object [] r1 = batch((List)gets);
749 
750       // translate.
751       Result [] results = new Result[r1.length];
752       int i=0;
753       for (Object o : r1) {
754         // batch ensures if there is a failure we get an exception instead
755         results[i++] = (Result) o;
756       }
757 
758       return results;
759     } catch (InterruptedException e) {
760       throw new IOException(e);
761     }
762   }
763 
764   /**
765    * {@inheritDoc}
766    */
767   @Override
768   public void batch(final List<?extends Row> actions, final Object[] results)
769       throws InterruptedException, IOException {
770     connection.processBatch(actions, tableName, pool, results);
771   }
772 
773   /**
774    * {@inheritDoc}
775    */
776   @Override
777   public Object[] batch(final List<? extends Row> actions) throws InterruptedException, IOException {
778     Object[] results = new Object[actions.size()];
779     connection.processBatch(actions, tableName, pool, results);
780     return results;
781   }
782 
783   /**
784    * {@inheritDoc}
785    */
786   @Override
787   public void delete(final Delete delete)
788   throws IOException {
789     new ServerCallable<Boolean>(connection, tableName, delete.getRow(), operationTimeout) {
790           public Boolean call() throws IOException {
791             server.delete(location.getRegionInfo().getRegionName(), delete);
792             return null; // FindBugs NP_BOOLEAN_RETURN_NULL
793           }
794         }.withRetries();
795   }
796 
797   /**
798    * {@inheritDoc}
799    */
800   @Override
801   public void delete(final List<Delete> deletes)
802   throws IOException {
803     Object[] results = new Object[deletes.size()];
804     try {
805       connection.processBatch((List) deletes, tableName, pool, results);
806     } catch (InterruptedException e) {
807       throw new IOException(e);
808     } finally {
809       // mutate list so that it is empty for complete success, or contains only failed records
810       // results are returned in the same order as the requests in list
811       // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
812       for (int i = results.length - 1; i>=0; i--) {
813         // if result is not null, it succeeded
814         if (results[i] instanceof Result) {
815           deletes.remove(i);
816         }
817       }
818     }
819   }
820 
821   /**
822    * {@inheritDoc}
823    */
824   @Override
825   public void put(final Put put) throws IOException {
826     doPut(put);
827     if (autoFlush) {
828       flushCommits();
829     }
830   }
831 
832   /**
833    * {@inheritDoc}
834    */
835   @Override
836   public void put(final List<Put> puts) throws IOException {
837     for (Put put : puts) {
838       doPut(put);
839     }
840     if (autoFlush) {
841       flushCommits();
842     }
843   }
844 
845   private void doPut(Put put) throws IOException{
846     validatePut(put);
847     writeBuffer.add(put);
848     currentWriteBufferSize += put.heapSize();
849     if (currentWriteBufferSize > writeBufferSize) {
850       flushCommits();
851     }
852   }
853 
854   /**
855    * {@inheritDoc}
856    */
857   @Override
858   public void mutateRow(final RowMutations rm) throws IOException {
859     new ServerCallable<Void>(connection, tableName, rm.getRow(),
860         operationTimeout) {
861       public Void call() throws IOException {
862         server.mutateRow(location.getRegionInfo().getRegionName(), rm);
863         return null;
864       }
865     }.withRetries();
866   }
867 
868   /**
869    * {@inheritDoc}
870    */
871   @Override
872   public Result append(final Append append) throws IOException {
873     if (append.numFamilies() == 0) {
874       throw new IOException(
875           "Invalid arguments to append, no columns specified");
876     }
877     return new ServerCallable<Result>(connection, tableName, append.getRow(), operationTimeout) {
878           public Result call() throws IOException {
879             return server.append(
880                 location.getRegionInfo().getRegionName(), append);
881           }
882         }.withRetries();
883   }
884 
885   /**
886    * {@inheritDoc}
887    */
888   @Override
889   public Result increment(final Increment increment) throws IOException {
890     if (!increment.hasFamilies()) {
891       throw new IOException(
892           "Invalid arguments to increment, no columns specified");
893     }
894     return new ServerCallable<Result>(connection, tableName, increment.getRow(), operationTimeout) {
895           public Result call() throws IOException {
896             return server.increment(
897                 location.getRegionInfo().getRegionName(), increment);
898           }
899         }.withRetries();
900   }
901 
902   /**
903    * {@inheritDoc}
904    */
905   @Override
906   public long incrementColumnValue(final byte [] row, final byte [] family,
907       final byte [] qualifier, final long amount)
908   throws IOException {
909     return incrementColumnValue(row, family, qualifier, amount, true);
910   }
911 
912   /**
913    * {@inheritDoc}
914    */
915   @Override
916   public long incrementColumnValue(final byte [] row, final byte [] family,
917       final byte [] qualifier, final long amount, final boolean writeToWAL)
918   throws IOException {
919     NullPointerException npe = null;
920     if (row == null) {
921       npe = new NullPointerException("row is null");
922     } else if (family == null) {
923       npe = new NullPointerException("column is null");
924     }
925     if (npe != null) {
926       throw new IOException(
927           "Invalid arguments to incrementColumnValue", npe);
928     }
929     return new ServerCallable<Long>(connection, tableName, row, operationTimeout) {
930           public Long call() throws IOException {
931             return server.incrementColumnValue(
932                 location.getRegionInfo().getRegionName(), row, family,
933                 qualifier, amount, writeToWAL);
934           }
935         }.withRetries();
936   }
937 
938   /**
939    * {@inheritDoc}
940    */
941   @Override
942   public boolean checkAndPut(final byte [] row,
943       final byte [] family, final byte [] qualifier, final byte [] value,
944       final Put put)
945   throws IOException {
946     return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
947           public Boolean call() throws IOException {
948             return server.checkAndPut(location.getRegionInfo().getRegionName(),
949                 row, family, qualifier, value, put) ? Boolean.TRUE : Boolean.FALSE;
950           }
951         }.withRetries();
952   }
953 
954 
955   /**
956    * {@inheritDoc}
957    */
958   @Override
959   public boolean checkAndDelete(final byte [] row,
960       final byte [] family, final byte [] qualifier, final byte [] value,
961       final Delete delete)
962   throws IOException {
963     return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
964           public Boolean call() throws IOException {
965             return server.checkAndDelete(
966                 location.getRegionInfo().getRegionName(),
967                 row, family, qualifier, value, delete)
968             ? Boolean.TRUE : Boolean.FALSE;
969           }
970         }.withRetries();
971   }
972 
973   /**
974    * {@inheritDoc}
975    */
976   @Override
977   public boolean exists(final Get get) throws IOException {
978     return new ServerCallable<Boolean>(connection, tableName, get.getRow(), operationTimeout) {
979           public Boolean call() throws IOException {
980             return server.
981                 exists(location.getRegionInfo().getRegionName(), get);
982           }
983         }.withRetries();
984   }
985 
986   /**
987    * {@inheritDoc}
988    */
989   @Override
990   public void flushCommits() throws IOException {
991     try {
992       Object[] results = new Object[writeBuffer.size()];
993       try {
994         this.connection.processBatch(writeBuffer, tableName, pool, results);
995       } catch (InterruptedException e) {
996         throw new IOException(e);
997       } finally {
998         // mutate list so that it is empty for complete success, or contains
999         // only failed records results are returned in the same order as the
1000         // requests in list walk the list backwards, so we can remove from list
1001         // without impacting the indexes of earlier members
1002         for (int i = results.length - 1; i>=0; i--) {
1003           if (results[i] instanceof Result) {
1004             // successful Puts are removed from the list here.
1005             writeBuffer.remove(i);
1006           }
1007         }
1008       }
1009     } finally {
1010       if (clearBufferOnFail) {
1011         writeBuffer.clear();
1012         currentWriteBufferSize = 0;
1013       } else {
1014         // the write buffer was adjusted by processBatchOfPuts
1015         currentWriteBufferSize = 0;
1016         for (Put aPut : writeBuffer) {
1017           currentWriteBufferSize += aPut.heapSize();
1018         }
1019       }
1020     }
1021   }
1022 
1023   /**
1024    * {@inheritDoc}
1025    */
1026   @Override
1027   public void close() throws IOException {
1028     if (this.closed) {
1029       return;
1030     }
1031     flushCommits();
1032     if (cleanupPoolOnClose) {
1033       this.pool.shutdown();
1034     }
1035     if (cleanupConnectionOnClose) {
1036       if (this.connection != null) {
1037         this.connection.close();
1038       }
1039     }
1040     this.closed = true;
1041   }
1042 
1043   // validate for well-formedness
1044   private void validatePut(final Put put) throws IllegalArgumentException{
1045     if (put.isEmpty()) {
1046       throw new IllegalArgumentException("No columns to insert");
1047     }
1048     if (maxKeyValueSize > 0) {
1049       for (List<KeyValue> list : put.getFamilyMap().values()) {
1050         for (KeyValue kv : list) {
1051           if (kv.getLength() > maxKeyValueSize) {
1052             throw new IllegalArgumentException("KeyValue size too large");
1053           }
1054         }
1055       }
1056     }
1057   }
1058 
1059   /**
1060    * {@inheritDoc}
1061    */
1062   @Override
1063   public RowLock lockRow(final byte [] row)
1064   throws IOException {
1065     return new ServerCallable<RowLock>(connection, tableName, row, operationTimeout) {
1066         public RowLock call() throws IOException {
1067           long lockId =
1068               server.lockRow(location.getRegionInfo().getRegionName(), row);
1069           return new RowLock(row,lockId);
1070         }
1071       }.withRetries();
1072   }
1073 
1074   /**
1075    * {@inheritDoc}
1076    */
1077   @Override
1078   public void unlockRow(final RowLock rl)
1079   throws IOException {
1080     new ServerCallable<Boolean>(connection, tableName, rl.getRow(), operationTimeout) {
1081         public Boolean call() throws IOException {
1082           server.unlockRow(location.getRegionInfo().getRegionName(),
1083               rl.getLockId());
1084           return null; // FindBugs NP_BOOLEAN_RETURN_NULL
1085         }
1086       }.withRetries();
1087   }
1088 
1089   /**
1090    * {@inheritDoc}
1091    */
1092   @Override
1093   public boolean isAutoFlush() {
1094     return autoFlush;
1095   }
1096 
1097   /**
1098    * See {@link #setAutoFlush(boolean, boolean)}
1099    *
1100    * @param autoFlush
1101    *          Whether or not to enable 'auto-flush'.
1102    */
1103   public void setAutoFlush(boolean autoFlush) {
1104     setAutoFlush(autoFlush, autoFlush);
1105   }
1106 
1107   /**
1108    * Turns 'auto-flush' on or off.
1109    * <p>
1110    * When enabled (default), {@link Put} operations don't get buffered/delayed
1111    * and are immediately executed. Failed operations are not retried. This is
1112    * slower but safer.
1113    * <p>
1114    * Turning off {@link #autoFlush} means that multiple {@link Put}s will be
1115    * accepted before any RPC is actually sent to do the write operations. If the
1116    * application dies before pending writes get flushed to HBase, data will be
1117    * lost.
1118    * <p>
1119    * When you turn {@link #autoFlush} off, you should also consider the
1120    * {@link #clearBufferOnFail} option. By default, asynchronous {@link Put}
1121    * requests will be retried on failure until successful. However, this can
1122    * pollute the writeBuffer and slow down batching performance. Additionally,
1123    * you may want to issue a number of Put requests and call
1124    * {@link #flushCommits()} as a barrier. In both use cases, consider setting
1125    * clearBufferOnFail to true to erase the buffer after {@link #flushCommits()}
1126    * has been called, regardless of success.
1127    *
1128    * @param autoFlush
1129    *          Whether or not to enable 'auto-flush'.
1130    * @param clearBufferOnFail
1131    *          Whether to keep Put failures in the writeBuffer
1132    * @see #flushCommits
1133    */
1134   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1135     this.autoFlush = autoFlush;
1136     this.clearBufferOnFail = autoFlush || clearBufferOnFail;
1137   }
1138 
1139   /**
1140    * Returns the maximum size in bytes of the write buffer for this HTable.
1141    * <p>
1142    * The default value comes from the configuration parameter
1143    * {@code hbase.client.write.buffer}.
1144    * @return The size of the write buffer in bytes.
1145    */
1146   public long getWriteBufferSize() {
1147     return writeBufferSize;
1148   }
1149 
1150   /**
1151    * Sets the size of the buffer in bytes.
1152    * <p>
1153    * If the new size is less than the current amount of data in the
1154    * write buffer, the buffer gets flushed.
1155    * @param writeBufferSize The new write buffer size, in bytes.
1156    * @throws IOException if a remote or network exception occurs.
1157    */
1158   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1159     this.writeBufferSize = writeBufferSize;
1160     if(currentWriteBufferSize > writeBufferSize) {
1161       flushCommits();
1162     }
1163   }
1164 
1165   /**
1166    * Returns the write buffer.
1167    * @return The current write buffer.
1168    */
1169   public ArrayList<Put> getWriteBuffer() {
1170     return writeBuffer;
1171   }
1172 
1173   /**
1174    * The pool is used for mutli requests for this HTable
1175    * @return the pool used for mutli
1176    */
1177   ExecutorService getPool() {
1178     return this.pool;
1179   }
1180 
1181   /**
1182    * Enable or disable region cache prefetch for the table. It will be
1183    * applied for the given table's all HTable instances who share the same
1184    * connection. By default, the cache prefetch is enabled.
1185    * @param tableName name of table to configure.
1186    * @param enable Set to true to enable region cache prefetch. Or set to
1187    * false to disable it.
1188    * @throws IOException
1189    */
1190   public static void setRegionCachePrefetch(final byte[] tableName,
1191       final boolean enable) throws IOException {
1192     HConnectionManager.execute(new HConnectable<Void>(HBaseConfiguration
1193         .create()) {
1194       @Override
1195       public Void connect(HConnection connection) throws IOException {
1196         connection.setRegionCachePrefetch(tableName, enable);
1197         return null;
1198       }
1199     });
1200   }
1201 
1202   /**
1203    * Enable or disable region cache prefetch for the table. It will be
1204    * applied for the given table's all HTable instances who share the same
1205    * connection. By default, the cache prefetch is enabled.
1206    * @param conf The Configuration object to use.
1207    * @param tableName name of table to configure.
1208    * @param enable Set to true to enable region cache prefetch. Or set to
1209    * false to disable it.
1210    * @throws IOException
1211    */
1212   public static void setRegionCachePrefetch(final Configuration conf,
1213       final byte[] tableName, final boolean enable) throws IOException {
1214     HConnectionManager.execute(new HConnectable<Void>(conf) {
1215       @Override
1216       public Void connect(HConnection connection) throws IOException {
1217         connection.setRegionCachePrefetch(tableName, enable);
1218         return null;
1219       }
1220     });
1221   }
1222 
1223   /**
1224    * Check whether region cache prefetch is enabled or not for the table.
1225    * @param conf The Configuration object to use.
1226    * @param tableName name of table to check
1227    * @return true if table's region cache prefecth is enabled. Otherwise
1228    * it is disabled.
1229    * @throws IOException
1230    */
1231   public static boolean getRegionCachePrefetch(final Configuration conf,
1232       final byte[] tableName) throws IOException {
1233     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
1234       @Override
1235       public Boolean connect(HConnection connection) throws IOException {
1236         return connection.getRegionCachePrefetch(tableName);
1237       }
1238     });
1239   }
1240 
1241   /**
1242    * Check whether region cache prefetch is enabled or not for the table.
1243    * @param tableName name of table to check
1244    * @return true if table's region cache prefecth is enabled. Otherwise
1245    * it is disabled.
1246    * @throws IOException
1247    */
1248   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1249     return HConnectionManager.execute(new HConnectable<Boolean>(
1250         HBaseConfiguration.create()) {
1251       @Override
1252       public Boolean connect(HConnection connection) throws IOException {
1253         return connection.getRegionCachePrefetch(tableName);
1254       }
1255     });
1256  }
1257 
1258   /**
1259    * Explicitly clears the region cache to fetch the latest value from META.
1260    * This is a power user function: avoid unless you know the ramifications.
1261    */
1262   public void clearRegionCache() {
1263     this.connection.clearRegionCache();
1264   }
1265 
1266   /**
1267    * {@inheritDoc}
1268    */
1269   @Override
1270   public <T extends CoprocessorProtocol> T coprocessorProxy(
1271       Class<T> protocol, byte[] row) {
1272     return (T)Proxy.newProxyInstance(this.getClass().getClassLoader(),
1273         new Class[]{protocol},
1274         new ExecRPCInvoker(configuration,
1275             connection,
1276             protocol,
1277             tableName,
1278             row));
1279   }
1280 
1281   /**
1282    * {@inheritDoc}
1283    */
1284   @Override
1285   public <T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
1286       Class<T> protocol, byte[] startKey, byte[] endKey,
1287       Batch.Call<T,R> callable)
1288       throws IOException, Throwable {
1289 
1290     final Map<byte[],R> results =  Collections.synchronizedMap(new TreeMap<byte[],R>(
1291         Bytes.BYTES_COMPARATOR));
1292     coprocessorExec(protocol, startKey, endKey, callable,
1293         new Batch.Callback<R>(){
1294       public void update(byte[] region, byte[] row, R value) {
1295         results.put(region, value);
1296       }
1297     });
1298     return results;
1299   }
1300 
1301   /**
1302    * {@inheritDoc}
1303    */
1304   @Override
1305   public <T extends CoprocessorProtocol, R> void coprocessorExec(
1306       Class<T> protocol, byte[] startKey, byte[] endKey,
1307       Batch.Call<T,R> callable, Batch.Callback<R> callback)
1308       throws IOException, Throwable {
1309 
1310     // get regions covered by the row range
1311     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1312     connection.processExecs(protocol, keys, tableName, pool, callable,
1313         callback);
1314   }
1315 
1316   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1317   throws IOException {
1318     if (start == null) {
1319       start = HConstants.EMPTY_START_ROW;
1320     }
1321     if (end == null) {
1322       end = HConstants.EMPTY_END_ROW;
1323     }
1324     return getKeysAndRegionsInRange(start, end, true).getFirst();
1325   }
1326 
1327   public void setOperationTimeout(int operationTimeout) {
1328     this.operationTimeout = operationTimeout;
1329   }
1330 
1331   public int getOperationTimeout() {
1332     return operationTimeout;
1333   }
1334 
1335 }