View Javadoc

1   /*
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.EOFException;
23  import java.io.FileNotFoundException;
24  import java.io.IOException;
25  import java.io.InterruptedIOException;
26  import java.io.UnsupportedEncodingException;
27  import java.lang.reflect.Constructor;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.text.ParseException;
31  import java.util.AbstractList;
32  import java.util.ArrayList;
33  import java.util.Arrays;
34  import java.util.Collection;
35  import java.util.Collections;
36  import java.util.HashMap;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.NavigableMap;
40  import java.util.NavigableSet;
41  import java.util.Random;
42  import java.util.RandomAccess;
43  import java.util.Set;
44  import java.util.TreeMap;
45  import java.util.UUID;
46  import java.util.concurrent.Callable;
47  import java.util.concurrent.CompletionService;
48  import java.util.concurrent.ConcurrentHashMap;
49  import java.util.concurrent.ConcurrentSkipListMap;
50  import java.util.concurrent.CountDownLatch;
51  import java.util.concurrent.ExecutionException;
52  import java.util.concurrent.ExecutorCompletionService;
53  import java.util.concurrent.Future;
54  import java.util.concurrent.ThreadFactory;
55  import java.util.concurrent.ThreadPoolExecutor;
56  import java.util.concurrent.TimeUnit;
57  import java.util.concurrent.atomic.AtomicBoolean;
58  import java.util.concurrent.atomic.AtomicInteger;
59  import java.util.concurrent.atomic.AtomicLong;
60  import java.util.concurrent.locks.Lock;
61  import java.util.concurrent.locks.ReentrantReadWriteLock;
62  
63  import org.apache.commons.logging.Log;
64  import org.apache.commons.logging.LogFactory;
65  import org.apache.hadoop.conf.Configuration;
66  import org.apache.hadoop.fs.FSDataOutputStream;
67  import org.apache.hadoop.fs.FSDataInputStream;
68  import org.apache.hadoop.fs.FileStatus;
69  import org.apache.hadoop.fs.FileSystem;
70  import org.apache.hadoop.fs.Path;
71  import org.apache.hadoop.fs.permission.FsPermission;
72  import org.apache.hadoop.hbase.DoNotRetryIOException;
73  import org.apache.hadoop.hbase.DroppedSnapshotException;
74  import org.apache.hadoop.hbase.HBaseConfiguration;
75  import org.apache.hadoop.hbase.HBaseFileSystem;
76  import org.apache.hadoop.hbase.HColumnDescriptor;
77  import org.apache.hadoop.hbase.HConstants;
78  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
79  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
80  import org.apache.hadoop.hbase.HRegionInfo;
81  import org.apache.hadoop.hbase.HTableDescriptor;
82  import org.apache.hadoop.hbase.KeyValue;
83  import org.apache.hadoop.hbase.NotServingRegionException;
84  import org.apache.hadoop.hbase.RegionTooBusyException;
85  import org.apache.hadoop.hbase.UnknownScannerException;
86  import org.apache.hadoop.hbase.backup.HFileArchiver;
87  import org.apache.hadoop.hbase.client.Append;
88  import org.apache.hadoop.hbase.client.Durability;
89  import org.apache.hadoop.hbase.client.RowMutations;
90  import org.apache.hadoop.hbase.client.Delete;
91  import org.apache.hadoop.hbase.client.Get;
92  import org.apache.hadoop.hbase.client.Increment;
93  import org.apache.hadoop.hbase.client.IsolationLevel;
94  import org.apache.hadoop.hbase.client.Mutation;
95  import org.apache.hadoop.hbase.client.Put;
96  import org.apache.hadoop.hbase.client.Result;
97  import org.apache.hadoop.hbase.client.Row;
98  import org.apache.hadoop.hbase.client.RowLock;
99  import org.apache.hadoop.hbase.client.Scan;
100 import org.apache.hadoop.hbase.client.coprocessor.Exec;
101 import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
102 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
103 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
104 import org.apache.hadoop.hbase.filter.Filter;
105 import org.apache.hadoop.hbase.filter.FilterBase;
106 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
107 import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
108 import org.apache.hadoop.hbase.io.HFileLink;
109 import org.apache.hadoop.hbase.io.HeapSize;
110 import org.apache.hadoop.hbase.io.Reference;
111 import org.apache.hadoop.hbase.io.TimeRange;
112 import org.apache.hadoop.hbase.io.hfile.BlockCache;
113 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
114 import org.apache.hadoop.hbase.io.hfile.HFile;
115 import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
116 import org.apache.hadoop.hbase.ipc.HBaseRPC;
117 import org.apache.hadoop.hbase.ipc.HBaseServer;
118 import org.apache.hadoop.hbase.ipc.RpcCallContext;
119 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
120 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
121 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
122 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
123 import org.apache.hadoop.hbase.regionserver.metrics.OperationMetrics;
124 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
125 import org.apache.hadoop.hbase.regionserver.wal.HLog;
126 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
127 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
128 import org.apache.hadoop.hbase.snapshot.TakeSnapshotUtils;
129 import org.apache.hadoop.hbase.util.Bytes;
130 import org.apache.hadoop.hbase.util.CancelableProgressable;
131 import org.apache.hadoop.hbase.util.ClassSize;
132 import org.apache.hadoop.hbase.util.CompressionTest;
133 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
134 import org.apache.hadoop.hbase.util.FSUtils;
135 import org.apache.hadoop.hbase.util.HashedBytes;
136 import org.apache.hadoop.hbase.util.Pair;
137 import org.apache.hadoop.hbase.util.Threads;
138 import org.apache.hadoop.hbase.util.Writables;
139 import org.apache.hadoop.io.MultipleIOException;
140 import org.apache.hadoop.io.Writable;
141 import org.apache.hadoop.util.StringUtils;
142 import org.cliffc.high_scale_lib.Counter;
143 
144 import com.google.common.base.Preconditions;
145 import com.google.common.collect.ClassToInstanceMap;
146 import com.google.common.collect.ImmutableList;
147 import com.google.common.collect.Lists;
148 import com.google.common.collect.Maps;
149 import com.google.common.collect.Sets;
150 import com.google.common.collect.MutableClassToInstanceMap;
151 
152 /**
153  * HRegion stores data for a certain region of a table.  It stores all columns
154  * for each row. A given table consists of one or more HRegions.
155  *
156  * <p>We maintain multiple HStores for a single HRegion.
157  *
158  * <p>An Store is a set of rows with some column data; together,
159  * they make up all the data for the rows.
160  *
161  * <p>Each HRegion has a 'startKey' and 'endKey'.
162  * <p>The first is inclusive, the second is exclusive (except for
163  * the final region)  The endKey of region 0 is the same as
164  * startKey for region 1 (if it exists).  The startKey for the
165  * first region is null. The endKey for the final region is null.
166  *
167  * <p>Locking at the HRegion level serves only one purpose: preventing the
168  * region from being closed (and consequently split) while other operations
169  * are ongoing. Each row level operation obtains both a row lock and a region
170  * read lock for the duration of the operation. While a scanner is being
171  * constructed, getScanner holds a read lock. If the scanner is successfully
172  * constructed, it holds a read lock until it is closed. A close takes out a
173  * write lock and consequently will block for ongoing operations and will block
174  * new operations from starting while the close is in progress.
175  *
176  * <p>An HRegion is defined by its table and its key extent.
177  *
178  * <p>It consists of at least one Store.  The number of Stores should be
179  * configurable, so that data which is accessed together is stored in the same
180  * Store.  Right now, we approximate that by building a single Store for
181  * each column family.  (This config info will be communicated via the
182  * tabledesc.)
183  *
184  * <p>The HTableDescriptor contains metainfo about the HRegion's table.
185  * regionName is a unique identifier for this HRegion. (startKey, endKey]
186  * defines the keyspace for this HRegion.
187  */
188 public class HRegion implements HeapSize { // , Writable{
189   public static final Log LOG = LogFactory.getLog(HRegion.class);
190   private static final String MERGEDIR = ".merges";
191 
192   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = "hbase.hregion.scan.loadColumnFamiliesOnDemand";
193 
194   final AtomicBoolean closed = new AtomicBoolean(false);
195   /* Closing can take some time; use the closing flag if there is stuff we don't
196    * want to do while in closing state; e.g. like offer this region up to the
197    * master as a region to close if the carrying regionserver is overloaded.
198    * Once set, it is never cleared.
199    */
200   final AtomicBoolean closing = new AtomicBoolean(false);
201 
202   //////////////////////////////////////////////////////////////////////////////
203   // Members
204   //////////////////////////////////////////////////////////////////////////////
205 
206   private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
207     new ConcurrentHashMap<HashedBytes, CountDownLatch>();
208   private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
209     new ConcurrentHashMap<Integer, HashedBytes>();
210   private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
211   static private Random rand = new Random();
212 
213   protected final Map<byte [], Store> stores =
214     new ConcurrentSkipListMap<byte [], Store>(Bytes.BYTES_RAWCOMPARATOR);
215 
216   // Registered region protocol handlers
217   private ClassToInstanceMap<CoprocessorProtocol>
218       protocolHandlers = MutableClassToInstanceMap.create();
219 
220   private Map<String, Class<? extends CoprocessorProtocol>>
221       protocolHandlerNames = Maps.newHashMap();
222 
223   /**
224    * Temporary subdirectory of the region directory used for compaction output.
225    */
226   public static final String REGION_TEMP_SUBDIR = ".tmp";
227 
228   //These variable are just used for getting data out of the region, to test on
229   //client side
230   // private int numStores = 0;
231   // private int [] storeSize = null;
232   // private byte [] name = null;
233 
234   final AtomicLong memstoreSize = new AtomicLong(0);
235 
236   // Debug possible data loss due to WAL off
237   final AtomicLong numPutsWithoutWAL = new AtomicLong(0);
238   final AtomicLong dataInMemoryWithoutWAL = new AtomicLong(0);
239 
240   final Counter readRequestsCount = new Counter();
241   final Counter writeRequestsCount = new Counter();
242   final Counter updatesBlockedMs = new Counter();
243 
244   /**
245    * The directory for the table this region is part of.
246    * This directory contains the directory for this region.
247    */
248   private final Path tableDir;
249 
250   private final HLog log;
251   private final FileSystem fs;
252   private final Configuration conf;
253   final Configuration baseConf;
254   private final int rowLockWaitDuration;
255   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
256 
257   // The internal wait duration to acquire a lock before read/update
258   // from the region. It is not per row. The purpose of this wait time
259   // is to avoid waiting a long time while the region is busy, so that
260   // we can release the IPC handler soon enough to improve the
261   // availability of the region server. It can be adjusted by
262   // tuning configuration "hbase.busy.wait.duration".
263   final long busyWaitDuration;
264   static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
265 
266   // If updating multiple rows in one call, wait longer,
267   // i.e. waiting for busyWaitDuration * # of rows. However,
268   // we can limit the max multiplier.
269   final int maxBusyWaitMultiplier;
270 
271   // Max busy wait duration. There is no point to wait longer than the RPC
272   // purge timeout, when a RPC call will be terminated by the RPC engine.
273   final long maxBusyWaitDuration;
274 
275   private final HRegionInfo regionInfo;
276   private final Path regiondir;
277   KeyValue.KVComparator comparator;
278 
279   private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
280   /**
281    * The default setting for whether to enable on-demand CF loading for
282    * scan requests to this region. Requests can override it.
283    */
284   private boolean isLoadingCfsOnDemandDefault = false;
285 
286   /**
287    * @return The smallest mvcc readPoint across all the scanners in this
288    * region. Writes older than this readPoint, are included  in every
289    * read operation.
290    */
291   public long getSmallestReadPoint() {
292     long minimumReadPoint;
293     // We need to ensure that while we are calculating the smallestReadPoint
294     // no new RegionScanners can grab a readPoint that we are unaware of.
295     // We achieve this by synchronizing on the scannerReadPoints object.
296     synchronized(scannerReadPoints) {
297       minimumReadPoint = mvcc.memstoreReadPoint();
298 
299       for (Long readPoint: this.scannerReadPoints.values()) {
300         if (readPoint < minimumReadPoint) {
301           minimumReadPoint = readPoint;
302         }
303       }
304     }
305     return minimumReadPoint;
306   }
307   /*
308    * Data structure of write state flags used coordinating flushes,
309    * compactions and closes.
310    */
311   static class WriteState {
312     // Set while a memstore flush is happening.
313     volatile boolean flushing = false;
314     // Set when a flush has been requested.
315     volatile boolean flushRequested = false;
316     // Number of compactions running.
317     volatile int compacting = 0;
318     // Gets set in close. If set, cannot compact or flush again.
319     volatile boolean writesEnabled = true;
320     // Set if region is read-only
321     volatile boolean readOnly = false;
322 
323     /**
324      * Set flags that make this region read-only.
325      *
326      * @param onOff flip value for region r/o setting
327      */
328     synchronized void setReadOnly(final boolean onOff) {
329       this.writesEnabled = !onOff;
330       this.readOnly = onOff;
331     }
332 
333     boolean isReadOnly() {
334       return this.readOnly;
335     }
336 
337     boolean isFlushRequested() {
338       return this.flushRequested;
339     }
340 
341     static final long HEAP_SIZE = ClassSize.align(
342         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
343   }
344 
345   /**
346    * Objects from this class are created when flushing to describe all the different states that
347    * that method ends up in. The Result enum describes those states. The sequence id should only
348    * be specified if the flush was successful, and the failure message should only be speficied
349    * if it didn't flush.
350    */
351   public static class FlushResult {
352     enum Result {
353       FLUSHED_NO_COMPACTION_NEEDED,
354       FLUSHED_COMPACTION_NEEDED,
355       // Special case where a flush didn't run because there's nothing in the memstores. Used when
356       // bulk loading to know when we can still load even if a flush didn't happen.
357       CANNOT_FLUSH_MEMSTORE_EMPTY,
358       CANNOT_FLUSH
359       // Be careful adding more to this enum, look at the below methods to make sure
360     }
361 
362     final Result result;
363     final String failureReason;
364     final long flushSequenceId;
365 
366     /**
367      * Convenience constructor to use when the flush is successful, the failure message is set to
368      * null.
369      * @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED.
370      * @param flushSequenceId Generated sequence id that comes right after the edits in the
371      *                        memstores.
372      */
373     FlushResult(Result result, long flushSequenceId) {
374       this(result, flushSequenceId, null);
375       assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
376           .FLUSHED_COMPACTION_NEEDED;
377     }
378 
379     /**
380      * Convenience constructor to use when we cannot flush.
381      * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
382      * @param failureReason Reason why we couldn't flush.
383      */
384     FlushResult(Result result, String failureReason) {
385       this(result, -1, failureReason);
386       assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
387     }
388 
389     /**
390      * Constructor with all the parameters.
391      * @param result Any of the Result.
392      * @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
393      * @param failureReason Reason why we couldn't flush, or null.
394      */
395     FlushResult(Result result, long flushSequenceId, String failureReason) {
396       this.result = result;
397       this.flushSequenceId = flushSequenceId;
398       this.failureReason = failureReason;
399     }
400 
401     /**
402      * Convenience method, the equivalent of checking if result is
403      * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
404      * @return true if the memstores were flushed, else false.
405      */
406     public boolean isFlushSucceeded() {
407       return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
408           .FLUSHED_COMPACTION_NEEDED;
409     }
410 
411     /**
412      * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
413      * @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
414      */
415     public boolean isCompactionNeeded() {
416       return result == Result.FLUSHED_COMPACTION_NEEDED;
417     }
418   }
419 
420   final WriteState writestate = new WriteState();
421 
422   long memstoreFlushSize;
423   final long timestampSlop;
424   private volatile long lastFlushTime;
425   final RegionServerServices rsServices;
426   private RegionServerAccounting rsAccounting;
427   private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
428   private long flushCheckInterval;
429   private long blockingMemStoreSize;
430   final long threadWakeFrequency;
431   // Used to guard closes
432   final ReentrantReadWriteLock lock =
433     new ReentrantReadWriteLock();
434 
435   // Stop updates lock
436   private final ReentrantReadWriteLock updatesLock =
437     new ReentrantReadWriteLock();
438   private boolean splitRequest;
439   private byte[] explicitSplitPoint = null;
440 
441   private final MultiVersionConsistencyControl mvcc =
442       new MultiVersionConsistencyControl();
443 
444   // Coprocessor host
445   private RegionCoprocessorHost coprocessorHost;
446 
447   /**
448    * Name of the region info file that resides just under the region directory.
449    */
450   public final static String REGIONINFO_FILE = ".regioninfo";
451   private HTableDescriptor htableDescriptor = null;
452   private RegionSplitPolicy splitPolicy;
453   private final OperationMetrics opMetrics;
454   private final boolean deferredLogSyncDisabled;
455 
456   /**
457    * Should only be used for testing purposes
458    */
459   public HRegion(){
460     this.tableDir = null;
461     this.blockingMemStoreSize = 0L;
462     this.conf = null;
463     this.rowLockWaitDuration = DEFAULT_ROWLOCK_WAIT_DURATION;
464     this.rsServices = null;
465     this.baseConf = null;
466     this.fs = null;
467     this.timestampSlop = HConstants.LATEST_TIMESTAMP;
468     this.memstoreFlushSize = 0L;
469     this.log = null;
470     this.regiondir = null;
471     this.regionInfo = null;
472     this.htableDescriptor = null;
473     this.threadWakeFrequency = 0L;
474     this.coprocessorHost = null;
475     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
476     this.opMetrics = new OperationMetrics();
477 
478     this.maxBusyWaitDuration = 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
479     this.busyWaitDuration = DEFAULT_BUSY_WAIT_DURATION;
480     this.maxBusyWaitMultiplier = 2;
481     this.deferredLogSyncDisabled = false;
482   }
483 
484   
485   /**
486    * HRegion copy constructor. Useful when reopening a closed region (normally
487    * for unit tests)
488    * @param other original object
489    */
490   public HRegion(HRegion other) {
491     this(other.getTableDir(), other.getLog(), other.getFilesystem(),
492         other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
493   }
494   
495   /**
496    * HRegion constructor.  his constructor should only be used for testing and
497    * extensions.  Instances of HRegion should be instantiated with the
498    * {@link HRegion#newHRegion(Path, HLog, FileSystem, Configuration, HRegionInfo, HTableDescriptor, RegionServerServices)} method.
499    *
500    *
501    * @param tableDir qualified path of directory where region should be located,
502    * usually the table directory.
503    * @param log The HLog is the outbound log for any updates to the HRegion
504    * (There's a single HLog for all the HRegions on a single HRegionServer.)
505    * The log file is a logfile from the previous execution that's
506    * custom-computed for this HRegion. The HRegionServer computes and sorts the
507    * appropriate log info for this HRegion. If there is a previous log file
508    * (implying that the HRegion has been written-to before), then read it from
509    * the supplied path.
510    * @param fs is the filesystem.
511    * @param conf is global configuration settings.
512    * @param regionInfo - HRegionInfo that describes the region
513    * is new), then read them from the supplied path.
514    * @param rsServices reference to {@link RegionServerServices} or null
515    *
516    * @see HRegion#newHRegion(Path, HLog, FileSystem, Configuration, HRegionInfo, HTableDescriptor, RegionServerServices)
517    */
518   public HRegion(Path tableDir, HLog log, FileSystem fs, Configuration confParam,
519     final HRegionInfo regionInfo, final HTableDescriptor htd,
520       RegionServerServices rsServices) {
521     this.tableDir = tableDir;
522     this.comparator = regionInfo.getComparator();
523     this.log = log;
524     this.fs = fs;
525     if (confParam instanceof CompoundConfiguration) {
526        throw new IllegalArgumentException("Need original base configuration");
527     }
528     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
529     this.baseConf = confParam;
530     if (htd != null) {
531       this.conf = new CompoundConfiguration().add(confParam).add(htd.getValues());
532     }
533     else {
534       this.conf = new CompoundConfiguration().add(confParam);
535     }
536     this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
537         DEFAULT_CACHE_FLUSH_INTERVAL);
538     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
539                     DEFAULT_ROWLOCK_WAIT_DURATION);
540 
541     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, false);
542     this.regionInfo = regionInfo;
543     this.htableDescriptor = htd;
544     this.rsServices = rsServices;
545     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY,
546         10 * 1000);
547     String encodedNameStr = this.regionInfo.getEncodedName();
548     setHTableSpecificConf();
549     this.regiondir = getRegionDir(this.tableDir, encodedNameStr);
550     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
551     this.opMetrics = new OperationMetrics(conf, this.regionInfo);
552 
553     this.busyWaitDuration = conf.getLong(
554       "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
555     this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
556     if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
557       throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
558         + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
559         + maxBusyWaitMultiplier + "). Their product should be positive");
560     }
561     this.maxBusyWaitDuration = conf.getLong("ipc.client.call.purge.timeout",
562       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
563 
564     /*
565      * timestamp.slop provides a server-side constraint on the timestamp. This
566      * assumes that you base your TS around currentTimeMillis(). In this case,
567      * throw an error to the user if the user-specified TS is newer than now +
568      * slop. LATEST_TIMESTAMP == don't use this functionality
569      */
570     this.timestampSlop = conf.getLong(
571         "hbase.hregion.keyvalue.timestamp.slop.millisecs",
572         HConstants.LATEST_TIMESTAMP);
573     // When hbase.regionserver.optionallogflushinterval <= 0 , deferred log sync is disabled.
574     this.deferredLogSyncDisabled = conf.getLong("hbase.regionserver.optionallogflushinterval",
575         1 * 1000) <= 0;
576 
577     if (rsServices != null) {
578       this.rsAccounting = this.rsServices.getRegionServerAccounting();
579       // don't initialize coprocessors if not running within a regionserver
580       // TODO: revisit if coprocessors should load in other cases
581       this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
582     }
583     if (LOG.isDebugEnabled()) {
584       // Write out region name as string and its encoded name.
585       LOG.debug("Instantiated " + this);
586     }
587   }
588 
589   void setHTableSpecificConf() {
590     if (this.htableDescriptor == null) return;
591     LOG.info("Setting up tabledescriptor config now ...");
592     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
593 
594     if (flushSize <= 0) {
595       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
596         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
597     }
598     this.memstoreFlushSize = flushSize;
599     this.blockingMemStoreSize = this.memstoreFlushSize *
600         conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
601   }
602 
603   /**
604    * Initialize this region.
605    * @return What the next sequence (edit) id should be.
606    * @throws IOException e
607    */
608   public long initialize() throws IOException {
609     return initialize(null);
610   }
611 
612   /**
613    * Initialize this region.
614    *
615    * @param reporter Tickle every so often if initialize is taking a while.
616    * @return What the next sequence (edit) id should be.
617    * @throws IOException e
618    */
619   public long initialize(final CancelableProgressable reporter)
620       throws IOException {
621 
622     MonitoredTask status = TaskMonitor.get().createStatus(
623         "Initializing region " + this);
624 
625     long nextSeqId = -1;
626     try {
627       nextSeqId = initializeRegionInternals(reporter, status);
628       return nextSeqId;
629     } finally {
630       // nextSeqid will be -1 if the initialization fails.
631       // At least it will be 0 otherwise.
632       if (nextSeqId == -1) {
633         status.abort("Exception during region " + this.getRegionNameAsString()
634             + " initialization.");
635       }
636     }
637   }
638 
639   private long initializeRegionInternals(final CancelableProgressable reporter,
640       MonitoredTask status) throws IOException, UnsupportedEncodingException {
641     if (coprocessorHost != null) {
642       status.setStatus("Running coprocessor pre-open hook");
643       coprocessorHost.preOpen();
644     }
645 
646     // Write HRI to a file in case we need to recover .META.
647     status.setStatus("Writing region info on filesystem");
648     checkRegioninfoOnFilesystem();
649 
650     // Remove temporary data left over from old regions
651     status.setStatus("Cleaning up temporary data from old regions");
652     cleanupTmpDir();
653 
654     // Load in all the HStores.
655     //
656     // Context: During replay we want to ensure that we do not lose any data. So, we
657     // have to be conservative in how we replay logs. For each store, we calculate
658     // the maxSeqId up to which the store was flushed. And, skip the edits which
659     // is equal to or lower than maxSeqId for each store.
660     Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
661         Bytes.BYTES_COMPARATOR);
662     long maxSeqId = -1;
663     // initialized to -1 so that we pick up MemstoreTS from column families
664     long maxMemstoreTS = -1;
665 
666     if (this.htableDescriptor != null &&
667         !htableDescriptor.getFamilies().isEmpty()) {
668       // initialize the thread pool for opening stores in parallel.
669       ThreadPoolExecutor storeOpenerThreadPool =
670         getStoreOpenAndCloseThreadPool(
671           "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString());
672       CompletionService<Store> completionService =
673         new ExecutorCompletionService<Store>(storeOpenerThreadPool);
674 
675       // initialize each store in parallel
676       for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
677         status.setStatus("Instantiating store for column family " + family);
678         completionService.submit(new Callable<Store>() {
679           public Store call() throws IOException {
680             return instantiateHStore(tableDir, family);
681           }
682         });
683       }
684       boolean allStoresOpened = false;
685       try {
686         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
687           Future<Store> future = completionService.take();
688           Store store = future.get();
689           this.stores.put(store.getColumnFamilyName().getBytes(), store);
690 
691           long storeSeqIdForReplay = store.getMaxSequenceId();
692           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), storeSeqIdForReplay);
693           if (maxSeqId == -1 || storeSeqIdForReplay > maxSeqId) {
694             maxSeqId = storeSeqIdForReplay;
695           }
696           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
697           if (maxStoreMemstoreTS > maxMemstoreTS) {
698             maxMemstoreTS = maxStoreMemstoreTS;
699           }
700         }
701         allStoresOpened = true;
702       } catch (InterruptedException e) {
703         throw new IOException(e);
704       } catch (ExecutionException e) {
705         throw new IOException(e.getCause());
706       } finally {
707         storeOpenerThreadPool.shutdownNow();
708         if (!allStoresOpened) {
709           // something went wrong, close all opened stores
710           LOG.error("Could not initialize all stores for the region=" + this);
711           for (Store store : this.stores.values()) {
712             try {
713               store.close();
714             } catch (IOException e) { 
715               LOG.warn(e.getMessage());
716             }
717           }
718         }
719       }
720     }
721     mvcc.initialize(maxMemstoreTS + 1);
722     // Recover any edits if available.
723     maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
724         this.regiondir, maxSeqIdInStores, reporter, status));
725 
726     status.setStatus("Cleaning up detritus from prior splits");
727     // Get rid of any splits or merges that were lost in-progress.  Clean out
728     // these directories here on open.  We may be opening a region that was
729     // being split but we crashed in the middle of it all.
730     SplitTransaction.cleanupAnySplitDetritus(this);
731     FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
732 
733     this.writestate.setReadOnly(this.htableDescriptor.isReadOnly());
734 
735     this.writestate.flushRequested = false;
736     this.writestate.compacting = 0;
737 
738     // Initialize split policy
739     this.splitPolicy = RegionSplitPolicy.create(this, conf);
740 
741     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
742     // Use maximum of log sequenceid or that which was found in stores
743     // (particularly if no recovered edits, seqid will be -1).
744     long nextSeqid = maxSeqId + 1;
745     LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
746 
747     // A region can be reopened if failed a split; reset flags
748     this.closing.set(false);
749     this.closed.set(false);
750 
751     if (coprocessorHost != null) {
752       status.setStatus("Running coprocessor post-open hooks");
753       coprocessorHost.postOpen();
754     }
755 
756     status.markComplete("Region opened successfully");
757     return nextSeqid;
758   }
759 
760   /*
761    * Move any passed HStore files into place (if any).  Used to pick up split
762    * files and any merges from splits and merges dirs.
763    * @param initialFiles
764    * @throws IOException
765    */
766   static void moveInitialFilesIntoPlace(final FileSystem fs,
767     final Path initialFiles, final Path regiondir)
768   throws IOException {
769     if (initialFiles != null && fs.exists(initialFiles)) {
770       if (!HBaseFileSystem.renameDirForFileSystem(fs, initialFiles, regiondir)) {
771         LOG.warn("Unable to rename " + initialFiles + " to " + regiondir);
772       }
773     }
774   }
775 
776   /**
777    * @return True if this region has references.
778    */
779   public boolean hasReferences() {
780     for (Store store : this.stores.values()) {
781       for (StoreFile sf : store.getStorefiles()) {
782         // Found a reference, return.
783         if (sf.isReference()) return true;
784       }
785     }
786     return false;
787   }
788 
789   /**
790    * This function will return the HDFS blocks distribution based on the data
791    * captured when HFile is created
792    * @return The HDFS blocks distribution for the region.
793    */
794   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
795     HDFSBlocksDistribution hdfsBlocksDistribution =
796       new HDFSBlocksDistribution();
797     synchronized (this.stores) {
798       for (Store store : this.stores.values()) {
799         for (StoreFile sf : store.getStorefiles()) {
800           HDFSBlocksDistribution storeFileBlocksDistribution =
801             sf.getHDFSBlockDistribution();
802           hdfsBlocksDistribution.add(storeFileBlocksDistribution);
803         }
804       }
805     }
806     return hdfsBlocksDistribution;
807   }
808 
809   /**
810    * This is a helper function to compute HDFS block distribution on demand
811    * @param conf configuration
812    * @param tableDescriptor HTableDescriptor of the table
813    * @param regionEncodedName encoded name of the region
814    * @return The HDFS blocks distribution for the given region.
815    * @throws IOException
816    */
817   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
818     Configuration conf, HTableDescriptor tableDescriptor,
819     String regionEncodedName) throws IOException {
820     Path tablePath = FSUtils.getTablePath(FSUtils.getRootDir(conf),
821       tableDescriptor.getName());
822     return computeHDFSBlocksDistribution(conf, tableDescriptor, regionEncodedName, tablePath);
823   }
824 
825   /**
826    * This is a helper function to compute HDFS block distribution on demand
827    * @param conf configuration
828    * @param tableDescriptor HTableDescriptor of the table
829    * @param regionEncodedName encoded name of the region
830    * @param tablePath The table's directory
831    * @return The HDFS blocks distribution for the given region.
832    * @throws IOException
833    */
834   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
835     Configuration conf, HTableDescriptor tableDescriptor,
836     String regionEncodedName, Path tablePath) throws IOException {
837     HDFSBlocksDistribution hdfsBlocksDistribution =
838       new HDFSBlocksDistribution();
839     FileSystem fs = tablePath.getFileSystem(conf);
840 
841     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
842       Path storeHomeDir = Store.getStoreHomedir(tablePath, regionEncodedName,
843       family.getName());
844       if (!fs.exists(storeHomeDir))continue;
845 
846       FileStatus[] hfilesStatus = null;
847       hfilesStatus = fs.listStatus(storeHomeDir);
848 
849       for (FileStatus hfileStatus : hfilesStatus) {
850         Path p = hfileStatus.getPath();
851         if (HFileLink.isHFileLink(p)) {
852           hfileStatus = new HFileLink(conf, p).getFileStatus(fs);
853         } else if (StoreFile.isReference(p)) {
854           p = StoreFile.getReferredToFile(p);
855           if (HFileLink.isHFileLink(p)) {
856             hfileStatus = new HFileLink(conf, p).getFileStatus(fs);
857           } else {
858             hfileStatus = fs.getFileStatus(p);
859           }
860         }
861         HDFSBlocksDistribution storeFileBlocksDistribution =
862           FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0,
863           hfileStatus.getLen());
864         hdfsBlocksDistribution.add(storeFileBlocksDistribution);
865       }
866     }
867     return hdfsBlocksDistribution;
868   }
869 
870   public AtomicLong getMemstoreSize() {
871     return memstoreSize;
872   }
873 
874   /**
875    * Increase the size of mem store in this region and the size of global mem
876    * store
877    * @param memStoreSize
878    * @return the size of memstore in this region
879    */
880   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
881     if (this.rsAccounting != null) {
882       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
883     }
884     return this.memstoreSize.addAndGet(memStoreSize);
885   }
886 
887   /*
888    * Write out an info file under the region directory.  Useful recovering
889    * mangled regions.
890    * @throws IOException
891    */
892   private void checkRegioninfoOnFilesystem() throws IOException {
893     checkRegioninfoOnFilesystem(this.regiondir);
894   }
895 
896   /**
897    * Write out an info file under the region directory. Useful recovering mangled regions.
898    * @param regiondir directory under which to write out the region info
899    * @throws IOException
900    */
901   private void checkRegioninfoOnFilesystem(Path regiondir) throws IOException {
902     writeRegioninfoOnFilesystem(regionInfo, regiondir, getFilesystem(), conf);
903   }
904 
905   /**
906    * Write out an info file under the region directory. Useful recovering mangled regions. If the
907    * regioninfo already exists on disk and there is information in the file, then we fast exit.
908    * @param regionInfo information about the region
909    * @param regiondir directory under which to write out the region info
910    * @param fs {@link FileSystem} on which to write the region info
911    * @param conf {@link Configuration} from which to extract specific file locations
912    * @throws IOException on unexpected error.
913    */
914   public static void writeRegioninfoOnFilesystem(HRegionInfo regionInfo, Path regiondir,
915       FileSystem fs, Configuration conf) throws IOException {
916     Path regioninfoPath = new Path(regiondir, REGIONINFO_FILE);
917     if (fs.exists(regioninfoPath)) {
918       if (fs.getFileStatus(regioninfoPath).getLen() > 0) {
919         return;
920       }
921 
922       LOG.info("Rewriting .regioninfo file at: " + regioninfoPath);
923       if (!fs.delete(regioninfoPath, false)) {
924         throw new IOException("Unable to remove existing " + regioninfoPath);
925       }
926     }
927 
928     // Create in tmpdir and then move into place in case we crash after
929     // create but before close.  If we don't successfully close the file,
930     // subsequent region reopens will fail the below because create is
931     // registered in NN.
932 
933     // first check to get the permissions
934     FsPermission perms = FSUtils.getFilePermissions(fs, conf,
935         HConstants.DATA_FILE_UMASK_KEY);
936 
937     // and then create the file
938     Path tmpPath = new Path(getTmpDir(regiondir), REGIONINFO_FILE);
939 
940     // if datanode crashes or if the RS goes down just before the close is called while trying to
941     // close the created regioninfo file in the .tmp directory then on next
942     // creation we will be getting AlreadyCreatedException.
943     // Hence delete and create the file if exists.
944     if (FSUtils.isExists(fs, tmpPath)) {
945       FSUtils.delete(fs, tmpPath, true);
946     }
947 
948     FSDataOutputStream out = FSUtils.create(fs, tmpPath, perms);
949 
950     try {
951       regionInfo.write(out);
952       out.write('\n');
953       out.write('\n');
954       out.write(Bytes.toBytes(regionInfo.toString()));
955     } finally {
956       out.close();
957     }
958     if (!HBaseFileSystem.renameDirForFileSystem(fs, tmpPath, regioninfoPath)) {
959       throw new IOException("Unable to rename " + tmpPath + " to " +
960         regioninfoPath);
961     }
962   }
963 
964   /**
965    * @param fs
966    * @param dir
967    * @return An HRegionInfo instance gotten from the <code>.regioninfo</code> file under region dir
968    * @throws IOException
969    */
970   public static HRegionInfo loadDotRegionInfoFileContent(final FileSystem fs, final Path dir)
971   throws IOException {
972     Path regioninfo = new Path(dir, HRegion.REGIONINFO_FILE);
973     if (!fs.exists(regioninfo)) throw new FileNotFoundException(regioninfo.toString());
974     FSDataInputStream in = fs.open(regioninfo);
975     try {
976       HRegionInfo hri = new HRegionInfo();
977       hri.readFields(in);
978       return hri;
979     } finally {
980       in.close();
981     }
982   }
983 
984   /** @return a HRegionInfo object for this region */
985   public HRegionInfo getRegionInfo() {
986     return this.regionInfo;
987   }
988 
989   /**
990    * @return Instance of {@link RegionServerServices} used by this HRegion.
991    * Can be null.
992    */
993   RegionServerServices getRegionServerServices() {
994     return this.rsServices;
995   }
996 
997   /** @return requestsCount for this region */
998   public long getRequestsCount() {
999     return this.readRequestsCount.get() + this.writeRequestsCount.get();
1000   }
1001 
1002   /** @return readRequestsCount for this region */
1003   public long getReadRequestsCount() {
1004     return this.readRequestsCount.get();
1005   }
1006 
1007   /** @return writeRequestsCount for this region */
1008   public long getWriteRequestsCount() {
1009     return this.writeRequestsCount.get();
1010   }
1011 
1012   /** @return true if region is closed */
1013   public boolean isClosed() {
1014     return this.closed.get();
1015   }
1016 
1017   /**
1018    * @return True if closing process has started.
1019    */
1020   public boolean isClosing() {
1021     return this.closing.get();
1022   }
1023 
1024   /** @return true if region is available (not closed and not closing) */
1025   public boolean isAvailable() {
1026     return !isClosed() && !isClosing();
1027   }
1028 
1029   /** @return true if region is splittable */
1030   public boolean isSplittable() {
1031     return isAvailable() && !hasReferences();
1032   }
1033 
1034   boolean areWritesEnabled() {
1035     synchronized(this.writestate) {
1036       return this.writestate.writesEnabled;
1037     }
1038   }
1039 
1040    public MultiVersionConsistencyControl getMVCC() {
1041      return mvcc;
1042    }
1043 
1044    public boolean isLoadingCfsOnDemandDefault() {
1045      return this.isLoadingCfsOnDemandDefault;
1046    }
1047 
1048   /**
1049    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
1050    * service any more calls.
1051    *
1052    * <p>This method could take some time to execute, so don't call it from a
1053    * time-sensitive thread.
1054    *
1055    * @return Vector of all the storage files that the HRegion's component
1056    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
1057    * vector if already closed and null if judged that it should not close.
1058    *
1059    * @throws IOException e
1060    */
1061   public List<StoreFile> close() throws IOException {
1062     return close(false);
1063   }
1064 
1065   private final Object closeLock = new Object();
1066 
1067   /** Conf key for the periodic flush interval */
1068   public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = 
1069       "hbase.regionserver.optionalcacheflushinterval";
1070   /** Default interval for the memstore flush */
1071   public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1072 
1073   /**
1074    * Close down this HRegion.  Flush the cache unless abort parameter is true,
1075    * Shut down each HStore, don't service any more calls.
1076    *
1077    * This method could take some time to execute, so don't call it from a
1078    * time-sensitive thread.
1079    *
1080    * @param abort true if server is aborting (only during testing)
1081    * @return Vector of all the storage files that the HRegion's component
1082    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
1083    * we are not to close at this time or we are already closed.
1084    *
1085    * @throws IOException e
1086    */
1087   public List<StoreFile> close(final boolean abort) throws IOException {
1088     // Only allow one thread to close at a time. Serialize them so dual
1089     // threads attempting to close will run up against each other.
1090     MonitoredTask status = TaskMonitor.get().createStatus(
1091         "Closing region " + this +
1092         (abort ? " due to abort" : ""));
1093 
1094     status.setStatus("Waiting for close lock");
1095     try {
1096       synchronized (closeLock) {
1097         return doClose(abort, status);
1098       }
1099     } finally {
1100       status.cleanup();
1101     }
1102   }
1103 
1104   private List<StoreFile> doClose(
1105       final boolean abort, MonitoredTask status)
1106   throws IOException {
1107     if (isClosed()) {
1108       LOG.warn("Region " + this + " already closed");
1109       return null;
1110     }
1111 
1112     if (coprocessorHost != null) {
1113       status.setStatus("Running coprocessor pre-close hooks");
1114       this.coprocessorHost.preClose(abort);
1115     }
1116 
1117     status.setStatus("Disabling compacts and flushes for region");
1118     synchronized (writestate) {
1119       // Disable compacting and flushing by background threads for this
1120       // region.
1121       writestate.writesEnabled = false;
1122       LOG.debug("Closing " + this + ": disabling compactions & flushes");
1123       waitForFlushesAndCompactions();
1124     }
1125     // If we were not just flushing, is it worth doing a preflush...one
1126     // that will clear out of the bulk of the memstore before we put up
1127     // the close flag?
1128     if (!abort && worthPreFlushing()) {
1129       status.setStatus("Pre-flushing region before close");
1130       LOG.info("Running close preflush of " + this.getRegionNameAsString());
1131       try {
1132         internalFlushcache(status);
1133       } catch (IOException ioe) {
1134         // Failed to flush the region. Keep going.
1135         status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1136       }
1137     }
1138 
1139     this.closing.set(true);
1140     status.setStatus("Disabling writes for close");
1141     // block waiting for the lock for closing
1142     lock.writeLock().lock();
1143     try {
1144       if (this.isClosed()) {
1145         status.abort("Already got closed by another process");
1146         // SplitTransaction handles the null
1147         return null;
1148       }
1149       LOG.debug("Updates disabled for region " + this);
1150       // Don't flush the cache if we are aborting
1151       if (!abort) {
1152         int flushCount = 0;
1153         while (this.getMemstoreSize().get() > 0) {
1154           try {
1155             if (flushCount++ > 0) {
1156               int actualFlushes = flushCount - 1;
1157               if (actualFlushes > 5) {
1158                 // If we tried 5 times and are unable to clear memory, abort
1159                 // so we do not lose data
1160                 throw new DroppedSnapshotException("Failed clearing memory after " +
1161                   actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
1162               } 
1163               LOG.info("Running extra flush, " + actualFlushes +
1164                 " (carrying snapshot?) " + this);
1165             }
1166             internalFlushcache(status);
1167           } catch (IOException ioe) {
1168             status.setStatus("Failed flush " + this + ", putting online again");
1169             synchronized (writestate) {
1170               writestate.writesEnabled = true;
1171             }
1172             // Have to throw to upper layers.  I can't abort server from here.
1173             throw ioe;
1174           }
1175         }
1176       }
1177 
1178       List<StoreFile> result = new ArrayList<StoreFile>();
1179       if (!stores.isEmpty()) {
1180         // initialize the thread pool for closing stores in parallel.
1181         ThreadPoolExecutor storeCloserThreadPool =
1182           getStoreOpenAndCloseThreadPool("StoreCloserThread-"
1183             + this.regionInfo.getRegionNameAsString());
1184         CompletionService<ImmutableList<StoreFile>> completionService =
1185           new ExecutorCompletionService<ImmutableList<StoreFile>>(
1186             storeCloserThreadPool);
1187 
1188         // close each store in parallel
1189         for (final Store store : stores.values()) {
1190           assert abort? true: store.getFlushableSize() == 0;
1191           completionService
1192               .submit(new Callable<ImmutableList<StoreFile>>() {
1193                 public ImmutableList<StoreFile> call() throws IOException {
1194                   return store.close();
1195                 }
1196               });
1197         }
1198         try {
1199           for (int i = 0; i < stores.size(); i++) {
1200             Future<ImmutableList<StoreFile>> future = completionService
1201                 .take();
1202             ImmutableList<StoreFile> storeFileList = future.get();
1203             result.addAll(storeFileList);
1204           }
1205         } catch (InterruptedException e) {
1206           throw new IOException(e);
1207         } catch (ExecutionException e) {
1208           throw new IOException(e.getCause());
1209         } finally {
1210           storeCloserThreadPool.shutdownNow();
1211         }
1212       }
1213       this.closed.set(true);
1214       if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
1215       if (coprocessorHost != null) {
1216         status.setStatus("Running coprocessor post-close hooks");
1217         this.coprocessorHost.postClose(abort);
1218       }
1219       this.opMetrics.closeMetrics(this.getRegionInfo().getEncodedName());
1220       status.markComplete("Closed");
1221       LOG.info("Closed " + this);
1222       return result;
1223     } finally {
1224       lock.writeLock().unlock();
1225     }
1226   }
1227 
1228   /**
1229    * Wait for all current flushes and compactions of the region to complete.
1230    * <p>
1231    * Exposed for TESTING.
1232    */
1233   public void waitForFlushesAndCompactions() {
1234     synchronized (writestate) {
1235       while (writestate.compacting > 0 || writestate.flushing) {
1236         LOG.debug("waiting for " + writestate.compacting + " compactions"
1237             + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1238         try {
1239           writestate.wait();
1240         } catch (InterruptedException iex) {
1241           // essentially ignore and propagate the interrupt back up
1242           Thread.currentThread().interrupt();
1243         }
1244       }
1245     }
1246   }
1247 
1248   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1249       final String threadNamePrefix) {
1250     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1251     int maxThreads = Math.min(numStores,
1252         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1253             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1254     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1255   }
1256 
1257   protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1258       final String threadNamePrefix) {
1259     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1260     int maxThreads = Math.max(1,
1261         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1262             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1263             / numStores);
1264     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1265   }
1266 
1267   static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1268       final String threadNamePrefix) {
1269     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1270       new ThreadFactory() {
1271         private int count = 1;
1272 
1273         public Thread newThread(Runnable r) {
1274           return new Thread(r, threadNamePrefix + "-" + count++);
1275         }
1276       });
1277   }
1278 
1279    /**
1280     * @return True if its worth doing a flush before we put up the close flag.
1281     */
1282   private boolean worthPreFlushing() {
1283     return this.memstoreSize.get() >
1284       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1285   }
1286 
1287   //////////////////////////////////////////////////////////////////////////////
1288   // HRegion accessors
1289   //////////////////////////////////////////////////////////////////////////////
1290 
1291   /** @return start key for region */
1292   public byte [] getStartKey() {
1293     return this.regionInfo.getStartKey();
1294   }
1295 
1296   /** @return end key for region */
1297   public byte [] getEndKey() {
1298     return this.regionInfo.getEndKey();
1299   }
1300 
1301   /** @return region id */
1302   public long getRegionId() {
1303     return this.regionInfo.getRegionId();
1304   }
1305 
1306   /** @return region name */
1307   public byte [] getRegionName() {
1308     return this.regionInfo.getRegionName();
1309   }
1310 
1311   /** @return region name as string for logging */
1312   public String getRegionNameAsString() {
1313     return this.regionInfo.getRegionNameAsString();
1314   }
1315 
1316   /** @return HTableDescriptor for this region */
1317   public HTableDescriptor getTableDesc() {
1318     return this.htableDescriptor;
1319   }
1320 
1321   /** @return HLog in use for this region */
1322   public HLog getLog() {
1323     return this.log;
1324   }
1325 
1326   /** @return Configuration object */
1327   public Configuration getConf() {
1328     return this.conf;
1329   }
1330 
1331     /**
1332    * A split takes the config from the parent region & passes it to the daughter
1333    * region's constructor. If 'conf' was passed, you would end up using the HTD
1334    * of the parent region in addition to the new daughter HTD. Pass 'baseConf'
1335    * to the daughter regions to avoid this tricky dedupe problem.
1336    * @return Configuration object
1337    */
1338   Configuration getBaseConf() {
1339     return this.baseConf;
1340   }
1341 
1342   /** @return region directory Path */
1343   public Path getRegionDir() {
1344     return this.regiondir;
1345   }
1346 
1347   /**
1348    * Computes the Path of the HRegion
1349    *
1350    * @param tabledir qualified path for table
1351    * @param name ENCODED region name
1352    * @return Path of HRegion directory
1353    */
1354   public static Path getRegionDir(final Path tabledir, final String name) {
1355     return new Path(tabledir, name);
1356   }
1357 
1358   /** @return FileSystem being used by this region */
1359   public FileSystem getFilesystem() {
1360     return this.fs;
1361   }
1362 
1363   /** @return the last time the region was flushed */
1364   public long getLastFlushTime() {
1365     return this.lastFlushTime;
1366   }
1367 
1368   /** @return info about the last flushes <time, size> */
1369   public List<Pair<Long,Long>> getRecentFlushInfo() {
1370     this.lock.readLock().lock();
1371     List<Pair<Long,Long>> ret = this.recentFlushes;
1372     this.recentFlushes = new ArrayList<Pair<Long,Long>>();
1373     this.lock.readLock().unlock();
1374     return ret;
1375   }
1376 
1377   //////////////////////////////////////////////////////////////////////////////
1378   // HRegion maintenance.
1379   //
1380   // These methods are meant to be called periodically by the HRegionServer for
1381   // upkeep.
1382   //////////////////////////////////////////////////////////////////////////////
1383 
1384   /** @return returns size of largest HStore. */
1385   public long getLargestHStoreSize() {
1386     long size = 0;
1387     for (Store h: stores.values()) {
1388       long storeSize = h.getSize();
1389       if (storeSize > size) {
1390         size = storeSize;
1391       }
1392     }
1393     return size;
1394   }
1395 
1396   /*
1397    * Do preparation for pending compaction.
1398    * @throws IOException
1399    */
1400   void doRegionCompactionPrep() throws IOException {
1401   }
1402 
1403   /*
1404    * Removes the temporary directory for this Store.
1405    */
1406   private void cleanupTmpDir() throws IOException {
1407     FSUtils.deleteDirectory(this.fs, getTmpDir());
1408   }
1409 
1410   /**
1411    * Get the temporary directory for this region. This directory
1412    * will have its contents removed when the region is reopened.
1413    */
1414   Path getTmpDir() {
1415     return getTmpDir(getRegionDir());
1416   }
1417 
1418   static Path getTmpDir(Path regionDir) {
1419     return new Path(regionDir, REGION_TEMP_SUBDIR);
1420   }
1421 
1422   void triggerMajorCompaction() {
1423     for (Store h: stores.values()) {
1424       h.triggerMajorCompaction();
1425     }
1426   }
1427 
1428   /**
1429    * This is a helper function that compact all the stores synchronously
1430    * It is used by utilities and testing
1431    *
1432    * @param majorCompaction True to force a major compaction regardless of thresholds
1433    * @throws IOException e
1434    */
1435   public void compactStores(final boolean majorCompaction)
1436   throws IOException {
1437     if (majorCompaction) {
1438       this.triggerMajorCompaction();
1439     }
1440     compactStores();
1441   }
1442 
1443   /**
1444    * This is a helper function that compact all the stores synchronously
1445    * It is used by utilities and testing
1446    *
1447    * @throws IOException e
1448    */
1449   public void compactStores() throws IOException {
1450     for(Store s : getStores().values()) {
1451       CompactionRequest cr = s.requestCompaction();
1452       if(cr != null) {
1453         try {
1454           compact(cr);
1455         } finally {
1456           s.finishRequest(cr);
1457         }
1458       }
1459     }
1460   }
1461 
1462   /*
1463    * Called by compaction thread and after region is opened to compact the
1464    * HStores if necessary.
1465    *
1466    * <p>This operation could block for a long time, so don't call it from a
1467    * time-sensitive thread.
1468    *
1469    * Note that no locking is necessary at this level because compaction only
1470    * conflicts with a region split, and that cannot happen because the region
1471    * server does them sequentially and not in parallel.
1472    *
1473    * @param cr Compaction details, obtained by requestCompaction()
1474    * @return whether the compaction completed
1475    * @throws IOException e
1476    */
1477   public boolean compact(CompactionRequest cr)
1478   throws IOException {
1479     if (cr == null) {
1480       return false;
1481     }
1482     if (this.closing.get() || this.closed.get()) {
1483       LOG.debug("Skipping compaction on " + this + " because closing/closed");
1484       return false;
1485     }
1486     Preconditions.checkArgument(cr.getHRegion().equals(this));
1487     // block waiting for the lock for compaction
1488     lock.readLock().lock();
1489     MonitoredTask status = TaskMonitor.get().createStatus(
1490         "Compacting " + cr.getStore() + " in " + this);
1491     try {
1492       if (this.closed.get()) {
1493         LOG.debug("Skipping compaction on " + this + " because closed");
1494         return false;
1495       }
1496       boolean decr = true;
1497       try {
1498         synchronized (writestate) {
1499           if (writestate.writesEnabled) {
1500             ++writestate.compacting;
1501           } else {
1502             String msg = "NOT compacting region " + this + ". Writes disabled.";
1503             LOG.info(msg);
1504             status.abort(msg);
1505             decr = false;
1506             return false;
1507           }
1508         }
1509         LOG.info("Starting compaction on " + cr.getStore() + " in region "
1510             + this + (cr.getCompactSelection().isOffPeakCompaction()?" as an off-peak compaction":""));
1511         doRegionCompactionPrep();
1512         try {
1513           status.setStatus("Compacting store " + cr.getStore());
1514           cr.getStore().compact(cr);
1515         } catch (InterruptedIOException iioe) {
1516           String msg = "compaction interrupted by user";
1517           LOG.info(msg, iioe);
1518           status.abort(msg);
1519           return false;
1520         }
1521       } finally {
1522         if (decr) {
1523           synchronized (writestate) {
1524             --writestate.compacting;
1525             if (writestate.compacting <= 0) {
1526               writestate.notifyAll();
1527             }
1528           }
1529         }
1530       }
1531       status.markComplete("Compaction complete");
1532       return true;
1533     } finally {
1534       status.cleanup();
1535       lock.readLock().unlock();
1536     }
1537   }
1538 
1539   /**
1540    * Flush the cache.
1541    *
1542    * When this method is called the cache will be flushed unless:
1543    * <ol>
1544    *   <li>the cache is empty</li>
1545    *   <li>the region is closed.</li>
1546    *   <li>a flush is already in progress</li>
1547    *   <li>writes are disabled</li>
1548    * </ol>
1549    *
1550    * <p>This method may block for some time, so it should not be called from a
1551    * time-sensitive thread.
1552    *
1553    * @return true if the region needs compaction
1554    *
1555    * @throws IOException general io exceptions
1556    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1557    * because a Snapshot was not properly persisted.
1558    */
1559   public FlushResult flushcache() throws IOException {
1560     // fail-fast instead of waiting on the lock
1561     if (this.closing.get()) {
1562       String msg = "Skipping flush on " + this + " because closing";
1563       LOG.debug(msg);
1564       return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1565     }
1566     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1567     status.setStatus("Acquiring readlock on region");
1568     // block waiting for the lock for flushing cache
1569     lock.readLock().lock();
1570     try {
1571       if (this.closed.get()) {
1572         String msg = "Skipping flush on " + this + " because closed";
1573         LOG.debug(msg);
1574         status.abort(msg);
1575         return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1576       }
1577       if (coprocessorHost != null) {
1578         status.setStatus("Running coprocessor pre-flush hooks");
1579         coprocessorHost.preFlush();
1580       }
1581       if (numPutsWithoutWAL.get() > 0) {
1582         numPutsWithoutWAL.set(0);
1583         dataInMemoryWithoutWAL.set(0);
1584       }
1585       synchronized (writestate) {
1586         if (!writestate.flushing && writestate.writesEnabled) {
1587           this.writestate.flushing = true;
1588         } else {
1589           if (LOG.isDebugEnabled()) {
1590             LOG.debug("NOT flushing memstore for region " + this
1591                 + ", flushing=" + writestate.flushing + ", writesEnabled="
1592                 + writestate.writesEnabled);
1593           }
1594           String msg = "Not flushing since "
1595               + (writestate.flushing ? "already flushing"
1596               : "writes not enabled");
1597           status.abort(msg);
1598           return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1599         }
1600       }
1601       try {
1602         FlushResult fs = internalFlushcache(status);
1603 
1604         if (coprocessorHost != null) {
1605           status.setStatus("Running post-flush coprocessor hooks");
1606           coprocessorHost.postFlush();
1607         }
1608 
1609         status.markComplete("Flush successful");
1610         return fs;
1611       } finally {
1612         synchronized (writestate) {
1613           writestate.flushing = false;
1614           this.writestate.flushRequested = false;
1615           writestate.notifyAll();
1616         }
1617       }
1618     } finally {
1619       lock.readLock().unlock();
1620       status.cleanup();
1621     }
1622   }
1623 
1624   /**
1625    * Should the memstore be flushed now
1626    */
1627   boolean shouldFlush() {
1628     if (flushCheckInterval <= 0) { //disabled
1629       return false;
1630     }
1631     long now = EnvironmentEdgeManager.currentTimeMillis();
1632     //if we flushed in the recent past, we don't need to do again now
1633     if ((now - getLastFlushTime() < flushCheckInterval)) {
1634       return false;
1635     }
1636     //since we didn't flush in the recent past, flush now if certain conditions
1637     //are met. Return true on first such memstore hit.
1638     for (Store s : this.getStores().values()) {
1639       if (s.timeOfOldestEdit() < now - flushCheckInterval) {
1640         // we have an old enough edit in the memstore, flush
1641         return true;
1642       }
1643     }
1644     return false;
1645   }
1646 
1647   /**
1648    * Flush the memstore.
1649    *
1650    * Flushing the memstore is a little tricky. We have a lot of updates in the
1651    * memstore, all of which have also been written to the log. We need to
1652    * write those updates in the memstore out to disk, while being able to
1653    * process reads/writes as much as possible during the flush operation. Also,
1654    * the log has to state clearly the point in time at which the memstore was
1655    * flushed. (That way, during recovery, we know when we can rely on the
1656    * on-disk flushed structures and when we have to recover the memstore from
1657    * the log.)
1658    *
1659    * <p>So, we have a three-step process:
1660    *
1661    * <ul><li>A. Flush the memstore to the on-disk stores, noting the current
1662    * sequence ID for the log.<li>
1663    *
1664    * <li>B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence
1665    * ID that was current at the time of memstore-flush.</li>
1666    *
1667    * <li>C. Get rid of the memstore structures that are now redundant, as
1668    * they've been flushed to the on-disk HStores.</li>
1669    * </ul>
1670    * <p>This method is protected, but can be accessed via several public
1671    * routes.
1672    *
1673    * <p> This method may block for some time.
1674    * @param status
1675    *
1676    * @return true if the region needs compacting
1677    *
1678    * @throws IOException general io exceptions
1679    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1680    * because a Snapshot was not properly persisted.
1681    */
1682   protected FlushResult internalFlushcache(MonitoredTask status)
1683       throws IOException {
1684     return internalFlushcache(this.log, -1, status);
1685   }
1686 
1687   /**
1688    * @param wal Null if we're NOT to go via hlog/wal.
1689    * @param myseqid The seqid to use if <code>wal</code> is null writing out
1690    * flush file.
1691    * @param status
1692    * @return true if the region needs compacting
1693    * @throws IOException
1694    * @see #internalFlushcache(MonitoredTask)
1695    */
1696   protected FlushResult internalFlushcache(
1697       final HLog wal, final long myseqid, MonitoredTask status)
1698   throws IOException {
1699     if (this.rsServices != null && this.rsServices.isAborted()) {
1700       // Don't flush when server aborting, it's unsafe
1701       throw new IOException("Aborting flush because server is abortted...");
1702     }
1703     final long startTime = EnvironmentEdgeManager.currentTimeMillis();
1704     // Clear flush flag.
1705     // Record latest flush time
1706     this.lastFlushTime = startTime;
1707     // If nothing to flush, return and avoid logging start/stop flush.
1708     if (this.memstoreSize.get() <= 0) {
1709       return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
1710     }
1711     if (LOG.isDebugEnabled()) {
1712       LOG.debug("Started memstore flush for " + this +
1713         ", current region memstore size " +
1714         StringUtils.humanReadableInt(this.memstoreSize.get()) +
1715         ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
1716     }
1717 
1718     // Stop updates while we snapshot the memstore of all stores. We only have
1719     // to do this for a moment.  Its quick.  The subsequent sequence id that
1720     // goes into the HLog after we've flushed all these snapshots also goes
1721     // into the info file that sits beside the flushed files.
1722     // We also set the memstore size to zero here before we allow updates
1723     // again so its value will represent the size of the updates received
1724     // during the flush
1725     long sequenceId = -1L;
1726     long completeSequenceId = -1L;
1727     MultiVersionConsistencyControl.WriteEntry w = null;
1728 
1729     // We have to take a write lock during snapshot, or else a write could
1730     // end up in both snapshot and memstore (makes it difficult to do atomic
1731     // rows then)
1732     status.setStatus("Obtaining lock to block concurrent updates");
1733     // block waiting for the lock for internal flush
1734     this.updatesLock.writeLock().lock();
1735     long totalFlushableSize = 0;
1736     status.setStatus("Preparing to flush by snapshotting stores");
1737     List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
1738     try {
1739       // Record the mvcc for all transactions in progress.
1740       w = mvcc.beginMemstoreInsert();
1741       mvcc.advanceMemstore(w);
1742 
1743       sequenceId = (wal == null)? myseqid:
1744         wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
1745       completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
1746 
1747       for (Store s : stores.values()) {
1748         totalFlushableSize += s.getFlushableSize();
1749         storeFlushers.add(s.getStoreFlusher(completeSequenceId));
1750       }
1751 
1752       // prepare flush (take a snapshot)
1753       for (StoreFlusher flusher : storeFlushers) {
1754         flusher.prepare();
1755       }
1756     } finally {
1757       this.updatesLock.writeLock().unlock();
1758     }
1759     String s = "Finished snapshotting " + this +
1760       ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
1761     status.setStatus(s);
1762     LOG.debug(s);
1763 
1764     // sync unflushed WAL changes when deferred log sync is enabled
1765     // see HBASE-8208 for details
1766     if (wal != null && isDeferredLogSyncEnabled()) {
1767       wal.sync();
1768     }
1769 
1770     // wait for all in-progress transactions to commit to HLog before
1771     // we can start the flush. This prevents
1772     // uncommitted transactions from being written into HFiles.
1773     // We have to block before we start the flush, otherwise keys that
1774     // were removed via a rollbackMemstore could be written to Hfiles.
1775     mvcc.waitForRead(w);
1776 
1777     status.setStatus("Flushing stores");
1778     LOG.debug("Finished snapshotting, commencing flushing stores");
1779 
1780     // Any failure from here on out will be catastrophic requiring server
1781     // restart so hlog content can be replayed and put back into the memstore.
1782     // Otherwise, the snapshot content while backed up in the hlog, it will not
1783     // be part of the current running servers state.
1784     boolean compactionRequested = false;
1785     try {
1786       // A.  Flush memstore to all the HStores.
1787       // Keep running vector of all store files that includes both old and the
1788       // just-made new flush store file. The new flushed file is still in the
1789       // tmp directory.
1790 
1791       for (StoreFlusher flusher : storeFlushers) {
1792         flusher.flushCache(status);
1793       }
1794 
1795       // Switch snapshot (in memstore) -> new hfile (thus causing
1796       // all the store scanners to reset/reseek).
1797       for (StoreFlusher flusher : storeFlushers) {
1798         boolean needsCompaction = flusher.commit(status);
1799         if (needsCompaction) {
1800           compactionRequested = true;
1801         }
1802       }
1803       storeFlushers.clear();
1804 
1805       // Set down the memstore size by amount of flush.
1806       this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
1807     } catch (Throwable t) {
1808       // An exception here means that the snapshot was not persisted.
1809       // The hlog needs to be replayed so its content is restored to memstore.
1810       // Currently, only a server restart will do this.
1811       // We used to only catch IOEs but its possible that we'd get other
1812       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
1813       // all and sundry.
1814       if (wal != null) {
1815         wal.abortCacheFlush(this.regionInfo.getEncodedNameAsBytes());
1816       }
1817       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
1818           Bytes.toStringBinary(getRegionName()));
1819       dse.initCause(t);
1820       status.abort("Flush failed: " + StringUtils.stringifyException(t));
1821       throw dse;
1822     }
1823 
1824     // If we get to here, the HStores have been written. If we get an
1825     // error in completeCacheFlush it will release the lock it is holding
1826 
1827     // B.  Write a FLUSHCACHE-COMPLETE message to the log.
1828     //     This tells future readers that the HStores were emitted correctly,
1829     //     and that all updates to the log for this regionName that have lower
1830     //     log-sequence-ids can be safely ignored.
1831     if (wal != null) {
1832       wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(),
1833         regionInfo.getTableName(), completeSequenceId,
1834         this.getRegionInfo().isMetaRegion());
1835     }
1836 
1837     // C. Finally notify anyone waiting on memstore to clear:
1838     // e.g. checkResources().
1839     synchronized (this) {
1840       notifyAll(); // FindBugs NN_NAKED_NOTIFY
1841     }
1842 
1843     long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
1844     long memstoresize = this.memstoreSize.get();
1845     String msg = "Finished memstore flush of ~" +
1846       StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize +
1847       ", currentsize=" +
1848       StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize +
1849       " for region " + this + " in " + time + "ms, sequenceid=" + sequenceId +
1850       ", compaction requested=" + compactionRequested +
1851       ((wal == null)? "; wal=null": "");
1852     LOG.info(msg);
1853     status.setStatus(msg);
1854     this.recentFlushes.add(new Pair<Long,Long>(time/1000, totalFlushableSize));
1855 
1856     return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
1857         FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, sequenceId);
1858   }
1859 
1860    /**
1861    * Get the sequence number to be associated with this cache flush. Used by
1862    * TransactionalRegion to not complete pending transactions.
1863    *
1864    *
1865    * @param currentSequenceId
1866    * @return sequence id to complete the cache flush with
1867    */
1868   protected long getCompleteCacheFlushSequenceId(long currentSequenceId) {
1869     return currentSequenceId;
1870   }
1871 
1872   //////////////////////////////////////////////////////////////////////////////
1873   // get() methods for client use.
1874   //////////////////////////////////////////////////////////////////////////////
1875   /**
1876    * Return all the data for the row that matches <i>row</i> exactly,
1877    * or the one that immediately preceeds it, at or immediately before
1878    * <i>ts</i>.
1879    *
1880    * @param row row key
1881    * @return map of values
1882    * @throws IOException
1883    */
1884   Result getClosestRowBefore(final byte [] row)
1885   throws IOException{
1886     return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
1887   }
1888 
1889   /**
1890    * Return all the data for the row that matches <i>row</i> exactly,
1891    * or the one that immediately preceeds it, at or immediately before
1892    * <i>ts</i>.
1893    *
1894    * @param row row key
1895    * @param family column family to find on
1896    * @return map of values
1897    * @throws IOException read exceptions
1898    */
1899   public Result getClosestRowBefore(final byte [] row, final byte [] family)
1900   throws IOException {
1901     if (coprocessorHost != null) {
1902       Result result = new Result();
1903       if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
1904         return result;
1905       }
1906     }
1907     // look across all the HStores for this region and determine what the
1908     // closest key is across all column families, since the data may be sparse
1909     checkRow(row, "getClosestRowBefore");
1910     startRegionOperation();
1911     this.readRequestsCount.increment();
1912     this.opMetrics.setReadRequestCountMetrics(this.readRequestsCount.get());   
1913     try {
1914       Store store = getStore(family);
1915       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
1916       KeyValue key = store.getRowKeyAtOrBefore(row);
1917       Result result = null;
1918       if (key != null) {
1919         Get get = new Get(key.getRow());
1920         get.addFamily(family);
1921         result = get(get, null);
1922       }
1923       if (coprocessorHost != null) {
1924         coprocessorHost.postGetClosestRowBefore(row, family, result);
1925       }
1926       return result;
1927     } finally {
1928       closeRegionOperation();
1929     }
1930   }
1931 
1932   /**
1933    * Return an iterator that scans over the HRegion, returning the indicated
1934    * columns and rows specified by the {@link Scan}.
1935    * <p>
1936    * This Iterator must be closed by the caller.
1937    *
1938    * @param scan configured {@link Scan}
1939    * @return RegionScanner
1940    * @throws IOException read exceptions
1941    */
1942   public RegionScanner getScanner(Scan scan) throws IOException {
1943    return getScanner(scan, null);
1944   }
1945 
1946   void prepareScanner(Scan scan) throws IOException {
1947     if(!scan.hasFamilies()) {
1948       // Adding all families to scanner
1949       for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
1950         scan.addFamily(family);
1951       }
1952     }
1953   }
1954 
1955   protected RegionScanner getScanner(Scan scan,
1956       List<KeyValueScanner> additionalScanners) throws IOException {
1957     startRegionOperation();
1958     try {
1959       // Verify families are all valid
1960       prepareScanner(scan);
1961       if(scan.hasFamilies()) {
1962         for(byte [] family : scan.getFamilyMap().keySet()) {
1963           checkFamily(family);
1964         }
1965       }
1966       return instantiateRegionScanner(scan, additionalScanners);
1967     } finally {
1968       closeRegionOperation();
1969     }
1970   }
1971 
1972   protected RegionScanner instantiateRegionScanner(Scan scan,
1973       List<KeyValueScanner> additionalScanners) throws IOException {
1974     return new RegionScannerImpl(scan, additionalScanners, this);
1975   }
1976 
1977   /*
1978    * @param delete The passed delete is modified by this method. WARNING!
1979    */
1980   private void prepareDelete(Delete delete) throws IOException {
1981     // Check to see if this is a deleteRow insert
1982     if(delete.getFamilyMap().isEmpty()){
1983       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
1984         // Don't eat the timestamp
1985         delete.deleteFamily(family, delete.getTimeStamp());
1986       }
1987     } else {
1988       for(byte [] family : delete.getFamilyMap().keySet()) {
1989         if(family == null) {
1990           throw new NoSuchColumnFamilyException("Empty family is invalid");
1991         }
1992         checkFamily(family);
1993       }
1994     }
1995   }
1996 
1997   //////////////////////////////////////////////////////////////////////////////
1998   // set() methods for client use.
1999   //////////////////////////////////////////////////////////////////////////////
2000 
2001   /**
2002    * @param delete delete object
2003    * @param writeToWAL append to the write ahead lock or not
2004    * @throws IOException read exceptions
2005    */
2006   public void delete(Delete delete, boolean writeToWAL)
2007   throws IOException {
2008     delete(delete, null, writeToWAL);
2009   }
2010 
2011   /**
2012    * @param delete delete object
2013    * @param lockid existing lock id, or null for grab a lock
2014    * @param writeToWAL append to the write ahead lock or not
2015    * @throws IOException read exceptions
2016    * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
2017    */
2018   public void delete(Delete delete, Integer lockid, boolean writeToWAL)
2019   throws IOException {
2020     checkReadOnly();
2021     checkResources();
2022     Integer lid = null;
2023     startRegionOperation();
2024     this.writeRequestsCount.increment();
2025     this.opMetrics.setWriteRequestCountMetrics(this.writeRequestsCount.get());
2026     try {
2027       byte [] row = delete.getRow();
2028       // If we did not pass an existing row lock, obtain a new one
2029       lid = getLock(lockid, row, true);
2030 
2031       try {
2032         // All edits for the given row (across all column families) must happen atomically.
2033         prepareDelete(delete);
2034         internalDelete(delete, delete.getClusterId(), writeToWAL);
2035       } finally {
2036         if(lockid == null) releaseRowLock(lid);
2037       }
2038     } finally {
2039       closeRegionOperation();
2040     }
2041   }
2042 
2043   /**
2044    * This is used only by unit tests. Not required to be a public API.
2045    * @param familyMap map of family to edits for the given family.
2046    * @param writeToWAL
2047    * @throws IOException
2048    */
2049   void delete(Map<byte[], List<KeyValue>> familyMap, UUID clusterId,
2050       boolean writeToWAL) throws IOException {
2051     Delete delete = new Delete();
2052     delete.setFamilyMap(familyMap);
2053     delete.setClusterId(clusterId);
2054     delete.setWriteToWAL(writeToWAL);
2055     internalDelete(delete, clusterId, writeToWAL);
2056   }
2057 
2058   /**
2059    * Setup correct timestamps in the KVs in Delete object.
2060    * Caller should have the row and region locks.
2061    * @param familyMap
2062    * @param now
2063    * @throws IOException
2064    */
2065   private void prepareDeleteTimestamps(Map<byte[], List<KeyValue>> familyMap, byte[] byteNow)
2066       throws IOException {
2067     for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
2068 
2069       byte[] family = e.getKey();
2070       List<KeyValue> kvs = e.getValue();
2071       assert kvs instanceof RandomAccess;
2072       Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2073       int listSize = kvs.size();
2074       for (int i=0; i < listSize; i++) {
2075         KeyValue kv = kvs.get(i);
2076         //  Check if time is LATEST, change to time of most recent addition if so
2077         //  This is expensive.
2078         if (kv.isLatestTimestamp() && kv.isDeleteType()) {
2079           byte[] qual = kv.getQualifier();
2080           if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2081 
2082           Integer count = kvCount.get(qual);
2083           if (count == null) {
2084             kvCount.put(qual, 1);
2085           } else {
2086             kvCount.put(qual, count + 1);
2087           }
2088           count = kvCount.get(qual);
2089 
2090           Get get = new Get(kv.getRow());
2091           get.setMaxVersions(count);
2092           get.addColumn(family, qual);
2093 
2094           List<KeyValue> result = get(get, false);
2095 
2096           if (result.size() < count) {
2097             // Nothing to delete
2098             kv.updateLatestStamp(byteNow);
2099             continue;
2100           }
2101           if (result.size() > count) {
2102             throw new RuntimeException("Unexpected size: " + result.size());
2103           }
2104           KeyValue getkv = result.get(count - 1);
2105           Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
2106               getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
2107         } else {
2108           kv.updateLatestStamp(byteNow);
2109         }
2110       }
2111     }
2112   }
2113 
2114   /**
2115    * @param delete The Delete command
2116    * @param clusterId UUID of the originating cluster (for replication).
2117    * @param writeToWAL
2118    * @throws IOException
2119    */
2120   private void internalDelete(Delete delete, UUID clusterId,
2121       boolean writeToWAL) throws IOException {
2122     Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
2123     WALEdit walEdit = new WALEdit();
2124     /* Run coprocessor pre hook outside of locks to avoid deadlock */
2125     if (coprocessorHost != null) {
2126       if (coprocessorHost.preDelete(delete, walEdit, writeToWAL)) {
2127         return;
2128       }
2129     }
2130 
2131     long now = EnvironmentEdgeManager.currentTimeMillis();
2132     byte [] byteNow = Bytes.toBytes(now);
2133     boolean flush = false;
2134 
2135     lock(updatesLock.readLock());
2136     try {
2137       prepareDeleteTimestamps(delete.getFamilyMap(), byteNow);
2138 
2139       if (writeToWAL) {
2140         // write/sync to WAL should happen before we touch memstore.
2141         //
2142         // If order is reversed, i.e. we write to memstore first, and
2143         // for some reason fail to write/sync to commit log, the memstore
2144         // will contain uncommitted transactions.
2145         //
2146         // bunch up all edits across all column families into a
2147         // single WALEdit.
2148         addFamilyMapToWALEdit(familyMap, walEdit);
2149         walEdit.addClusterIds(delete.getClusterIds());
2150         this.log.append(regionInfo, this.htableDescriptor.getName(),
2151             walEdit, clusterId, now, this.htableDescriptor);
2152       }
2153 
2154       // Now make changes to the memstore.
2155       long addedSize = applyFamilyMapToMemstore(familyMap, null);
2156       flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
2157 
2158     } finally {
2159       this.updatesLock.readLock().unlock();
2160     }
2161     // do after lock
2162     if (coprocessorHost != null) {
2163       coprocessorHost.postDelete(delete, walEdit, writeToWAL);
2164     }
2165     final long after = EnvironmentEdgeManager.currentTimeMillis();
2166     this.opMetrics.updateDeleteMetrics(familyMap.keySet(), after-now);
2167 
2168     if (flush) {
2169       // Request a cache flush.  Do it outside update lock.
2170       requestFlush();
2171     }
2172   }
2173 
2174   /**
2175    * @param put
2176    * @throws IOException
2177    */
2178   public void put(Put put) throws IOException {
2179     this.put(put, null, put.getWriteToWAL());
2180   }
2181 
2182   /**
2183    * @param put
2184    * @param writeToWAL
2185    * @throws IOException
2186    */
2187   public void put(Put put, boolean writeToWAL) throws IOException {
2188     this.put(put, null, writeToWAL);
2189   }
2190 
2191   /**
2192    * @param put
2193    * @param lockid
2194    * @throws IOException
2195    * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
2196    */
2197   public void put(Put put, Integer lockid) throws IOException {
2198     this.put(put, lockid, put.getWriteToWAL());
2199   }
2200 
2201 
2202 
2203   /**
2204    * @param put
2205    * @param lockid
2206    * @param writeToWAL
2207    * @throws IOException
2208    * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
2209    */
2210   public void put(Put put, Integer lockid, boolean writeToWAL)
2211   throws IOException {
2212     checkReadOnly();
2213 
2214     // Do a rough check that we have resources to accept a write.  The check is
2215     // 'rough' in that between the resource check and the call to obtain a
2216     // read lock, resources may run out.  For now, the thought is that this
2217     // will be extremely rare; we'll deal with it when it happens.
2218     checkResources();
2219     startRegionOperation();
2220     this.writeRequestsCount.increment();
2221     this.opMetrics.setWriteRequestCountMetrics(this.writeRequestsCount.get());
2222     try {
2223       // We obtain a per-row lock, so other clients will block while one client
2224       // performs an update. The read lock is released by the client calling
2225       // #commit or #abort or if the HRegionServer lease on the lock expires.
2226       // See HRegionServer#RegionListener for how the expire on HRegionServer
2227       // invokes a HRegion#abort.
2228       byte [] row = put.getRow();
2229       // If we did not pass an existing row lock, obtain a new one
2230       Integer lid = getLock(lockid, row, true);
2231 
2232       try {
2233         // All edits for the given row (across all column families) must happen atomically.
2234         internalPut(put, put.getClusterId(), writeToWAL);
2235       } finally {
2236         if(lockid == null) releaseRowLock(lid);
2237       }
2238     } finally {
2239       closeRegionOperation();
2240     }
2241   }
2242 
2243   /**
2244    * Struct-like class that tracks the progress of a batch operation,
2245    * accumulating status codes and tracking the index at which processing
2246    * is proceeding.
2247    */
2248   private static class BatchOperationInProgress<T> {
2249     T[] operations;
2250     int nextIndexToProcess = 0;
2251     OperationStatus[] retCodeDetails;
2252     WALEdit[] walEditsFromCoprocessors;
2253 
2254     public BatchOperationInProgress(T[] operations) {
2255       this.operations = operations;
2256       this.retCodeDetails = new OperationStatus[operations.length];
2257       this.walEditsFromCoprocessors = new WALEdit[operations.length];
2258       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2259     }
2260 
2261     public boolean isDone() {
2262       return nextIndexToProcess == operations.length;
2263     }
2264   }
2265 
2266   /**
2267    * Perform a batch put with no pre-specified locks
2268    * @see HRegion#batchMutate(Pair[])
2269    */
2270   public OperationStatus[] put(Put[] puts) throws IOException {
2271     @SuppressWarnings("unchecked")
2272     Pair<Mutation, Integer> putsAndLocks[] = new Pair[puts.length];
2273 
2274     for (int i = 0; i < puts.length; i++) {
2275       putsAndLocks[i] = new Pair<Mutation, Integer>(puts[i], null);
2276     }
2277     return batchMutate(putsAndLocks);
2278   }
2279 
2280   /**
2281    * Perform a batch of puts.
2282    * @param putsAndLocks
2283    *          the list of puts paired with their requested lock IDs.
2284    * @return an array of OperationStatus which internally contains the OperationStatusCode and the
2285    *         exceptionMessage if any.
2286    * @throws IOException
2287    * @deprecated Instead use {@link HRegion#batchMutate(Pair[])}
2288    */
2289   @Deprecated
2290   public OperationStatus[] put(Pair<Put, Integer>[] putsAndLocks) throws IOException {
2291     Pair<Mutation, Integer>[] mutationsAndLocks = new Pair[putsAndLocks.length];
2292     System.arraycopy(putsAndLocks, 0, mutationsAndLocks, 0, putsAndLocks.length);
2293     return batchMutate(mutationsAndLocks);
2294   }
2295 
2296   /**
2297    * Perform a batch of mutations.
2298    * It supports only Put and Delete mutations and will ignore other types passed.
2299    * @param mutationsAndLocks
2300    *          the list of mutations paired with their requested lock IDs.
2301    * @return an array of OperationStatus which internally contains the
2302    *         OperationStatusCode and the exceptionMessage if any.
2303    * @throws IOException
2304    */
2305   public OperationStatus[] batchMutate(
2306       Pair<Mutation, Integer>[] mutationsAndLocks) throws IOException {
2307     BatchOperationInProgress<Pair<Mutation, Integer>> batchOp =
2308       new BatchOperationInProgress<Pair<Mutation,Integer>>(mutationsAndLocks);
2309 
2310     boolean initialized = false;
2311 
2312     while (!batchOp.isDone()) {
2313       checkReadOnly();
2314       checkResources();
2315 
2316       long newSize;
2317       startRegionOperation();
2318 
2319       try {
2320         if (!initialized) {
2321           this.writeRequestsCount.increment();
2322           this.opMetrics.setWriteRequestCountMetrics(this.writeRequestsCount.get());
2323           doPreMutationHook(batchOp);
2324           initialized = true;
2325         }
2326         long addedSize = doMiniBatchMutation(batchOp);
2327         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2328       } finally {
2329         closeRegionOperation();
2330       }
2331       if (isFlushSize(newSize)) {
2332         requestFlush();
2333       }
2334     }
2335     return batchOp.retCodeDetails;
2336   }
2337 
2338   private void doPreMutationHook(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp)
2339       throws IOException {
2340     /* Run coprocessor pre hook outside of locks to avoid deadlock */
2341     WALEdit walEdit = new WALEdit();
2342     if (coprocessorHost != null) {
2343       for (int i = 0; i < batchOp.operations.length; i++) {
2344         Pair<Mutation, Integer> nextPair = batchOp.operations[i];
2345         Mutation m = nextPair.getFirst();
2346         if (m instanceof Put) {
2347           if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
2348             // pre hook says skip this Put
2349             // mark as success and skip in doMiniBatchMutation
2350             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2351           }
2352         } else if (m instanceof Delete) {
2353           if (coprocessorHost.preDelete((Delete) m, walEdit, m.getWriteToWAL())) {
2354             // pre hook says skip this Delete
2355             // mark as success and skip in doMiniBatchMutation
2356             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2357           }
2358         } else {
2359           // In case of passing Append mutations along with the Puts and Deletes in batchMutate
2360           // mark the operation return code as failure so that it will not be considered in
2361           // the doMiniBatchMutation
2362           batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2363               "Put/Delete mutations only supported in batchMutate() now");
2364         }
2365         if (!walEdit.isEmpty()) {
2366           batchOp.walEditsFromCoprocessors[i] = walEdit;
2367           walEdit = new WALEdit();
2368         }
2369       }
2370     }
2371   }
2372 
2373   // The mutation will be either a Put or Delete.
2374   @SuppressWarnings("unchecked")
2375   private long doMiniBatchMutation(
2376       BatchOperationInProgress<Pair<Mutation, Integer>> batchOp) throws IOException {
2377 
2378     // The set of columnFamilies first seen for Put.
2379     Set<byte[]> putsCfSet = null;
2380     // variable to note if all Put items are for the same CF -- metrics related
2381     boolean putsCfSetConsistent = true;
2382     // The set of columnFamilies first seen for Delete.
2383     Set<byte[]> deletesCfSet = null;
2384     // variable to note if all Delete items are for the same CF -- metrics related
2385     boolean deletesCfSetConsistent = true;
2386     long startTimeMs = EnvironmentEdgeManager.currentTimeMillis();
2387 
2388     WALEdit walEdit = new WALEdit();
2389 
2390     MultiVersionConsistencyControl.WriteEntry w = null;
2391     long txid = 0;
2392     boolean doRollBackMemstore = false;
2393     boolean locked = false;
2394 
2395     /** Keep track of the locks we hold so we can release them in finally clause */
2396     List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2397     Set<HashedBytes> rowsAlreadyLocked = Sets.newHashSet();
2398 
2399     // reference family maps directly so coprocessors can mutate them if desired
2400     Map<byte[],List<KeyValue>>[] familyMaps = new Map[batchOp.operations.length];
2401     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
2402     int firstIndex = batchOp.nextIndexToProcess;
2403     int lastIndexExclusive = firstIndex;
2404     boolean success = false;
2405     int noOfPuts = 0, noOfDeletes = 0;
2406     try {
2407       // ------------------------------------
2408       // STEP 1. Try to acquire as many locks as we can, and ensure
2409       // we acquire at least one.
2410       // ----------------------------------
2411       int numReadyToWrite = 0;
2412       long now = EnvironmentEdgeManager.currentTimeMillis();
2413       while (lastIndexExclusive < batchOp.operations.length) {
2414         Pair<Mutation, Integer> nextPair = batchOp.operations[lastIndexExclusive];
2415         Mutation mutation = nextPair.getFirst();
2416         Integer providedLockId = nextPair.getSecond();
2417 
2418         Map<byte[], List<KeyValue>> familyMap = mutation.getFamilyMap();
2419         // store the family map reference to allow for mutations
2420         familyMaps[lastIndexExclusive] = familyMap;
2421 
2422         // skip anything that "ran" already
2423         if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2424             != OperationStatusCode.NOT_RUN) {
2425           lastIndexExclusive++;
2426           continue;
2427         }
2428 
2429         try {
2430           if (mutation instanceof Put) {
2431             checkFamilies(familyMap.keySet());
2432             checkTimestamps(mutation.getFamilyMap(), now);
2433           } else {
2434             prepareDelete((Delete) mutation);
2435           }
2436         } catch (NoSuchColumnFamilyException nscf) {
2437           LOG.warn("No such column family in batch mutation", nscf);
2438           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2439               OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2440           lastIndexExclusive++;
2441           continue;
2442         } catch (DoNotRetryIOException fsce) {
2443           // The only thing that throws a generic DoNotRetryIOException in the above code is
2444           // checkTimestamps so that DoNotRetryIOException means that timestamps were invalid.
2445           // If more checks are added, be sure to revisit this assumption.
2446           LOG.warn("Batch Mutation did not pass sanity check", fsce);
2447           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2448               OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2449           lastIndexExclusive++;
2450           continue;
2451         }
2452         // If we haven't got any rows in our batch, we should block to
2453         // get the next one.
2454         boolean shouldBlock = numReadyToWrite == 0;
2455         boolean failedToAcquire = false;
2456         Integer acquiredLockId = null;
2457         HashedBytes currentRow = new HashedBytes(mutation.getRow());
2458         try {
2459           if (providedLockId != null || !rowsAlreadyLocked.contains(currentRow)) {
2460             acquiredLockId = getLock(providedLockId, currentRow, shouldBlock);
2461             if (acquiredLockId == null) {
2462               failedToAcquire = true;
2463             } else if (providedLockId == null) {
2464               rowsAlreadyLocked.add(currentRow);
2465             }
2466           } 
2467         } catch (IOException ioe) {
2468           LOG.warn("Failed getting lock in batch put, row=" + currentRow, ioe);
2469           failedToAcquire = true;
2470         }
2471         if (failedToAcquire) {
2472           // We failed to grab another lock
2473           assert !shouldBlock : "Should never fail to get lock when blocking";
2474           break; // stop acquiring more rows for this batch
2475         }
2476         if (providedLockId == null) {
2477           acquiredLocks.add(acquiredLockId);
2478         }
2479         lastIndexExclusive++;
2480         numReadyToWrite++;
2481 
2482         if (mutation instanceof Put) {
2483           // If Column Families stay consistent through out all of the
2484           // individual puts then metrics can be reported as a mutliput across
2485           // column families in the first put.
2486           if (putsCfSet == null) {
2487             putsCfSet = mutation.getFamilyMap().keySet();
2488           } else {
2489             putsCfSetConsistent = putsCfSetConsistent
2490                 && mutation.getFamilyMap().keySet().equals(putsCfSet);
2491           }
2492         } else {
2493           if (deletesCfSet == null) {
2494             deletesCfSet = mutation.getFamilyMap().keySet();
2495           } else {
2496             deletesCfSetConsistent = deletesCfSetConsistent
2497                 && mutation.getFamilyMap().keySet().equals(deletesCfSet);
2498           }
2499         }
2500       }
2501 
2502       // we should record the timestamp only after we have acquired the rowLock,
2503       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
2504       now = EnvironmentEdgeManager.currentTimeMillis();
2505       byte[] byteNow = Bytes.toBytes(now);
2506 
2507       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
2508       if (numReadyToWrite <= 0) return 0L;
2509 
2510       // We've now grabbed as many mutations off the list as we can
2511 
2512       // ------------------------------------
2513       // STEP 2. Update any LATEST_TIMESTAMP timestamps
2514       // ----------------------------------
2515       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2516         // skip invalid
2517         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2518             != OperationStatusCode.NOT_RUN) continue;
2519         Mutation mutation = batchOp.operations[i].getFirst();
2520         if (mutation instanceof Put) {
2521           updateKVTimestamps(familyMaps[i].values(), byteNow);
2522           noOfPuts++;
2523         } else {
2524           prepareDeleteTimestamps(familyMaps[i], byteNow);
2525           noOfDeletes++;
2526         }
2527       }
2528 
2529       lock(this.updatesLock.readLock(), numReadyToWrite);
2530       locked = true;
2531 
2532       //
2533       // ------------------------------------
2534       // Acquire the latest mvcc number
2535       // ----------------------------------
2536       w = mvcc.beginMemstoreInsert();
2537 
2538       // calling the pre CP hook for batch mutation
2539       if (coprocessorHost != null) {
2540         MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp = 
2541           new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations, 
2542           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2543         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2544       }
2545 
2546       // ------------------------------------
2547       // STEP 3. Write back to memstore
2548       // Write to memstore. It is ok to write to memstore
2549       // first without updating the HLog because we do not roll
2550       // forward the memstore MVCC. The MVCC will be moved up when
2551       // the complete operation is done. These changes are not yet
2552       // visible to scanners till we update the MVCC. The MVCC is
2553       // moved only when the sync is complete.
2554       // ----------------------------------
2555       long addedSize = 0;
2556       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2557         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2558             != OperationStatusCode.NOT_RUN) {
2559           continue;
2560         }
2561         doRollBackMemstore = true;
2562         addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
2563       }
2564 
2565       // ------------------------------------
2566       // STEP 4. Build WAL edit
2567       // ----------------------------------
2568       Durability durability = Durability.USE_DEFAULT;
2569 
2570       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2571         // Skip puts that were determined to be invalid during preprocessing
2572         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2573             != OperationStatusCode.NOT_RUN) {
2574           continue;
2575         }
2576         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2577 
2578         Mutation m = batchOp.operations[i].getFirst();
2579         Durability tmpDur = m.getDurability(); 
2580         if (tmpDur.ordinal() > durability.ordinal()) {
2581           durability = tmpDur;
2582         }
2583         if (tmpDur == Durability.SKIP_WAL) {
2584           if (m instanceof Put) {
2585             recordPutWithoutWal(m.getFamilyMap());
2586           }
2587           continue;
2588         }
2589 
2590         // Add WAL edits by CP
2591         WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2592         if (fromCP != null) {
2593           for (KeyValue kv : fromCP.getKeyValues()) {
2594             walEdit.add(kv);
2595           }
2596         }
2597         addFamilyMapToWALEdit(familyMaps[i], walEdit);
2598       }
2599 
2600       // -------------------------
2601       // STEP 5. Append the edit to WAL. Do not sync wal.
2602       // -------------------------
2603       Mutation first = batchOp.operations[firstIndex].getFirst();
2604       walEdit.addClusterIds(first.getClusterIds());
2605       txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
2606                walEdit, first.getClusterId(), now, this.htableDescriptor);
2607 
2608       // -------------------------------
2609       // STEP 6. Release row locks, etc.
2610       // -------------------------------
2611       if (locked) {
2612         this.updatesLock.readLock().unlock();
2613         locked = false;
2614       }
2615       if (acquiredLocks != null) {
2616         for (Integer toRelease : acquiredLocks) {
2617           releaseRowLock(toRelease);
2618         }
2619         acquiredLocks = null;
2620         rowsAlreadyLocked = null;
2621       }
2622       // -------------------------
2623       // STEP 7. Sync wal.
2624       // -------------------------
2625       if (walEdit.size() > 0) {
2626         syncOrDefer(txid, durability);
2627       }
2628       doRollBackMemstore = false;
2629       // calling the post CP hook for batch mutation
2630       if (coprocessorHost != null) {
2631         MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp = 
2632           new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations, 
2633           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2634         coprocessorHost.postBatchMutate(miniBatchOp);
2635       }
2636       
2637       // ------------------------------------------------------------------
2638       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
2639       // ------------------------------------------------------------------
2640       if (w != null) {
2641         mvcc.completeMemstoreInsert(w);
2642         w = null;
2643       }
2644 
2645       // ------------------------------------
2646       // STEP 9. Run coprocessor post hooks. This should be done after the wal is
2647       // synced so that the coprocessor contract is adhered to.
2648       // ------------------------------------
2649       if (coprocessorHost != null) {
2650         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2651           // only for successful puts
2652           if (batchOp.retCodeDetails[i].getOperationStatusCode()
2653               != OperationStatusCode.SUCCESS) {
2654             continue;
2655           }
2656           Mutation m = batchOp.operations[i].getFirst();
2657           if (m instanceof Put) {
2658             coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
2659           } else {
2660             coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
2661           }
2662         }
2663       }
2664       success = true;
2665       return addedSize;
2666     } finally {
2667 
2668       // if the wal sync was unsuccessful, remove keys from memstore
2669       if (doRollBackMemstore) {
2670         rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
2671       }
2672       if (w != null) mvcc.completeMemstoreInsert(w);
2673 
2674       if (locked) {
2675         this.updatesLock.readLock().unlock();
2676       }
2677 
2678       if (acquiredLocks != null) {
2679         for (Integer toRelease : acquiredLocks) {
2680           releaseRowLock(toRelease);
2681         }
2682       }
2683 
2684       // do after lock
2685       final long netTimeMs = EnvironmentEdgeManager.currentTimeMillis()- startTimeMs;
2686 
2687       // See if the column families were consistent through the whole thing.
2688       // if they were then keep them. If they were not then pass a null.
2689       // null will be treated as unknown.
2690       // Total time taken might be involving Puts and Deletes.
2691       // Split the time for puts and deletes based on the total number of Puts and Deletes.
2692       long timeTakenForPuts = 0;
2693       if (noOfPuts > 0) {
2694         // There were some Puts in the batch.
2695         double noOfMutations = noOfPuts + noOfDeletes;
2696         timeTakenForPuts = (long) (netTimeMs * (noOfPuts / noOfMutations));
2697         final Set<byte[]> keptCfs = putsCfSetConsistent ? putsCfSet : null;
2698         this.opMetrics.updateMultiPutMetrics(keptCfs, timeTakenForPuts);
2699       }
2700       if (noOfDeletes > 0) {
2701         // There were some Deletes in the batch.
2702         final Set<byte[]> keptCfs = deletesCfSetConsistent ? deletesCfSet : null;
2703         this.opMetrics.updateMultiDeleteMetrics(keptCfs, netTimeMs - timeTakenForPuts);
2704       }
2705       if (!success) {
2706         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2707           if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
2708             batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
2709           }
2710         }
2711       }
2712       batchOp.nextIndexToProcess = lastIndexExclusive;
2713     }
2714   }
2715 
2716   //TODO, Think that gets/puts and deletes should be refactored a bit so that
2717   //the getting of the lock happens before, so that you would just pass it into
2718   //the methods. So in the case of checkAndMutate you could just do lockRow,
2719   //get, put, unlockRow or something
2720  /**
2721   *
2722   * @param row
2723   * @param family
2724   * @param qualifier
2725   * @param compareOp
2726   * @param comparator
2727   * @param writeToWAL
2728   * @throws IOException
2729   * @return true if the new put was execute, false otherwise
2730   */
2731  public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
2732      CompareOp compareOp, WritableByteArrayComparable comparator, Writable w,
2733      boolean writeToWAL)
2734  throws IOException {
2735    return checkAndMutate(row, family, qualifier, compareOp, comparator, w, null, writeToWAL);
2736  }
2737   
2738   /**
2739    *
2740    * @param row
2741    * @param family
2742    * @param qualifier
2743    * @param compareOp
2744    * @param comparator
2745    * @param lockId
2746    * @param writeToWAL
2747    * @throws IOException
2748    * @return true if the new put was execute, false otherwise
2749    * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
2750    */
2751   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
2752       CompareOp compareOp, WritableByteArrayComparable comparator, Writable w,
2753       Integer lockId, boolean writeToWAL)
2754   throws IOException{
2755     checkReadOnly();
2756     //TODO, add check for value length or maybe even better move this to the
2757     //client if this becomes a global setting
2758     checkResources();
2759     boolean isPut = w instanceof Put;
2760     if (!isPut && !(w instanceof Delete))
2761       throw new DoNotRetryIOException("Action must be Put or Delete");
2762     Row r = (Row)w;
2763     if (!Bytes.equals(row, r.getRow())) {
2764       throw new DoNotRetryIOException("Action's getRow must match the passed row");
2765     }
2766 
2767     startRegionOperation();
2768     this.writeRequestsCount.increment();
2769     this.opMetrics.setWriteRequestCountMetrics(this.writeRequestsCount.get());
2770     try {
2771       RowLock lock = isPut ? ((Put)w).getRowLock() : ((Delete)w).getRowLock();
2772       Get get = new Get(row, lock);
2773       checkFamily(family);
2774       get.addColumn(family, qualifier);
2775 
2776       // Lock row
2777       Integer lid = getLock(lockId, get.getRow(), true);
2778       // wait for all previous transactions to complete (with lock held)
2779       mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
2780       List<KeyValue> result = new ArrayList<KeyValue>();
2781       try {
2782         result = get(get, false);
2783 
2784         boolean valueIsNull = comparator.getValue() == null ||
2785           comparator.getValue().length == 0;
2786         boolean matches = false;
2787         if (result.size() == 0 && valueIsNull) {
2788           matches = true;
2789         } else if (result.size() > 0 && result.get(0).getValue().length == 0 &&
2790             valueIsNull) {
2791           matches = true;
2792         } else if (result.size() == 1 && !valueIsNull) {
2793           KeyValue kv = result.get(0);
2794           int compareResult = comparator.compareTo(kv.getBuffer(),
2795               kv.getValueOffset(), kv.getValueLength());
2796           switch (compareOp) {
2797           case LESS:
2798             matches = compareResult < 0;
2799             break;
2800           case LESS_OR_EQUAL:
2801             matches = compareResult <= 0;
2802             break;
2803           case EQUAL:
2804             matches = compareResult == 0;
2805             break;
2806           case NOT_EQUAL:
2807             matches = compareResult != 0;
2808             break;
2809           case GREATER_OR_EQUAL:
2810             matches = compareResult >= 0;
2811             break;
2812           case GREATER:
2813             matches = compareResult > 0;
2814             break;
2815           default:
2816             throw new RuntimeException("Unknown Compare op " + compareOp.name());
2817           }
2818         }
2819         //If matches put the new put or delete the new delete
2820         if (matches) {
2821           // All edits for the given row (across all column families) must
2822           // happen atomically.
2823           //
2824           // Using default cluster id, as this can only happen in the
2825           // originating cluster. A slave cluster receives the result as a Put
2826           // or Delete
2827           if (isPut) {
2828             internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
2829           } else {
2830             Delete d = (Delete)w;
2831             prepareDelete(d);
2832             internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
2833           }
2834           return true;
2835         }
2836         return false;
2837       } finally {
2838         if(lockId == null) releaseRowLock(lid);
2839       }
2840     } finally {
2841       closeRegionOperation();
2842     }
2843   }
2844 
2845 
2846   /**
2847    * Complete taking the snapshot on the region. Writes the region info and adds references to the
2848    * working snapshot directory.
2849    *
2850    * TODO for api consistency, consider adding another version with no {@link ForeignExceptionSnare}
2851    * arg.  (In the future other cancellable HRegion methods could eventually add a
2852    * {@link ForeignExceptionSnare}, or we could do something fancier).
2853    *
2854    * @param desc snasphot description object
2855    * @param exnSnare ForeignExceptionSnare that captures external exeptions in case we need to
2856    *   bail out.  This is allowed to be null and will just be ignored in that case.
2857    * @throws IOException if there is an external or internal error causing the snapshot to fail
2858    */
2859   public void addRegionToSnapshot(SnapshotDescription desc,
2860       ForeignExceptionSnare exnSnare) throws IOException {
2861     // This should be "fast" since we don't rewrite store files but instead
2862     // back up the store files by creating a reference
2863     Path rootDir = FSUtils.getRootDir(this.rsServices.getConfiguration());
2864     Path snapshotRegionDir = TakeSnapshotUtils.getRegionSnapshotDirectory(desc, rootDir,
2865       regionInfo.getEncodedName());
2866 
2867     // 1. dump region meta info into the snapshot directory
2868     LOG.debug("Storing region-info for snapshot.");
2869     checkRegioninfoOnFilesystem(snapshotRegionDir);
2870 
2871     // 2. iterate through all the stores in the region
2872     LOG.debug("Creating references for hfiles");
2873 
2874     // This ensures that we have an atomic view of the directory as long as we have < ls limit
2875     // (batch size of the files in a directory) on the namenode. Otherwise, we get back the files in
2876     // batches and may miss files being added/deleted. This could be more robust (iteratively
2877     // checking to see if we have all the files until we are sure), but the limit is currently 1000
2878     // files/batch, far more than the number of store files under a single column family.
2879     for (Store store : stores.values()) {
2880       // 2.1. build the snapshot reference directory for the store
2881       Path dstStoreDir = TakeSnapshotUtils.getStoreSnapshotDirectory(snapshotRegionDir,
2882         Bytes.toString(store.getFamily().getName()));
2883       List<StoreFile> storeFiles = store.getStorefiles();
2884       if (LOG.isDebugEnabled()) {
2885         LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
2886       }
2887 
2888       // 2.2. iterate through all the store's files and create "references".
2889       int sz = storeFiles.size();
2890       for (int i = 0; i < sz; i++) {
2891         if (exnSnare != null) {
2892           exnSnare.rethrowException();
2893         }
2894         StoreFile storeFile = storeFiles.get(i);
2895         Path file = storeFile.getPath();
2896 
2897         LOG.debug("Creating reference for file (" + (i+1) + "/" + sz + ") : " + file);
2898         Path referenceFile = new Path(dstStoreDir, file.getName());
2899         boolean success = true;
2900         if (storeFile.isReference()) {
2901           // write the Reference object to the snapshot
2902           storeFile.getReference().write(fs, referenceFile);
2903         } else {
2904           // create "reference" to this store file.  It is intentionally an empty file -- all
2905           // necessary information is captured by its fs location and filename.  This allows us to
2906           // only figure out what needs to be done via a single nn operation (instead of having to
2907           // open and read the files as well).
2908           success = HBaseFileSystem.createNewFileOnFileSystem(fs, referenceFile);
2909         }
2910         if (!success) {
2911           throw new IOException("Failed to create reference file:" + referenceFile);
2912         }
2913       }
2914     }
2915   }
2916 
2917   /**
2918    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the provided current
2919    * timestamp.
2920    */
2921   private void updateKVTimestamps(
2922       final Iterable<List<KeyValue>> keyLists, final byte[] now) {
2923     for (List<KeyValue> keys: keyLists) {
2924       if (keys == null) continue;
2925       assert keys instanceof RandomAccess;
2926       int listSize = keys.size();
2927       for (int i=0; i < listSize; i++) {
2928         KeyValue key = keys.get(i);
2929         key.updateLatestStamp(now);
2930       }
2931     }
2932   }
2933 
2934   /*
2935    * Check if resources to support an update.
2936    *
2937    * Here we synchronize on HRegion, a broad scoped lock.  Its appropriate
2938    * given we're figuring in here whether this region is able to take on
2939    * writes.  This is only method with a synchronize (at time of writing),
2940    * this and the synchronize on 'this' inside in internalFlushCache to send
2941    * the notify.
2942    */
2943   private void checkResources()
2944       throws RegionTooBusyException, InterruptedIOException {
2945 
2946     // If catalog region, do not impose resource constraints or block updates.
2947     if (this.getRegionInfo().isMetaRegion()) return;
2948 
2949     boolean blocked = false;
2950     long startTime = 0;
2951     while (this.memstoreSize.get() > this.blockingMemStoreSize) {
2952       requestFlush();
2953       if (!blocked) {
2954         startTime = EnvironmentEdgeManager.currentTimeMillis();
2955         LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
2956           "' on region " + Bytes.toStringBinary(getRegionName()) +
2957           ": memstore size " +
2958           StringUtils.humanReadableInt(this.memstoreSize.get()) +
2959           " is >= than blocking " +
2960           StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");
2961       }
2962       long now = EnvironmentEdgeManager.currentTimeMillis();
2963       long timeToWait = startTime + busyWaitDuration - now;
2964       if (timeToWait <= 0L) {
2965         final long totalTime = now - startTime;
2966         this.updatesBlockedMs.add(totalTime);
2967         LOG.info("Failed to unblock updates for region " + this + " '"
2968           + Thread.currentThread().getName() + "' in " + totalTime
2969           + "ms. The region is still busy.");
2970         throw new RegionTooBusyException("region is flushing");
2971       }
2972       blocked = true;
2973       synchronized(this) {
2974         try {
2975           wait(Math.min(timeToWait, threadWakeFrequency));
2976         } catch (InterruptedException ie) {
2977           final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
2978           if (totalTime > 0) {
2979             this.updatesBlockedMs.add(totalTime);
2980           }
2981           LOG.info("Interrupted while waiting to unblock updates for region "
2982             + this + " '" + Thread.currentThread().getName() + "'");
2983           InterruptedIOException iie = new InterruptedIOException();
2984           iie.initCause(ie);
2985           throw iie;
2986         }
2987       }
2988     }
2989     if (blocked) {
2990       // Add in the blocked time if appropriate
2991       final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
2992       if(totalTime > 0 ){
2993         this.updatesBlockedMs.add(totalTime);
2994       }
2995       LOG.info("Unblocking updates for region " + this + " '"
2996           + Thread.currentThread().getName() + "'");
2997     }
2998   }
2999 
3000   /**
3001    * @throws IOException Throws exception if region is in read-only mode.
3002    */
3003   protected void checkReadOnly() throws IOException {
3004     if (this.writestate.isReadOnly()) {
3005       throw new IOException("region is read only");
3006     }
3007   }
3008 
3009   /**
3010    * Add updates first to the hlog and then add values to memstore.
3011    * Warning: Assumption is caller has lock on passed in row.
3012    * @param family
3013    * @param edits Cell updates by column
3014    * @praram now
3015    * @throws IOException
3016    */
3017   private void put(byte [] family, List<KeyValue> edits)
3018   throws IOException {
3019     Map<byte[], List<KeyValue>> familyMap;
3020     familyMap = new HashMap<byte[], List<KeyValue>>();
3021 
3022     familyMap.put(family, edits);
3023     Put p = new Put();
3024     p.setFamilyMap(familyMap);
3025     p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
3026     p.setWriteToWAL(true);
3027     this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true);
3028   }
3029 
3030   /**
3031    * Add updates first to the hlog (if writeToWal) and then add values to memstore.
3032    * Warning: Assumption is caller has lock on passed in row.
3033    * @param put The Put command
3034    * @param clusterId UUID of the originating cluster (for replication).
3035    * @param writeToWAL if true, then we should write to the log
3036    * @throws IOException
3037    */
3038   private void internalPut(Put put, UUID clusterId, boolean writeToWAL) throws IOException {
3039     Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
3040     WALEdit walEdit = new WALEdit();
3041     /* run pre put hook outside of lock to avoid deadlock */
3042     if (coprocessorHost != null) {
3043       if (coprocessorHost.prePut(put, walEdit, writeToWAL)) {
3044         return;
3045       }
3046     }
3047 
3048     long now = EnvironmentEdgeManager.currentTimeMillis();
3049     byte[] byteNow = Bytes.toBytes(now);
3050     boolean flush = false;
3051 
3052     lock(this.updatesLock.readLock());
3053     try {
3054       checkFamilies(familyMap.keySet());
3055       checkTimestamps(familyMap, now);
3056       updateKVTimestamps(familyMap.values(), byteNow);
3057       // write/sync to WAL should happen before we touch memstore.
3058       //
3059       // If order is reversed, i.e. we write to memstore first, and
3060       // for some reason fail to write/sync to commit log, the memstore
3061       // will contain uncommitted transactions.
3062       if (writeToWAL) {
3063         addFamilyMapToWALEdit(familyMap, walEdit);
3064         walEdit.addClusterIds(put.getClusterIds());
3065         this.log.append(regionInfo, this.htableDescriptor.getName(),
3066             walEdit, clusterId, now, this.htableDescriptor);
3067       } else {
3068         recordPutWithoutWal(familyMap);
3069       }
3070 
3071       long addedSize = applyFamilyMapToMemstore(familyMap, null);
3072       flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
3073     } finally {
3074       this.updatesLock.readLock().unlock();
3075     }
3076 
3077     if (coprocessorHost != null) {
3078       coprocessorHost.postPut(put, walEdit, writeToWAL);
3079     }
3080 
3081     // do after lock
3082     final long after = EnvironmentEdgeManager.currentTimeMillis();
3083     this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now);
3084 
3085     if (flush) {
3086       // Request a cache flush.  Do it outside update lock.
3087       requestFlush();
3088     }
3089   }
3090 
3091   /**
3092    * Atomically apply the given map of family->edits to the memstore.
3093    * This handles the consistency control on its own, but the caller
3094    * should already have locked updatesLock.readLock(). This also does
3095    * <b>not</b> check the families for validity.
3096    *
3097    * @param familyMap Map of kvs per family
3098    * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
3099    *        If null, then this method internally creates a mvcc transaction.
3100    * @return the additional memory usage of the memstore caused by the
3101    * new entries.
3102    */
3103   private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap,
3104     MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
3105     long size = 0;
3106     boolean freemvcc = false;
3107 
3108     try {
3109       if (localizedWriteEntry == null) {
3110         localizedWriteEntry = mvcc.beginMemstoreInsert();
3111         freemvcc = true;
3112       }
3113 
3114       for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
3115         byte[] family = e.getKey();
3116         List<KeyValue> edits = e.getValue();
3117         assert edits instanceof RandomAccess;
3118         Store store = getStore(family);
3119         int listSize = edits.size();
3120         for (int i=0; i< listSize; i++) {
3121           KeyValue kv = edits.get(i);
3122           kv.setMemstoreTS(localizedWriteEntry.getWriteNumber());
3123           size += store.add(kv);
3124         }
3125       }
3126     } finally {
3127       if (freemvcc) {
3128         mvcc.completeMemstoreInsert(localizedWriteEntry);
3129       }
3130     }
3131 
3132      return size;
3133    }
3134 
3135   /**
3136    * Remove all the keys listed in the map from the memstore. This method is
3137    * called when a Put/Delete has updated memstore but subequently fails to update
3138    * the wal. This method is then invoked to rollback the memstore.
3139    */
3140   private void rollbackMemstore(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
3141                                 Map<byte[], List<KeyValue>>[] familyMaps,
3142                                 int start, int end) {
3143     int kvsRolledback = 0;
3144     for (int i = start; i < end; i++) {
3145       // skip over request that never succeeded in the first place.
3146       if (batchOp.retCodeDetails[i].getOperationStatusCode()
3147             != OperationStatusCode.SUCCESS) {
3148         continue;
3149       }
3150 
3151       // Rollback all the kvs for this row.
3152       Map<byte[], List<KeyValue>> familyMap  = familyMaps[i];
3153       for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
3154         byte[] family = e.getKey();
3155         List<KeyValue> edits = e.getValue();
3156 
3157         // Remove those keys from the memstore that matches our
3158         // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is
3159         // that even the memstoreTS has to match for keys that will be rolleded-back.
3160         Store store = getStore(family);
3161         for (KeyValue kv: edits) {
3162           store.rollback(kv);
3163           kvsRolledback++;
3164         }
3165       }
3166     }
3167     LOG.debug("rollbackMemstore rolled back " + kvsRolledback +
3168         " keyvalues from start:" + start + " to end:" + end);
3169   }
3170 
3171   /**
3172    * Check the collection of families for validity.
3173    * @throws NoSuchColumnFamilyException if a family does not exist.
3174    */
3175   private void checkFamilies(Collection<byte[]> families)
3176   throws NoSuchColumnFamilyException {
3177     for (byte[] family : families) {
3178       checkFamily(family);
3179     }
3180   }
3181 
3182   private void checkTimestamps(final Map<byte[], List<KeyValue>> familyMap,
3183       long now) throws DoNotRetryIOException {
3184     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3185       return;
3186     }
3187     long maxTs = now + timestampSlop;
3188     for (List<KeyValue> kvs : familyMap.values()) {
3189       assert kvs instanceof RandomAccess;
3190       int listSize = kvs.size();
3191       for (int i=0; i < listSize; i++) {
3192         KeyValue kv = kvs.get(i);
3193         // see if the user-side TS is out of range. latest = server-side
3194         if (!kv.isLatestTimestamp() && kv.getTimestamp() > maxTs) {
3195           throw new DoNotRetryIOException("Timestamp for KV out of range "
3196               + kv + " (too.new=" + timestampSlop + ")");
3197         }
3198       }
3199     }
3200   }
3201 
3202   /**
3203    * Append the given map of family->edits to a WALEdit data structure.
3204    * This does not write to the HLog itself.
3205    * @param familyMap map of family->edits
3206    * @param walEdit the destination entry to append into
3207    */
3208   private void addFamilyMapToWALEdit(Map<byte[], List<KeyValue>> familyMap,
3209       WALEdit walEdit) {
3210     for (List<KeyValue> edits : familyMap.values()) {
3211       assert edits instanceof RandomAccess;
3212       int listSize = edits.size();
3213       for (int i=0; i < listSize; i++) {
3214         KeyValue kv = edits.get(i);
3215         walEdit.add(kv);
3216       }
3217     }
3218   }
3219 
3220   private void requestFlush() {
3221     if (this.rsServices == null) {
3222       return;
3223     }
3224     synchronized (writestate) {
3225       if (this.writestate.isFlushRequested()) {
3226         return;
3227       }
3228       writestate.flushRequested = true;
3229     }
3230     // Make request outside of synchronize block; HBASE-818.
3231     this.rsServices.getFlushRequester().requestFlush(this);
3232     if (LOG.isDebugEnabled()) {
3233       LOG.debug("Flush requested on " + this);
3234     }
3235   }
3236 
3237   /*
3238    * @param size
3239    * @return True if size is over the flush threshold
3240    */
3241   private boolean isFlushSize(final long size) {
3242     return size > this.memstoreFlushSize;
3243   }
3244 
3245   /**
3246    * Read the edits log put under this region by wal log splitting process.  Put
3247    * the recovered edits back up into this region.
3248    *
3249    * <p>We can ignore any log message that has a sequence ID that's equal to or
3250    * lower than minSeqId.  (Because we know such log messages are already
3251    * reflected in the HFiles.)
3252    *
3253    * <p>While this is running we are putting pressure on memory yet we are
3254    * outside of our usual accounting because we are not yet an onlined region
3255    * (this stuff is being run as part of Region initialization).  This means
3256    * that if we're up against global memory limits, we'll not be flagged to flush
3257    * because we are not online. We can't be flushed by usual mechanisms anyways;
3258    * we're not yet online so our relative sequenceids are not yet aligned with
3259    * HLog sequenceids -- not till we come up online, post processing of split
3260    * edits.
3261    *
3262    * <p>But to help relieve memory pressure, at least manage our own heap size
3263    * flushing if are in excess of per-region limits.  Flushing, though, we have
3264    * to be careful and avoid using the regionserver/hlog sequenceid.  Its running
3265    * on a different line to whats going on in here in this region context so if we
3266    * crashed replaying these edits, but in the midst had a flush that used the
3267    * regionserver log with a sequenceid in excess of whats going on in here
3268    * in this region and with its split editlogs, then we could miss edits the
3269    * next time we go to recover. So, we have to flush inline, using seqids that
3270    * make sense in a this single region context only -- until we online.
3271    *
3272    * @param regiondir
3273    * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
3274    * the maxSeqId for the store to be applied, else its skipped.
3275    * @param reporter
3276    * @return the sequence id of the last edit added to this region out of the
3277    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3278    * @throws UnsupportedEncodingException
3279    * @throws IOException
3280    */
3281   protected long replayRecoveredEditsIfAny(final Path regiondir,
3282       Map<byte[], Long> maxSeqIdInStores,
3283       final CancelableProgressable reporter, final MonitoredTask status)
3284       throws UnsupportedEncodingException, IOException {
3285     long minSeqIdForTheRegion = -1;
3286     for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3287       if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3288         minSeqIdForTheRegion = maxSeqIdInStore;
3289       }
3290     }
3291     long seqid = minSeqIdForTheRegion;
3292     NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
3293     if (files == null || files.isEmpty()) return seqid;
3294 
3295     for (Path edits: files) {
3296       if (edits == null || !this.fs.exists(edits)) {
3297         LOG.warn("Null or non-existent edits file: " + edits);
3298         continue;
3299       }
3300       if (isZeroLengthThenDelete(this.fs, edits)) continue;
3301 
3302       long maxSeqId = Long.MAX_VALUE;
3303       String fileName = edits.getName();
3304       maxSeqId = Math.abs(Long.parseLong(fileName));
3305       if (maxSeqId <= minSeqIdForTheRegion) {
3306         String msg = "Maximum sequenceid for this log is " + maxSeqId
3307             + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3308             + ", skipped the whole file, path=" + edits;
3309         LOG.debug(msg);
3310         continue;
3311       }
3312 
3313       try {
3314         seqid = replayRecoveredEdits(edits, maxSeqIdInStores, reporter);
3315       } catch (IOException e) {
3316         boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
3317         if (skipErrors) {
3318           Path p = HLog.moveAsideBadEditsFile(fs, edits);
3319           LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits +
3320             " as " + p, e);
3321         } else {
3322           throw e;
3323         }
3324       }
3325       // The edits size added into rsAccounting during this replaying will not
3326       // be required any more. So just clear it.
3327       if (this.rsAccounting != null) {
3328         this.rsAccounting.clearRegionReplayEditsSize(this.regionInfo.getRegionName());
3329       }
3330     }
3331     if (seqid > minSeqIdForTheRegion) {
3332       // Then we added some edits to memory. Flush and cleanup split edit files.
3333       internalFlushcache(null, seqid, status);
3334     }
3335     // Now delete the content of recovered edits.  We're done w/ them.
3336     for (Path file: files) {
3337       if (!HBaseFileSystem.deleteFileFromFileSystem(fs, file)) {
3338         LOG.error("Failed delete of " + file);
3339       } else {
3340         LOG.debug("Deleted recovered.edits file=" + file);
3341       }
3342     }
3343     return seqid;
3344   }
3345 
3346   /*
3347    * @param edits File of recovered edits.
3348    * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in log
3349    * must be larger than this to be replayed for each store.
3350    * @param reporter
3351    * @return the sequence id of the last edit added to this region out of the
3352    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3353    * @throws IOException
3354    */
3355   private long replayRecoveredEdits(final Path edits,
3356       Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3357     throws IOException {
3358     String msg = "Replaying edits from " + edits;
3359     LOG.info(msg);
3360     MonitoredTask status = TaskMonitor.get().createStatus(msg);
3361 
3362     status.setStatus("Opening logs");
3363     HLog.Reader reader = null;
3364     try {
3365       reader = HLog.getReader(this.fs, edits, conf);
3366       long currentEditSeqId = -1;
3367       long firstSeqIdInLog = -1;
3368       long skippedEdits = 0;
3369       long editsCount = 0;
3370       long intervalEdits = 0;
3371       HLog.Entry entry;
3372       Store store = null;
3373       boolean reported_once = false;
3374 
3375       try {
3376         // How many edits seen before we check elapsed time
3377         int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
3378             2000);
3379         // How often to send a progress report (default 1/2 master timeout)
3380         int period = this.conf.getInt("hbase.hstore.report.period",
3381             this.conf.getInt("hbase.master.assignment.timeoutmonitor.timeout",
3382                 180000) / 2);
3383         long lastReport = EnvironmentEdgeManager.currentTimeMillis();
3384 
3385         while ((entry = reader.next()) != null) {
3386           HLogKey key = entry.getKey();
3387           WALEdit val = entry.getEdit();
3388 
3389           if (reporter != null) {
3390             intervalEdits += val.size();
3391             if (intervalEdits >= interval) {
3392               // Number of edits interval reached
3393               intervalEdits = 0;
3394               long cur = EnvironmentEdgeManager.currentTimeMillis();
3395               if (lastReport + period <= cur) {
3396                 status.setStatus("Replaying edits..." +
3397                     " skipped=" + skippedEdits +
3398                     " edits=" + editsCount);
3399                 // Timeout reached
3400                 if(!reporter.progress()) {
3401                   msg = "Progressable reporter failed, stopping replay";
3402                   LOG.warn(msg);
3403                   status.abort(msg);
3404                   throw new IOException(msg);
3405                 }
3406                 reported_once = true;
3407                 lastReport = cur;
3408               }
3409             }
3410           }
3411 
3412           // Start coprocessor replay here. The coprocessor is for each WALEdit
3413           // instead of a KeyValue.
3414           if (coprocessorHost != null) {
3415             status.setStatus("Running pre-WAL-restore hook in coprocessors");
3416             if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
3417               // if bypass this log entry, ignore it ...
3418               continue;
3419             }
3420           }
3421 
3422           if (firstSeqIdInLog == -1) {
3423             firstSeqIdInLog = key.getLogSeqNum();
3424           }
3425           boolean flush = false;
3426           for (KeyValue kv: val.getKeyValues()) {
3427             // Check this edit is for me. Also, guard against writing the special
3428             // METACOLUMN info such as HBASE::CACHEFLUSH entries
3429             if (kv.matchingFamily(HLog.METAFAMILY) ||
3430                 !Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes())) {
3431               skippedEdits++;
3432               continue;
3433                 }
3434             // Figure which store the edit is meant for.
3435             if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
3436               store = this.stores.get(kv.getFamily());
3437             }
3438             if (store == null) {
3439               // This should never happen.  Perhaps schema was changed between
3440               // crash and redeploy?
3441               LOG.warn("No family for " + kv);
3442               skippedEdits++;
3443               continue;
3444             }
3445             // Now, figure if we should skip this edit.
3446             if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
3447                 .getName())) {
3448               skippedEdits++;
3449               continue;
3450             }
3451             currentEditSeqId = key.getLogSeqNum();
3452             // Once we are over the limit, restoreEdit will keep returning true to
3453             // flush -- but don't flush until we've played all the kvs that make up
3454             // the WALEdit.
3455             flush = restoreEdit(store, kv);
3456             editsCount++;
3457           }
3458           if (flush) internalFlushcache(null, currentEditSeqId, status);
3459 
3460           if (coprocessorHost != null) {
3461             coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3462           }
3463         }
3464       } catch (EOFException eof) {
3465         Path p = HLog.moveAsideBadEditsFile(fs, edits);
3466         msg = "Encountered EOF. Most likely due to Master failure during " +
3467             "log spliting, so we have this data in another edit.  " +
3468             "Continuing, but renaming " + edits + " as " + p;
3469         LOG.warn(msg, eof);
3470         status.abort(msg);
3471       } catch (IOException ioe) {
3472         // If the IOE resulted from bad file format,
3473         // then this problem is idempotent and retrying won't help
3474         if (ioe.getCause() instanceof ParseException) {
3475           Path p = HLog.moveAsideBadEditsFile(fs, edits);
3476           msg = "File corruption encountered!  " +
3477               "Continuing, but renaming " + edits + " as " + p;
3478           LOG.warn(msg, ioe);
3479           status.setStatus(msg);
3480         } else {
3481           status.abort(StringUtils.stringifyException(ioe));
3482           // other IO errors may be transient (bad network connection,
3483           // checksum exception on one datanode, etc).  throw & retry
3484           throw ioe;
3485         }
3486       }
3487       if (reporter != null && !reported_once) {
3488         reporter.progress();
3489       }
3490       msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3491         ", firstSequenceidInLog=" + firstSeqIdInLog +
3492         ", maxSequenceidInLog=" + currentEditSeqId + ", path=" + edits;
3493       status.markComplete(msg);
3494       LOG.debug(msg);
3495       return currentEditSeqId;
3496     } finally {
3497       status.cleanup();
3498       if (reader != null) {
3499          reader.close();
3500       }
3501     }
3502   }
3503 
3504   /**
3505    * Used by tests
3506    * @param s Store to add edit too.
3507    * @param kv KeyValue to add.
3508    * @return True if we should flush.
3509    */
3510   protected boolean restoreEdit(final Store s, final KeyValue kv) {
3511     long kvSize = s.add(kv);
3512     if (this.rsAccounting != null) {
3513       rsAccounting.addAndGetRegionReplayEditsSize(this.regionInfo.getRegionName(), kvSize);
3514     }
3515     return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3516   }
3517 
3518   /*
3519    * @param fs
3520    * @param p File to check.
3521    * @return True if file was zero-length (and if so, we'll delete it in here).
3522    * @throws IOException
3523    */
3524   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3525       throws IOException {
3526     FileStatus stat = fs.getFileStatus(p);
3527     if (stat.getLen() > 0) return false;
3528     LOG.warn("File " + p + " is zero-length, deleting.");
3529     HBaseFileSystem.deleteFileFromFileSystem(fs, p);
3530     return true;
3531   }
3532 
3533   protected Store instantiateHStore(Path tableDir, HColumnDescriptor c)
3534       throws IOException {
3535     return new Store(tableDir, this, c, this.fs, this.conf);
3536   }
3537 
3538   /**
3539    * Return HStore instance.
3540    * Use with caution.  Exposed for use of fixup utilities.
3541    * @param column Name of column family hosted by this region.
3542    * @return Store that goes with the family on passed <code>column</code>.
3543    * TODO: Make this lookup faster.
3544    */
3545   public Store getStore(final byte [] column) {
3546     return this.stores.get(column);
3547   }
3548 
3549   public Map<byte[], Store> getStores() {
3550     return this.stores;
3551   }
3552 
3553   /**
3554    * Return list of storeFiles for the set of CFs.
3555    * Uses closeLock to prevent the race condition where a region closes
3556    * in between the for loop - closing the stores one by one, some stores
3557    * will return 0 files.
3558    * @return List of storeFiles.
3559    */
3560   public List<String> getStoreFileList(final byte [][] columns)
3561     throws IllegalArgumentException {
3562     List<String> storeFileNames = new ArrayList<String>();
3563     synchronized(closeLock) {
3564       for(byte[] column : columns) {
3565         Store store = this.stores.get(column);
3566         if (store == null) {
3567           throw new IllegalArgumentException("No column family : " +
3568               new String(column) + " available");
3569         }
3570         List<StoreFile> storeFiles = store.getStorefiles();
3571         for (StoreFile storeFile: storeFiles) {
3572           storeFileNames.add(storeFile.getPath().toString());
3573         }
3574       }
3575     }
3576     return storeFileNames;
3577   }
3578   //////////////////////////////////////////////////////////////////////////////
3579   // Support code
3580   //////////////////////////////////////////////////////////////////////////////
3581 
3582   /** Make sure this is a valid row for the HRegion */
3583   void checkRow(final byte [] row, String op) throws IOException {
3584     if(!rowIsInRange(regionInfo, row)) {
3585       throw new WrongRegionException("Requested row out of range for " +
3586           op + " on HRegion " + this + ", startKey='" +
3587           Bytes.toStringBinary(regionInfo.getStartKey()) + "', getEndKey()='" +
3588           Bytes.toStringBinary(regionInfo.getEndKey()) + "', row='" +
3589           Bytes.toStringBinary(row) + "'");
3590     }
3591   }
3592 
3593   /**
3594    * Obtain a lock on the given row.  Blocks until success.
3595    *
3596    * I know it's strange to have two mappings:
3597    * <pre>
3598    *   ROWS  ==> LOCKS
3599    * </pre>
3600    * as well as
3601    * <pre>
3602    *   LOCKS ==> ROWS
3603    * </pre>
3604    *
3605    * But it acts as a guard on the client; a miswritten client just can't
3606    * submit the name of a row and start writing to it; it must know the correct
3607    * lockid, which matches the lock list in memory.
3608    *
3609    * <p>It would be more memory-efficient to assume a correctly-written client,
3610    * which maybe we'll do in the future.
3611    *
3612    * @param row Name of row to lock.
3613    * @throws IOException
3614    * @return The id of the held lock.
3615    */
3616   public Integer obtainRowLock(final byte [] row) throws IOException {
3617     startRegionOperation();
3618     this.writeRequestsCount.increment();
3619     this.opMetrics.setWriteRequestCountMetrics( this.writeRequestsCount.get());
3620     try {
3621       return internalObtainRowLock(new HashedBytes(row), true);
3622     } finally {
3623       closeRegionOperation();
3624     }
3625   }
3626 
3627   /**
3628    * Obtains or tries to obtain the given row lock.
3629    * @param waitForLock if true, will block until the lock is available.
3630    *        Otherwise, just tries to obtain the lock and returns
3631    *        null if unavailable.
3632    */
3633   private Integer internalObtainRowLock(final HashedBytes rowKey, boolean waitForLock)
3634       throws IOException {
3635     checkRow(rowKey.getBytes(), "row lock");
3636     startRegionOperation();
3637     try {
3638       CountDownLatch rowLatch = new CountDownLatch(1);
3639 
3640       // loop until we acquire the row lock (unless !waitForLock)
3641       while (true) {
3642         CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
3643         if (existingLatch == null) {
3644           break;
3645         } else {
3646           // row already locked
3647           if (!waitForLock) {
3648             return null;
3649           }
3650           try {
3651             if (!existingLatch.await(this.rowLockWaitDuration,
3652                             TimeUnit.MILLISECONDS)) {
3653               throw new IOException("Timed out on getting lock for row=" + rowKey);
3654             }
3655           } catch (InterruptedException ie) {
3656             LOG.warn("internalObtainRowLock interrupted for row=" + rowKey);
3657             InterruptedIOException iie = new InterruptedIOException();
3658             iie.initCause(ie);
3659             throw iie;
3660           }
3661         }
3662       }
3663 
3664       // loop until we generate an unused lock id
3665       while (true) {
3666         Integer lockId = lockIdGenerator.incrementAndGet();
3667         HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
3668         if (existingRowKey == null) {
3669           return lockId;
3670         } else {
3671           // lockId already in use, jump generator to a new spot
3672           lockIdGenerator.set(rand.nextInt());
3673         }
3674       }
3675     } finally {
3676       closeRegionOperation();
3677     }
3678   }
3679 
3680   /**
3681    * Used by unit tests.
3682    * @param lockid
3683    * @return Row that goes with <code>lockid</code>
3684    */
3685   byte[] getRowFromLock(final Integer lockid) {
3686     HashedBytes rowKey = lockIds.get(lockid);
3687     return rowKey == null ? null : rowKey.getBytes();
3688   }
3689 
3690   /**
3691    * Release the row lock!
3692    * @param lockId  The lock ID to release.
3693    */
3694   public void releaseRowLock(final Integer lockId) {
3695     if (lockId == null) return; // null lock id, do nothing
3696     HashedBytes rowKey = lockIds.remove(lockId);
3697     if (rowKey == null) {
3698       LOG.warn("Release unknown lockId: " + lockId);
3699       return;
3700     }
3701     CountDownLatch rowLatch = lockedRows.remove(rowKey);
3702     if (rowLatch == null) {
3703       LOG.error("Releases row not locked, lockId: " + lockId + " row: "
3704           + rowKey);
3705       return;
3706     }
3707     rowLatch.countDown();
3708   }
3709 
3710   /**
3711    * See if row is currently locked.
3712    * @param lockid
3713    * @return boolean
3714    */
3715   boolean isRowLocked(final Integer lockId) {
3716     return lockIds.containsKey(lockId);
3717   }
3718 
3719   /**
3720    * Returns existing row lock if found, otherwise
3721    * obtains a new row lock and returns it.
3722    * @param lockid requested by the user, or null if the user didn't already hold lock
3723    * @param row the row to lock
3724    * @param waitForLock if true, will block until the lock is available, otherwise will
3725    * simply return null if it could not acquire the lock.
3726    * @return lockid or null if waitForLock is false and the lock was unavailable.
3727    */
3728   public Integer getLock(Integer lockid, byte [] row, boolean waitForLock)
3729   throws IOException {
3730     return getLock(lockid, new HashedBytes(row), waitForLock);
3731   }
3732 
3733   /**
3734    * Returns existing row lock if found, otherwise
3735    * obtains a new row lock and returns it.
3736    * @param lockid requested by the user, or null if the user didn't already hold lock
3737    * @param row the row to lock
3738    * @param waitForLock if true, will block until the lock is available, otherwise will
3739    * simply return null if it could not acquire the lock.
3740    * @return lockid or null if waitForLock is false and the lock was unavailable.
3741    */
3742   protected Integer getLock(Integer lockid, HashedBytes row, boolean waitForLock)
3743   throws IOException {
3744     Integer lid;
3745     if (lockid == null) {
3746       lid = internalObtainRowLock(row, waitForLock);
3747     } else {
3748       HashedBytes rowFromLock = lockIds.get(lockid);
3749       if (!row.equals(rowFromLock)) {
3750         throw new IOException("Invalid row lock: LockId: " + lockid + " holds the lock for row: " + rowFromLock + " but wanted lock for row: " + row);
3751       }
3752       lid = lockid;
3753     }
3754     return lid;
3755   }
3756 
3757   /**
3758    * Determines whether multiple column families are present
3759    * Precondition: familyPaths is not null
3760    *
3761    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3762    */
3763   private static boolean hasMultipleColumnFamilies(
3764       List<Pair<byte[], String>> familyPaths) {
3765     boolean multipleFamilies = false;
3766     byte[] family = null;
3767     for (Pair<byte[], String> pair : familyPaths) {
3768       byte[] fam = pair.getFirst();
3769       if (family == null) {
3770         family = fam;
3771       } else if (!Bytes.equals(family, fam)) {
3772         multipleFamilies = true;
3773         break;
3774       }
3775     }
3776     return multipleFamilies;
3777   }
3778 
3779   /**
3780    * Attempts to atomically load a group of hfiles.  This is critical for loading
3781    * rows with multiple column families atomically.
3782    *
3783    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3784    * @return true if successful, false if failed recoverably
3785    * @throws IOException if failed unrecoverably.
3786    */
3787   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths) throws IOException {
3788     return bulkLoadHFiles(familyPaths, false);
3789   }
3790 
3791   /**
3792    * Attempts to atomically load a group of hfiles. This is critical for loading rows with multiple
3793    * column families atomically.
3794    * @param familyPaths List of Pair<byte[] column family, String hfilePath> * @param assignSeqNum
3795    *          should we assign sequence numbers
3796    * @return true if successful, false if failed recoverably
3797    * @throws IOException if failed unrecoverably.
3798    */
3799   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId)
3800       throws IOException {
3801     return bulkLoadHFiles(familyPaths, null, assignSeqId);
3802   }
3803 
3804   /**
3805    * Attempts to atomically load a group of hfiles.  This is critical for loading
3806    * rows with multiple column families atomically.
3807    *
3808    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3809    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
3810    * file about to be bulk loaded
3811    * @return true if successful, false if failed recoverably
3812    * @throws IOException if failed unrecoverably.
3813    */
3814   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3815       BulkLoadListener bulkLoadListener) throws IOException {
3816     return bulkLoadHFiles(familyPaths, bulkLoadListener, false);
3817   }
3818 
3819   /**
3820    * Attempts to atomically load a group of hfiles. This is critical for loading rows with multiple
3821    * column families atomically.
3822    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3823    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a file about to be
3824    *          bulk loaded * @param assignSeqNum should we assign sequence numbers
3825    * @return true if successful, false if failed recoverably
3826    * @throws IOException if failed unrecoverably.
3827    */
3828   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3829       BulkLoadListener bulkLoadListener, boolean assignSeqId) throws IOException {
3830     Preconditions.checkNotNull(familyPaths);
3831     // we need writeLock for multi-family bulk load
3832     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
3833     try {
3834       this.writeRequestsCount.increment();
3835       this.opMetrics.setWriteRequestCountMetrics( this.writeRequestsCount.get());
3836 
3837       // There possibly was a split that happend between when the split keys
3838       // were gathered and before the HRegion's write lock was taken.  We need
3839       // to validate the HFile region before attempting to bulk load all of them
3840       List<IOException> ioes = new ArrayList<IOException>();
3841       List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
3842       for (Pair<byte[], String> p : familyPaths) {
3843         byte[] familyName = p.getFirst();
3844         String path = p.getSecond();
3845 
3846         Store store = getStore(familyName);
3847         if (store == null) {
3848           IOException ioe = new DoNotRetryIOException(
3849               "No such column family " + Bytes.toStringBinary(familyName));
3850           ioes.add(ioe);
3851         } else {
3852           try {
3853             store.assertBulkLoadHFileOk(new Path(path));
3854           } catch (WrongRegionException wre) {
3855             // recoverable (file doesn't fit in region)
3856             failures.add(p);
3857           } catch (IOException ioe) {
3858             // unrecoverable (hdfs problem)
3859             ioes.add(ioe);
3860           }
3861         }
3862       }
3863 
3864       // validation failed because of some sort of IO problem.
3865       if (ioes.size() != 0) {
3866         IOException e = MultipleIOException.createIOException(ioes);
3867         LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
3868         throw e;
3869       }
3870 
3871       // validation failed, bail out before doing anything permanent.
3872       if (failures.size() != 0) {
3873         StringBuilder list = new StringBuilder();
3874         for (Pair<byte[], String> p : failures) {
3875           list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
3876             .append(p.getSecond());
3877         }
3878         // problem when validating
3879         LOG.warn("There was a recoverable bulk load failure likely due to a" +
3880             " split.  These (family, HFile) pairs were not loaded: " + list);
3881         return false;
3882       }
3883 
3884       long seqId = -1;
3885       // We need to assign a sequential ID that's in between two memstores in order to preserve
3886       // the guarantee that all the edits lower than the highest sequential ID from all the
3887       // HFiles are flushed on disk. See HBASE-10958.
3888       if (assignSeqId) {
3889         FlushResult fs = this.flushcache();
3890         if (fs.isFlushSucceeded()) {
3891           seqId = fs.flushSequenceId;
3892         } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
3893           seqId = this.log.obtainSeqNum();
3894         } else {
3895           throw new IOException("Could not bulk load with an assigned sequential ID because the " +
3896               "flush didn't run. Reason for not flushing: " + fs.failureReason);
3897         }
3898       }
3899 
3900       for (Pair<byte[], String> p : familyPaths) {
3901         byte[] familyName = p.getFirst();
3902         String path = p.getSecond();
3903         Store store = getStore(familyName);
3904         try {
3905           String finalPath = path;
3906           if(bulkLoadListener != null) {
3907             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
3908           }
3909           store.bulkLoadHFile(finalPath, seqId);
3910           if(bulkLoadListener != null) {
3911             bulkLoadListener.doneBulkLoad(familyName, path);
3912           }
3913         } catch (IOException ioe) {
3914           // a failure here causes an atomicity violation that we currently
3915           // cannot recover from since it is likely a failed hdfs operation.
3916 
3917           // TODO Need a better story for reverting partial failures due to HDFS.
3918           LOG.error("There was a partial failure due to IO when attempting to" +
3919               " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond());
3920           if(bulkLoadListener != null) {
3921             try {
3922               bulkLoadListener.failedBulkLoad(familyName, path);
3923             } catch (Exception ex) {
3924               LOG.error("Error while calling failedBulkLoad for family "+
3925                   Bytes.toString(familyName)+" with path "+path, ex);
3926             }
3927           }
3928           throw ioe;
3929         }
3930       }
3931       return true;
3932     } finally {
3933       closeBulkRegionOperation();
3934     }
3935   }
3936 
3937   @Override
3938   public boolean equals(Object o) {
3939     if (!(o instanceof HRegion)) {
3940       return false;
3941     }
3942     return Bytes.equals(this.getRegionName(), ((HRegion) o).getRegionName());
3943   }
3944 
3945   @Override
3946   public int hashCode() {
3947     return Bytes.hashCode(this.getRegionName());
3948   }
3949 
3950   @Override
3951   public String toString() {
3952     return this.regionInfo.getRegionNameAsString();
3953   }
3954 
3955   /** @return Path of region base directory */
3956   public Path getTableDir() {
3957     return this.tableDir;
3958   }
3959 
3960   /**
3961    * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
3962    */
3963   class RegionScannerImpl implements RegionScanner {
3964     // Package local for testability
3965     KeyValueHeap storeHeap = null;
3966     /** Heap of key-values that are not essential for the provided filters and are thus read
3967      * on demand, if on-demand column family loading is enabled.*/
3968     KeyValueHeap joinedHeap = null;
3969     /**
3970      * If the joined heap data gathering is interrupted due to scan limits, this will
3971      * contain the row for which we are populating the values.*/
3972     private KeyValue joinedContinuationRow = null;
3973     // KeyValue indicating that limit is reached when scanning
3974     private final KeyValue KV_LIMIT = new KeyValue();
3975     private final byte [] stopRow;
3976     private final Filter filter;
3977     private int batch;
3978     private int isScan;
3979     private boolean filterClosed = false;
3980     private long readPt;
3981     private HRegion region;
3982 
3983     public HRegionInfo getRegionInfo() {
3984       return regionInfo;
3985     }
3986     
3987     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
3988         throws IOException {
3989       // DebugPrint.println("HRegionScanner.<init>");
3990       this.region = region;
3991       this.filter = scan.getFilter();
3992       this.batch = scan.getBatch();
3993       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
3994         this.stopRow = null;
3995       } else {
3996         this.stopRow = scan.getStopRow();
3997       }
3998       // If we are doing a get, we want to be [startRow,endRow] normally
3999       // it is [startRow,endRow) and if startRow=endRow we get nothing.
4000       this.isScan = scan.isGetScan() ? -1 : 0;
4001 
4002       // synchronize on scannerReadPoints so that nobody calculates
4003       // getSmallestReadPoint, before scannerReadPoints is updated.
4004       IsolationLevel isolationLevel = scan.getIsolationLevel();
4005       synchronized(scannerReadPoints) {
4006         if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
4007           // This scan can read even uncommitted transactions
4008           this.readPt = Long.MAX_VALUE;
4009           MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
4010         } else {
4011           this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
4012         }
4013         scannerReadPoints.put(this, this.readPt);
4014       }
4015 
4016       // Here we separate all scanners into two lists - scanner that provide data required
4017       // by the filter to operate (scanners list) and all others (joinedScanners list).
4018       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
4019       List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
4020       if (additionalScanners != null) {
4021         scanners.addAll(additionalScanners);
4022       }
4023 
4024       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
4025           scan.getFamilyMap().entrySet()) {
4026         Store store = stores.get(entry.getKey());
4027         KeyValueScanner scanner;
4028         try {
4029           scanner = store.getScanner(scan, entry.getValue());
4030         } catch (FileNotFoundException e) {
4031           abortRegionServer(e.getMessage());
4032           throw new NotServingRegionException(regionInfo.getRegionNameAsString() + " is closing");
4033         }
4034         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
4035           || FilterBase.isFamilyEssential(this.filter, entry.getKey())) {
4036           scanners.add(scanner);
4037         } else {
4038           joinedScanners.add(scanner);
4039         }
4040       }
4041       this.storeHeap = new KeyValueHeap(scanners, comparator);
4042       if (!joinedScanners.isEmpty()) {
4043         this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
4044       }
4045     }
4046 
4047     RegionScannerImpl(Scan scan, HRegion region) throws IOException {
4048       this(scan, null, region);
4049     }
4050 
4051     @Override
4052     public long getMvccReadPoint() {
4053       return this.readPt;
4054     }
4055     /**
4056      * Reset both the filter and the old filter.
4057      */
4058     protected void resetFilters() {
4059       if (filter != null) {
4060         filter.reset();
4061       }
4062     }
4063 
4064     @Override
4065     public boolean next(List<KeyValue> outResults, int limit)
4066         throws IOException {
4067       return next(outResults, limit, null);
4068     }
4069 
4070     @Override
4071     public synchronized boolean next(List<KeyValue> outResults, int limit,
4072         String metric) throws IOException {
4073       if (this.filterClosed) {
4074         throw new UnknownScannerException("Scanner was closed (timed out?) " +
4075             "after we renewed it. Could be caused by a very slow scanner " +
4076             "or a lengthy garbage collection");
4077       }
4078       startRegionOperation();
4079       readRequestsCount.increment();
4080       opMetrics.setReadRequestCountMetrics(readRequestsCount.get());
4081       try {
4082 
4083         // This could be a new thread from the last time we called next().
4084         MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
4085 
4086         return nextRaw(outResults, limit, metric);
4087       } finally {
4088         closeRegionOperation();
4089       }
4090     }
4091 
4092     @Override
4093     public boolean nextRaw(List<KeyValue> outResults, String metric)
4094         throws IOException {
4095       return nextRaw(outResults, batch, metric);
4096     }
4097 
4098     @Override
4099     public boolean nextRaw(List<KeyValue> outResults, int limit,
4100         String metric) throws IOException {
4101       boolean returnResult;
4102       if (outResults.isEmpty()) {
4103         // Usually outResults is empty. This is true when next is called
4104         // to handle scan or get operation.
4105         returnResult = nextInternal(outResults, limit, metric);
4106       } else {
4107         List<KeyValue> tmpList = new ArrayList<KeyValue>();
4108         returnResult = nextInternal(tmpList, limit, metric);
4109         outResults.addAll(tmpList);
4110       }
4111       resetFilters();
4112       if (isFilterDoneInternal()) {
4113         return false;
4114       }
4115       return returnResult;
4116     }
4117 
4118     @Override
4119     public boolean next(List<KeyValue> outResults)
4120         throws IOException {
4121       // apply the batching limit by default
4122       return next(outResults, batch, null);
4123     }
4124 
4125     @Override
4126     public boolean next(List<KeyValue> outResults, String metric)
4127         throws IOException {
4128       // apply the batching limit by default
4129       return next(outResults, batch, metric);
4130     }
4131 
4132     private void populateFromJoinedHeap(List<KeyValue> results, int limit, String metric)
4133         throws IOException {
4134       assert joinedContinuationRow != null;
4135       KeyValue kv = populateResult(results, this.joinedHeap, limit,
4136         joinedContinuationRow.getBuffer(), joinedContinuationRow.getRowOffset(),
4137         joinedContinuationRow.getRowLength(), metric);
4138       if (kv != KV_LIMIT) {
4139         // We are done with this row, reset the continuation.
4140         joinedContinuationRow = null;
4141       }
4142       // As the data is obtained from two independent heaps, we need to
4143       // ensure that result list is sorted, because Result relies on that.
4144       Collections.sort(results, comparator);
4145     }
4146 
4147     /**
4148      * Fetches records with this row into result list, until next row or limit (if not -1).
4149      * @param results
4150      * @param heap KeyValueHeap to fetch data from. It must be positioned on correct row before call.
4151      * @param limit Max amount of KVs to place in result list, -1 means no limit.
4152      * @param currentRow Byte array with key we are fetching.
4153      * @param offset offset for currentRow
4154      * @param length length for currentRow
4155      * @param metric Metric key to be passed into KeyValueHeap::next().
4156      * @return true if limit reached, false otherwise.
4157      */
4158     private KeyValue populateResult(List<KeyValue> results, KeyValueHeap heap, int limit,
4159         byte[] currentRow, int offset, short length, String metric) throws IOException {
4160       KeyValue nextKv;
4161       try {
4162         do {
4163           heap.next(results, limit - results.size(), metric);
4164           if (limit > 0 && results.size() == limit) {
4165             return KV_LIMIT;
4166           }
4167           nextKv = heap.peek();
4168         } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
4169       } catch (FileNotFoundException e) {
4170         abortRegionServer(e.getMessage());
4171         throw new NotServingRegionException(regionInfo.getRegionNameAsString() + " is closing");
4172       }
4173       return nextKv;
4174     }
4175 
4176     /*
4177      * @return True if a filter rules the scanner is over, done.
4178      */
4179     public synchronized boolean isFilterDone() {
4180       return isFilterDoneInternal();
4181     }
4182 
4183     private boolean isFilterDoneInternal() {
4184       return this.filter != null && this.filter.filterAllRemaining();
4185     }
4186 
4187     private boolean nextInternal(List<KeyValue> results, int limit, String metric)
4188     throws IOException {
4189       if (!results.isEmpty()) {
4190         throw new IllegalArgumentException("First parameter should be an empty list");
4191       }
4192       RpcCallContext rpcCall = HBaseServer.getCurrentCall();
4193       // The loop here is used only when at some point during the next we determine
4194       // that due to effects of filters or otherwise, we have an empty row in the result.
4195       // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
4196       // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
4197       // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
4198       while (true) {
4199         if (rpcCall != null) {
4200           // If a user specifies a too-restrictive or too-slow scanner, the
4201           // client might time out and disconnect while the server side
4202           // is still processing the request. We should abort aggressively
4203           // in that case.
4204           rpcCall.throwExceptionIfCallerDisconnected();
4205         }
4206 
4207         // Let's see what we have in the storeHeap.
4208         KeyValue current = this.storeHeap.peek();
4209 
4210         byte[] currentRow = null;
4211         int offset = 0;
4212         short length = 0;
4213         if (current != null) {
4214           currentRow = current.getBuffer();
4215           offset = current.getRowOffset();
4216           length = current.getRowLength();
4217         }
4218         boolean stopRow = isStopRow(currentRow, offset, length);
4219         // Check if we were getting data from the joinedHeap abd hit the limit.
4220         // If not, then it's main path - getting results from storeHeap.
4221         if (joinedContinuationRow == null) {
4222           // First, check if we are at a stop row. If so, there are no more results.
4223           if (stopRow) {
4224             if (filter != null && filter.hasFilterRow()) {
4225               filter.filterRow(results);
4226             }
4227             if (filter != null && filter.filterRow()) {
4228               results.clear();
4229             }
4230             return false;
4231           }
4232 
4233           // Check if rowkey filter wants to exclude this row. If so, loop to next.
4234           // Techically, if we hit limits before on this row, we don't need this call.
4235           if (filterRowKey(currentRow, offset, length)) {
4236             results.clear();
4237             boolean moreRows = nextRow(currentRow, offset, length);
4238             if (!moreRows) return false;
4239             continue;
4240           }
4241 
4242           // Ok, we are good, let's try to get some results from the main heap.
4243           KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
4244               length, metric);
4245           if (nextKv == KV_LIMIT) {
4246             if (this.filter != null && filter.hasFilterRow()) {
4247               throw new IncompatibleFilterException(
4248                 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
4249             }
4250             return true; // We hit the limit.
4251           }
4252           stopRow = nextKv == null
4253               || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
4254           // save that the row was empty before filters applied to it.
4255           final boolean isEmptyRow = results.isEmpty();
4256 
4257           // We have the part of the row necessary for filtering (all of it, usually).
4258           // First filter with the filterRow(List).            
4259           if (filter != null && filter.hasFilterRow()) {
4260             filter.filterRow(results);
4261           }
4262 
4263           if (isEmptyRow || filterRow()) {
4264             results.clear();
4265             boolean moreRows = nextRow(currentRow, offset, length);
4266             if (!moreRows) return false;
4267 
4268             // This row was totally filtered out, if this is NOT the last row,
4269             // we should continue on. Otherwise, nothing else to do.
4270             if (!stopRow) continue;
4271             return false;
4272           }
4273 
4274           // Ok, we are done with storeHeap for this row.
4275           // Now we may need to fetch additional, non-essential data into row.
4276           // These values are not needed for filter to work, so we postpone their
4277           // fetch to (possibly) reduce amount of data loads from disk.
4278           if (this.joinedHeap != null) {
4279             KeyValue nextJoinedKv = joinedHeap.peek();
4280             // If joinedHeap is pointing to some other row, try to seek to a correct one.
4281             boolean mayHaveData =
4282               (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
4283                 || (this.joinedHeap.requestSeek(
4284                     KeyValue.createFirstOnRow(currentRow, offset, length), true, true)
4285                   && joinedHeap.peek() != null
4286                   && joinedHeap.peek().matchingRow(currentRow, offset, length));
4287             if (mayHaveData) {
4288               joinedContinuationRow = current;
4289               populateFromJoinedHeap(results, limit, metric);
4290             }
4291           }
4292         } else {
4293           // Populating from the joined map was stopped by limits, populate some more.
4294           populateFromJoinedHeap(results, limit, metric);
4295         }
4296 
4297         // We may have just called populateFromJoinedMap and hit the limits. If that is
4298         // the case, we need to call it again on the next next() invocation.
4299         if (joinedContinuationRow != null) {
4300           return true;
4301         }
4302 
4303         // Finally, we are done with both joinedHeap and storeHeap.
4304         // Double check to prevent empty rows from appearing in result. It could be
4305         // the case when SingleValueExcludeFilter is used.
4306         if (results.isEmpty()) {
4307           boolean moreRows = nextRow(currentRow, offset, length);
4308           if (!moreRows) return false;
4309           if (!stopRow) continue;
4310         }
4311 
4312         // We are done. Return the result.
4313         return !stopRow;
4314       }
4315     }
4316 
4317     private boolean filterRow() {
4318       return filter != null
4319           && filter.filterRow();
4320     }
4321     private boolean filterRowKey(byte[] row, int offset, short length) {
4322       return filter != null
4323           && filter.filterRowKey(row, offset, length);
4324     }
4325 
4326     protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
4327       KeyValue next;
4328       while((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
4329         this.storeHeap.next(MOCKED_LIST);       
4330       }
4331       resetFilters();
4332       // Calling the hook in CP which allows it to do a fast forward
4333       if (this.region.getCoprocessorHost() != null) {
4334         return this.region.getCoprocessorHost().postScannerFilterRow(this, currentRow, offset,
4335             length);
4336       }
4337       return true;
4338     }
4339 
4340     private boolean isStopRow(byte [] currentRow, int offset, short length) {
4341       return currentRow == null ||
4342           (stopRow != null &&
4343           comparator.compareRows(stopRow, 0, stopRow.length,
4344               currentRow, offset, length) <= isScan);
4345     }
4346 
4347     @Override
4348     public synchronized void close() {
4349       if (storeHeap != null) {
4350         storeHeap.close();
4351         storeHeap = null;
4352       }
4353       if (joinedHeap != null) {
4354         joinedHeap.close();
4355         joinedHeap = null;
4356       }
4357       // no need to sychronize here.
4358       scannerReadPoints.remove(this);
4359       this.filterClosed = true;
4360     }
4361 
4362     KeyValueHeap getStoreHeapForTesting() {
4363       return storeHeap;
4364     }
4365 
4366     @Override
4367     public synchronized boolean reseek(byte[] row) throws IOException {
4368       if (row == null) {
4369         throw new IllegalArgumentException("Row cannot be null.");
4370       }
4371       boolean result = false;
4372       startRegionOperation();
4373       try {
4374         // This could be a new thread from the last time we called next().
4375         MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
4376         KeyValue kv = KeyValue.createFirstOnRow(row);
4377         // use request seek to make use of the lazy seek option. See HBASE-5520
4378         result = this.storeHeap.requestSeek(kv, true, true);
4379         if (this.joinedHeap != null) {
4380           result = this.joinedHeap.requestSeek(kv, true, true) || result;
4381         }
4382       } catch (FileNotFoundException e) {
4383         abortRegionServer(e.getMessage());
4384         throw new NotServingRegionException(regionInfo.getRegionNameAsString() + " is closing");
4385       } finally {
4386         closeRegionOperation();
4387       }
4388       return result;
4389     }
4390   }
4391 
4392   // Utility methods
4393   /**
4394    * A utility method to create new instances of HRegion based on the
4395    * {@link HConstants#REGION_IMPL} configuration property.
4396    * @param tableDir qualified path of directory where region should be located,
4397    * usually the table directory.
4398    * @param log The HLog is the outbound log for any updates to the HRegion
4399    * (There's a single HLog for all the HRegions on a single HRegionServer.)
4400    * The log file is a logfile from the previous execution that's
4401    * custom-computed for this HRegion. The HRegionServer computes and sorts the
4402    * appropriate log info for this HRegion. If there is a previous log file
4403    * (implying that the HRegion has been written-to before), then read it from
4404    * the supplied path.
4405    * @param fs is the filesystem.
4406    * @param conf is global configuration settings.
4407    * @param regionInfo - HRegionInfo that describes the region
4408    * is new), then read them from the supplied path.
4409    * @param htd
4410    * @param rsServices
4411    * @return the new instance
4412    */
4413   public static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
4414       Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
4415       RegionServerServices rsServices) {
4416     try {
4417       @SuppressWarnings("unchecked")
4418       Class<? extends HRegion> regionClass =
4419           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
4420 
4421       Constructor<? extends HRegion> c =
4422           regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
4423               Configuration.class, HRegionInfo.class, HTableDescriptor.class,
4424               RegionServerServices.class);
4425 
4426       return c.newInstance(tableDir, log, fs, conf, regionInfo, htd, rsServices);
4427     } catch (Throwable e) {
4428       // todo: what should I throw here?
4429       throw new IllegalStateException("Could not instantiate a region instance.", e);
4430     }
4431   }
4432 
4433   /**
4434    * Convenience method creating new HRegions. Used by createTable and by the
4435    * bootstrap code in the HMaster constructor.
4436    * Note, this method creates an {@link HLog} for the created region. It
4437    * needs to be closed explicitly.  Use {@link HRegion#getLog()} to get
4438    * access.  <b>When done with a region created using this method, you will
4439    * need to explicitly close the {@link HLog} it created too; it will not be
4440    * done for you.  Not closing the log will leave at least a daemon thread
4441    * running.</b>  Call {@link #closeHRegion(HRegion)} and it will do
4442    * necessary cleanup for you.
4443    * @param info Info for region to create.
4444    * @param rootDir Root directory for HBase instance
4445    * @param conf
4446    * @param hTableDescriptor
4447    * @return new HRegion
4448    *
4449    * @throws IOException
4450    */
4451   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4452       final Configuration conf, final HTableDescriptor hTableDescriptor)
4453   throws IOException {
4454     return createHRegion(info, rootDir, conf, hTableDescriptor, null);
4455   }
4456 
4457   /**
4458    * This will do the necessary cleanup a call to {@link #createHRegion(HRegionInfo, Path, Configuration, HTableDescriptor)}
4459    * requires.  This method will close the region and then close its
4460    * associated {@link HLog} file.  You use it if you call the other createHRegion,
4461    * the one that takes an {@link HLog} instance but don't be surprised by the
4462    * call to the {@link HLog#closeAndDelete()} on the {@link HLog} the
4463    * HRegion was carrying.
4464    * @param r
4465    * @throws IOException
4466    */
4467   public static void closeHRegion(final HRegion r) throws IOException {
4468     if (r == null) return;
4469     r.close();
4470     if (r.getLog() == null) return;
4471     r.getLog().closeAndDelete();
4472   }
4473 
4474   /**
4475    * Convenience method creating new HRegions. Used by createTable.
4476    * The {@link HLog} for the created region needs to be closed explicitly.
4477    * Use {@link HRegion#getLog()} to get access.
4478    *
4479    * @param info Info for region to create.
4480    * @param rootDir Root directory for HBase instance
4481    * @param conf
4482    * @param hTableDescriptor
4483    * @param hlog shared HLog
4484    * @param boolean initialize - true to initialize the region
4485    * @return new HRegion
4486    *
4487    * @throws IOException
4488    */
4489   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4490                                       final Configuration conf,
4491                                       final HTableDescriptor hTableDescriptor,
4492                                       final HLog hlog,
4493                                       final boolean initialize)
4494       throws IOException {
4495     return createHRegion(info, rootDir, conf, hTableDescriptor,
4496         hlog, initialize, false);
4497   }
4498 
4499   /**
4500    * Convenience method creating new HRegions. Used by createTable.
4501    * The {@link HLog} for the created region needs to be closed
4502    * explicitly, if it is not null.
4503    * Use {@link HRegion#getLog()} to get access.
4504    *
4505    * @param info Info for region to create.
4506    * @param rootDir Root directory for HBase instance
4507    * @param conf
4508    * @param hTableDescriptor
4509    * @param hlog shared HLog
4510    * @param boolean initialize - true to initialize the region
4511    * @param boolean ignoreHLog
4512       - true to skip generate new hlog if it is null, mostly for createTable
4513    * @return new HRegion
4514    *
4515    * @throws IOException
4516    */
4517   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4518                                       final Configuration conf,
4519                                       final HTableDescriptor hTableDescriptor,
4520                                       final HLog hlog,
4521                                       final boolean initialize, final boolean ignoreHLog)
4522       throws IOException {
4523     LOG.info("creating HRegion " + info.getTableNameAsString()
4524         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
4525         " Table name == " + info.getTableNameAsString());
4526 
4527     Path tableDir =
4528         HTableDescriptor.getTableDir(rootDir, info.getTableName());
4529     Path regionDir = HRegion.getRegionDir(tableDir, info.getEncodedName());
4530     FileSystem fs = FileSystem.get(conf);
4531     HBaseFileSystem.makeDirOnFileSystem(fs, regionDir);
4532     // Write HRI to a file in case we need to recover .META.
4533     writeRegioninfoOnFilesystem(info, regionDir, fs, conf);
4534     HLog effectiveHLog = hlog;
4535     if (hlog == null && !ignoreHLog) {
4536       effectiveHLog = new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
4537           new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf);
4538     }
4539     HRegion region = HRegion.newHRegion(tableDir,
4540         effectiveHLog, fs, conf, info, hTableDescriptor, null);
4541     if (initialize) {
4542       region.initialize();
4543     }
4544     return region;
4545   }
4546 
4547   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4548                                       final Configuration conf,
4549                                       final HTableDescriptor hTableDescriptor,
4550                                       final HLog hlog)
4551     throws IOException {
4552     return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
4553   }
4554 
4555   /**
4556    * Open a Region.
4557    * @param info Info for region to be opened.
4558    * @param wal HLog for region to use. This method will call
4559    * HLog#setSequenceNumber(long) passing the result of the call to
4560    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4561    * up.  HRegionStore does this every time it opens a new region.
4562    * @param conf
4563    * @return new HRegion
4564    *
4565    * @throws IOException
4566    */
4567   public static HRegion openHRegion(final HRegionInfo info,
4568       final HTableDescriptor htd, final HLog wal,
4569       final Configuration conf)
4570   throws IOException {
4571     return openHRegion(info, htd, wal, conf, null, null);
4572   }
4573 
4574   /**
4575    * Open a Region.
4576    * @param info Info for region to be opened
4577    * @param htd
4578    * @param wal HLog for region to use. This method will call
4579    * HLog#setSequenceNumber(long) passing the result of the call to
4580    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4581    * up.  HRegionStore does this every time it opens a new region.
4582    * @param conf
4583    * @param rsServices An interface we can request flushes against.
4584    * @param reporter An interface we can report progress against.
4585    * @return new HRegion
4586    *
4587    * @throws IOException
4588    */
4589   public static HRegion openHRegion(final HRegionInfo info,
4590     final HTableDescriptor htd, final HLog wal, final Configuration conf,
4591     final RegionServerServices rsServices,
4592     final CancelableProgressable reporter)
4593   throws IOException {
4594     if (LOG.isDebugEnabled()) {
4595       LOG.debug("Opening region: " + info);
4596     }
4597     if (info == null) {
4598       throw new NullPointerException("Passed region info is null");
4599     }
4600     Path dir = HTableDescriptor.getTableDir(FSUtils.getRootDir(conf),
4601       info.getTableName());
4602     FileSystem fs = null;
4603     if (rsServices != null) {
4604       fs = rsServices.getFileSystem();
4605     }
4606     if (fs == null) {
4607       fs = FileSystem.get(conf);
4608     }
4609     HRegion r = HRegion.newHRegion(dir, wal, fs, conf, info,
4610       htd, rsServices);
4611     return r.openHRegion(reporter);
4612   }
4613 
4614   public static HRegion openHRegion(Path tableDir, final HRegionInfo info,
4615       final HTableDescriptor htd, final HLog wal, final Configuration conf)
4616   throws IOException {
4617     return openHRegion(tableDir, info, htd, wal, conf, null, null);
4618   }
4619 
4620   /**
4621    * Open a Region.
4622    * @param tableDir Table directory
4623    * @param info Info for region to be opened.
4624    * @param wal HLog for region to use. This method will call
4625    * HLog#setSequenceNumber(long) passing the result of the call to
4626    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4627    * up.  HRegionStore does this every time it opens a new region.
4628    * @param conf
4629    * @param reporter An interface we can report progress against.
4630    * @return new HRegion
4631    *
4632    * @throws IOException
4633    */
4634   public static HRegion openHRegion(final Path tableDir, final HRegionInfo info,
4635       final HTableDescriptor htd, final HLog wal, final Configuration conf,
4636       final RegionServerServices rsServices,
4637       final CancelableProgressable reporter)
4638   throws IOException {
4639     if (info == null) throw new NullPointerException("Passed region info is null");
4640     LOG.info("HRegion.openHRegion Region name ==" + info.getRegionNameAsString());
4641     if (LOG.isDebugEnabled()) {
4642       LOG.debug("Opening region: " + info);
4643     }
4644     Path dir = HTableDescriptor.getTableDir(tableDir,
4645         info.getTableName());
4646     HRegion r = HRegion.newHRegion(dir, wal, FileSystem.get(conf), conf, info,
4647         htd, rsServices);
4648     return r.openHRegion(reporter);
4649   }
4650 
4651 
4652   /**
4653    * Open HRegion.
4654    * Calls initialize and sets sequenceid.
4655    * @param reporter
4656    * @return Returns <code>this</code>
4657    * @throws IOException
4658    */
4659   protected HRegion openHRegion(final CancelableProgressable reporter)
4660   throws IOException {
4661     checkCompressionCodecs();
4662 
4663     long seqid = initialize(reporter);
4664     if (this.log != null) {
4665       this.log.setSequenceNumber(seqid);
4666     }
4667     return this;
4668   }
4669 
4670   private void checkCompressionCodecs() throws IOException {
4671     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4672       CompressionTest.testCompression(fam.getCompression());
4673       CompressionTest.testCompression(fam.getCompactionCompression());
4674     }
4675   }
4676 
4677   /**
4678    * Inserts a new region's meta information into the passed
4679    * <code>meta</code> region. Used by the HMaster bootstrap code adding
4680    * new table to ROOT table.
4681    *
4682    * @param meta META HRegion to be updated
4683    * @param r HRegion to add to <code>meta</code>
4684    *
4685    * @throws IOException
4686    */
4687   public static void addRegionToMETA(HRegion meta, HRegion r)
4688   throws IOException {
4689     meta.checkResources();
4690     // The row key is the region name
4691     byte[] row = r.getRegionName();
4692     Integer lid = meta.obtainRowLock(row);
4693     try {
4694       final long now = EnvironmentEdgeManager.currentTimeMillis();
4695       final List<KeyValue> edits = new ArrayList<KeyValue>(2);
4696       edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4697         HConstants.REGIONINFO_QUALIFIER, now,
4698         Writables.getBytes(r.getRegionInfo())));
4699       // Set into the root table the version of the meta table.
4700       edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4701         HConstants.META_VERSION_QUALIFIER, now,
4702         Bytes.toBytes(HConstants.META_VERSION)));
4703       meta.put(HConstants.CATALOG_FAMILY, edits);
4704     } finally {
4705       meta.releaseRowLock(lid);
4706     }
4707   }
4708 
4709   /**
4710    * Deletes all the files for a HRegion
4711    *
4712    * @param fs the file system object
4713    * @param rootdir qualified path of HBase root directory
4714    * @param info HRegionInfo for region to be deleted
4715    * @throws IOException
4716    */
4717   public static void deleteRegion(FileSystem fs, Path rootdir, HRegionInfo info)
4718   throws IOException {
4719     deleteRegion(fs, HRegion.getRegionDir(rootdir, info));
4720   }
4721 
4722   private static void deleteRegion(FileSystem fs, Path regiondir)
4723   throws IOException {
4724     if (LOG.isDebugEnabled()) {
4725       LOG.debug("DELETING region " + regiondir.toString());
4726     }
4727     if (!HBaseFileSystem.deleteDirFromFileSystem(fs, regiondir)) {
4728       LOG.warn("Failed delete of " + regiondir);
4729     }
4730   }
4731 
4732   /**
4733    * Computes the Path of the HRegion
4734    *
4735    * @param rootdir qualified path of HBase root directory
4736    * @param info HRegionInfo for the region
4737    * @return qualified path of region directory
4738    */
4739   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
4740     return new Path(
4741       HTableDescriptor.getTableDir(rootdir, info.getTableName()),
4742                                    info.getEncodedName());
4743   }
4744 
4745   /**
4746    * Determines if the specified row is within the row range specified by the
4747    * specified HRegionInfo
4748    *
4749    * @param info HRegionInfo that specifies the row range
4750    * @param row row to be checked
4751    * @return true if the row is within the range specified by the HRegionInfo
4752    */
4753   public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
4754     return ((info.getStartKey().length == 0) ||
4755         (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
4756         ((info.getEndKey().length == 0) ||
4757             (Bytes.compareTo(info.getEndKey(), row) > 0));
4758   }
4759 
4760   /**
4761    * Make the directories for a specific column family
4762    *
4763    * @param fs the file system
4764    * @param tabledir base directory where region will live (usually the table dir)
4765    * @param hri
4766    * @param colFamily the column family
4767    * @throws IOException
4768    */
4769   public static void makeColumnFamilyDirs(FileSystem fs, Path tabledir,
4770     final HRegionInfo hri, byte [] colFamily)
4771   throws IOException {
4772     Path dir = Store.getStoreHomedir(tabledir, hri.getEncodedName(), colFamily);
4773     if (!HBaseFileSystem.makeDirOnFileSystem(fs, dir)) {
4774       LOG.warn("Failed to create " + dir);
4775     }
4776   }
4777 
4778   /**
4779    * Merge two HRegions.  The regions must be adjacent and must not overlap.
4780    *
4781    * @param srcA
4782    * @param srcB
4783    * @return new merged HRegion
4784    * @throws IOException
4785    */
4786   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
4787   throws IOException {
4788     HRegion a = srcA;
4789     HRegion b = srcB;
4790 
4791     // Make sure that srcA comes first; important for key-ordering during
4792     // write of the merged file.
4793     if (srcA.getStartKey() == null) {
4794       if (srcB.getStartKey() == null) {
4795         throw new IOException("Cannot merge two regions with null start key");
4796       }
4797       // A's start key is null but B's isn't. Assume A comes before B
4798     } else if ((srcB.getStartKey() == null) ||
4799       (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
4800       a = srcB;
4801       b = srcA;
4802     }
4803 
4804     if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
4805       throw new IOException("Cannot merge non-adjacent regions");
4806     }
4807     return merge(a, b);
4808   }
4809 
4810   /**
4811    * Merge two regions whether they are adjacent or not.
4812    *
4813    * @param a region a
4814    * @param b region b
4815    * @return new merged region
4816    * @throws IOException
4817    */
4818   public static HRegion merge(HRegion a, HRegion b)
4819   throws IOException {
4820     if (!a.getRegionInfo().getTableNameAsString().equals(
4821         b.getRegionInfo().getTableNameAsString())) {
4822       throw new IOException("Regions do not belong to the same table");
4823     }
4824 
4825     FileSystem fs = a.getFilesystem();
4826 
4827     // Make sure each region's cache is empty
4828 
4829     a.flushcache();
4830     b.flushcache();
4831 
4832     // Compact each region so we only have one store file per family
4833 
4834     a.compactStores(true);
4835     if (LOG.isDebugEnabled()) {
4836       LOG.debug("Files for region: " + a);
4837       listPaths(fs, a.getRegionDir());
4838     }
4839     b.compactStores(true);
4840     if (LOG.isDebugEnabled()) {
4841       LOG.debug("Files for region: " + b);
4842       listPaths(fs, b.getRegionDir());
4843     }
4844 
4845     Configuration conf = a.getBaseConf();
4846     HTableDescriptor tabledesc = a.getTableDesc();
4847     HLog log = a.getLog();
4848     Path tableDir = a.getTableDir();
4849     // Presume both are of same region type -- i.e. both user or catalog
4850     // table regions.  This way can use comparator.
4851     final byte[] startKey =
4852       (a.comparator.matchingRows(a.getStartKey(), 0, a.getStartKey().length,
4853            HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
4854        || b.comparator.matchingRows(b.getStartKey(), 0,
4855               b.getStartKey().length, HConstants.EMPTY_BYTE_ARRAY, 0,
4856               HConstants.EMPTY_BYTE_ARRAY.length))
4857       ? HConstants.EMPTY_BYTE_ARRAY
4858       : (a.comparator.compareRows(a.getStartKey(), 0, a.getStartKey().length,
4859              b.getStartKey(), 0, b.getStartKey().length) <= 0
4860          ? a.getStartKey()
4861          : b.getStartKey());
4862     final byte[] endKey =
4863       (a.comparator.matchingRows(a.getEndKey(), 0, a.getEndKey().length,
4864            HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
4865        || a.comparator.matchingRows(b.getEndKey(), 0, b.getEndKey().length,
4866               HConstants.EMPTY_BYTE_ARRAY, 0,
4867               HConstants.EMPTY_BYTE_ARRAY.length))
4868       ? HConstants.EMPTY_BYTE_ARRAY
4869       : (a.comparator.compareRows(a.getEndKey(), 0, a.getEndKey().length,
4870              b.getEndKey(), 0, b.getEndKey().length) <= 0
4871          ? b.getEndKey()
4872          : a.getEndKey());
4873 
4874     HRegionInfo newRegionInfo =
4875         new HRegionInfo(tabledesc.getName(), startKey, endKey);
4876     LOG.info("Creating new region " + newRegionInfo.toString());
4877     String encodedName = newRegionInfo.getEncodedName();
4878     Path newRegionDir = HRegion.getRegionDir(a.getTableDir(), encodedName);
4879     if(fs.exists(newRegionDir)) {
4880       throw new IOException("Cannot merge; target file collision at " +
4881           newRegionDir);
4882     }
4883     HBaseFileSystem.makeDirOnFileSystem(fs, newRegionDir);
4884 
4885     LOG.info("starting merge of regions: " + a + " and " + b +
4886       " into new region " + newRegionInfo.toString() +
4887         " with start key <" + Bytes.toStringBinary(startKey) + "> and end key <" +
4888         Bytes.toStringBinary(endKey) + ">");
4889 
4890     // Move HStoreFiles under new region directory
4891     Map<byte [], List<StoreFile>> byFamily =
4892       new TreeMap<byte [], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
4893     byFamily = filesByFamily(byFamily, a.close());
4894     byFamily = filesByFamily(byFamily, b.close());
4895     for (Map.Entry<byte [], List<StoreFile>> es : byFamily.entrySet()) {
4896       byte [] colFamily = es.getKey();
4897       makeColumnFamilyDirs(fs, tableDir, newRegionInfo, colFamily);
4898       // Because we compacted the source regions we should have no more than two
4899       // HStoreFiles per family and there will be no reference store
4900       List<StoreFile> srcFiles = es.getValue();
4901       for (StoreFile hsf: srcFiles) {
4902         StoreFile.rename(fs, hsf.getPath(),
4903           StoreFile.getUniqueFile(fs, Store.getStoreHomedir(tableDir,
4904             newRegionInfo.getEncodedName(), colFamily)));
4905       }
4906     }
4907     if (LOG.isDebugEnabled()) {
4908       LOG.debug("Files for new region");
4909       listPaths(fs, newRegionDir);
4910     }
4911     HRegion dstRegion = HRegion.newHRegion(tableDir, log, fs, conf,
4912         newRegionInfo, a.getTableDesc(), null);
4913     long totalReadRequestCount = a.readRequestsCount.get() + b.readRequestsCount.get();
4914     dstRegion.readRequestsCount.set(totalReadRequestCount);
4915     dstRegion.opMetrics.setReadRequestCountMetrics(totalReadRequestCount);
4916     
4917     long totalWriteRequestCount = a.writeRequestsCount.get() + b.writeRequestsCount.get();
4918     dstRegion.writeRequestsCount.set(totalWriteRequestCount);
4919     dstRegion.opMetrics.setWriteRequestCountMetrics(totalWriteRequestCount);
4920     
4921     dstRegion.initialize();
4922     dstRegion.compactStores();
4923     if (LOG.isDebugEnabled()) {
4924       LOG.debug("Files for new region");
4925       listPaths(fs, dstRegion.getRegionDir());
4926     }
4927 
4928     // delete out the 'A' region
4929     HFileArchiver.archiveRegion(fs, FSUtils.getRootDir(a.getConf()),
4930         a.getTableDir(), a.getRegionDir());
4931     // delete out the 'B' region
4932     HFileArchiver.archiveRegion(fs, FSUtils.getRootDir(b.getConf()),
4933         b.getTableDir(), b.getRegionDir());
4934 
4935     LOG.info("merge completed. New region is " + dstRegion);
4936 
4937     return dstRegion;
4938   }
4939 
4940   /*
4941    * Fills a map with a vector of store files keyed by column family.
4942    * @param byFamily Map to fill.
4943    * @param storeFiles Store files to process.
4944    * @param family
4945    * @return Returns <code>byFamily</code>
4946    */
4947   private static Map<byte [], List<StoreFile>> filesByFamily(
4948       Map<byte [], List<StoreFile>> byFamily, List<StoreFile> storeFiles) {
4949     for (StoreFile src: storeFiles) {
4950       byte [] family = src.getFamily();
4951       List<StoreFile> v = byFamily.get(family);
4952       if (v == null) {
4953         v = new ArrayList<StoreFile>();
4954         byFamily.put(family, v);
4955       }
4956       v.add(src);
4957     }
4958     return byFamily;
4959   }
4960 
4961   /**
4962    * @return True if needs a mojor compaction.
4963    * @throws IOException
4964    */
4965   boolean isMajorCompaction() throws IOException {
4966     for (Store store: this.stores.values()) {
4967       if (store.isMajorCompaction()) {
4968         return true;
4969       }
4970     }
4971     return false;
4972   }
4973 
4974   /*
4975    * List the files under the specified directory
4976    *
4977    * @param fs
4978    * @param dir
4979    * @throws IOException
4980    */
4981   private static void listPaths(FileSystem fs, Path dir) throws IOException {
4982     if (LOG.isDebugEnabled()) {
4983       FileStatus[] stats = FSUtils.listStatus(fs, dir, null);
4984       if (stats == null || stats.length == 0) {
4985         return;
4986       }
4987       for (int i = 0; i < stats.length; i++) {
4988         String path = stats[i].getPath().toString();
4989         if (stats[i].isDir()) {
4990           LOG.debug("d " + path);
4991           listPaths(fs, stats[i].getPath());
4992         } else {
4993           LOG.debug("f " + path + " size=" + stats[i].getLen());
4994         }
4995       }
4996     }
4997   }
4998 
4999 
5000   //
5001   // HBASE-880
5002   //
5003   /**
5004    * @param get get object
5005    * @return result
5006    * @throws IOException read exceptions
5007    */
5008   public Result get(final Get get) throws IOException {
5009     return get(get, null);
5010   }
5011 
5012   /**
5013    * @param get get object
5014    * @param lockid existing lock id, or null for no previous lock
5015    * @return result
5016    * @throws IOException read exceptions
5017    * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
5018    */
5019   public Result get(final Get get, final Integer lockid) throws IOException {
5020     checkRow(get.getRow(), "Get");
5021     // Verify families are all valid
5022     if (get.hasFamilies()) {
5023       for (byte [] family: get.familySet()) {
5024         checkFamily(family);
5025       }
5026     } else { // Adding all families to scanner
5027       for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
5028         get.addFamily(family);
5029       }
5030     }
5031     List<KeyValue> results = get(get, true);
5032     return new Result(results);
5033   }
5034 
5035   /*
5036    * Do a get based on the get parameter.
5037    * @param withCoprocessor invoke coprocessor or not. We don't want to
5038    * always invoke cp for this private method.
5039    */
5040   private List<KeyValue> get(Get get, boolean withCoprocessor)
5041   throws IOException {
5042     long now = EnvironmentEdgeManager.currentTimeMillis();
5043 
5044     List<KeyValue> results = new ArrayList<KeyValue>();
5045 
5046     // pre-get CP hook
5047     if (withCoprocessor && (coprocessorHost != null)) {
5048        if (coprocessorHost.preGet(get, results)) {
5049          return results;
5050        }
5051     }
5052 
5053     Scan scan = new Scan(get);
5054 
5055     RegionScanner scanner = null;
5056     try {
5057       scanner = getScanner(scan);
5058       scanner.next(results, SchemaMetrics.METRIC_GETSIZE);
5059     } finally {
5060       if (scanner != null)
5061         scanner.close();
5062     }
5063 
5064     // post-get CP hook
5065     if (withCoprocessor && (coprocessorHost != null)) {
5066       coprocessorHost.postGet(get, results);
5067     }
5068 
5069     // do after lock
5070     final long after = EnvironmentEdgeManager.currentTimeMillis();
5071     this.opMetrics.updateGetMetrics(get.familySet(), after - now);
5072 
5073     return results;
5074   }
5075 
5076   public void mutateRow(RowMutations rm) throws IOException {
5077     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
5078   }
5079 
5080   /**
5081    * Perform atomic mutations within the region.
5082    * @param mutations The list of mutations to perform.
5083    * <code>mutations</code> can contain operations for multiple rows.
5084    * Caller has to ensure that all rows are contained in this region.
5085    * @param rowsToLock Rows to lock
5086    * If multiple rows are locked care should be taken that
5087    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
5088    * @throws IOException
5089    */
5090   public void mutateRowsWithLocks(Collection<Mutation> mutations,
5091       Collection<byte[]> rowsToLock) throws IOException {
5092     boolean flush = false;
5093 
5094     checkReadOnly();
5095     checkResources();
5096 
5097     startRegionOperation();
5098     List<Integer> acquiredLocks = null;
5099     try {
5100       // 1. run all pre-hooks before the atomic operation
5101       // if any pre hook indicates "bypass", bypass the entire operation
5102 
5103       // one WALEdit is used for all edits.
5104       WALEdit walEdit = new WALEdit();
5105       if (coprocessorHost != null) {
5106         for (Mutation m : mutations) {
5107           if (m instanceof Put) {
5108             if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
5109               // by pass everything
5110               return;
5111             }
5112           } else if (m instanceof Delete) {
5113             Delete d = (Delete) m;
5114             prepareDelete(d);
5115             if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) {
5116               // by pass everything
5117               return;
5118             }
5119           }
5120         }
5121       }
5122 
5123       long txid = 0;
5124       boolean walSyncSuccessful = false;
5125       boolean memstoreUpdated = false;
5126       boolean locked = false;
5127 
5128       // 2. acquire the row lock(s)
5129       acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
5130       for (byte[] row : rowsToLock) {
5131         // attempt to lock all involved rows, fail if one lock times out
5132         Integer lid = getLock(null, row, true);
5133         if (lid == null) {
5134           throw new IOException("Failed to acquire lock on "
5135               + Bytes.toStringBinary(row));
5136         }
5137         acquiredLocks.add(lid);
5138       }
5139 
5140       // 3. acquire the region lock
5141       lock(this.updatesLock.readLock(), acquiredLocks.size() == 0 ? 1 : acquiredLocks.size());
5142       locked = true;
5143 
5144       // 4. Get a mvcc write number
5145       MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
5146 
5147       long now = EnvironmentEdgeManager.currentTimeMillis();
5148       byte[] byteNow = Bytes.toBytes(now);
5149       Durability durability = Durability.USE_DEFAULT;
5150       try {
5151         // 5. Check mutations and apply edits to a single WALEdit
5152         for (Mutation m : mutations) {
5153           if (m instanceof Put) {
5154             Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
5155             checkFamilies(familyMap.keySet());
5156             checkTimestamps(familyMap, now);
5157             updateKVTimestamps(familyMap.values(), byteNow);
5158           } else if (m instanceof Delete) {
5159             Delete d = (Delete) m;
5160             prepareDelete(d);
5161             prepareDeleteTimestamps(d.getFamilyMap(), byteNow);
5162           } else {
5163             throw new DoNotRetryIOException(
5164                 "Action must be Put or Delete. But was: "
5165                     + m.getClass().getName());
5166           }
5167           Durability tmpDur = m.getDurability(); 
5168           if (tmpDur.ordinal() > durability.ordinal()) {
5169             durability = tmpDur;
5170           }
5171           if (tmpDur != Durability.SKIP_WAL) {
5172             addFamilyMapToWALEdit(m.getFamilyMap(), walEdit);
5173           }
5174         }
5175 
5176         // 6. append all edits at once (don't sync)
5177         if (walEdit.size() > 0) {
5178           txid = this.log.appendNoSync(regionInfo,
5179               this.htableDescriptor.getName(), walEdit,
5180               HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
5181         }
5182 
5183         // 7. apply to memstore
5184         long addedSize = 0;
5185         memstoreUpdated = true;
5186         for (Mutation m : mutations) {
5187           addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w);
5188         }
5189         flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
5190 
5191         // 8. release region and row lock(s)
5192         this.updatesLock.readLock().unlock();
5193         locked = false;
5194         if (acquiredLocks != null) {
5195           for (Integer lid : acquiredLocks) {
5196             releaseRowLock(lid);
5197           }
5198           acquiredLocks = null;
5199         }
5200 
5201         // 9. sync WAL if required
5202         if (walEdit.size() > 0) {
5203           syncOrDefer(txid, durability);
5204         }
5205         walSyncSuccessful = true;
5206 
5207         // 10. advance mvcc
5208         mvcc.completeMemstoreInsert(w);
5209         w = null;
5210 
5211         // 11. run coprocessor post host hooks
5212         // after the WAL is sync'ed and all locks are released
5213         // (similar to doMiniBatchPut)
5214         if (coprocessorHost != null) {
5215           for (Mutation m : mutations) {
5216             if (m instanceof Put) {
5217               coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
5218             } else if (m instanceof Delete) {
5219               coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
5220             }
5221           }
5222         }
5223       } finally {
5224         // 12. clean up if needed
5225         if (memstoreUpdated && !walSyncSuccessful) {
5226           int kvsRolledback = 0;
5227           for (Mutation m : mutations) {
5228             for (Map.Entry<byte[], List<KeyValue>> e : m.getFamilyMap()
5229                 .entrySet()) {
5230               List<KeyValue> kvs = e.getValue();
5231               byte[] family = e.getKey();
5232               Store store = getStore(family);
5233               // roll back each kv
5234               for (KeyValue kv : kvs) {
5235                 store.rollback(kv);
5236                 kvsRolledback++;
5237               }
5238             }
5239           }
5240           LOG.info("mutateRowWithLocks: rolled back " + kvsRolledback
5241               + " KeyValues");
5242         }
5243 
5244         if (w != null) {
5245           mvcc.completeMemstoreInsert(w);
5246         }
5247 
5248         if (locked) {
5249           this.updatesLock.readLock().unlock();
5250         }
5251 
5252         if (acquiredLocks != null) {
5253           for (Integer lid : acquiredLocks) {
5254             releaseRowLock(lid);
5255           }
5256         }
5257       }
5258     } finally {
5259       if (flush) {
5260         // 13. Flush cache if needed. Do it outside update lock.
5261         requestFlush();
5262       }
5263       closeRegionOperation();
5264     }
5265   }
5266 
5267   // TODO: There's a lot of boiler plate code identical
5268   // to increment... See how to better unify that.
5269 
5270   /**
5271   *
5272   * Perform one or more append operations on a row.
5273   * <p>
5274   * Appends performed are done under row lock but reads do not take locks out
5275   * so this can be seen partially complete by gets and scans.
5276   *
5277   * @param append
5278   * @param writeToWAL
5279   * @return new keyvalues after increment
5280   * @throws IOException
5281   */
5282  public Result append(Append append, boolean writeToWAL)
5283      throws IOException {
5284    return append(append, null, writeToWAL);
5285  }
5286   /**
5287    *
5288    * Perform one or more append operations on a row.
5289    * <p>
5290    * Appends performed are done under row lock but reads do not take locks out
5291    * so this can be seen partially complete by gets and scans.
5292    *
5293    * @param append
5294    * @param lockid
5295    * @param writeToWAL
5296    * @return new keyvalues after increment
5297    * @throws IOException
5298    * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
5299    */
5300   public Result append(Append append, Integer lockid, boolean writeToWAL)
5301       throws IOException {
5302     // TODO: Use MVCC to make this set of appends atomic to reads
5303     byte[] row = append.getRow();
5304     checkRow(row, "append");
5305     boolean flush = false;
5306     WALEdit walEdits = null;
5307     List<KeyValue> allKVs = new ArrayList<KeyValue>(append.size());
5308     Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
5309     long before = EnvironmentEdgeManager.currentTimeMillis();
5310     long size = 0;
5311     long txid = 0;
5312 
5313     checkReadOnly();
5314     // Lock row
5315     startRegionOperation();
5316     this.writeRequestsCount.increment();
5317     this.opMetrics.setWriteRequestCountMetrics(this.writeRequestsCount.get());
5318     try {
5319       Integer lid = getLock(lockid, row, true);
5320       lock(this.updatesLock.readLock());
5321       try {
5322         long now = EnvironmentEdgeManager.currentTimeMillis();
5323         // Process each family
5324         for (Map.Entry<byte[], List<KeyValue>> family : append.getFamilyMap()
5325             .entrySet()) {
5326 
5327           Store store = stores.get(family.getKey());
5328           Collections.sort(family.getValue(), store.getComparator());
5329           List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
5330 
5331           // Get previous values for all columns in this family
5332           Get get = new Get(row);
5333           for (KeyValue kv : family.getValue()) {
5334             get.addColumn(family.getKey(), kv.getQualifier());
5335           }
5336           List<KeyValue> results = get(get, false);
5337 
5338           // Iterate the input columns and update existing values if they were
5339           // found, otherwise add new column initialized to the append value
5340 
5341           // Avoid as much copying as possible. Every byte is copied at most
5342           // once.
5343           // Would be nice if KeyValue had scatter/gather logic
5344           int idx = 0;
5345           for (KeyValue kv : family.getValue()) {
5346             KeyValue newKV;
5347             if (idx < results.size()
5348                 && results.get(idx).matchingQualifier(kv.getBuffer(),
5349                     kv.getQualifierOffset(), kv.getQualifierLength())) {
5350               KeyValue oldKv = results.get(idx);
5351               // allocate an empty kv once
5352               newKV = new KeyValue(row.length, kv.getFamilyLength(),
5353                   kv.getQualifierLength(), now, KeyValue.Type.Put,
5354                   oldKv.getValueLength() + kv.getValueLength());
5355               // copy in the value
5356               System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(),
5357                   newKV.getBuffer(), newKV.getValueOffset(),
5358                   oldKv.getValueLength());
5359               System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
5360                   newKV.getBuffer(),
5361                   newKV.getValueOffset() + oldKv.getValueLength(),
5362                   kv.getValueLength());
5363               idx++;
5364             } else {
5365               // allocate an empty kv once
5366               newKV = new KeyValue(row.length, kv.getFamilyLength(),
5367                   kv.getQualifierLength(), now, KeyValue.Type.Put,
5368                   kv.getValueLength());
5369               // copy in the value
5370               System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
5371                   newKV.getBuffer(), newKV.getValueOffset(),
5372                   kv.getValueLength());
5373             }
5374             // copy in row, family, and qualifier
5375             System.arraycopy(kv.getBuffer(), kv.getRowOffset(),
5376                 newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength());
5377             System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(),
5378                 newKV.getBuffer(), newKV.getFamilyOffset(),
5379                 kv.getFamilyLength());
5380             System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
5381                 newKV.getBuffer(), newKV.getQualifierOffset(),
5382                 kv.getQualifierLength());
5383 
5384             kvs.add(newKV);
5385 
5386             // Append update to WAL
5387             if (writeToWAL) {
5388               if (walEdits == null) {
5389                 walEdits = new WALEdit();
5390               }
5391               walEdits.add(newKV);
5392             }
5393           }
5394 
5395           // store the kvs to the temporary memstore before writing HLog
5396           tempMemstore.put(store, kvs);
5397         }
5398 
5399         // Actually write to WAL now
5400         if (writeToWAL) {
5401           // Using default cluster id, as this can only happen in the orginating
5402           // cluster. A slave cluster receives the final value (not the delta)
5403           // as a Put.
5404           txid = this.log.appendNoSync(regionInfo,
5405               this.htableDescriptor.getName(), walEdits,
5406               HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
5407               this.htableDescriptor);
5408         }
5409         // Actually write to Memstore now
5410         for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
5411           Store store = entry.getKey();
5412           size += store.upsert(entry.getValue());
5413           allKVs.addAll(entry.getValue());
5414         }
5415         size = this.addAndGetGlobalMemstoreSize(size);
5416         flush = isFlushSize(size);
5417       } finally {
5418         this.updatesLock.readLock().unlock();
5419         releaseRowLock(lid);
5420       }
5421       if (writeToWAL) {
5422         // sync the transaction log outside the rowlock
5423         syncOrDefer(txid, append.getDurability());
5424       }
5425     } finally {
5426       closeRegionOperation();
5427     }
5428 
5429 
5430     long after = EnvironmentEdgeManager.currentTimeMillis();
5431     this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before);
5432 
5433     if (flush) {
5434       // Request a cache flush. Do it outside update lock.
5435       requestFlush();
5436     }
5437 
5438     return append.isReturnResults() ? new Result(allKVs) : null;
5439   }
5440 
5441   /**
5442   *
5443   * Perform one or more increment operations on a row.
5444   * <p>
5445   * Increments performed are done under row lock but reads do not take locks
5446   * out so this can be seen partially complete by gets and scans.
5447   * @param increment
5448   * @param writeToWAL
5449   * @return new keyvalues after increment
5450   * @throws IOException
5451   */
5452   public Result increment(Increment increment, boolean writeToWAL)
5453   throws IOException {
5454     return increment(increment, null, writeToWAL);
5455   }
5456 
5457   /**
5458    *
5459    * Perform one or more increment operations on a row.
5460    * <p>
5461    * Increments performed are done under row lock but reads do not take locks
5462    * out so this can be seen partially complete by gets and scans.
5463    * @param increment
5464    * @param lockid
5465    * @param writeToWAL
5466    * @return new keyvalues after increment
5467    * @throws IOException
5468    * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
5469 
5470    */
5471   public Result increment(Increment increment, Integer lockid,
5472       boolean writeToWAL)
5473   throws IOException {
5474     // TODO: Use MVCC to make this set of increments atomic to reads
5475     byte [] row = increment.getRow();
5476     checkRow(row, "increment");
5477     TimeRange tr = increment.getTimeRange();
5478     boolean flush = false;
5479     WALEdit walEdits = null;
5480     List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.numColumns());
5481     Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
5482     long before = EnvironmentEdgeManager.currentTimeMillis();
5483     long size = 0;
5484     long txid = 0;
5485 
5486     checkReadOnly();
5487     // Lock row
5488     startRegionOperation();
5489     this.writeRequestsCount.increment();
5490     this.opMetrics.setWriteRequestCountMetrics(this.writeRequestsCount.get());
5491     try {
5492       Integer lid = getLock(lockid, row, true);
5493       lock(this.updatesLock.readLock());
5494       try {
5495         long now = EnvironmentEdgeManager.currentTimeMillis();
5496         // Process each family
5497         for (Map.Entry<byte [], NavigableMap<byte [], Long>> family :
5498           increment.getFamilyMap().entrySet()) {
5499 
5500           Store store = stores.get(family.getKey());
5501           List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
5502 
5503           // Get previous values for all columns in this family
5504           Get get = new Get(row);
5505           for (Map.Entry<byte [], Long> column : family.getValue().entrySet()) {
5506             get.addColumn(family.getKey(), column.getKey());
5507           }
5508           get.setTimeRange(tr.getMin(), tr.getMax());
5509           List<KeyValue> results = get(get, false);
5510 
5511           // Iterate the input columns and update existing values if they were
5512           // found, otherwise add new column initialized to the increment amount
5513           int idx = 0;
5514           for (Map.Entry<byte [], Long> column : family.getValue().entrySet()) {
5515             long amount = column.getValue();
5516             if (idx < results.size() &&
5517                 results.get(idx).matchingQualifier(column.getKey())) {
5518               KeyValue kv = results.get(idx);
5519               if(kv.getValueLength() == Bytes.SIZEOF_LONG) {
5520                 amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG);
5521               } else {
5522                 // throw DoNotRetryIOException instead of IllegalArgumentException
5523                 throw new DoNotRetryIOException(
5524                     "Attempted to increment field that isn't 64 bits wide");
5525               }
5526               idx++;
5527             }
5528 
5529             // Append new incremented KeyValue to list
5530             KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(),
5531                 now, Bytes.toBytes(amount));
5532             kvs.add(newKV);
5533 
5534             // Append update to WAL
5535             if (writeToWAL) {
5536               if (walEdits == null) {
5537                 walEdits = new WALEdit();
5538               }
5539               walEdits.add(newKV);
5540             }
5541           }
5542 
5543           //store the kvs to the temporary memstore before writing HLog
5544           tempMemstore.put(store, kvs);
5545         }
5546 
5547         // Actually write to WAL now
5548         if (writeToWAL) {
5549           // Using default cluster id, as this can only happen in the orginating
5550           // cluster. A slave cluster receives the final value (not the delta)
5551           // as a Put.
5552           txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
5553               walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
5554               this.htableDescriptor);
5555         }
5556 
5557         //Actually write to Memstore now
5558         for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
5559           Store store = entry.getKey();
5560           size += store.upsert(entry.getValue());
5561           allKVs.addAll(entry.getValue());
5562         }
5563         size = this.addAndGetGlobalMemstoreSize(size);
5564         flush = isFlushSize(size);
5565       } finally {
5566         this.updatesLock.readLock().unlock();
5567         releaseRowLock(lid);
5568       }
5569       if (writeToWAL) {
5570         // sync the transaction log outside the rowlock
5571         syncOrDefer(txid, Durability.USE_DEFAULT);
5572       }
5573     } finally {
5574       closeRegionOperation();
5575       long after = EnvironmentEdgeManager.currentTimeMillis();
5576       this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before);
5577     }
5578 
5579     if (flush) {
5580       // Request a cache flush.  Do it outside update lock.
5581       requestFlush();
5582     }
5583 
5584     return new Result(allKVs);
5585   }
5586 
5587   /**
5588    * @param row
5589    * @param family
5590    * @param qualifier
5591    * @param amount
5592    * @param writeToWAL
5593    * @return The new value.
5594    * @throws IOException
5595    */
5596   public long incrementColumnValue(byte [] row, byte [] family,
5597       byte [] qualifier, long amount, boolean writeToWAL)
5598   throws IOException {
5599     // to be used for metrics
5600     long before = EnvironmentEdgeManager.currentTimeMillis();
5601 
5602     checkRow(row, "increment");
5603     boolean flush = false;
5604     boolean wrongLength = false;
5605     long txid = 0;
5606     // Lock row
5607     long result = amount;
5608     startRegionOperation();
5609     this.writeRequestsCount.increment();
5610     this.opMetrics.setWriteRequestCountMetrics(this.writeRequestsCount.get());
5611     try {
5612       Integer lid = obtainRowLock(row);
5613       lock(this.updatesLock.readLock());
5614       try {
5615         Store store = stores.get(family);
5616 
5617         // Get the old value:
5618         Get get = new Get(row);
5619         get.addColumn(family, qualifier);
5620 
5621         // we don't want to invoke coprocessor in this case; ICV is wrapped
5622         // in HRegionServer, so we leave getLastIncrement alone
5623         List<KeyValue> results = get(get, false);
5624 
5625         if (!results.isEmpty()) {
5626           KeyValue kv = results.get(0);
5627           if(kv.getValueLength() == Bytes.SIZEOF_LONG){
5628             byte [] buffer = kv.getBuffer();
5629             int valueOffset = kv.getValueOffset();
5630             result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
5631           }
5632           else{
5633             wrongLength = true;
5634           }
5635         }
5636         if(!wrongLength){
5637           // build the KeyValue now:
5638           KeyValue newKv = new KeyValue(row, family,
5639             qualifier, EnvironmentEdgeManager.currentTimeMillis(),
5640             Bytes.toBytes(result));
5641 
5642           // now log it:
5643           if (writeToWAL) {
5644             long now = EnvironmentEdgeManager.currentTimeMillis();
5645             WALEdit walEdit = new WALEdit();
5646             walEdit.add(newKv);
5647             // Using default cluster id, as this can only happen in the
5648             // orginating cluster. A slave cluster receives the final value (not
5649             // the delta) as a Put.
5650             txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
5651                 walEdit, HConstants.DEFAULT_CLUSTER_ID, now,
5652                 this.htableDescriptor);
5653           }
5654 
5655           // Now request the ICV to the store, this will set the timestamp
5656           // appropriately depending on if there is a value in memcache or not.
5657           // returns the change in the size of the memstore from operation
5658           long size = store.updateColumnValue(row, family, qualifier, result);
5659 
5660           size = this.addAndGetGlobalMemstoreSize(size);
5661           flush = isFlushSize(size);
5662         }
5663       } finally {
5664         this.updatesLock.readLock().unlock();
5665         releaseRowLock(lid);
5666       }
5667       if (writeToWAL) {
5668         // sync the transaction log outside the rowlock
5669         syncOrDefer(txid, Durability.USE_DEFAULT);
5670       }
5671     } finally {
5672       closeRegionOperation();
5673     }
5674 
5675     // do after lock
5676     long after = EnvironmentEdgeManager.currentTimeMillis();
5677     this.opMetrics.updateIncrementColumnValueMetrics(family, after - before);
5678 
5679     if (flush) {
5680       // Request a cache flush.  Do it outside update lock.
5681       requestFlush();
5682     }
5683     if(wrongLength){
5684       throw new DoNotRetryIOException(
5685           "Attempted to increment field that isn't 64 bits wide");
5686     }
5687     return result;
5688   }
5689 
5690 
5691   //
5692   // New HBASE-880 Helpers
5693   //
5694 
5695   private void checkFamily(final byte [] family)
5696   throws NoSuchColumnFamilyException {
5697     if (!this.htableDescriptor.hasFamily(family)) {
5698       throw new NoSuchColumnFamilyException("Column family " +
5699           Bytes.toString(family) + " does not exist in region " + this
5700           + " in table " + this.htableDescriptor);
5701   	}
5702   }
5703 
5704   public static final long FIXED_OVERHEAD = ClassSize.align(
5705       ClassSize.OBJECT +
5706       ClassSize.ARRAY +
5707       36 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
5708       (8 * Bytes.SIZEOF_LONG) +
5709       Bytes.SIZEOF_BOOLEAN);
5710 
5711   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
5712       ClassSize.OBJECT + // closeLock
5713       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
5714       (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
5715       ClassSize.ATOMIC_INTEGER + // lockIdGenerator
5716       (3 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, lockIds, scannerReadPoints
5717       WriteState.HEAP_SIZE + // writestate
5718       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
5719       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
5720       ClassSize.ARRAYLIST + // recentFlushes
5721       MultiVersionConsistencyControl.FIXED_SIZE // mvcc
5722       ;
5723 
5724   @Override
5725   public long heapSize() {
5726     long heapSize = DEEP_OVERHEAD;
5727     for(Store store : this.stores.values()) {
5728       heapSize += store.heapSize();
5729     }
5730     // this does not take into account row locks, recent flushes, mvcc entries
5731     return heapSize;
5732   }
5733 
5734   /*
5735    * This method calls System.exit.
5736    * @param message Message to print out.  May be null.
5737    */
5738   private static void printUsageAndExit(final String message) {
5739     if (message != null && message.length() > 0) System.out.println(message);
5740     System.out.println("Usage: HRegion CATLALOG_TABLE_DIR [major_compact]");
5741     System.out.println("Options:");
5742     System.out.println(" major_compact  Pass this option to major compact " +
5743       "passed region.");
5744     System.out.println("Default outputs scan of passed region.");
5745     System.exit(1);
5746   }
5747 
5748   /**
5749    * Registers a new CoprocessorProtocol subclass and instance to
5750    * be available for handling {@link HRegion#exec(Exec)} calls.
5751    *
5752    * <p>
5753    * Only a single protocol type/handler combination may be registered per
5754    * region.
5755    * After the first registration, subsequent calls with the same protocol type
5756    * will fail with a return value of {@code false}.
5757    * </p>
5758    * @param protocol a {@code CoprocessorProtocol} subinterface defining the
5759    * protocol methods
5760    * @param handler an instance implementing the interface
5761    * @param <T> the protocol type
5762    * @return {@code true} if the registration was successful, {@code false}
5763    * otherwise
5764    */
5765   public <T extends CoprocessorProtocol> boolean registerProtocol(
5766       Class<T> protocol, T handler) {
5767 
5768     /* No stacking of protocol handlers is currently allowed.  The
5769      * first to claim wins!
5770      */
5771     if (protocolHandlers.containsKey(protocol)) {
5772       LOG.error("Protocol "+protocol.getName()+
5773           " already registered, rejecting request from "+
5774           handler
5775       );
5776       return false;
5777     }
5778 
5779     protocolHandlers.putInstance(protocol, handler);
5780     protocolHandlerNames.put(protocol.getName(), protocol);
5781     if (LOG.isDebugEnabled()) {
5782       LOG.debug("Registered protocol handler: region="+
5783           Bytes.toStringBinary(getRegionName())+" protocol="+protocol.getName());
5784     }
5785     return true;
5786   }
5787 
5788   /**
5789    * Executes a single {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
5790    * method using the registered protocol handlers.
5791    * {@link CoprocessorProtocol} implementations must be registered via the
5792    * {@link org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)}
5793    * method before they are available.
5794    *
5795    * @param call an {@code Exec} instance identifying the protocol, method name,
5796    *     and parameters for the method invocation
5797    * @return an {@code ExecResult} instance containing the region name of the
5798    *     invocation and the return value
5799    * @throws IOException if no registered protocol handler is found or an error
5800    *     occurs during the invocation
5801    * @see org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)
5802    */
5803   public ExecResult exec(Exec call)
5804       throws IOException {
5805     Class<? extends CoprocessorProtocol> protocol = call.getProtocol();
5806     if (protocol == null) {
5807       String protocolName = call.getProtocolName();
5808       if (LOG.isTraceEnabled()) {
5809         LOG.trace("Received dynamic protocol exec call with protocolName " + protocolName);
5810       }
5811       // detect the actual protocol class
5812       protocol  = protocolHandlerNames.get(protocolName);
5813       if (protocol == null) {
5814         throw new HBaseRPC.UnknownProtocolException(protocol,
5815             "No matching handler for protocol "+protocolName+
5816             " in region "+Bytes.toStringBinary(getRegionName()));
5817       }
5818     }
5819     if (!protocolHandlers.containsKey(protocol)) {
5820       throw new HBaseRPC.UnknownProtocolException(protocol,
5821           "No matching handler for protocol "+protocol.getName()+
5822           " in region "+Bytes.toStringBinary(getRegionName()));
5823     }
5824 
5825     CoprocessorProtocol handler = protocolHandlers.getInstance(protocol);
5826     Object value;
5827 
5828     try {
5829       Method method = protocol.getMethod(
5830           call.getMethodName(), call.getParameterClasses());
5831       method.setAccessible(true);
5832 
5833       value = method.invoke(handler, call.getParameters());
5834     } catch (InvocationTargetException e) {
5835       Throwable target = e.getTargetException();
5836       if (target instanceof IOException) {
5837         throw (IOException)target;
5838       }
5839       IOException ioe = new IOException(target.toString());
5840       ioe.setStackTrace(target.getStackTrace());
5841       throw ioe;
5842     } catch (Throwable e) {
5843       if (!(e instanceof IOException)) {
5844         LOG.error("Unexpected throwable object ", e);
5845       }
5846       IOException ioe = new IOException(e.toString());
5847       ioe.setStackTrace(e.getStackTrace());
5848       throw ioe;
5849     }
5850 
5851     return new ExecResult(getRegionName(), value);
5852   }
5853 
5854   /*
5855    * Process table.
5856    * Do major compaction or list content.
5857    * @param fs
5858    * @param p
5859    * @param log
5860    * @param c
5861    * @param majorCompact
5862    * @throws IOException
5863    */
5864   private static void processTable(final FileSystem fs, final Path p,
5865       final HLog log, final Configuration c,
5866       final boolean majorCompact)
5867   throws IOException {
5868     HRegion region = null;
5869     String rootStr = Bytes.toString(HConstants.ROOT_TABLE_NAME);
5870     String metaStr = Bytes.toString(HConstants.META_TABLE_NAME);
5871     // Currently expects tables have one region only.
5872     if (p.getName().startsWith(rootStr)) {
5873       region = HRegion.newHRegion(p, log, fs, c, HRegionInfo.ROOT_REGIONINFO,
5874         HTableDescriptor.ROOT_TABLEDESC, null);
5875     } else if (p.getName().startsWith(metaStr)) {
5876       region = HRegion.newHRegion(p, log, fs, c,
5877         HRegionInfo.FIRST_META_REGIONINFO, HTableDescriptor.META_TABLEDESC, null);
5878     } else {
5879       throw new IOException("Not a known catalog table: " + p.toString());
5880     }
5881     try {
5882       region.initialize();
5883       if (majorCompact) {
5884         region.compactStores(true);
5885       } else {
5886         // Default behavior
5887         Scan scan = new Scan();
5888         // scan.addFamily(HConstants.CATALOG_FAMILY);
5889         RegionScanner scanner = region.getScanner(scan);
5890         try {
5891           List<KeyValue> kvs = new ArrayList<KeyValue>();
5892           boolean done = false;
5893           do {
5894             kvs.clear();
5895             done = scanner.next(kvs);
5896             if (kvs.size() > 0) LOG.info(kvs);
5897           } while (done);
5898         } finally {
5899           scanner.close();
5900         }
5901       }
5902     } finally {
5903       region.close();
5904     }
5905   }
5906 
5907   boolean shouldForceSplit() {
5908     return this.splitRequest;
5909   }
5910 
5911   byte[] getExplicitSplitPoint() {
5912     return this.explicitSplitPoint;
5913   }
5914 
5915   void forceSplit(byte[] sp) {
5916     // NOTE : this HRegion will go away after the forced split is successfull
5917     //        therefore, no reason to clear this value
5918     this.splitRequest = true;
5919     if (sp != null) {
5920       this.explicitSplitPoint = sp;
5921     }
5922   }
5923 
5924   void clearSplit_TESTS_ONLY() {
5925     this.splitRequest = false;
5926   }
5927 
5928   /**
5929    * Give the region a chance to prepare before it is split.
5930    */
5931   protected void prepareToSplit() {
5932     // nothing
5933   }
5934 
5935   /**
5936    * Return the splitpoint. null indicates the region isn't splittable
5937    * If the splitpoint isn't explicitly specified, it will go over the stores
5938    * to find the best splitpoint. Currently the criteria of best splitpoint
5939    * is based on the size of the store.
5940    */
5941   public byte[] checkSplit() {
5942     // Can't split ROOT/META
5943     if (this.regionInfo.isMetaTable()) {
5944       if (shouldForceSplit()) {
5945         LOG.warn("Cannot split root/meta regions in HBase 0.20 and above");
5946       }
5947       return null;
5948     }
5949 
5950     if (!splitPolicy.shouldSplit()) {
5951       return null;
5952     }
5953 
5954     byte[] ret = splitPolicy.getSplitPoint();
5955 
5956     if (ret != null) {
5957       try {
5958         checkRow(ret, "calculated split");
5959       } catch (IOException e) {
5960         LOG.error("Ignoring invalid split", e);
5961         return null;
5962       }
5963     }
5964     return ret;
5965   }
5966 
5967   /**
5968    * @return The priority that this region should have in the compaction queue
5969    */
5970   public int getCompactPriority() {
5971     int count = Integer.MAX_VALUE;
5972     for(Store store : stores.values()) {
5973       count = Math.min(count, store.getCompactPriority());
5974     }
5975     return count;
5976   }
5977 
5978   /**
5979    * Checks every store to see if one has too many
5980    * store files
5981    * @return true if any store has too many store files
5982    */
5983   public boolean needsCompaction() {
5984     for(Store store : stores.values()) {
5985       if(store.needsCompaction()) {
5986         return true;
5987       }
5988     }
5989     return false;
5990   }
5991 
5992   /** @return the coprocessor host */
5993   public RegionCoprocessorHost getCoprocessorHost() {
5994     return coprocessorHost;
5995   }
5996 
5997   /*
5998    * Set the read request count defined in opMetrics
5999    * @param value absolute value of read request count
6000    */
6001   public void setOpMetricsReadRequestCount(long value)
6002   {
6003     this.opMetrics.setReadRequestCountMetrics(value);
6004   }
6005   
6006   /*
6007    * Set the write request count defined in opMetrics
6008    * @param value absolute value of write request count
6009    */
6010   public void setOpMetricsWriteRequestCount(long value)
6011   {
6012     this.opMetrics.setWriteRequestCountMetrics(value);
6013   }
6014   
6015   /** @param coprocessorHost the new coprocessor host */
6016   public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
6017     this.coprocessorHost = coprocessorHost;
6018   }
6019 
6020   /**
6021    * This method needs to be called before any public call that reads or
6022    * modifies data. It has to be called just before a try.
6023    * #closeRegionOperation needs to be called in the try's finally block
6024    * Acquires a read lock and checks if the region is closing or closed.
6025    * @throws NotServingRegionException when the region is closing or closed
6026    * @throws RegionTooBusyException if failed to get the lock in time
6027    * @throws InterruptedIOException if interrupted while waiting for a lock
6028    */
6029   public void startRegionOperation()
6030       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
6031     if (this.closing.get()) {
6032       throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
6033           " is closing");
6034     }
6035     lock(lock.readLock());
6036     if (this.closed.get()) {
6037       lock.readLock().unlock();
6038       throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
6039           " is closed");
6040     }
6041   }
6042 
6043   /**
6044    * Closes the lock. This needs to be called in the finally block corresponding
6045    * to the try block of #startRegionOperation
6046    */
6047   public void closeRegionOperation(){
6048     lock.readLock().unlock();
6049   }
6050 
6051   public void abortRegionServer(String msg) throws IOException {
6052     RegionServerServices rs = getRegionServerServices();
6053     if (rs instanceof HRegionServer) {
6054       ((HRegionServer)rs).abort(msg);
6055     }
6056   }
6057 
6058   /**
6059    * This method needs to be called before any public call that reads or
6060    * modifies stores in bulk. It has to be called just before a try.
6061    * #closeBulkRegionOperation needs to be called in the try's finally block
6062    * Acquires a writelock and checks if the region is closing or closed.
6063    * @throws NotServingRegionException when the region is closing or closed
6064    * @throws RegionTooBusyException if failed to get the lock in time
6065    * @throws InterruptedIOException if interrupted while waiting for a lock
6066    */
6067   private void startBulkRegionOperation(boolean writeLockNeeded)
6068       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
6069     if (this.closing.get()) {
6070       throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
6071           " is closing");
6072     }
6073     if (writeLockNeeded) lock(lock.writeLock());
6074     else lock(lock.readLock());
6075     if (this.closed.get()) {
6076       if (writeLockNeeded) lock.writeLock().unlock();
6077       else lock.readLock().unlock();
6078       throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
6079           " is closed");
6080     }
6081   }
6082 
6083   /**
6084    * Closes the lock. This needs to be called in the finally block corresponding
6085    * to the try block of #startRegionOperation
6086    */
6087   private void closeBulkRegionOperation() {
6088     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
6089     else lock.readLock().unlock();
6090   }
6091 
6092   /**
6093    * Update counters for numer of puts without wal and the size of possible data loss.
6094    * These information are exposed by the region server metrics.
6095    */
6096   private void recordPutWithoutWal(final Map<byte [], List<KeyValue>> familyMap) {
6097     if (numPutsWithoutWAL.getAndIncrement() == 0) {
6098       LOG.info("writing data to region " + this +
6099                " with WAL disabled. Data may be lost in the event of a crash.");
6100     }
6101 
6102     long putSize = 0;
6103     for (List<KeyValue> edits : familyMap.values()) {
6104       assert edits instanceof RandomAccess;
6105       int listSize = edits.size();
6106       for (int i=0; i < listSize; i++) {
6107         KeyValue kv = edits.get(i);
6108         putSize += kv.getKeyLength() + kv.getValueLength();
6109       }
6110     }
6111 
6112     dataInMemoryWithoutWAL.addAndGet(putSize);
6113   }
6114 
6115   private void lock(final Lock lock)
6116       throws RegionTooBusyException, InterruptedIOException {
6117     lock(lock, 1);
6118   }
6119 
6120   /**
6121    * Try to acquire a lock.  Throw RegionTooBusyException
6122    * if failed to get the lock in time. Throw InterruptedIOException
6123    * if interrupted while waiting for the lock.
6124    */
6125   private void lock(final Lock lock, final int multiplier)
6126       throws RegionTooBusyException, InterruptedIOException {
6127     try {
6128       final long waitTime = Math.min(maxBusyWaitDuration,
6129         busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
6130       if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
6131         throw new RegionTooBusyException(
6132           "failed to get a lock in " + waitTime + " ms. " +
6133             "regionName=" + (this.getRegionInfo() == null ? "unknown" :
6134             this.getRegionInfo().getRegionNameAsString()) +
6135             ", server=" + (this.getRegionServerServices() == null ? "unknown" :
6136             this.getRegionServerServices().getServerName()));
6137       }
6138     } catch (InterruptedException ie) {
6139       LOG.info("Interrupted while waiting for a lock");
6140       InterruptedIOException iie = new InterruptedIOException();
6141       iie.initCause(ie);
6142       throw iie;
6143     }
6144   }
6145 
6146   /**
6147    * Calls sync with the given transaction ID if the region's table is not
6148    * deferring it.
6149    * @param txid should sync up to which transaction
6150    * @throws IOException If anything goes wrong with DFS
6151    */
6152   private void syncOrDefer(long txid, Durability durability) throws IOException {
6153     if (this.getRegionInfo().isMetaRegion()) {
6154       this.log.sync(txid);
6155     } else {
6156       switch(durability) {
6157       case USE_DEFAULT:
6158         // do what CF defaults to
6159         if (!isDeferredLogSyncEnabled()) {
6160           this.log.sync(txid);
6161         }
6162         break;
6163       case SKIP_WAL:
6164         // nothing do to
6165         break;
6166       case ASYNC_WAL:
6167         // defer the sync, unless we globally can't
6168         if (this.deferredLogSyncDisabled) {
6169           this.log.sync(txid);
6170         }
6171         break;
6172       case SYNC_WAL:
6173       case FSYNC_WAL:
6174         // sync the WAL edit (SYNC and FSYNC treated the same for now)
6175         this.log.sync(txid);
6176         break;
6177       }
6178     }
6179   }
6180 
6181   /**
6182    * check if current region is deferred sync enabled.
6183    */
6184   private boolean isDeferredLogSyncEnabled() {
6185     return (this.htableDescriptor.isDeferredLogFlush() && !this.deferredLogSyncDisabled);
6186   }
6187 
6188   /**
6189    * A mocked list implementaion - discards all updates.
6190    */
6191   private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {
6192 
6193     @Override
6194     public void add(int index, KeyValue element) {
6195       // do nothing
6196     }
6197 
6198     @Override
6199     public boolean addAll(int index, Collection<? extends KeyValue> c) {
6200       return false; // this list is never changed as a result of an update
6201     }
6202 
6203     @Override
6204     public KeyValue get(int index) {
6205       throw new UnsupportedOperationException();
6206     }
6207 
6208     @Override
6209     public int size() {
6210       return 0;
6211     }
6212   };
6213 
6214   /**
6215    * Facility for dumping and compacting catalog tables.
6216    * Only does catalog tables since these are only tables we for sure know
6217    * schema on.  For usage run:
6218    * <pre>
6219    *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
6220    * </pre>
6221    * @param args
6222    * @throws IOException
6223    */
6224   public static void main(String[] args) throws IOException {
6225     if (args.length < 1) {
6226       printUsageAndExit(null);
6227     }
6228     boolean majorCompact = false;
6229     if (args.length > 1) {
6230       if (!args[1].toLowerCase().startsWith("major")) {
6231         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
6232       }
6233       majorCompact = true;
6234     }
6235     final Path tableDir = new Path(args[0]);
6236     final Configuration c = HBaseConfiguration.create();
6237     final FileSystem fs = FileSystem.get(c);
6238     final Path logdir = new Path(c.get("hbase.tmp.dir"),
6239         "hlog" + tableDir.getName()
6240         + EnvironmentEdgeManager.currentTimeMillis());
6241     final Path oldLogDir = new Path(c.get("hbase.tmp.dir"),
6242         HConstants.HREGION_OLDLOGDIR_NAME);
6243     final HLog log = new HLog(fs, logdir, oldLogDir, c);
6244     try {
6245       processTable(fs, tableDir, log, c, majorCompact);
6246     } finally {
6247        log.close();
6248        // TODO: is this still right?
6249        BlockCache bc = new CacheConfig(c).getBlockCache();
6250        if (bc != null) bc.shutdown();
6251     }
6252   }
6253 
6254   /**
6255    * Listener class to enable callers of
6256    * bulkLoadHFile() to perform any necessary
6257    * pre/post processing of a given bulkload call
6258    */
6259   public static interface BulkLoadListener {
6260 
6261     /**
6262      * Called before an HFile is actually loaded
6263      * @param family family being loaded to
6264      * @param srcPath path of HFile
6265      * @return final path to be used for actual loading
6266      * @throws IOException
6267      */
6268     String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
6269 
6270     /**
6271      * Called after a successful HFile load
6272      * @param family family being loaded to
6273      * @param srcPath path of HFile
6274      * @throws IOException
6275      */
6276     void doneBulkLoad(byte[] family, String srcPath) throws IOException;
6277 
6278     /**
6279      * Called after a failed HFile load
6280      * @param family family being loaded to
6281      * @param srcPath path of HFile
6282      * @throws IOException
6283      */
6284     void failedBulkLoad(byte[] family, String srcPath) throws IOException;
6285 
6286   }
6287 }