View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.io.UnsupportedEncodingException;
25  import java.lang.reflect.Constructor;
26  import java.text.ParseException;
27  import java.util.AbstractList;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.NavigableMap;
36  import java.util.NavigableSet;
37  import java.util.Set;
38  import java.util.TreeMap;
39  import java.util.concurrent.Callable;
40  import java.util.concurrent.CompletionService;
41  import java.util.concurrent.ConcurrentHashMap;
42  import java.util.concurrent.ConcurrentSkipListMap;
43  import java.util.concurrent.CountDownLatch;
44  import java.util.concurrent.ExecutionException;
45  import java.util.concurrent.ExecutorCompletionService;
46  import java.util.concurrent.ExecutorService;
47  import java.util.concurrent.Executors;
48  import java.util.concurrent.Future;
49  import java.util.concurrent.FutureTask;
50  import java.util.concurrent.ThreadFactory;
51  import java.util.concurrent.ThreadPoolExecutor;
52  import java.util.concurrent.TimeUnit;
53  import java.util.concurrent.TimeoutException;
54  import java.util.concurrent.atomic.AtomicBoolean;
55  import java.util.concurrent.atomic.AtomicInteger;
56  import java.util.concurrent.atomic.AtomicLong;
57  import java.util.concurrent.locks.Lock;
58  import java.util.concurrent.locks.ReentrantReadWriteLock;
59  
60  import org.apache.commons.logging.Log;
61  import org.apache.commons.logging.LogFactory;
62  import org.apache.hadoop.classification.InterfaceAudience;
63  import org.apache.hadoop.conf.Configuration;
64  import org.apache.hadoop.fs.FileStatus;
65  import org.apache.hadoop.fs.FileSystem;
66  import org.apache.hadoop.fs.Path;
67  import org.apache.hadoop.hbase.Cell;
68  import org.apache.hadoop.hbase.CellUtil;
69  import org.apache.hadoop.hbase.CompoundConfiguration;
70  import org.apache.hadoop.hbase.DoNotRetryIOException;
71  import org.apache.hadoop.hbase.DroppedSnapshotException;
72  import org.apache.hadoop.hbase.HBaseConfiguration;
73  import org.apache.hadoop.hbase.HColumnDescriptor;
74  import org.apache.hadoop.hbase.HConstants;
75  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
76  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
77  import org.apache.hadoop.hbase.HRegionInfo;
78  import org.apache.hadoop.hbase.HTableDescriptor;
79  import org.apache.hadoop.hbase.KeyValue;
80  import org.apache.hadoop.hbase.KeyValueUtil;
81  import org.apache.hadoop.hbase.NotServingRegionException;
82  import org.apache.hadoop.hbase.RegionTooBusyException;
83  import org.apache.hadoop.hbase.TableName;
84  import org.apache.hadoop.hbase.UnknownScannerException;
85  import org.apache.hadoop.hbase.backup.HFileArchiver;
86  import org.apache.hadoop.hbase.client.Append;
87  import org.apache.hadoop.hbase.client.Delete;
88  import org.apache.hadoop.hbase.client.Durability;
89  import org.apache.hadoop.hbase.client.Get;
90  import org.apache.hadoop.hbase.client.Increment;
91  import org.apache.hadoop.hbase.client.IsolationLevel;
92  import org.apache.hadoop.hbase.client.Mutation;
93  import org.apache.hadoop.hbase.client.Put;
94  import org.apache.hadoop.hbase.client.Result;
95  import org.apache.hadoop.hbase.client.RowMutations;
96  import org.apache.hadoop.hbase.client.Scan;
97  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
98  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
99  import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
100 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
101 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
102 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
103 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
104 import org.apache.hadoop.hbase.filter.FilterWrapper;
105 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
106 import org.apache.hadoop.hbase.io.HeapSize;
107 import org.apache.hadoop.hbase.io.TimeRange;
108 import org.apache.hadoop.hbase.io.hfile.BlockCache;
109 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
110 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
111 import org.apache.hadoop.hbase.ipc.RpcCallContext;
112 import org.apache.hadoop.hbase.ipc.RpcServer;
113 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
114 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
115 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
116 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
117 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
118 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
119 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
120 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
121 import org.apache.hadoop.hbase.regionserver.wal.HLog;
122 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
123 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
124 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
125 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.MutationReplay;
126 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
127 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
128 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
129 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
130 import org.apache.hadoop.hbase.util.Bytes;
131 import org.apache.hadoop.hbase.util.CancelableProgressable;
132 import org.apache.hadoop.hbase.util.ClassSize;
133 import org.apache.hadoop.hbase.util.CompressionTest;
134 import org.apache.hadoop.hbase.util.Counter;
135 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
136 import org.apache.hadoop.hbase.util.FSUtils;
137 import org.apache.hadoop.hbase.util.HashedBytes;
138 import org.apache.hadoop.hbase.util.Pair;
139 import org.apache.hadoop.hbase.util.Threads;
140 import org.apache.hadoop.io.MultipleIOException;
141 import org.apache.hadoop.util.StringUtils;
142 
143 import com.google.common.annotations.VisibleForTesting;
144 import com.google.common.base.Preconditions;
145 import com.google.common.collect.Lists;
146 import com.google.common.collect.Maps;
147 import com.google.common.io.Closeables;
148 import com.google.protobuf.Descriptors;
149 import com.google.protobuf.Message;
150 import com.google.protobuf.RpcCallback;
151 import com.google.protobuf.RpcController;
152 import com.google.protobuf.Service;
153 
154 /**
155  * HRegion stores data for a certain region of a table.  It stores all columns
156  * for each row. A given table consists of one or more HRegions.
157  *
158  * <p>We maintain multiple HStores for a single HRegion.
159  *
160  * <p>An Store is a set of rows with some column data; together,
161  * they make up all the data for the rows.
162  *
163  * <p>Each HRegion has a 'startKey' and 'endKey'.
164  * <p>The first is inclusive, the second is exclusive (except for
165  * the final region)  The endKey of region 0 is the same as
166  * startKey for region 1 (if it exists).  The startKey for the
167  * first region is null. The endKey for the final region is null.
168  *
169  * <p>Locking at the HRegion level serves only one purpose: preventing the
170  * region from being closed (and consequently split) while other operations
171  * are ongoing. Each row level operation obtains both a row lock and a region
172  * read lock for the duration of the operation. While a scanner is being
173  * constructed, getScanner holds a read lock. If the scanner is successfully
174  * constructed, it holds a read lock until it is closed. A close takes out a
175  * write lock and consequently will block for ongoing operations and will block
176  * new operations from starting while the close is in progress.
177  *
178  * <p>An HRegion is defined by its table and its key extent.
179  *
180  * <p>It consists of at least one Store.  The number of Stores should be
181  * configurable, so that data which is accessed together is stored in the same
182  * Store.  Right now, we approximate that by building a single Store for
183  * each column family.  (This config info will be communicated via the
184  * tabledesc.)
185  *
186  * <p>The HTableDescriptor contains metainfo about the HRegion's table.
187  * regionName is a unique identifier for this HRegion. (startKey, endKey]
188  * defines the keyspace for this HRegion.
189  */
190 @InterfaceAudience.Private
191 public class HRegion implements HeapSize { // , Writable{
192   public static final Log LOG = LogFactory.getLog(HRegion.class);
193 
194   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
195       "hbase.hregion.scan.loadColumnFamiliesOnDemand";
196 
197   /**
198    * This is the global default value for durability. All tables/mutations not
199    * defining a durability or using USE_DEFAULT will default to this value.
200    */
201   private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL;
202 
203   final AtomicBoolean closed = new AtomicBoolean(false);
204   /* Closing can take some time; use the closing flag if there is stuff we don't
205    * want to do while in closing state; e.g. like offer this region up to the
206    * master as a region to close if the carrying regionserver is overloaded.
207    * Once set, it is never cleared.
208    */
209   final AtomicBoolean closing = new AtomicBoolean(false);
210 
211   /**
212    * The sequence id of the last flush on this region.  Used doing some rough calculations on
213    * whether time to flush or not.
214    */
215   protected volatile long lastFlushSeqId = -1L;
216 
217   /**
218    * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL/HLog
219    * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
220    * Its default value is -1L. This default is used as a marker to indicate
221    * that the region hasn't opened yet. Once it is opened, it is set to the derived
222    * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
223    * 
224    * <p>Control of this sequence is handed off to the WAL/HLog implementation.  It is responsible
225    * for tagging edits with the correct sequence id since it is responsible for getting the
226    * edits into the WAL files. It controls updating the sequence id value.  DO NOT UPDATE IT
227    * OUTSIDE OF THE WAL.  The value you get will not be what you think it is.
228    */
229   private final AtomicLong sequenceId = new AtomicLong(-1L);
230 
231   /**
232    * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
233    * startRegionOperation to possibly invoke different checks before any region operations. Not all
234    * operations have to be defined here. It's only needed when a special check is need in
235    * startRegionOperation
236    */
237   public enum Operation {
238     ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
239     REPLAY_BATCH_MUTATE, COMPACT_REGION
240   }
241 
242   //////////////////////////////////////////////////////////////////////////////
243   // Members
244   //////////////////////////////////////////////////////////////////////////////
245 
246   // map from a locked row to the context for that lock including:
247   // - CountDownLatch for threads waiting on that row
248   // - the thread that owns the lock (allow reentrancy)
249   // - reference count of (reentrant) locks held by the thread
250   // - the row itself
251   private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
252       new ConcurrentHashMap<HashedBytes, RowLockContext>();
253 
254   protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
255       Bytes.BYTES_RAWCOMPARATOR);
256 
257   // TODO: account for each registered handler in HeapSize computation
258   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
259 
260   public final AtomicLong memstoreSize = new AtomicLong(0);
261 
262   // Debug possible data loss due to WAL off
263   final Counter numMutationsWithoutWAL = new Counter();
264   final Counter dataInMemoryWithoutWAL = new Counter();
265 
266   // Debug why CAS operations are taking a while.
267   final Counter checkAndMutateChecksPassed = new Counter();
268   final Counter checkAndMutateChecksFailed = new Counter();
269 
270   //Number of requests
271   final Counter readRequestsCount = new Counter();
272   final Counter writeRequestsCount = new Counter();
273 
274   // Compaction counters
275   final AtomicLong compactionsFinished = new AtomicLong(0L);
276   final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
277   final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
278 
279 
280   private final HLog log;
281   private final HRegionFileSystem fs;
282   protected final Configuration conf;
283   private final Configuration baseConf;
284   private final KeyValue.KVComparator comparator;
285   private final int rowLockWaitDuration;
286   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
287 
288   // The internal wait duration to acquire a lock before read/update
289   // from the region. It is not per row. The purpose of this wait time
290   // is to avoid waiting a long time while the region is busy, so that
291   // we can release the IPC handler soon enough to improve the
292   // availability of the region server. It can be adjusted by
293   // tuning configuration "hbase.busy.wait.duration".
294   final long busyWaitDuration;
295   static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
296 
297   // If updating multiple rows in one call, wait longer,
298   // i.e. waiting for busyWaitDuration * # of rows. However,
299   // we can limit the max multiplier.
300   final int maxBusyWaitMultiplier;
301 
302   // Max busy wait duration. There is no point to wait longer than the RPC
303   // purge timeout, when a RPC call will be terminated by the RPC engine.
304   final long maxBusyWaitDuration;
305 
306   // negative number indicates infinite timeout
307   static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
308   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
309 
310   private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
311 
312   /**
313    * The sequence ID that was encountered when this region was opened.
314    */
315   private long openSeqNum = HConstants.NO_SEQNUM;
316 
317   /**
318    * The default setting for whether to enable on-demand CF loading for
319    * scan requests to this region. Requests can override it.
320    */
321   private boolean isLoadingCfsOnDemandDefault = false;
322 
323   private final AtomicInteger majorInProgress = new AtomicInteger(0);
324   private final AtomicInteger minorInProgress = new AtomicInteger(0);
325 
326   //
327   // Context: During replay we want to ensure that we do not lose any data. So, we
328   // have to be conservative in how we replay logs. For each store, we calculate
329   // the maxSeqId up to which the store was flushed. And, skip the edits which
330   // are equal to or lower than maxSeqId for each store.
331   // The following map is populated when opening the region
332   Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
333 
334   /**
335    * Config setting for whether to allow writes when a region is in recovering or not.
336    */
337   private boolean disallowWritesInRecovering = false;
338 
339   // when a region is in recovering state, it can only accept writes not reads
340   private volatile boolean isRecovering = false;
341 
342   /**
343    * @return The smallest mvcc readPoint across all the scanners in this
344    * region. Writes older than this readPoint, are included  in every
345    * read operation.
346    */
347   public long getSmallestReadPoint() {
348     long minimumReadPoint;
349     // We need to ensure that while we are calculating the smallestReadPoint
350     // no new RegionScanners can grab a readPoint that we are unaware of.
351     // We achieve this by synchronizing on the scannerReadPoints object.
352     synchronized(scannerReadPoints) {
353       minimumReadPoint = mvcc.memstoreReadPoint();
354 
355       for (Long readPoint: this.scannerReadPoints.values()) {
356         if (readPoint < minimumReadPoint) {
357           minimumReadPoint = readPoint;
358         }
359       }
360     }
361     return minimumReadPoint;
362   }
363   /*
364    * Data structure of write state flags used coordinating flushes,
365    * compactions and closes.
366    */
367   static class WriteState {
368     // Set while a memstore flush is happening.
369     volatile boolean flushing = false;
370     // Set when a flush has been requested.
371     volatile boolean flushRequested = false;
372     // Number of compactions running.
373     volatile int compacting = 0;
374     // Gets set in close. If set, cannot compact or flush again.
375     volatile boolean writesEnabled = true;
376     // Set if region is read-only
377     volatile boolean readOnly = false;
378 
379     /**
380      * Set flags that make this region read-only.
381      *
382      * @param onOff flip value for region r/o setting
383      */
384     synchronized void setReadOnly(final boolean onOff) {
385       this.writesEnabled = !onOff;
386       this.readOnly = onOff;
387     }
388 
389     boolean isReadOnly() {
390       return this.readOnly;
391     }
392 
393     boolean isFlushRequested() {
394       return this.flushRequested;
395     }
396 
397     static final long HEAP_SIZE = ClassSize.align(
398         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
399   }
400 
401   /**
402    * Objects from this class are created when flushing to describe all the different states that
403    * that method ends up in. The Result enum describes those states. The sequence id should only
404    * be specified if the flush was successful, and the failure message should only be specified
405    * if it didn't flush.
406    */
407   public static class FlushResult {
408     enum Result {
409       FLUSHED_NO_COMPACTION_NEEDED,
410       FLUSHED_COMPACTION_NEEDED,
411       // Special case where a flush didn't run because there's nothing in the memstores. Used when
412       // bulk loading to know when we can still load even if a flush didn't happen.
413       CANNOT_FLUSH_MEMSTORE_EMPTY,
414       CANNOT_FLUSH
415       // Be careful adding more to this enum, look at the below methods to make sure
416     }
417 
418     final Result result;
419     final String failureReason;
420     final long flushSequenceId;
421 
422     /**
423      * Convenience constructor to use when the flush is successful, the failure message is set to
424      * null.
425      * @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED.
426      * @param flushSequenceId Generated sequence id that comes right after the edits in the
427      *                        memstores.
428      */
429     FlushResult(Result result, long flushSequenceId) {
430       this(result, flushSequenceId, null);
431       assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
432           .FLUSHED_COMPACTION_NEEDED;
433     }
434 
435     /**
436      * Convenience constructor to use when we cannot flush.
437      * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
438      * @param failureReason Reason why we couldn't flush.
439      */
440     FlushResult(Result result, String failureReason) {
441       this(result, -1, failureReason);
442       assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
443     }
444 
445     /**
446      * Constructor with all the parameters.
447      * @param result Any of the Result.
448      * @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
449      * @param failureReason Reason why we couldn't flush, or null.
450      */
451     FlushResult(Result result, long flushSequenceId, String failureReason) {
452       this.result = result;
453       this.flushSequenceId = flushSequenceId;
454       this.failureReason = failureReason;
455     }
456 
457     /**
458      * Convenience method, the equivalent of checking if result is
459      * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
460      * @return true if the memstores were flushed, else false.
461      */
462     public boolean isFlushSucceeded() {
463       return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
464           .FLUSHED_COMPACTION_NEEDED;
465     }
466 
467     /**
468      * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
469      * @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
470      */
471     public boolean isCompactionNeeded() {
472       return result == Result.FLUSHED_COMPACTION_NEEDED;
473     }
474   }
475 
476   final WriteState writestate = new WriteState();
477 
478   long memstoreFlushSize;
479   final long timestampSlop;
480   final long rowProcessorTimeout;
481   private volatile long lastFlushTime;
482   final RegionServerServices rsServices;
483   private RegionServerAccounting rsAccounting;
484   private long flushCheckInterval;
485   // flushPerChanges is to prevent too many changes in memstore
486   private long flushPerChanges;
487   private long blockingMemStoreSize;
488   final long threadWakeFrequency;
489   // Used to guard closes
490   final ReentrantReadWriteLock lock =
491     new ReentrantReadWriteLock();
492 
493   // Stop updates lock
494   private final ReentrantReadWriteLock updatesLock =
495     new ReentrantReadWriteLock();
496   private boolean splitRequest;
497   private byte[] explicitSplitPoint = null;
498 
499   private final MultiVersionConsistencyControl mvcc =
500       new MultiVersionConsistencyControl();
501 
502   // Coprocessor host
503   private RegionCoprocessorHost coprocessorHost;
504 
505   private HTableDescriptor htableDescriptor = null;
506   private RegionSplitPolicy splitPolicy;
507 
508   private final MetricsRegion metricsRegion;
509   private final MetricsRegionWrapperImpl metricsRegionWrapper;
510   private final Durability durability;
511 
512   /**
513    * HRegion constructor. This constructor should only be used for testing and
514    * extensions.  Instances of HRegion should be instantiated with the
515    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
516    *
517    * @param tableDir qualified path of directory where region should be located,
518    * usually the table directory.
519    * @param log The HLog is the outbound log for any updates to the HRegion
520    * (There's a single HLog for all the HRegions on a single HRegionServer.)
521    * The log file is a logfile from the previous execution that's
522    * custom-computed for this HRegion. The HRegionServer computes and sorts the
523    * appropriate log info for this HRegion. If there is a previous log file
524    * (implying that the HRegion has been written-to before), then read it from
525    * the supplied path.
526    * @param fs is the filesystem.
527    * @param confParam is global configuration settings.
528    * @param regionInfo - HRegionInfo that describes the region
529    * is new), then read them from the supplied path.
530    * @param htd the table descriptor
531    * @param rsServices reference to {@link RegionServerServices} or null
532    */
533   @Deprecated
534   public HRegion(final Path tableDir, final HLog log, final FileSystem fs,
535       final Configuration confParam, final HRegionInfo regionInfo,
536       final HTableDescriptor htd, final RegionServerServices rsServices) {
537     this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
538       log, confParam, htd, rsServices);
539   }
540 
541   /**
542    * HRegion constructor. This constructor should only be used for testing and
543    * extensions.  Instances of HRegion should be instantiated with the
544    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
545    *
546    * @param fs is the filesystem.
547    * @param log The HLog is the outbound log for any updates to the HRegion
548    * (There's a single HLog for all the HRegions on a single HRegionServer.)
549    * The log file is a logfile from the previous execution that's
550    * custom-computed for this HRegion. The HRegionServer computes and sorts the
551    * appropriate log info for this HRegion. If there is a previous log file
552    * (implying that the HRegion has been written-to before), then read it from
553    * the supplied path.
554    * @param confParam is global configuration settings.
555    * @param htd the table descriptor
556    * @param rsServices reference to {@link RegionServerServices} or null
557    */
558   public HRegion(final HRegionFileSystem fs, final HLog log, final Configuration confParam,
559       final HTableDescriptor htd, final RegionServerServices rsServices) {
560     if (htd == null) {
561       throw new IllegalArgumentException("Need table descriptor");
562     }
563 
564     if (confParam instanceof CompoundConfiguration) {
565       throw new IllegalArgumentException("Need original base configuration");
566     }
567 
568     this.comparator = fs.getRegionInfo().getComparator();
569     this.log = log;
570     this.fs = fs;
571 
572     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
573     this.baseConf = confParam;
574     this.conf = new CompoundConfiguration()
575       .add(confParam)
576       .addStringMap(htd.getConfiguration())
577       .addWritableMap(htd.getValues());
578     this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
579         DEFAULT_CACHE_FLUSH_INTERVAL);
580     this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
581     if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
582       throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
583           + MAX_FLUSH_PER_CHANGES);
584     }
585 
586     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
587                     DEFAULT_ROWLOCK_WAIT_DURATION);
588 
589     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
590     this.htableDescriptor = htd;
591     this.rsServices = rsServices;
592     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
593     setHTableSpecificConf();
594     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
595 
596     this.busyWaitDuration = conf.getLong(
597       "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
598     this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
599     if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
600       throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
601         + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
602         + maxBusyWaitMultiplier + "). Their product should be positive");
603     }
604     this.maxBusyWaitDuration = conf.getLong("ipc.client.call.purge.timeout",
605       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
606 
607     /*
608      * timestamp.slop provides a server-side constraint on the timestamp. This
609      * assumes that you base your TS around currentTimeMillis(). In this case,
610      * throw an error to the user if the user-specified TS is newer than now +
611      * slop. LATEST_TIMESTAMP == don't use this functionality
612      */
613     this.timestampSlop = conf.getLong(
614         "hbase.hregion.keyvalue.timestamp.slop.millisecs",
615         HConstants.LATEST_TIMESTAMP);
616 
617     /**
618      * Timeout for the process time in processRowsWithLocks().
619      * Use -1 to switch off time bound.
620      */
621     this.rowProcessorTimeout = conf.getLong(
622         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
623     this.durability = htd.getDurability() == Durability.USE_DEFAULT
624         ? DEFAULT_DURABLITY
625         : htd.getDurability();
626     if (rsServices != null) {
627       this.rsAccounting = this.rsServices.getRegionServerAccounting();
628       // don't initialize coprocessors if not running within a regionserver
629       // TODO: revisit if coprocessors should load in other cases
630       this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
631       this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
632       this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
633 
634       Map<String, HRegion> recoveringRegions = rsServices.getRecoveringRegions();
635       String encodedName = getRegionInfo().getEncodedName();
636       if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
637         this.isRecovering = true;
638         recoveringRegions.put(encodedName, this);
639       }
640     } else {
641       this.metricsRegionWrapper = null;
642       this.metricsRegion = null;
643     }
644     if (LOG.isDebugEnabled()) {
645       // Write out region name as string and its encoded name.
646       LOG.debug("Instantiated " + this);
647     }
648 
649     // by default, we allow writes against a region when it's in recovering
650     this.disallowWritesInRecovering =
651         conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
652           HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
653   }
654 
655   void setHTableSpecificConf() {
656     if (this.htableDescriptor == null) return;
657     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
658 
659     if (flushSize <= 0) {
660       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
661         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
662     }
663     this.memstoreFlushSize = flushSize;
664     this.blockingMemStoreSize = this.memstoreFlushSize *
665         conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
666   }
667 
668   /**
669    * Initialize this region.
670    * Used only by tests and SplitTransaction to reopen the region.
671    * You should use createHRegion() or openHRegion()
672    * @return What the next sequence (edit) id should be.
673    * @throws IOException e
674    * @deprecated use HRegion.createHRegion() or HRegion.openHRegion()
675    */
676   @Deprecated
677   public long initialize() throws IOException {
678     return initialize(null);
679   }
680 
681   /**
682    * Initialize this region.
683    *
684    * @param reporter Tickle every so often if initialize is taking a while.
685    * @return What the next sequence (edit) id should be.
686    * @throws IOException e
687    */
688   private long initialize(final CancelableProgressable reporter) throws IOException {
689     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
690     long nextSeqId = -1;
691     try {
692       nextSeqId = initializeRegionInternals(reporter, status);
693       return nextSeqId;
694     } finally {
695       // nextSeqid will be -1 if the initialization fails.
696       // At least it will be 0 otherwise.
697       if (nextSeqId == -1) {
698         status
699             .abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
700       }
701     }
702   }
703 
704   private long initializeRegionInternals(final CancelableProgressable reporter,
705       final MonitoredTask status) throws IOException, UnsupportedEncodingException {
706     if (coprocessorHost != null) {
707       status.setStatus("Running coprocessor pre-open hook");
708       coprocessorHost.preOpen();
709     }
710 
711     // Write HRI to a file in case we need to recover hbase:meta
712     status.setStatus("Writing region info on filesystem");
713     fs.checkRegionInfoOnFilesystem();
714 
715     // Remove temporary data left over from old regions
716     status.setStatus("Cleaning up temporary data from old regions");
717     fs.cleanupTempDir();
718 
719     // Initialize all the HStores
720     status.setStatus("Initializing all the Stores");
721     long maxSeqId = initializeRegionStores(reporter, status);
722 
723     status.setStatus("Cleaning up detritus from prior splits");
724     // Get rid of any splits or merges that were lost in-progress.  Clean out
725     // these directories here on open.  We may be opening a region that was
726     // being split but we crashed in the middle of it all.
727     fs.cleanupAnySplitDetritus();
728     fs.cleanupMergesDir();
729 
730     this.writestate.setReadOnly(this.htableDescriptor.isReadOnly());
731     this.writestate.flushRequested = false;
732     this.writestate.compacting = 0;
733 
734     // Initialize split policy
735     this.splitPolicy = RegionSplitPolicy.create(this, conf);
736 
737     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
738     // Use maximum of log sequenceid or that which was found in stores
739     // (particularly if no recovered edits, seqid will be -1).
740     long nextSeqid = maxSeqId + 1;
741     if (this.isRecovering) {
742       // In distributedLogReplay mode, we don't know the last change sequence number because region
743       // is opened before recovery completes. So we add a safety bumper to avoid new sequence number
744       // overlaps used sequence numbers
745       nextSeqid += this.flushPerChanges + 10000000; // add another extra 10million
746     }
747     LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
748       "; next sequenceid=" + nextSeqid);
749 
750     // A region can be reopened if failed a split; reset flags
751     this.closing.set(false);
752     this.closed.set(false);
753 
754     this.lastFlushSeqId = nextSeqid;
755     if (coprocessorHost != null) {
756       status.setStatus("Running coprocessor post-open hooks");
757       coprocessorHost.postOpen();
758     }
759 
760     status.markComplete("Region opened successfully");
761     return nextSeqid;
762   }
763 
764   private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
765       throws IOException, UnsupportedEncodingException {
766     // Load in all the HStores.
767 
768     long maxSeqId = -1;
769     // initialized to -1 so that we pick up MemstoreTS from column families
770     long maxMemstoreTS = -1;
771 
772     if (!htableDescriptor.getFamilies().isEmpty()) {
773       // initialize the thread pool for opening stores in parallel.
774       ThreadPoolExecutor storeOpenerThreadPool =
775         getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
776       CompletionService<HStore> completionService =
777         new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
778 
779       // initialize each store in parallel
780       for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
781         status.setStatus("Instantiating store for column family " + family);
782         completionService.submit(new Callable<HStore>() {
783           @Override
784           public HStore call() throws IOException {
785             return instantiateHStore(family);
786           }
787         });
788       }
789       boolean allStoresOpened = false;
790       try {
791         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
792           Future<HStore> future = completionService.take();
793           HStore store = future.get();
794           this.stores.put(store.getColumnFamilyName().getBytes(), store);
795 
796           long storeMaxSequenceId = store.getMaxSequenceId();
797           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
798               storeMaxSequenceId);
799           if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
800             maxSeqId = storeMaxSequenceId;
801           }
802           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
803           if (maxStoreMemstoreTS > maxMemstoreTS) {
804             maxMemstoreTS = maxStoreMemstoreTS;
805           }
806         }
807         allStoresOpened = true;
808       } catch (InterruptedException e) {
809         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
810       } catch (ExecutionException e) {
811         throw new IOException(e.getCause());
812       } finally {
813         storeOpenerThreadPool.shutdownNow();
814         if (!allStoresOpened) {
815           // something went wrong, close all opened stores
816           LOG.error("Could not initialize all stores for the region=" + this);
817           for (Store store : this.stores.values()) {
818             try {
819               store.close();
820             } catch (IOException e) {
821               LOG.warn(e.getMessage());
822             }
823           }
824         }
825       }
826     }
827     // Recover any edits if available.
828     maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
829         this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
830     maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
831     mvcc.initialize(maxSeqId);
832     return maxSeqId;
833   }
834 
835   /**
836    * @return True if this region has references.
837    */
838   public boolean hasReferences() {
839     for (Store store : this.stores.values()) {
840       if (store.hasReferences()) return true;
841     }
842     return false;
843   }
844 
845   /**
846    * This function will return the HDFS blocks distribution based on the data
847    * captured when HFile is created
848    * @return The HDFS blocks distribution for the region.
849    */
850   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
851     HDFSBlocksDistribution hdfsBlocksDistribution =
852       new HDFSBlocksDistribution();
853     synchronized (this.stores) {
854       for (Store store : this.stores.values()) {
855         for (StoreFile sf : store.getStorefiles()) {
856           HDFSBlocksDistribution storeFileBlocksDistribution =
857             sf.getHDFSBlockDistribution();
858           hdfsBlocksDistribution.add(storeFileBlocksDistribution);
859         }
860       }
861     }
862     return hdfsBlocksDistribution;
863   }
864 
865   /**
866    * This is a helper function to compute HDFS block distribution on demand
867    * @param conf configuration
868    * @param tableDescriptor HTableDescriptor of the table
869    * @param regionInfo encoded name of the region
870    * @return The HDFS blocks distribution for the given region.
871    * @throws IOException
872    */
873   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
874       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
875     Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
876     return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
877   }
878 
879   /**
880    * This is a helper function to compute HDFS block distribution on demand
881    * @param conf configuration
882    * @param tableDescriptor HTableDescriptor of the table
883    * @param regionInfo encoded name of the region
884    * @param tablePath the table directory
885    * @return The HDFS blocks distribution for the given region.
886    * @throws IOException
887    */
888   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
889       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo,  Path tablePath)
890       throws IOException {
891     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
892     FileSystem fs = tablePath.getFileSystem(conf);
893 
894     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
895     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
896       Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
897       if (storeFiles == null) continue;
898 
899       for (StoreFileInfo storeFileInfo : storeFiles) {
900         hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
901       }
902     }
903     return hdfsBlocksDistribution;
904   }
905 
906   public AtomicLong getMemstoreSize() {
907     return memstoreSize;
908   }
909 
910   /**
911    * Increase the size of mem store in this region and the size of global mem
912    * store
913    * @return the size of memstore in this region
914    */
915   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
916     if (this.rsAccounting != null) {
917       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
918     }
919     return this.memstoreSize.addAndGet(memStoreSize);
920   }
921 
922   /** @return a HRegionInfo object for this region */
923   public HRegionInfo getRegionInfo() {
924     return this.fs.getRegionInfo();
925   }
926 
927   /**
928    * @return Instance of {@link RegionServerServices} used by this HRegion.
929    * Can be null.
930    */
931   RegionServerServices getRegionServerServices() {
932     return this.rsServices;
933   }
934 
935   /** @return readRequestsCount for this region */
936   long getReadRequestsCount() {
937     return this.readRequestsCount.get();
938   }
939 
940   /** @return writeRequestsCount for this region */
941   long getWriteRequestsCount() {
942     return this.writeRequestsCount.get();
943   }
944 
945   MetricsRegion getMetrics() {
946     return metricsRegion;
947   }
948 
949   /** @return true if region is closed */
950   public boolean isClosed() {
951     return this.closed.get();
952   }
953 
954   /**
955    * @return True if closing process has started.
956    */
957   public boolean isClosing() {
958     return this.closing.get();
959   }
960 
961   /**
962    * Reset recovering state of current region
963    */
964   public void setRecovering(boolean newState) {
965     boolean wasRecovering = this.isRecovering;
966     this.isRecovering = newState;
967     if (wasRecovering && !isRecovering) {
968       // Call only when log replay is over.
969       coprocessorHost.postLogReplay();
970     }
971   }
972 
973   /**
974    * @return True if current region is in recovering
975    */
976   public boolean isRecovering() {
977     return this.isRecovering;
978   }
979 
980   /** @return true if region is available (not closed and not closing) */
981   public boolean isAvailable() {
982     return !isClosed() && !isClosing();
983   }
984 
985   /** @return true if region is splittable */
986   public boolean isSplittable() {
987     return isAvailable() && !hasReferences();
988   }
989 
990   /**
991    * @return true if region is mergeable
992    */
993   public boolean isMergeable() {
994     if (!isAvailable()) {
995       LOG.debug("Region " + this.getRegionNameAsString()
996           + " is not mergeable because it is closing or closed");
997       return false;
998     }
999     if (hasReferences()) {
1000       LOG.debug("Region " + this.getRegionNameAsString()
1001           + " is not mergeable because it has references");
1002       return false;
1003     }
1004 
1005     return true;
1006   }
1007 
1008   public boolean areWritesEnabled() {
1009     synchronized(this.writestate) {
1010       return this.writestate.writesEnabled;
1011     }
1012   }
1013 
1014    public MultiVersionConsistencyControl getMVCC() {
1015      return mvcc;
1016    }
1017 
1018    /*
1019     * Returns readpoint considering given IsolationLevel
1020     */
1021    public long getReadpoint(IsolationLevel isolationLevel) {
1022      if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1023        // This scan can read even uncommitted transactions
1024        return Long.MAX_VALUE;
1025      }
1026      return mvcc.memstoreReadPoint();
1027    }
1028 
1029    public boolean isLoadingCfsOnDemandDefault() {
1030      return this.isLoadingCfsOnDemandDefault;
1031    }
1032 
1033   /**
1034    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
1035    * service any more calls.
1036    *
1037    * <p>This method could take some time to execute, so don't call it from a
1038    * time-sensitive thread.
1039    *
1040    * @return Vector of all the storage files that the HRegion's component
1041    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
1042    * vector if already closed and null if judged that it should not close.
1043    *
1044    * @throws IOException e
1045    */
1046   public Map<byte[], List<StoreFile>> close() throws IOException {
1047     return close(false);
1048   }
1049 
1050   private final Object closeLock = new Object();
1051 
1052   /** Conf key for the periodic flush interval */
1053   public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1054       "hbase.regionserver.optionalcacheflushinterval";
1055   /** Default interval for the memstore flush */
1056   public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1057 
1058   /** Conf key to force a flush if there are already enough changes for one region in memstore */
1059   public static final String MEMSTORE_FLUSH_PER_CHANGES =
1060       "hbase.regionserver.flush.per.changes";
1061   public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000; // 30 millions
1062   /**
1063    * The following MAX_FLUSH_PER_CHANGES is large enough because each KeyValue has 20+ bytes
1064    * overhead. Therefore, even 1G empty KVs occupy at least 20GB memstore size for a single region
1065    */
1066   public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G
1067 
1068   /**
1069    * Close down this HRegion.  Flush the cache unless abort parameter is true,
1070    * Shut down each HStore, don't service any more calls.
1071    *
1072    * This method could take some time to execute, so don't call it from a
1073    * time-sensitive thread.
1074    *
1075    * @param abort true if server is aborting (only during testing)
1076    * @return Vector of all the storage files that the HRegion's component
1077    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
1078    * we are not to close at this time or we are already closed.
1079    *
1080    * @throws IOException e
1081    */
1082   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1083     // Only allow one thread to close at a time. Serialize them so dual
1084     // threads attempting to close will run up against each other.
1085     MonitoredTask status = TaskMonitor.get().createStatus(
1086         "Closing region " + this +
1087         (abort ? " due to abort" : ""));
1088 
1089     status.setStatus("Waiting for close lock");
1090     try {
1091       synchronized (closeLock) {
1092         return doClose(abort, status);
1093       }
1094     } finally {
1095       status.cleanup();
1096     }
1097   }
1098 
1099   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1100       throws IOException {
1101     if (isClosed()) {
1102       LOG.warn("Region " + this + " already closed");
1103       return null;
1104     }
1105 
1106     if (coprocessorHost != null) {
1107       status.setStatus("Running coprocessor pre-close hooks");
1108       this.coprocessorHost.preClose(abort);
1109     }
1110 
1111     status.setStatus("Disabling compacts and flushes for region");
1112     synchronized (writestate) {
1113       // Disable compacting and flushing by background threads for this
1114       // region.
1115       writestate.writesEnabled = false;
1116       LOG.debug("Closing " + this + ": disabling compactions & flushes");
1117       waitForFlushesAndCompactions();
1118     }
1119     // If we were not just flushing, is it worth doing a preflush...one
1120     // that will clear out of the bulk of the memstore before we put up
1121     // the close flag?
1122     if (!abort && worthPreFlushing()) {
1123       status.setStatus("Pre-flushing region before close");
1124       LOG.info("Running close preflush of " + this.getRegionNameAsString());
1125       try {
1126         internalFlushcache(status);
1127       } catch (IOException ioe) {
1128         // Failed to flush the region. Keep going.
1129         status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1130       }
1131     }
1132 
1133     this.closing.set(true);
1134     status.setStatus("Disabling writes for close");
1135     // block waiting for the lock for closing
1136     lock.writeLock().lock();
1137     try {
1138       if (this.isClosed()) {
1139         status.abort("Already got closed by another process");
1140         // SplitTransaction handles the null
1141         return null;
1142       }
1143       LOG.debug("Updates disabled for region " + this);
1144       // Don't flush the cache if we are aborting
1145       if (!abort) {
1146         int flushCount = 0;
1147         while (this.getMemstoreSize().get() > 0) {
1148           try {
1149             if (flushCount++ > 0) {
1150               int actualFlushes = flushCount - 1;
1151               if (actualFlushes > 5) {
1152                 // If we tried 5 times and are unable to clear memory, abort
1153                 // so we do not lose data
1154                 throw new DroppedSnapshotException("Failed clearing memory after " +
1155                   actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
1156               }
1157               LOG.info("Running extra flush, " + actualFlushes +
1158                 " (carrying snapshot?) " + this);
1159             }
1160             internalFlushcache(status);
1161           } catch (IOException ioe) {
1162             status.setStatus("Failed flush " + this + ", putting online again");
1163             synchronized (writestate) {
1164               writestate.writesEnabled = true;
1165             }
1166             // Have to throw to upper layers.  I can't abort server from here.
1167             throw ioe;
1168           }
1169         }
1170       }
1171 
1172       Map<byte[], List<StoreFile>> result =
1173         new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1174       if (!stores.isEmpty()) {
1175         // initialize the thread pool for closing stores in parallel.
1176         ThreadPoolExecutor storeCloserThreadPool =
1177           getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
1178         CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1179           new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1180 
1181         // close each store in parallel
1182         for (final Store store : stores.values()) {
1183           assert abort || store.getFlushableSize() == 0;
1184           completionService
1185               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1186                 @Override
1187                 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1188                   return new Pair<byte[], Collection<StoreFile>>(
1189                     store.getFamily().getName(), store.close());
1190                 }
1191               });
1192         }
1193         try {
1194           for (int i = 0; i < stores.size(); i++) {
1195             Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1196             Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1197             List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1198             if (familyFiles == null) {
1199               familyFiles = new ArrayList<StoreFile>();
1200               result.put(storeFiles.getFirst(), familyFiles);
1201             }
1202             familyFiles.addAll(storeFiles.getSecond());
1203           }
1204         } catch (InterruptedException e) {
1205           throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1206         } catch (ExecutionException e) {
1207           throw new IOException(e.getCause());
1208         } finally {
1209           storeCloserThreadPool.shutdownNow();
1210         }
1211       }
1212       this.closed.set(true);
1213       if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
1214       if (coprocessorHost != null) {
1215         status.setStatus("Running coprocessor post-close hooks");
1216         this.coprocessorHost.postClose(abort);
1217       }
1218       if ( this.metricsRegion != null) {
1219         this.metricsRegion.close();
1220       }
1221       if ( this.metricsRegionWrapper != null) {
1222         Closeables.closeQuietly(this.metricsRegionWrapper);
1223       }
1224       status.markComplete("Closed");
1225       LOG.info("Closed " + this);
1226       return result;
1227     } finally {
1228       lock.writeLock().unlock();
1229     }
1230   }
1231 
1232   /**
1233    * Wait for all current flushes and compactions of the region to complete.
1234    * <p>
1235    * Exposed for TESTING.
1236    */
1237   public void waitForFlushesAndCompactions() {
1238     synchronized (writestate) {
1239       boolean interrupted = false;
1240       try {
1241         while (writestate.compacting > 0 || writestate.flushing) {
1242           LOG.debug("waiting for " + writestate.compacting + " compactions"
1243             + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1244           try {
1245             writestate.wait();
1246           } catch (InterruptedException iex) {
1247             // essentially ignore and propagate the interrupt back up
1248             LOG.warn("Interrupted while waiting");
1249             interrupted = true;
1250           }
1251         }
1252       } finally {
1253         if (interrupted) {
1254           Thread.currentThread().interrupt();
1255         }
1256       }
1257     }
1258   }
1259 
1260   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1261       final String threadNamePrefix) {
1262     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1263     int maxThreads = Math.min(numStores,
1264         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1265             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1266     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1267   }
1268 
1269   protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1270       final String threadNamePrefix) {
1271     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1272     int maxThreads = Math.max(1,
1273         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1274             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1275             / numStores);
1276     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1277   }
1278 
1279   static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1280       final String threadNamePrefix) {
1281     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1282       new ThreadFactory() {
1283         private int count = 1;
1284 
1285         @Override
1286         public Thread newThread(Runnable r) {
1287           return new Thread(r, threadNamePrefix + "-" + count++);
1288         }
1289       });
1290   }
1291 
1292    /**
1293     * @return True if its worth doing a flush before we put up the close flag.
1294     */
1295   private boolean worthPreFlushing() {
1296     return this.memstoreSize.get() >
1297       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1298   }
1299 
1300   //////////////////////////////////////////////////////////////////////////////
1301   // HRegion accessors
1302   //////////////////////////////////////////////////////////////////////////////
1303 
1304   /** @return start key for region */
1305   public byte [] getStartKey() {
1306     return this.getRegionInfo().getStartKey();
1307   }
1308 
1309   /** @return end key for region */
1310   public byte [] getEndKey() {
1311     return this.getRegionInfo().getEndKey();
1312   }
1313 
1314   /** @return region id */
1315   public long getRegionId() {
1316     return this.getRegionInfo().getRegionId();
1317   }
1318 
1319   /** @return region name */
1320   public byte [] getRegionName() {
1321     return this.getRegionInfo().getRegionName();
1322   }
1323 
1324   /** @return region name as string for logging */
1325   public String getRegionNameAsString() {
1326     return this.getRegionInfo().getRegionNameAsString();
1327   }
1328 
1329   /** @return HTableDescriptor for this region */
1330   public HTableDescriptor getTableDesc() {
1331     return this.htableDescriptor;
1332   }
1333 
1334   /** @return HLog in use for this region */
1335   public HLog getLog() {
1336     return this.log;
1337   }
1338 
1339   /**
1340    * A split takes the config from the parent region & passes it to the daughter
1341    * region's constructor. If 'conf' was passed, you would end up using the HTD
1342    * of the parent region in addition to the new daughter HTD. Pass 'baseConf'
1343    * to the daughter regions to avoid this tricky dedupe problem.
1344    * @return Configuration object
1345    */
1346   Configuration getBaseConf() {
1347     return this.baseConf;
1348   }
1349 
1350   /** @return {@link FileSystem} being used by this region */
1351   public FileSystem getFilesystem() {
1352     return fs.getFileSystem();
1353   }
1354 
1355   /** @return the {@link HRegionFileSystem} used by this region */
1356   public HRegionFileSystem getRegionFileSystem() {
1357     return this.fs;
1358   }
1359 
1360   /** @return the last time the region was flushed */
1361   public long getLastFlushTime() {
1362     return this.lastFlushTime;
1363   }
1364 
1365   //////////////////////////////////////////////////////////////////////////////
1366   // HRegion maintenance.
1367   //
1368   // These methods are meant to be called periodically by the HRegionServer for
1369   // upkeep.
1370   //////////////////////////////////////////////////////////////////////////////
1371 
1372   /** @return returns size of largest HStore. */
1373   public long getLargestHStoreSize() {
1374     long size = 0;
1375     for (Store h : stores.values()) {
1376       long storeSize = h.getSize();
1377       if (storeSize > size) {
1378         size = storeSize;
1379       }
1380     }
1381     return size;
1382   }
1383 
1384   /**
1385    * @return KeyValue Comparator
1386    */
1387   public KeyValue.KVComparator getComparator() {
1388     return this.comparator;
1389   }
1390 
1391   /*
1392    * Do preparation for pending compaction.
1393    * @throws IOException
1394    */
1395   protected void doRegionCompactionPrep() throws IOException {
1396   }
1397 
1398   void triggerMajorCompaction() {
1399     for (Store h : stores.values()) {
1400       h.triggerMajorCompaction();
1401     }
1402   }
1403 
1404   /**
1405    * This is a helper function that compact all the stores synchronously
1406    * It is used by utilities and testing
1407    *
1408    * @param majorCompaction True to force a major compaction regardless of thresholds
1409    * @throws IOException e
1410    */
1411   public void compactStores(final boolean majorCompaction)
1412   throws IOException {
1413     if (majorCompaction) {
1414       this.triggerMajorCompaction();
1415     }
1416     compactStores();
1417   }
1418 
1419   /**
1420    * This is a helper function that compact all the stores synchronously
1421    * It is used by utilities and testing
1422    *
1423    * @throws IOException e
1424    */
1425   public void compactStores() throws IOException {
1426     for (Store s : getStores().values()) {
1427       CompactionContext compaction = s.requestCompaction();
1428       if (compaction != null) {
1429         compact(compaction, s);
1430       }
1431     }
1432   }
1433 
1434   /*
1435    * Called by compaction thread and after region is opened to compact the
1436    * HStores if necessary.
1437    *
1438    * <p>This operation could block for a long time, so don't call it from a
1439    * time-sensitive thread.
1440    *
1441    * Note that no locking is necessary at this level because compaction only
1442    * conflicts with a region split, and that cannot happen because the region
1443    * server does them sequentially and not in parallel.
1444    *
1445    * @param cr Compaction details, obtained by requestCompaction()
1446    * @return whether the compaction completed
1447    * @throws IOException e
1448    */
1449   public boolean compact(CompactionContext compaction, Store store) throws IOException {
1450     assert compaction != null && compaction.hasSelection();
1451     assert !compaction.getRequest().getFiles().isEmpty();
1452     if (this.closing.get() || this.closed.get()) {
1453       LOG.debug("Skipping compaction on " + this + " because closing/closed");
1454       store.cancelRequestedCompaction(compaction);
1455       return false;
1456     }
1457     MonitoredTask status = null;
1458     boolean didPerformCompaction = false;
1459     // block waiting for the lock for compaction
1460     lock.readLock().lock();
1461     try {
1462       byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1463       if (stores.get(cf) != store) {
1464         LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1465             + " has been re-instantiated, cancel this compaction request. "
1466             + " It may be caused by the roll back of split transaction");
1467         return false;
1468       }
1469 
1470       status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1471       if (this.closed.get()) {
1472         String msg = "Skipping compaction on " + this + " because closed";
1473         LOG.debug(msg);
1474         status.abort(msg);
1475         return false;
1476       }
1477       boolean wasStateSet = false;
1478       try {
1479         synchronized (writestate) {
1480           if (writestate.writesEnabled) {
1481             wasStateSet = true;
1482             ++writestate.compacting;
1483           } else {
1484             String msg = "NOT compacting region " + this + ". Writes disabled.";
1485             LOG.info(msg);
1486             status.abort(msg);
1487             return false;
1488           }
1489         }
1490         LOG.info("Starting compaction on " + store + " in region " + this
1491             + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1492         doRegionCompactionPrep();
1493         try {
1494           status.setStatus("Compacting store " + store);
1495           didPerformCompaction = true;
1496           store.compact(compaction);
1497         } catch (InterruptedIOException iioe) {
1498           String msg = "compaction interrupted";
1499           LOG.info(msg, iioe);
1500           status.abort(msg);
1501           return false;
1502         }
1503       } finally {
1504         if (wasStateSet) {
1505           synchronized (writestate) {
1506             --writestate.compacting;
1507             if (writestate.compacting <= 0) {
1508               writestate.notifyAll();
1509             }
1510           }
1511         }
1512       }
1513       status.markComplete("Compaction complete");
1514       return true;
1515     } finally {
1516       try {
1517         if (!didPerformCompaction) store.cancelRequestedCompaction(compaction);
1518         if (status != null) status.cleanup();
1519       } finally {
1520         lock.readLock().unlock();
1521       }
1522     }
1523   }
1524 
1525   /**
1526    * Flush the cache.
1527    *
1528    * When this method is called the cache will be flushed unless:
1529    * <ol>
1530    *   <li>the cache is empty</li>
1531    *   <li>the region is closed.</li>
1532    *   <li>a flush is already in progress</li>
1533    *   <li>writes are disabled</li>
1534    * </ol>
1535    *
1536    * <p>This method may block for some time, so it should not be called from a
1537    * time-sensitive thread.
1538    *
1539    * @return true if the region needs compacting
1540    *
1541    * @throws IOException general io exceptions
1542    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1543    * because a Snapshot was not properly persisted.
1544    */
1545   public FlushResult flushcache() throws IOException {
1546     // fail-fast instead of waiting on the lock
1547     if (this.closing.get()) {
1548       String msg = "Skipping flush on " + this + " because closing";
1549       LOG.debug(msg);
1550       return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1551     }
1552     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1553     status.setStatus("Acquiring readlock on region");
1554     // block waiting for the lock for flushing cache
1555     lock.readLock().lock();
1556     try {
1557       if (this.closed.get()) {
1558         String msg = "Skipping flush on " + this + " because closed";
1559         LOG.debug(msg);
1560         status.abort(msg);
1561         return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1562       }
1563       if (coprocessorHost != null) {
1564         status.setStatus("Running coprocessor pre-flush hooks");
1565         coprocessorHost.preFlush();
1566       }
1567       if (numMutationsWithoutWAL.get() > 0) {
1568         numMutationsWithoutWAL.set(0);
1569         dataInMemoryWithoutWAL.set(0);
1570       }
1571       synchronized (writestate) {
1572         if (!writestate.flushing && writestate.writesEnabled) {
1573           this.writestate.flushing = true;
1574         } else {
1575           if (LOG.isDebugEnabled()) {
1576             LOG.debug("NOT flushing memstore for region " + this
1577                 + ", flushing=" + writestate.flushing + ", writesEnabled="
1578                 + writestate.writesEnabled);
1579           }
1580           String msg = "Not flushing since "
1581               + (writestate.flushing ? "already flushing"
1582               : "writes not enabled");
1583           status.abort(msg);
1584           return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1585         }
1586       }
1587       try {
1588         FlushResult fs = internalFlushcache(status);
1589 
1590         if (coprocessorHost != null) {
1591           status.setStatus("Running post-flush coprocessor hooks");
1592           coprocessorHost.postFlush();
1593         }
1594 
1595         status.markComplete("Flush successful");
1596         return fs;
1597       } finally {
1598         synchronized (writestate) {
1599           writestate.flushing = false;
1600           this.writestate.flushRequested = false;
1601           writestate.notifyAll();
1602         }
1603       }
1604     } finally {
1605       lock.readLock().unlock();
1606       status.cleanup();
1607     }
1608   }
1609 
1610   /**
1611    * Should the memstore be flushed now
1612    */
1613   boolean shouldFlush() {
1614     // This is a rough measure.
1615     if (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get()) {
1616       return true;
1617     }
1618     if (flushCheckInterval <= 0) { //disabled
1619       return false;
1620     }
1621     long now = EnvironmentEdgeManager.currentTimeMillis();
1622     //if we flushed in the recent past, we don't need to do again now
1623     if ((now - getLastFlushTime() < flushCheckInterval)) {
1624       return false;
1625     }
1626     //since we didn't flush in the recent past, flush now if certain conditions
1627     //are met. Return true on first such memstore hit.
1628     for (Store s : this.getStores().values()) {
1629       if (s.timeOfOldestEdit() < now - flushCheckInterval) {
1630         // we have an old enough edit in the memstore, flush
1631         return true;
1632       }
1633     }
1634     return false;
1635   }
1636 
1637   /**
1638    * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
1639    * memstore, all of which have also been written to the log. We need to write those updates in the
1640    * memstore out to disk, while being able to process reads/writes as much as possible during the
1641    * flush operation.
1642    * <p>This method may block for some time.  Every time you call it, we up the regions
1643    * sequence id even if we don't flush; i.e. the returned region id will be at least one larger
1644    * than the last edit applied to this region. The returned id does not refer to an actual edit.
1645    * The returned id can be used for say installing a bulk loaded file just ahead of the last hfile
1646    * that was the result of this flush, etc.
1647    * @return object describing the flush's state
1648    *
1649    * @throws IOException general io exceptions
1650    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1651    * because a Snapshot was not properly persisted.
1652    */
1653   protected FlushResult internalFlushcache(MonitoredTask status)
1654       throws IOException {
1655     return internalFlushcache(this.log, -1, status);
1656   }
1657 
1658   /**
1659    * @param wal Null if we're NOT to go via hlog/wal.
1660    * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
1661    * @return object describing the flush's state
1662    * @throws IOException
1663    * @see #internalFlushcache(MonitoredTask)
1664    */
1665   protected FlushResult internalFlushcache(
1666       final HLog wal, final long myseqid, MonitoredTask status)
1667   throws IOException {
1668     if (this.rsServices != null && this.rsServices.isAborted()) {
1669       // Don't flush when server aborting, it's unsafe
1670       throw new IOException("Aborting flush because server is aborted...");
1671     }
1672     final long startTime = EnvironmentEdgeManager.currentTimeMillis();
1673     // If nothing to flush, return, but we need to safely update the region sequence id
1674     if (this.memstoreSize.get() <= 0) {
1675       // Take an update lock because am about to change the sequence id and we want the sequence id
1676       // to be at the border of the empty memstore.
1677       this.updatesLock.writeLock().lock();
1678       try {
1679         if (this.memstoreSize.get() <= 0) {
1680           // Presume that if there are still no edits in the memstore, then there are no edits for
1681           // this region out in the WAL/HLog subsystem so no need to do any trickery clearing out
1682           // edits in the WAL system. Up the sequence number so the resulting flush id is for
1683           // sure just beyond the last appended region edit (useful as a marker when bulk loading,
1684           // etc.)
1685           // wal can be null replaying edits.
1686           return wal != null?
1687             new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
1688               getNextSequenceId(wal), "Nothing to flush"):
1689             new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
1690         }
1691       } finally {
1692         this.updatesLock.writeLock().unlock();
1693       }
1694     }
1695     if (LOG.isDebugEnabled()) {
1696       LOG.debug("Started memstore flush for " + this +
1697         ", current region memstore size " +
1698         StringUtils.byteDesc(this.memstoreSize.get()) +
1699         ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
1700     }
1701 
1702     // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
1703     // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
1704     // allow updates again so its value will represent the size of the updates received
1705     // during flush
1706     MultiVersionConsistencyControl.WriteEntry w = null;
1707 
1708     // We have to take an update lock during snapshot, or else a write could end up in both snapshot
1709     // and memstore (makes it difficult to do atomic rows then)
1710     status.setStatus("Obtaining lock to block concurrent updates");
1711     // block waiting for the lock for internal flush
1712     this.updatesLock.writeLock().lock();
1713     long totalFlushableSize = 0;
1714     status.setStatus("Preparing to flush by snapshotting stores in " +
1715       getRegionInfo().getEncodedName());
1716     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
1717     long flushSeqId = -1L;
1718 
1719     try {
1720       try {
1721         w = mvcc.beginMemstoreInsert();
1722         if (wal != null) {
1723           if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
1724             // This should never happen.
1725             String msg = "Flush will not be started for ["
1726                 + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
1727             status.setStatus(msg);
1728             return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1729           }
1730           // Get a sequence id that we can use to denote the flush. It will be one beyond the last
1731           // edit that made it into the hfile (the below does not add an edit, it just asks the
1732           // WAL system to return next sequence edit).
1733           flushSeqId = getNextSequenceId(wal);
1734         } else {
1735           // use the provided sequence Id as WAL is not being used for this flush.
1736           flushSeqId = myseqid;
1737         }
1738 
1739         for (Store s : stores.values()) {
1740           totalFlushableSize += s.getFlushableSize();
1741           storeFlushCtxs.add(s.createFlushContext(flushSeqId));
1742         }
1743 
1744         // Prepare flush (take a snapshot)
1745         for (StoreFlushContext flush : storeFlushCtxs) {
1746           flush.prepare();
1747         }
1748       } finally {
1749         this.updatesLock.writeLock().unlock();
1750       }
1751       String s = "Finished memstore snapshotting " + this +
1752         ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
1753       status.setStatus(s);
1754       if (LOG.isTraceEnabled()) LOG.trace(s);
1755       // sync unflushed WAL changes when deferred log sync is enabled
1756       // see HBASE-8208 for details
1757       if (wal != null && !shouldSyncLog()) wal.sync();
1758 
1759       // wait for all in-progress transactions to commit to HLog before
1760       // we can start the flush. This prevents
1761       // uncommitted transactions from being written into HFiles.
1762       // We have to block before we start the flush, otherwise keys that
1763       // were removed via a rollbackMemstore could be written to Hfiles.
1764       mvcc.waitForPreviousTransactionsComplete(w);
1765       // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
1766       w = null;
1767       s = "Flushing stores of " + this;
1768       status.setStatus(s);
1769       if (LOG.isTraceEnabled()) LOG.trace(s);
1770     } finally {
1771       if (w != null) {
1772         // in case of failure just mark current w as complete
1773         mvcc.advanceMemstore(w);
1774       }
1775     }
1776 
1777     // Any failure from here on out will be catastrophic requiring server
1778     // restart so hlog content can be replayed and put back into the memstore.
1779     // Otherwise, the snapshot content while backed up in the hlog, it will not
1780     // be part of the current running servers state.
1781     boolean compactionRequested = false;
1782     try {
1783       // A.  Flush memstore to all the HStores.
1784       // Keep running vector of all store files that includes both old and the
1785       // just-made new flush store file. The new flushed file is still in the
1786       // tmp directory.
1787 
1788       for (StoreFlushContext flush : storeFlushCtxs) {
1789         flush.flushCache(status);
1790       }
1791 
1792       // Switch snapshot (in memstore) -> new hfile (thus causing
1793       // all the store scanners to reset/reseek).
1794       for (StoreFlushContext flush : storeFlushCtxs) {
1795         boolean needsCompaction = flush.commit(status);
1796         if (needsCompaction) {
1797           compactionRequested = true;
1798         }
1799       }
1800       storeFlushCtxs.clear();
1801 
1802       // Set down the memstore size by amount of flush.
1803       this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
1804     } catch (Throwable t) {
1805       // An exception here means that the snapshot was not persisted.
1806       // The hlog needs to be replayed so its content is restored to memstore.
1807       // Currently, only a server restart will do this.
1808       // We used to only catch IOEs but its possible that we'd get other
1809       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
1810       // all and sundry.
1811       if (wal != null) {
1812         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1813       }
1814       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
1815           Bytes.toStringBinary(getRegionName()));
1816       dse.initCause(t);
1817       status.abort("Flush failed: " + StringUtils.stringifyException(t));
1818       throw dse;
1819     }
1820 
1821     // If we get to here, the HStores have been written.
1822     if (wal != null) {
1823       wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1824     }
1825 
1826     // Record latest flush time
1827     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
1828 
1829     // Update the last flushed sequence id for region. TODO: This is dup'd inside the WAL/FSHlog.
1830     this.lastFlushSeqId = flushSeqId;
1831 
1832     // C. Finally notify anyone waiting on memstore to clear:
1833     // e.g. checkResources().
1834     synchronized (this) {
1835       notifyAll(); // FindBugs NN_NAKED_NOTIFY
1836     }
1837 
1838     long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
1839     long memstoresize = this.memstoreSize.get();
1840     String msg = "Finished memstore flush of ~" +
1841       StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize +
1842       ", currentsize=" +
1843       StringUtils.byteDesc(memstoresize) + "/" + memstoresize +
1844       " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
1845       ", compaction requested=" + compactionRequested +
1846       ((wal == null)? "; wal=null": "");
1847     LOG.info(msg);
1848     status.setStatus(msg);
1849 
1850     return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
1851         FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
1852   }
1853 
1854   /**
1855    * Method to safely get the next sequence number.
1856    * @return Next sequence number unassociated with any actual edit.
1857    * @throws IOException
1858    */
1859   private long getNextSequenceId(final HLog wal) throws IOException {
1860     HLogKey key = this.appendNoSyncNoAppend(wal, null);
1861     return key.getSequenceNumber();
1862   }
1863 
1864   //////////////////////////////////////////////////////////////////////////////
1865   // get() methods for client use.
1866   //////////////////////////////////////////////////////////////////////////////
1867   /**
1868    * Return all the data for the row that matches <i>row</i> exactly,
1869    * or the one that immediately preceeds it, at or immediately before
1870    * <i>ts</i>.
1871    *
1872    * @param row row key
1873    * @return map of values
1874    * @throws IOException
1875    */
1876   Result getClosestRowBefore(final byte [] row)
1877   throws IOException{
1878     return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
1879   }
1880 
1881   /**
1882    * Return all the data for the row that matches <i>row</i> exactly,
1883    * or the one that immediately precedes it, at or immediately before
1884    * <i>ts</i>.
1885    *
1886    * @param row row key
1887    * @param family column family to find on
1888    * @return map of values
1889    * @throws IOException read exceptions
1890    */
1891   public Result getClosestRowBefore(final byte [] row, final byte [] family)
1892   throws IOException {
1893     if (coprocessorHost != null) {
1894       Result result = new Result();
1895       if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
1896         return result;
1897       }
1898     }
1899     // look across all the HStores for this region and determine what the
1900     // closest key is across all column families, since the data may be sparse
1901     checkRow(row, "getClosestRowBefore");
1902     startRegionOperation(Operation.GET);
1903     this.readRequestsCount.increment();
1904     try {
1905       Store store = getStore(family);
1906       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
1907       KeyValue key = store.getRowKeyAtOrBefore(row);
1908       Result result = null;
1909       if (key != null) {
1910         Get get = new Get(CellUtil.cloneRow(key));
1911         get.addFamily(family);
1912         result = get(get);
1913       }
1914       if (coprocessorHost != null) {
1915         coprocessorHost.postGetClosestRowBefore(row, family, result);
1916       }
1917       return result;
1918     } finally {
1919       closeRegionOperation(Operation.GET);
1920     }
1921   }
1922 
1923   /**
1924    * Return an iterator that scans over the HRegion, returning the indicated
1925    * columns and rows specified by the {@link Scan}.
1926    * <p>
1927    * This Iterator must be closed by the caller.
1928    *
1929    * @param scan configured {@link Scan}
1930    * @return RegionScanner
1931    * @throws IOException read exceptions
1932    */
1933   public RegionScanner getScanner(Scan scan) throws IOException {
1934    return getScanner(scan, null);
1935   }
1936 
1937   void prepareScanner(Scan scan) throws IOException {
1938     if(!scan.hasFamilies()) {
1939       // Adding all families to scanner
1940       for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
1941         scan.addFamily(family);
1942       }
1943     }
1944   }
1945 
1946   protected RegionScanner getScanner(Scan scan,
1947       List<KeyValueScanner> additionalScanners) throws IOException {
1948     startRegionOperation(Operation.SCAN);
1949     try {
1950       // Verify families are all valid
1951       prepareScanner(scan);
1952       if(scan.hasFamilies()) {
1953         for(byte [] family : scan.getFamilyMap().keySet()) {
1954           checkFamily(family);
1955         }
1956       }
1957       return instantiateRegionScanner(scan, additionalScanners);
1958     } finally {
1959       closeRegionOperation(Operation.SCAN);
1960     }
1961   }
1962 
1963   protected RegionScanner instantiateRegionScanner(Scan scan,
1964       List<KeyValueScanner> additionalScanners) throws IOException {
1965     if (scan.isReversed()) {
1966       if (scan.getFilter() != null) {
1967         scan.getFilter().setReversed(true);
1968       }
1969       return new ReversedRegionScannerImpl(scan, additionalScanners, this);
1970     }
1971     return new RegionScannerImpl(scan, additionalScanners, this);
1972   }
1973 
1974   /*
1975    * @param delete The passed delete is modified by this method. WARNING!
1976    */
1977   void prepareDelete(Delete delete) throws IOException {
1978     // Check to see if this is a deleteRow insert
1979     if(delete.getFamilyCellMap().isEmpty()){
1980       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
1981         // Don't eat the timestamp
1982         delete.deleteFamily(family, delete.getTimeStamp());
1983       }
1984     } else {
1985       for(byte [] family : delete.getFamilyCellMap().keySet()) {
1986         if(family == null) {
1987           throw new NoSuchColumnFamilyException("Empty family is invalid");
1988         }
1989         checkFamily(family);
1990       }
1991     }
1992   }
1993 
1994   //////////////////////////////////////////////////////////////////////////////
1995   // set() methods for client use.
1996   //////////////////////////////////////////////////////////////////////////////
1997   /**
1998    * @param delete delete object
1999    * @throws IOException read exceptions
2000    */
2001   public void delete(Delete delete)
2002   throws IOException {
2003     checkReadOnly();
2004     checkResources();
2005     startRegionOperation(Operation.DELETE);
2006     try {
2007       delete.getRow();
2008       // All edits for the given row (across all column families) must happen atomically.
2009       doBatchMutate(delete);
2010     } finally {
2011       closeRegionOperation(Operation.DELETE);
2012     }
2013   }
2014 
2015   /**
2016    * Row needed by below method.
2017    */
2018   private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2019   /**
2020    * This is used only by unit tests. Not required to be a public API.
2021    * @param familyMap map of family to edits for the given family.
2022    * @throws IOException
2023    */
2024   void delete(NavigableMap<byte[], List<Cell>> familyMap,
2025       Durability durability) throws IOException {
2026     Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2027     delete.setFamilyCellMap(familyMap);
2028     delete.setDurability(durability);
2029     doBatchMutate(delete);
2030   }
2031 
2032   /**
2033    * Setup correct timestamps in the KVs in Delete object.
2034    * Caller should have the row and region locks.
2035    * @param mutation
2036    * @param familyMap
2037    * @param byteNow
2038    * @throws IOException
2039    */
2040   void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2041       byte[] byteNow) throws IOException {
2042     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2043 
2044       byte[] family = e.getKey();
2045       List<Cell> cells = e.getValue();
2046       Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2047 
2048       for (Cell cell: cells) {
2049         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2050         //  Check if time is LATEST, change to time of most recent addition if so
2051         //  This is expensive.
2052         if (kv.isLatestTimestamp() && kv.isDeleteType()) {
2053           byte[] qual = CellUtil.cloneQualifier(kv);
2054           if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2055 
2056           Integer count = kvCount.get(qual);
2057           if (count == null) {
2058             kvCount.put(qual, 1);
2059           } else {
2060             kvCount.put(qual, count + 1);
2061           }
2062           count = kvCount.get(qual);
2063 
2064           Get get = new Get(CellUtil.cloneRow(kv));
2065           get.setMaxVersions(count);
2066           get.addColumn(family, qual);
2067           if (coprocessorHost != null) {
2068             if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell, byteNow,
2069                 get)) {
2070               updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
2071             }
2072           } else {
2073             updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
2074           }
2075         } else {
2076           kv.updateLatestStamp(byteNow);
2077         }
2078       }
2079     }
2080   }
2081 
2082   void updateDeleteLatestVersionTimeStamp(KeyValue kv, Get get, int count, byte[] byteNow)
2083       throws IOException {
2084     List<Cell> result = get(get, false);
2085 
2086     if (result.size() < count) {
2087       // Nothing to delete
2088       kv.updateLatestStamp(byteNow);
2089       return;
2090     }
2091     if (result.size() > count) {
2092       throw new RuntimeException("Unexpected size: " + result.size());
2093     }
2094     KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
2095     Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), getkv.getBuffer(),
2096         getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
2097   }
2098 
2099   /**
2100    * @throws IOException
2101    */
2102   public void put(Put put)
2103   throws IOException {
2104     checkReadOnly();
2105 
2106     // Do a rough check that we have resources to accept a write.  The check is
2107     // 'rough' in that between the resource check and the call to obtain a
2108     // read lock, resources may run out.  For now, the thought is that this
2109     // will be extremely rare; we'll deal with it when it happens.
2110     checkResources();
2111     startRegionOperation(Operation.PUT);
2112     try {
2113       // All edits for the given row (across all column families) must happen atomically.
2114       doBatchMutate(put);
2115     } finally {
2116       closeRegionOperation(Operation.PUT);
2117     }
2118   }
2119 
2120   /**
2121    * Struct-like class that tracks the progress of a batch operation,
2122    * accumulating status codes and tracking the index at which processing
2123    * is proceeding.
2124    */
2125   private abstract static class BatchOperationInProgress<T> {
2126     T[] operations;
2127     int nextIndexToProcess = 0;
2128     OperationStatus[] retCodeDetails;
2129     WALEdit[] walEditsFromCoprocessors;
2130 
2131     public BatchOperationInProgress(T[] operations) {
2132       this.operations = operations;
2133       this.retCodeDetails = new OperationStatus[operations.length];
2134       this.walEditsFromCoprocessors = new WALEdit[operations.length];
2135       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2136     }
2137 
2138     public abstract Mutation getMutation(int index);
2139     public abstract long getNonceGroup(int index);
2140     public abstract long getNonce(int index);
2141     /** This method is potentially expensive and should only be used for non-replay CP path. */
2142     public abstract Mutation[] getMutationsForCoprocs();
2143     public abstract boolean isInReplay();
2144 
2145     public boolean isDone() {
2146       return nextIndexToProcess == operations.length;
2147     }
2148   }
2149 
2150   private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2151     private long nonceGroup;
2152     private long nonce;
2153     public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2154       super(operations);
2155       this.nonceGroup = nonceGroup;
2156       this.nonce = nonce;
2157     }
2158 
2159     public Mutation getMutation(int index) {
2160       return this.operations[index];
2161     }
2162 
2163     @Override
2164     public long getNonceGroup(int index) {
2165       return nonceGroup;
2166     }
2167 
2168     @Override
2169     public long getNonce(int index) {
2170       return nonce;
2171     }
2172 
2173     @Override
2174     public Mutation[] getMutationsForCoprocs() {
2175       return this.operations;
2176     }
2177 
2178     @Override
2179     public boolean isInReplay() {
2180       return false;
2181     }
2182   }
2183 
2184   private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
2185     public ReplayBatch(MutationReplay[] operations) {
2186       super(operations);
2187     }
2188 
2189     @Override
2190     public Mutation getMutation(int index) {
2191       return this.operations[index].mutation;
2192     }
2193 
2194     @Override
2195     public long getNonceGroup(int index) {
2196       return this.operations[index].nonceGroup;
2197     }
2198 
2199     @Override
2200     public long getNonce(int index) {
2201       return this.operations[index].nonce;
2202     }
2203 
2204     @Override
2205     public Mutation[] getMutationsForCoprocs() {
2206       assert false;
2207       throw new RuntimeException("Should not be called for replay batch");
2208     }
2209 
2210     @Override
2211     public boolean isInReplay() {
2212       return true;
2213     }
2214   }
2215 
2216   /**
2217    * Perform a batch of mutations.
2218    * It supports only Put and Delete mutations and will ignore other types passed.
2219    * @param mutations the list of mutations
2220    * @return an array of OperationStatus which internally contains the
2221    *         OperationStatusCode and the exceptionMessage if any.
2222    * @throws IOException
2223    */
2224   public OperationStatus[] batchMutate(
2225       Mutation[] mutations, long nonceGroup, long nonce) throws IOException {
2226     // As it stands, this is used for 3 things
2227     //  * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
2228     //  * coprocessor calls (see ex. BulkDeleteEndpoint).
2229     // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
2230     return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2231   }
2232 
2233   public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2234     return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2235   }
2236 
2237   /**
2238    * Replay a batch of mutations.
2239    * @param mutations mutations to replay.
2240    * @return an array of OperationStatus which internally contains the
2241    *         OperationStatusCode and the exceptionMessage if any.
2242    * @throws IOException
2243    */
2244   public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations)
2245       throws IOException {
2246     return batchMutate(new ReplayBatch(mutations));
2247   }
2248 
2249   /**
2250    * Perform a batch of mutations.
2251    * It supports only Put and Delete mutations and will ignore other types passed.
2252    * @param mutations the list of mutations
2253    * @return an array of OperationStatus which internally contains the
2254    *         OperationStatusCode and the exceptionMessage if any.
2255    * @throws IOException
2256    */
2257   OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2258     boolean initialized = false;
2259     while (!batchOp.isDone()) {
2260       if (!batchOp.isInReplay()) {
2261         checkReadOnly();
2262       }
2263       checkResources();
2264 
2265       long newSize;
2266       Operation op = Operation.BATCH_MUTATE;
2267       if (batchOp.isInReplay()) op = Operation.REPLAY_BATCH_MUTATE;
2268       startRegionOperation(op);
2269 
2270       try {
2271         if (!initialized) {
2272           this.writeRequestsCount.add(batchOp.operations.length);
2273           if (!batchOp.isInReplay()) {
2274             doPreMutationHook(batchOp);
2275           }
2276           initialized = true;
2277         }
2278         long addedSize = doMiniBatchMutation(batchOp);
2279         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2280       } finally {
2281         closeRegionOperation(op);
2282       }
2283       if (isFlushSize(newSize)) {
2284         requestFlush();
2285       }
2286     }
2287     return batchOp.retCodeDetails;
2288   }
2289 
2290 
2291   private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2292       throws IOException {
2293     /* Run coprocessor pre hook outside of locks to avoid deadlock */
2294     WALEdit walEdit = new WALEdit();
2295     if (coprocessorHost != null) {
2296       for (int i = 0 ; i < batchOp.operations.length; i++) {
2297         Mutation m = batchOp.getMutation(i);
2298         if (m instanceof Put) {
2299           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2300             // pre hook says skip this Put
2301             // mark as success and skip in doMiniBatchMutation
2302             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2303           }
2304         } else if (m instanceof Delete) {
2305           Delete curDel = (Delete) m;
2306           if (curDel.getFamilyCellMap().isEmpty()) {
2307             // handle deleting a row case
2308             prepareDelete(curDel);
2309           }
2310           if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2311             // pre hook says skip this Delete
2312             // mark as success and skip in doMiniBatchMutation
2313             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2314           }
2315         } else {
2316           // In case of passing Append mutations along with the Puts and Deletes in batchMutate
2317           // mark the operation return code as failure so that it will not be considered in
2318           // the doMiniBatchMutation
2319           batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2320               "Put/Delete mutations only supported in batchMutate() now");
2321         }
2322         if (!walEdit.isEmpty()) {
2323           batchOp.walEditsFromCoprocessors[i] = walEdit;
2324           walEdit = new WALEdit();
2325         }
2326       }
2327     }
2328   }
2329 
2330   @SuppressWarnings("unchecked")
2331   private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2332     boolean isInReplay = batchOp.isInReplay();
2333     // variable to note if all Put items are for the same CF -- metrics related
2334     boolean putsCfSetConsistent = true;
2335     //The set of columnFamilies first seen for Put.
2336     Set<byte[]> putsCfSet = null;
2337     // variable to note if all Delete items are for the same CF -- metrics related
2338     boolean deletesCfSetConsistent = true;
2339     //The set of columnFamilies first seen for Delete.
2340     Set<byte[]> deletesCfSet = null;
2341 
2342     long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2343     WALEdit walEdit = new WALEdit(isInReplay);
2344     MultiVersionConsistencyControl.WriteEntry w = null;
2345     long txid = 0;
2346     boolean doRollBackMemstore = false;
2347     boolean locked = false;
2348 
2349     /** Keep track of the locks we hold so we can release them in finally clause */
2350     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2351     // reference family maps directly so coprocessors can mutate them if desired
2352     Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2353     List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
2354     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
2355     int firstIndex = batchOp.nextIndexToProcess;
2356     int lastIndexExclusive = firstIndex;
2357     boolean success = false;
2358     int noOfPuts = 0, noOfDeletes = 0;
2359     HLogKey walKey = null;
2360     long mvccNum = 0;
2361     try {
2362       // ------------------------------------
2363       // STEP 1. Try to acquire as many locks as we can, and ensure
2364       // we acquire at least one.
2365       // ----------------------------------
2366       int numReadyToWrite = 0;
2367       long now = EnvironmentEdgeManager.currentTimeMillis();
2368       while (lastIndexExclusive < batchOp.operations.length) {
2369         Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2370         boolean isPutMutation = mutation instanceof Put;
2371 
2372         Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2373         // store the family map reference to allow for mutations
2374         familyMaps[lastIndexExclusive] = familyMap;
2375 
2376         // skip anything that "ran" already
2377         if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2378             != OperationStatusCode.NOT_RUN) {
2379           lastIndexExclusive++;
2380           continue;
2381         }
2382 
2383         try {
2384           if (isPutMutation) {
2385             // Check the families in the put. If bad, skip this one.
2386             if (isInReplay) {
2387               removeNonExistentColumnFamilyForReplay(familyMap);
2388             } else {
2389               checkFamilies(familyMap.keySet());
2390             }
2391             checkTimestamps(mutation.getFamilyCellMap(), now);
2392           } else {
2393             prepareDelete((Delete) mutation);
2394           }
2395         } catch (NoSuchColumnFamilyException nscf) {
2396           LOG.warn("No such column family in batch mutation", nscf);
2397           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2398               OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2399           lastIndexExclusive++;
2400           continue;
2401         } catch (FailedSanityCheckException fsce) {
2402           LOG.warn("Batch Mutation did not pass sanity check", fsce);
2403           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2404               OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2405           lastIndexExclusive++;
2406           continue;
2407         }
2408 
2409         // If we haven't got any rows in our batch, we should block to
2410         // get the next one.
2411         boolean shouldBlock = numReadyToWrite == 0;
2412         RowLock rowLock = null;
2413         try {
2414           rowLock = getRowLock(mutation.getRow(), shouldBlock);
2415         } catch (IOException ioe) {
2416           LOG.warn("Failed getting lock in batch put, row="
2417             + Bytes.toStringBinary(mutation.getRow()), ioe);
2418         }
2419         if (rowLock == null) {
2420           // We failed to grab another lock
2421           assert !shouldBlock : "Should never fail to get lock when blocking";
2422           break; // stop acquiring more rows for this batch
2423         } else {
2424           acquiredRowLocks.add(rowLock);
2425         }
2426 
2427         lastIndexExclusive++;
2428         numReadyToWrite++;
2429 
2430         if (isPutMutation) {
2431           // If Column Families stay consistent through out all of the
2432           // individual puts then metrics can be reported as a mutliput across
2433           // column families in the first put.
2434           if (putsCfSet == null) {
2435             putsCfSet = mutation.getFamilyCellMap().keySet();
2436           } else {
2437             putsCfSetConsistent = putsCfSetConsistent
2438                 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
2439           }
2440         } else {
2441           if (deletesCfSet == null) {
2442             deletesCfSet = mutation.getFamilyCellMap().keySet();
2443           } else {
2444             deletesCfSetConsistent = deletesCfSetConsistent
2445                 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
2446           }
2447         }
2448       }
2449 
2450       // we should record the timestamp only after we have acquired the rowLock,
2451       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
2452       now = EnvironmentEdgeManager.currentTimeMillis();
2453       byte[] byteNow = Bytes.toBytes(now);
2454 
2455       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
2456       if (numReadyToWrite <= 0) return 0L;
2457 
2458       // We've now grabbed as many mutations off the list as we can
2459 
2460       // ------------------------------------
2461       // STEP 2. Update any LATEST_TIMESTAMP timestamps
2462       // ----------------------------------
2463       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2464         // skip invalid
2465         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2466             != OperationStatusCode.NOT_RUN) continue;
2467 
2468         Mutation mutation = batchOp.getMutation(i);
2469         if (mutation instanceof Put) {
2470           updateKVTimestamps(familyMaps[i].values(), byteNow);
2471           noOfPuts++;
2472         } else {
2473           if (!isInReplay) {
2474             prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
2475           }
2476           noOfDeletes++;
2477         }
2478       }
2479 
2480       lock(this.updatesLock.readLock(), numReadyToWrite);
2481       locked = true;
2482       mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
2483       //
2484       // ------------------------------------
2485       // Acquire the latest mvcc number
2486       // ----------------------------------
2487       w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
2488       
2489       // calling the pre CP hook for batch mutation
2490       if (!isInReplay && coprocessorHost != null) {
2491         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2492           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2493           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2494         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2495       }
2496 
2497       // ------------------------------------
2498       // STEP 3. Write back to memstore
2499       // Write to memstore. It is ok to write to memstore
2500       // first without updating the HLog because we do not roll
2501       // forward the memstore MVCC. The MVCC will be moved up when
2502       // the complete operation is done. These changes are not yet
2503       // visible to scanners till we update the MVCC. The MVCC is
2504       // moved only when the sync is complete.
2505       // ----------------------------------
2506       long addedSize = 0;
2507       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2508         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2509             != OperationStatusCode.NOT_RUN) {
2510           continue;
2511         }
2512         doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
2513         addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells);
2514       }
2515 
2516       // ------------------------------------
2517       // STEP 4. Build WAL edit
2518       // ----------------------------------
2519       Durability durability = Durability.USE_DEFAULT;
2520       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2521         // Skip puts that were determined to be invalid during preprocessing
2522         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2523             != OperationStatusCode.NOT_RUN) {
2524           continue;
2525         }
2526         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2527 
2528         Mutation m = batchOp.getMutation(i);
2529         Durability tmpDur = getEffectiveDurability(m.getDurability());
2530         if (tmpDur.ordinal() > durability.ordinal()) {
2531           durability = tmpDur;
2532         }
2533         if (tmpDur == Durability.SKIP_WAL) {
2534           recordMutationWithoutWal(m.getFamilyCellMap());
2535           continue;
2536         }
2537 
2538         long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
2539         // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
2540         // Given how nonces are originally written, these should be contiguous.
2541         // They don't have to be, it will still work, just write more WALEdits than needed.
2542         if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
2543           if (walEdit.size() > 0) {
2544             assert isInReplay;
2545             if (!isInReplay) {
2546               throw new IOException("Multiple nonces per batch and not in replay");
2547             }
2548             // txid should always increase, so having the one from the last call is ok.
2549             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2550               this.htableDescriptor.getTableName(), now, m.getClusterIds(), 
2551               currentNonceGroup, currentNonce);
2552             txid = this.log.appendNoSync(this.htableDescriptor,  this.getRegionInfo(),  walKey,
2553               walEdit, getSequenceId(), true, null);
2554             walEdit = new WALEdit(isInReplay);
2555             walKey = null;
2556           }
2557           currentNonceGroup = nonceGroup;
2558           currentNonce = nonce;
2559         }
2560 
2561         // Add WAL edits by CP
2562         WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2563         if (fromCP != null) {
2564           for (KeyValue kv : fromCP.getKeyValues()) {
2565             walEdit.add(kv);
2566           }
2567         }
2568         addFamilyMapToWALEdit(familyMaps[i], walEdit);
2569       }
2570 
2571       // -------------------------
2572       // STEP 5. Append the final edit to WAL. Do not sync wal.
2573       // -------------------------
2574       Mutation mutation = batchOp.getMutation(firstIndex);
2575       if (walEdit.size() > 0) {
2576         walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2577             this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now, 
2578             mutation.getClusterIds(), currentNonceGroup, currentNonce);
2579         txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
2580           getSequenceId(), true, memstoreCells);
2581       }
2582       if(walKey == null){
2583         // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
2584         walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
2585       }
2586 
2587       // -------------------------------
2588       // STEP 6. Release row locks, etc.
2589       // -------------------------------
2590       if (locked) {
2591         this.updatesLock.readLock().unlock();
2592         locked = false;
2593       }
2594       releaseRowLocks(acquiredRowLocks);
2595 
2596       // -------------------------
2597       // STEP 7. Sync wal.
2598       // -------------------------
2599       if (txid != 0) {
2600         syncOrDefer(txid, durability);
2601       }
2602       
2603       doRollBackMemstore = false;
2604       // calling the post CP hook for batch mutation
2605       if (!isInReplay && coprocessorHost != null) {
2606         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2607           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2608           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2609         coprocessorHost.postBatchMutate(miniBatchOp);
2610       }
2611 
2612       // ------------------------------------------------------------------
2613       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
2614       // ------------------------------------------------------------------
2615       if (w != null) {
2616         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2617         w = null;
2618       }
2619 
2620       // ------------------------------------
2621       // STEP 9. Run coprocessor post hooks. This should be done after the wal is
2622       // synced so that the coprocessor contract is adhered to.
2623       // ------------------------------------
2624       if (!isInReplay && coprocessorHost != null) {
2625         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2626           // only for successful puts
2627           if (batchOp.retCodeDetails[i].getOperationStatusCode()
2628               != OperationStatusCode.SUCCESS) {
2629             continue;
2630           }
2631           Mutation m = batchOp.getMutation(i);
2632           if (m instanceof Put) {
2633             coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
2634           } else {
2635             coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
2636           }
2637         }
2638       }
2639 
2640       success = true;
2641       return addedSize;
2642     } finally {
2643 
2644       // if the wal sync was unsuccessful, remove keys from memstore
2645       if (doRollBackMemstore) {
2646         rollbackMemstore(memstoreCells);
2647       }
2648       if (w != null) {
2649         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2650       }
2651 
2652       if (locked) {
2653         this.updatesLock.readLock().unlock();
2654       }
2655       releaseRowLocks(acquiredRowLocks);
2656 
2657       // See if the column families were consistent through the whole thing.
2658       // if they were then keep them. If they were not then pass a null.
2659       // null will be treated as unknown.
2660       // Total time taken might be involving Puts and Deletes.
2661       // Split the time for puts and deletes based on the total number of Puts and Deletes.
2662 
2663       if (noOfPuts > 0) {
2664         // There were some Puts in the batch.
2665         if (this.metricsRegion != null) {
2666           this.metricsRegion.updatePut();
2667         }
2668       }
2669       if (noOfDeletes > 0) {
2670         // There were some Deletes in the batch.
2671         if (this.metricsRegion != null) {
2672           this.metricsRegion.updateDelete();
2673         }
2674       }
2675       if (!success) {
2676         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2677           if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
2678             batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
2679           }
2680         }
2681       }
2682       if (coprocessorHost != null && !batchOp.isInReplay()) {
2683         // call the coprocessor hook to do any finalization steps
2684         // after the put is done
2685         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2686             new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2687                 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
2688                 lastIndexExclusive);
2689         coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
2690       }
2691 
2692       batchOp.nextIndexToProcess = lastIndexExclusive;
2693     }
2694   }
2695 
2696   /**
2697    * Returns effective durability from the passed durability and
2698    * the table descriptor.
2699    */
2700   protected Durability getEffectiveDurability(Durability d) {
2701     return d == Durability.USE_DEFAULT ? this.durability : d;
2702   }
2703 
2704   //TODO, Think that gets/puts and deletes should be refactored a bit so that
2705   //the getting of the lock happens before, so that you would just pass it into
2706   //the methods. So in the case of checkAndMutate you could just do lockRow,
2707   //get, put, unlockRow or something
2708   /**
2709    *
2710    * @throws IOException
2711    * @return true if the new put was executed, false otherwise
2712    */
2713   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
2714       CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
2715       boolean writeToWAL)
2716   throws IOException{
2717     checkReadOnly();
2718     //TODO, add check for value length or maybe even better move this to the
2719     //client if this becomes a global setting
2720     checkResources();
2721     boolean isPut = w instanceof Put;
2722     if (!isPut && !(w instanceof Delete))
2723       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
2724           "be Put or Delete");
2725     if (!Bytes.equals(row, w.getRow())) {
2726       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
2727           "getRow must match the passed row");
2728     }
2729 
2730     startRegionOperation();
2731     try {
2732       Get get = new Get(row);
2733       checkFamily(family);
2734       get.addColumn(family, qualifier);
2735 
2736       // Lock row - note that doBatchMutate will relock this row if called
2737       RowLock rowLock = getRowLock(get.getRow());
2738       // wait for all previous transactions to complete (with lock held)
2739       mvcc.waitForPreviousTransactionsComplete();
2740       try {
2741         if (this.getCoprocessorHost() != null) {
2742           Boolean processed = null;
2743           if (w instanceof Put) {
2744             processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family, 
2745                 qualifier, compareOp, comparator, (Put) w);
2746           } else if (w instanceof Delete) {
2747             processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
2748                 qualifier, compareOp, comparator, (Delete) w);
2749           }
2750           if (processed != null) {
2751             return processed;
2752           }
2753         }
2754         List<Cell> result = get(get, false);
2755 
2756         boolean valueIsNull = comparator.getValue() == null ||
2757           comparator.getValue().length == 0;
2758         boolean matches = false;
2759         if (result.size() == 0 && valueIsNull) {
2760           matches = true;
2761         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
2762             valueIsNull) {
2763           matches = true;
2764         } else if (result.size() == 1 && !valueIsNull) {
2765           Cell kv = result.get(0);
2766           int compareResult = comparator.compareTo(kv.getValueArray(),
2767               kv.getValueOffset(), kv.getValueLength());
2768           switch (compareOp) {
2769           case LESS:
2770             matches = compareResult < 0;
2771             break;
2772           case LESS_OR_EQUAL:
2773             matches = compareResult <= 0;
2774             break;
2775           case EQUAL:
2776             matches = compareResult == 0;
2777             break;
2778           case NOT_EQUAL:
2779             matches = compareResult != 0;
2780             break;
2781           case GREATER_OR_EQUAL:
2782             matches = compareResult >= 0;
2783             break;
2784           case GREATER:
2785             matches = compareResult > 0;
2786             break;
2787           default:
2788             throw new RuntimeException("Unknown Compare op " + compareOp.name());
2789           }
2790         }
2791         //If matches put the new put or delete the new delete
2792         if (matches) {
2793           // All edits for the given row (across all column families) must
2794           // happen atomically.
2795           doBatchMutate(w);
2796           this.checkAndMutateChecksPassed.increment();
2797           return true;
2798         }
2799         this.checkAndMutateChecksFailed.increment();
2800         return false;
2801       } finally {
2802         rowLock.release();
2803       }
2804     } finally {
2805       closeRegionOperation();
2806     }
2807   }
2808 
2809   private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
2810     // Currently this is only called for puts and deletes, so no nonces.
2811     OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
2812         HConstants.NO_NONCE, HConstants.NO_NONCE);
2813     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
2814       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
2815     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
2816       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
2817     }
2818   }
2819 
2820   /**
2821    * Complete taking the snapshot on the region. Writes the region info and adds references to the
2822    * working snapshot directory.
2823    *
2824    * TODO for api consistency, consider adding another version with no {@link ForeignExceptionSnare}
2825    * arg.  (In the future other cancellable HRegion methods could eventually add a
2826    * {@link ForeignExceptionSnare}, or we could do something fancier).
2827    *
2828    * @param desc snapshot description object
2829    * @param exnSnare ForeignExceptionSnare that captures external exceptions in case we need to
2830    *   bail out.  This is allowed to be null and will just be ignored in that case.
2831    * @throws IOException if there is an external or internal error causing the snapshot to fail
2832    */
2833   public void addRegionToSnapshot(SnapshotDescription desc,
2834       ForeignExceptionSnare exnSnare) throws IOException {
2835     Path rootDir = FSUtils.getRootDir(conf);
2836     Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
2837 
2838     SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
2839                                                         snapshotDir, desc, exnSnare);
2840     manifest.addRegion(this);
2841   }
2842 
2843   /**
2844    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the
2845    * provided current timestamp.
2846    */
2847   void updateKVTimestamps(final Iterable<List<Cell>> keyLists, final byte[] now) {
2848     for (List<Cell> cells: keyLists) {
2849       if (cells == null) continue;
2850       for (Cell cell : cells) {
2851         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2852         kv.updateLatestStamp(now);
2853       }
2854     }
2855   }
2856 
2857   /*
2858    * Check if resources to support an update.
2859    *
2860    * We throw RegionTooBusyException if above memstore limit
2861    * and expect client to retry using some kind of backoff
2862   */
2863   private void checkResources()
2864     throws RegionTooBusyException {
2865     // If catalog region, do not impose resource constraints or block updates.
2866     if (this.getRegionInfo().isMetaRegion()) return;
2867 
2868     if (this.memstoreSize.get() > this.blockingMemStoreSize) {
2869       requestFlush();
2870       throw new RegionTooBusyException("Above memstore limit, " +
2871           "regionName=" + (this.getRegionInfo() == null ? "unknown" :
2872           this.getRegionInfo().getRegionNameAsString()) +
2873           ", server=" + (this.getRegionServerServices() == null ? "unknown" :
2874           this.getRegionServerServices().getServerName()) +
2875           ", memstoreSize=" + memstoreSize.get() +
2876           ", blockingMemStoreSize=" + blockingMemStoreSize);
2877     }
2878   }
2879 
2880   /**
2881    * @throws IOException Throws exception if region is in read-only mode.
2882    */
2883   protected void checkReadOnly() throws IOException {
2884     if (this.writestate.isReadOnly()) {
2885       throw new IOException("region is read only");
2886     }
2887   }
2888 
2889   /**
2890    * Add updates first to the hlog and then add values to memstore.
2891    * Warning: Assumption is caller has lock on passed in row.
2892    * @param edits Cell updates by column
2893    * @throws IOException
2894    */
2895   private void put(final byte [] row, byte [] family, List<Cell> edits)
2896   throws IOException {
2897     NavigableMap<byte[], List<Cell>> familyMap;
2898     familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
2899 
2900     familyMap.put(family, edits);
2901     Put p = new Put(row);
2902     p.setFamilyCellMap(familyMap);
2903     doBatchMutate(p);
2904   }
2905 
2906   /**
2907    * Atomically apply the given map of family->edits to the memstore.
2908    * This handles the consistency control on its own, but the caller
2909    * should already have locked updatesLock.readLock(). This also does
2910    * <b>not</b> check the families for validity.
2911    *
2912    * @param familyMap Map of kvs per family
2913    * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
2914    *        If null, then this method internally creates a mvcc transaction.
2915    * @param output newly added KVs into memstore
2916    * @return the additional memory usage of the memstore caused by the
2917    * new entries.
2918    */
2919   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
2920     long mvccNum, List<KeyValue> memstoreCells) {
2921     long size = 0;
2922 
2923     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2924       byte[] family = e.getKey();
2925       List<Cell> cells = e.getValue();
2926 
2927       Store store = getStore(family);
2928       for (Cell cell: cells) {
2929         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2930         kv.setMvccVersion(mvccNum);
2931         Pair<Long, Cell> ret = store.add(kv);
2932         size += ret.getFirst();
2933         memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
2934       }
2935     }
2936 
2937      return size;
2938    }
2939 
2940   /**
2941    * Remove all the keys listed in the map from the memstore. This method is
2942    * called when a Put/Delete has updated memstore but subsequently fails to update
2943    * the wal. This method is then invoked to rollback the memstore.
2944    */
2945   private void rollbackMemstore(List<KeyValue> memstoreCells) {
2946     int kvsRolledback = 0;
2947     
2948     for (KeyValue kv : memstoreCells) {
2949       byte[] family = kv.getFamily();
2950       Store store = getStore(family);
2951       store.rollback(kv);
2952       kvsRolledback++;
2953     }
2954     LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
2955   }
2956 
2957   /**
2958    * Check the collection of families for validity.
2959    * @throws NoSuchColumnFamilyException if a family does not exist.
2960    */
2961   void checkFamilies(Collection<byte[]> families)
2962   throws NoSuchColumnFamilyException {
2963     for (byte[] family : families) {
2964       checkFamily(family);
2965     }
2966   }
2967 
2968   /**
2969    * During replay, there could exist column families which are removed between region server
2970    * failure and replay
2971    */
2972   private void removeNonExistentColumnFamilyForReplay(
2973       final Map<byte[], List<Cell>> familyMap) {
2974     List<byte[]> nonExistentList = null;
2975     for (byte[] family : familyMap.keySet()) {
2976       if (!this.htableDescriptor.hasFamily(family)) {
2977         if (nonExistentList == null) {
2978           nonExistentList = new ArrayList<byte[]>();
2979         }
2980         nonExistentList.add(family);
2981       }
2982     }
2983     if (nonExistentList != null) {
2984       for (byte[] family : nonExistentList) {
2985         // Perhaps schema was changed between crash and replay
2986         LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
2987         familyMap.remove(family);
2988       }
2989     }
2990   }
2991 
2992   void checkTimestamps(final Map<byte[], List<Cell>> familyMap,
2993       long now) throws FailedSanityCheckException {
2994     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
2995       return;
2996     }
2997     long maxTs = now + timestampSlop;
2998     for (List<Cell> kvs : familyMap.values()) {
2999       for (Cell cell : kvs) {
3000         // see if the user-side TS is out of range. latest = server-side
3001         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
3002         if (!kv.isLatestTimestamp() && kv.getTimestamp() > maxTs) {
3003           throw new FailedSanityCheckException("Timestamp for KV out of range "
3004               + cell + " (too.new=" + timestampSlop + ")");
3005         }
3006       }
3007     }
3008   }
3009 
3010   /**
3011    * Append the given map of family->edits to a WALEdit data structure.
3012    * This does not write to the HLog itself.
3013    * @param familyMap map of family->edits
3014    * @param walEdit the destination entry to append into
3015    */
3016   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3017       WALEdit walEdit) {
3018     for (List<Cell> edits : familyMap.values()) {
3019       for (Cell cell : edits) {
3020         walEdit.add(KeyValueUtil.ensureKeyValue(cell));
3021       }
3022     }
3023   }
3024 
3025   private void requestFlush() {
3026     if (this.rsServices == null) {
3027       return;
3028     }
3029     synchronized (writestate) {
3030       if (this.writestate.isFlushRequested()) {
3031         return;
3032       }
3033       writestate.flushRequested = true;
3034     }
3035     // Make request outside of synchronize block; HBASE-818.
3036     this.rsServices.getFlushRequester().requestFlush(this);
3037     if (LOG.isDebugEnabled()) {
3038       LOG.debug("Flush requested on " + this);
3039     }
3040   }
3041 
3042   /*
3043    * @param size
3044    * @return True if size is over the flush threshold
3045    */
3046   private boolean isFlushSize(final long size) {
3047     return size > this.memstoreFlushSize;
3048   }
3049 
3050   /**
3051    * Read the edits log put under this region by wal log splitting process.  Put
3052    * the recovered edits back up into this region.
3053    *
3054    * <p>We can ignore any log message that has a sequence ID that's equal to or
3055    * lower than minSeqId.  (Because we know such log messages are already
3056    * reflected in the HFiles.)
3057    *
3058    * <p>While this is running we are putting pressure on memory yet we are
3059    * outside of our usual accounting because we are not yet an onlined region
3060    * (this stuff is being run as part of Region initialization).  This means
3061    * that if we're up against global memory limits, we'll not be flagged to flush
3062    * because we are not online. We can't be flushed by usual mechanisms anyways;
3063    * we're not yet online so our relative sequenceids are not yet aligned with
3064    * HLog sequenceids -- not till we come up online, post processing of split
3065    * edits.
3066    *
3067    * <p>But to help relieve memory pressure, at least manage our own heap size
3068    * flushing if are in excess of per-region limits.  Flushing, though, we have
3069    * to be careful and avoid using the regionserver/hlog sequenceid.  Its running
3070    * on a different line to whats going on in here in this region context so if we
3071    * crashed replaying these edits, but in the midst had a flush that used the
3072    * regionserver log with a sequenceid in excess of whats going on in here
3073    * in this region and with its split editlogs, then we could miss edits the
3074    * next time we go to recover. So, we have to flush inline, using seqids that
3075    * make sense in a this single region context only -- until we online.
3076    *
3077    * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
3078    * the maxSeqId for the store to be applied, else its skipped.
3079    * @return the sequence id of the last edit added to this region out of the
3080    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3081    * @throws UnsupportedEncodingException
3082    * @throws IOException
3083    */
3084   protected long replayRecoveredEditsIfAny(final Path regiondir,
3085       Map<byte[], Long> maxSeqIdInStores,
3086       final CancelableProgressable reporter, final MonitoredTask status)
3087       throws UnsupportedEncodingException, IOException {
3088     long minSeqIdForTheRegion = -1;
3089     for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3090       if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3091         minSeqIdForTheRegion = maxSeqIdInStore;
3092       }
3093     }
3094     long seqid = minSeqIdForTheRegion;
3095 
3096     FileSystem fs = this.fs.getFileSystem();
3097     NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
3098     if (LOG.isDebugEnabled()) {
3099       LOG.debug("Found " + (files == null ? 0 : files.size())
3100         + " recovered edits file(s) under " + regiondir);
3101     }
3102 
3103     if (files == null || files.isEmpty()) return seqid;
3104 
3105     for (Path edits: files) {
3106       if (edits == null || !fs.exists(edits)) {
3107         LOG.warn("Null or non-existent edits file: " + edits);
3108         continue;
3109       }
3110       if (isZeroLengthThenDelete(fs, edits)) continue;
3111 
3112       long maxSeqId;
3113       String fileName = edits.getName();
3114       maxSeqId = Math.abs(Long.parseLong(fileName));
3115       if (maxSeqId <= minSeqIdForTheRegion) {
3116         if (LOG.isDebugEnabled()) {
3117           String msg = "Maximum sequenceid for this log is " + maxSeqId
3118             + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3119             + ", skipped the whole file, path=" + edits;
3120           LOG.debug(msg);
3121         }
3122         continue;
3123       }
3124 
3125       try {
3126         // replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater
3127         seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3128       } catch (IOException e) {
3129         boolean skipErrors = conf.getBoolean(
3130             HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3131             conf.getBoolean(
3132                 "hbase.skip.errors",
3133                 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3134         if (conf.get("hbase.skip.errors") != null) {
3135           LOG.warn(
3136               "The property 'hbase.skip.errors' has been deprecated. Please use " +
3137               HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3138         }
3139         if (skipErrors) {
3140           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3141           LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3142               + "=true so continuing. Renamed " + edits +
3143               " as " + p, e);
3144         } else {
3145           throw e;
3146         }
3147       }
3148     }
3149     // The edits size added into rsAccounting during this replaying will not
3150     // be required any more. So just clear it.
3151     if (this.rsAccounting != null) {
3152       this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
3153     }
3154     if (seqid > minSeqIdForTheRegion) {
3155       // Then we added some edits to memory. Flush and cleanup split edit files.
3156       internalFlushcache(null, seqid, status);
3157     }
3158     // Now delete the content of recovered edits.  We're done w/ them.
3159     for (Path file: files) {
3160       if (!fs.delete(file, false)) {
3161         LOG.error("Failed delete of " + file);
3162       } else {
3163         LOG.debug("Deleted recovered.edits file=" + file);
3164       }
3165     }
3166     return seqid;
3167   }
3168 
3169   /*
3170    * @param edits File of recovered edits.
3171    * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in log
3172    * must be larger than this to be replayed for each store.
3173    * @param reporter
3174    * @return the sequence id of the last edit added to this region out of the
3175    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3176    * @throws IOException
3177    */
3178   private long replayRecoveredEdits(final Path edits,
3179       Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3180     throws IOException {
3181     String msg = "Replaying edits from " + edits;
3182     LOG.info(msg);
3183     MonitoredTask status = TaskMonitor.get().createStatus(msg);
3184     FileSystem fs = this.fs.getFileSystem();
3185 
3186     status.setStatus("Opening logs");
3187     HLog.Reader reader = null;
3188     try {
3189       reader = HLogFactory.createReader(fs, edits, conf);
3190       long currentEditSeqId = -1;
3191       long firstSeqIdInLog = -1;
3192       long skippedEdits = 0;
3193       long editsCount = 0;
3194       long intervalEdits = 0;
3195       HLog.Entry entry;
3196       Store store = null;
3197       boolean reported_once = false;
3198       ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3199 
3200       try {
3201         // How many edits seen before we check elapsed time
3202         int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
3203             2000);
3204         // How often to send a progress report (default 1/2 master timeout)
3205         int period = this.conf.getInt("hbase.hstore.report.period", 300000);
3206         long lastReport = EnvironmentEdgeManager.currentTimeMillis();
3207 
3208         while ((entry = reader.next()) != null) {
3209           HLogKey key = entry.getKey();
3210           WALEdit val = entry.getEdit();
3211 
3212           if (ng != null) { // some test, or nonces disabled
3213             ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
3214           }
3215 
3216           if (reporter != null) {
3217             intervalEdits += val.size();
3218             if (intervalEdits >= interval) {
3219               // Number of edits interval reached
3220               intervalEdits = 0;
3221               long cur = EnvironmentEdgeManager.currentTimeMillis();
3222               if (lastReport + period <= cur) {
3223                 status.setStatus("Replaying edits..." +
3224                     " skipped=" + skippedEdits +
3225                     " edits=" + editsCount);
3226                 // Timeout reached
3227                 if(!reporter.progress()) {
3228                   msg = "Progressable reporter failed, stopping replay";
3229                   LOG.warn(msg);
3230                   status.abort(msg);
3231                   throw new IOException(msg);
3232                 }
3233                 reported_once = true;
3234                 lastReport = cur;
3235               }
3236             }
3237           }
3238 
3239           // Start coprocessor replay here. The coprocessor is for each WALEdit
3240           // instead of a KeyValue.
3241           if (coprocessorHost != null) {
3242             status.setStatus("Running pre-WAL-restore hook in coprocessors");
3243             if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
3244               // if bypass this log entry, ignore it ...
3245               continue;
3246             }
3247           }
3248 
3249           if (firstSeqIdInLog == -1) {
3250             firstSeqIdInLog = key.getLogSeqNum();
3251           }
3252           currentEditSeqId = key.getLogSeqNum();
3253           boolean flush = false;
3254           for (KeyValue kv: val.getKeyValues()) {
3255             // Check this edit is for me. Also, guard against writing the special
3256             // METACOLUMN info such as HBASE::CACHEFLUSH entries
3257             if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY) ||
3258                 !Bytes.equals(key.getEncodedRegionName(),
3259                   this.getRegionInfo().getEncodedNameAsBytes())) {
3260               //this is a special edit, we should handle it
3261               CompactionDescriptor compaction = WALEdit.getCompaction(kv);
3262               if (compaction != null) {
3263                 //replay the compaction
3264                 completeCompactionMarker(compaction);
3265               }
3266 
3267               skippedEdits++;
3268               continue;
3269             }
3270             // Figure which store the edit is meant for.
3271             if (store == null || !CellUtil.matchingFamily(kv, store.getFamily().getName())) {
3272               store = getStore(kv);
3273             }
3274             if (store == null) {
3275               // This should never happen.  Perhaps schema was changed between
3276               // crash and redeploy?
3277               LOG.warn("No family for " + kv);
3278               skippedEdits++;
3279               continue;
3280             }
3281             // Now, figure if we should skip this edit.
3282             if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
3283                 .getName())) {
3284               skippedEdits++;
3285               continue;
3286             }
3287             // Once we are over the limit, restoreEdit will keep returning true to
3288             // flush -- but don't flush until we've played all the kvs that make up
3289             // the WALEdit.
3290             flush = restoreEdit(store, kv);
3291             editsCount++;
3292           }
3293           if (flush) internalFlushcache(null, currentEditSeqId, status);
3294 
3295           if (coprocessorHost != null) {
3296             coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3297           }
3298         }
3299       } catch (EOFException eof) {
3300         Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3301         msg = "Encountered EOF. Most likely due to Master failure during " +
3302             "log splitting, so we have this data in another edit.  " +
3303             "Continuing, but renaming " + edits + " as " + p;
3304         LOG.warn(msg, eof);
3305         status.abort(msg);
3306       } catch (IOException ioe) {
3307         // If the IOE resulted from bad file format,
3308         // then this problem is idempotent and retrying won't help
3309         if (ioe.getCause() instanceof ParseException) {
3310           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3311           msg = "File corruption encountered!  " +
3312               "Continuing, but renaming " + edits + " as " + p;
3313           LOG.warn(msg, ioe);
3314           status.setStatus(msg);
3315         } else {
3316           status.abort(StringUtils.stringifyException(ioe));
3317           // other IO errors may be transient (bad network connection,
3318           // checksum exception on one datanode, etc).  throw & retry
3319           throw ioe;
3320         }
3321       }
3322       if (reporter != null && !reported_once) {
3323         reporter.progress();
3324       }
3325       msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3326         ", firstSequenceIdInLog=" + firstSeqIdInLog +
3327         ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
3328       status.markComplete(msg);
3329       LOG.debug(msg);
3330       return currentEditSeqId;
3331     } finally {
3332       status.cleanup();
3333       if (reader != null) {
3334          reader.close();
3335       }
3336     }
3337   }
3338 
3339   /**
3340    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
3341    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
3342    * See HBASE-2331.
3343    */
3344   void completeCompactionMarker(CompactionDescriptor compaction)
3345       throws IOException {
3346     Store store = this.getStore(compaction.getFamilyName().toByteArray());
3347     if (store == null) {
3348       LOG.warn("Found Compaction WAL edit for deleted family:" +
3349           Bytes.toString(compaction.getFamilyName().toByteArray()));
3350       return;
3351     }
3352     store.completeCompactionMarker(compaction);
3353   }
3354 
3355   /**
3356    * Used by tests
3357    * @param s Store to add edit too.
3358    * @param kv KeyValue to add.
3359    * @return True if we should flush.
3360    */
3361   protected boolean restoreEdit(final Store s, final KeyValue kv) {
3362     long kvSize = s.add(kv).getFirst();
3363     if (this.rsAccounting != null) {
3364       rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
3365     }
3366     return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3367   }
3368 
3369   /*
3370    * @param fs
3371    * @param p File to check.
3372    * @return True if file was zero-length (and if so, we'll delete it in here).
3373    * @throws IOException
3374    */
3375   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3376       throws IOException {
3377     FileStatus stat = fs.getFileStatus(p);
3378     if (stat.getLen() > 0) return false;
3379     LOG.warn("File " + p + " is zero-length, deleting.");
3380     fs.delete(p, false);
3381     return true;
3382   }
3383 
3384   protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
3385     return new HStore(this, family, this.conf);
3386   }
3387 
3388   /**
3389    * Return HStore instance.
3390    * Use with caution.  Exposed for use of fixup utilities.
3391    * @param column Name of column family hosted by this region.
3392    * @return Store that goes with the family on passed <code>column</code>.
3393    * TODO: Make this lookup faster.
3394    */
3395   public Store getStore(final byte[] column) {
3396     return this.stores.get(column);
3397   }
3398 
3399   /**
3400    * Return HStore instance. Does not do any copy: as the number of store is limited, we
3401    *  iterate on the list.
3402    */
3403   private Store getStore(Cell cell) {
3404     for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
3405       if (Bytes.equals(
3406           cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3407           famStore.getKey(), 0, famStore.getKey().length)) {
3408         return famStore.getValue();
3409       }
3410     }
3411 
3412     return null;
3413   }
3414 
3415   public Map<byte[], Store> getStores() {
3416     return this.stores;
3417   }
3418 
3419   /**
3420    * Return list of storeFiles for the set of CFs.
3421    * Uses closeLock to prevent the race condition where a region closes
3422    * in between the for loop - closing the stores one by one, some stores
3423    * will return 0 files.
3424    * @return List of storeFiles.
3425    */
3426   public List<String> getStoreFileList(final byte [][] columns)
3427     throws IllegalArgumentException {
3428     List<String> storeFileNames = new ArrayList<String>();
3429     synchronized(closeLock) {
3430       for(byte[] column : columns) {
3431         Store store = this.stores.get(column);
3432         if (store == null) {
3433           throw new IllegalArgumentException("No column family : " +
3434               new String(column) + " available");
3435         }
3436         for (StoreFile storeFile: store.getStorefiles()) {
3437           storeFileNames.add(storeFile.getPath().toString());
3438         }
3439       }
3440     }
3441     return storeFileNames;
3442   }
3443   //////////////////////////////////////////////////////////////////////////////
3444   // Support code
3445   //////////////////////////////////////////////////////////////////////////////
3446 
3447   /** Make sure this is a valid row for the HRegion */
3448   void checkRow(final byte [] row, String op) throws IOException {
3449     if (!rowIsInRange(getRegionInfo(), row)) {
3450       throw new WrongRegionException("Requested row out of range for " +
3451           op + " on HRegion " + this + ", startKey='" +
3452           Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" +
3453           Bytes.toStringBinary(getEndKey()) + "', row='" +
3454           Bytes.toStringBinary(row) + "'");
3455     }
3456   }
3457 
3458   /**
3459    * Tries to acquire a lock on the given row.
3460    * @param waitForLock if true, will block until the lock is available.
3461    *        Otherwise, just tries to obtain the lock and returns
3462    *        false if unavailable.
3463    * @return the row lock if acquired,
3464    *   null if waitForLock was false and the lock was not acquired
3465    * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
3466    */
3467   public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
3468     checkRow(row, "row lock");
3469     startRegionOperation();
3470     try {
3471       HashedBytes rowKey = new HashedBytes(row);
3472       RowLockContext rowLockContext = new RowLockContext(rowKey);
3473 
3474       // loop until we acquire the row lock (unless !waitForLock)
3475       while (true) {
3476         RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
3477         if (existingContext == null) {
3478           // Row is not already locked by any thread, use newly created context.
3479           break;
3480         } else if (existingContext.ownedByCurrentThread()) {
3481           // Row is already locked by current thread, reuse existing context instead.
3482           rowLockContext = existingContext;
3483           break;
3484         } else {
3485           // Row is already locked by some other thread, give up or wait for it
3486           if (!waitForLock) {
3487             return null;
3488           }
3489           try {
3490             if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
3491               throw new IOException("Timed out waiting for lock for row: " + rowKey);
3492             }
3493           } catch (InterruptedException ie) {
3494             LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
3495             InterruptedIOException iie = new InterruptedIOException();
3496             iie.initCause(ie);
3497             throw iie;
3498           }
3499         }
3500       }
3501 
3502       // allocate new lock for this thread
3503       return rowLockContext.newLock();
3504     } finally {
3505       closeRegionOperation();
3506     }
3507   }
3508 
3509   /**
3510    * Acquires a lock on the given row.
3511    * The same thread may acquire multiple locks on the same row.
3512    * @return the acquired row lock
3513    * @throws IOException if the lock could not be acquired after waiting
3514    */
3515   public RowLock getRowLock(byte[] row) throws IOException {
3516     return getRowLock(row, true);
3517   }
3518 
3519   /**
3520    * If the given list of row locks is not null, releases all locks.
3521    */
3522   public void releaseRowLocks(List<RowLock> rowLocks) {
3523     if (rowLocks != null) {
3524       for (RowLock rowLock : rowLocks) {
3525         rowLock.release();
3526       }
3527       rowLocks.clear();
3528     }
3529   }
3530 
3531   /**
3532    * Determines whether multiple column families are present
3533    * Precondition: familyPaths is not null
3534    *
3535    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3536    */
3537   private static boolean hasMultipleColumnFamilies(
3538       List<Pair<byte[], String>> familyPaths) {
3539     boolean multipleFamilies = false;
3540     byte[] family = null;
3541     for (Pair<byte[], String> pair : familyPaths) {
3542       byte[] fam = pair.getFirst();
3543       if (family == null) {
3544         family = fam;
3545       } else if (!Bytes.equals(family, fam)) {
3546         multipleFamilies = true;
3547         break;
3548       }
3549     }
3550     return multipleFamilies;
3551   }
3552 
3553 
3554   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3555                                 boolean assignSeqId) throws IOException {
3556     return bulkLoadHFiles(familyPaths, assignSeqId, null);
3557   }
3558 
3559   /**
3560    * Attempts to atomically load a group of hfiles.  This is critical for loading
3561    * rows with multiple column families atomically.
3562    *
3563    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3564    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
3565    * file about to be bulk loaded
3566    * @return true if successful, false if failed recoverably
3567    * @throws IOException if failed unrecoverably.
3568    */
3569   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
3570       BulkLoadListener bulkLoadListener) throws IOException {
3571     Preconditions.checkNotNull(familyPaths);
3572     // we need writeLock for multi-family bulk load
3573     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
3574     try {
3575       this.writeRequestsCount.increment();
3576 
3577       // There possibly was a split that happened between when the split keys
3578       // were gathered and before the HRegion's write lock was taken.  We need
3579       // to validate the HFile region before attempting to bulk load all of them
3580       List<IOException> ioes = new ArrayList<IOException>();
3581       List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
3582       for (Pair<byte[], String> p : familyPaths) {
3583         byte[] familyName = p.getFirst();
3584         String path = p.getSecond();
3585 
3586         Store store = getStore(familyName);
3587         if (store == null) {
3588           IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
3589               "No such column family " + Bytes.toStringBinary(familyName));
3590           ioes.add(ioe);
3591         } else {
3592           try {
3593             store.assertBulkLoadHFileOk(new Path(path));
3594           } catch (WrongRegionException wre) {
3595             // recoverable (file doesn't fit in region)
3596             failures.add(p);
3597           } catch (IOException ioe) {
3598             // unrecoverable (hdfs problem)
3599             ioes.add(ioe);
3600           }
3601         }
3602       }
3603 
3604       // validation failed because of some sort of IO problem.
3605       if (ioes.size() != 0) {
3606         IOException e = MultipleIOException.createIOException(ioes);
3607         LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
3608         throw e;
3609       }
3610 
3611       // validation failed, bail out before doing anything permanent.
3612       if (failures.size() != 0) {
3613         StringBuilder list = new StringBuilder();
3614         for (Pair<byte[], String> p : failures) {
3615           list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
3616             .append(p.getSecond());
3617         }
3618         // problem when validating
3619         LOG.warn("There was a recoverable bulk load failure likely due to a" +
3620             " split.  These (family, HFile) pairs were not loaded: " + list);
3621         return false;
3622       }
3623 
3624       long seqId = -1;
3625       // We need to assign a sequential ID that's in between two memstores in order to preserve
3626       // the guarantee that all the edits lower than the highest sequential ID from all the
3627       // HFiles are flushed on disk. See HBASE-10958.  The sequence id returned when we flush is
3628       // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
3629       // a sequence id that we can be sure is beyond the last hfile written).
3630       if (assignSeqId) {
3631         FlushResult fs = this.flushcache();
3632         if (fs.isFlushSucceeded()) {
3633           seqId = fs.flushSequenceId;
3634         } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
3635           seqId = fs.flushSequenceId;
3636         } else {
3637           throw new IOException("Could not bulk load with an assigned sequential ID because the " +
3638               "flush didn't run. Reason for not flushing: " + fs.failureReason);
3639         }
3640       }
3641 
3642       for (Pair<byte[], String> p : familyPaths) {
3643         byte[] familyName = p.getFirst();
3644         String path = p.getSecond();
3645         Store store = getStore(familyName);
3646         try {
3647           String finalPath = path;
3648           if(bulkLoadListener != null) {
3649             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
3650           }
3651           store.bulkLoadHFile(finalPath, seqId);
3652           if(bulkLoadListener != null) {
3653             bulkLoadListener.doneBulkLoad(familyName, path);
3654           }
3655         } catch (IOException ioe) {
3656           // A failure here can cause an atomicity violation that we currently
3657           // cannot recover from since it is likely a failed HDFS operation.
3658 
3659           // TODO Need a better story for reverting partial failures due to HDFS.
3660           LOG.error("There was a partial failure due to IO when attempting to" +
3661               " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe);
3662           if(bulkLoadListener != null) {
3663             try {
3664               bulkLoadListener.failedBulkLoad(familyName, path);
3665             } catch (Exception ex) {
3666               LOG.error("Error while calling failedBulkLoad for family "+
3667                   Bytes.toString(familyName)+" with path "+path, ex);
3668             }
3669           }
3670           throw ioe;
3671         }
3672       }
3673       return true;
3674     } finally {
3675       closeBulkRegionOperation();
3676     }
3677   }
3678 
3679   @Override
3680   public boolean equals(Object o) {
3681     return o instanceof HRegion && Bytes.equals(this.getRegionName(),
3682                                                 ((HRegion) o).getRegionName());
3683   }
3684 
3685   @Override
3686   public int hashCode() {
3687     return Bytes.hashCode(this.getRegionName());
3688   }
3689 
3690   @Override
3691   public String toString() {
3692     return this.getRegionNameAsString();
3693   }
3694 
3695   /**
3696    * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
3697    */
3698   class RegionScannerImpl implements RegionScanner {
3699     // Package local for testability
3700     KeyValueHeap storeHeap = null;
3701     /** Heap of key-values that are not essential for the provided filters and are thus read
3702      * on demand, if on-demand column family loading is enabled.*/
3703     KeyValueHeap joinedHeap = null;
3704     /**
3705      * If the joined heap data gathering is interrupted due to scan limits, this will
3706      * contain the row for which we are populating the values.*/
3707     protected Cell joinedContinuationRow = null;
3708     // KeyValue indicating that limit is reached when scanning
3709     private final KeyValue KV_LIMIT = new KeyValue();
3710     protected final byte[] stopRow;
3711     private final FilterWrapper filter;
3712     private int batch;
3713     protected int isScan;
3714     private boolean filterClosed = false;
3715     private long readPt;
3716     private long maxResultSize;
3717     protected HRegion region;
3718 
3719     @Override
3720     public HRegionInfo getRegionInfo() {
3721       return region.getRegionInfo();
3722     }
3723 
3724     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
3725         throws IOException {
3726 
3727       this.region = region;
3728       this.maxResultSize = scan.getMaxResultSize();
3729       if (scan.hasFilter()) {
3730         this.filter = new FilterWrapper(scan.getFilter());
3731       } else {
3732         this.filter = null;
3733       }
3734 
3735       this.batch = scan.getBatch();
3736       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
3737         this.stopRow = null;
3738       } else {
3739         this.stopRow = scan.getStopRow();
3740       }
3741       // If we are doing a get, we want to be [startRow,endRow] normally
3742       // it is [startRow,endRow) and if startRow=endRow we get nothing.
3743       this.isScan = scan.isGetScan() ? -1 : 0;
3744 
3745       // synchronize on scannerReadPoints so that nobody calculates
3746       // getSmallestReadPoint, before scannerReadPoints is updated.
3747       IsolationLevel isolationLevel = scan.getIsolationLevel();
3748       synchronized(scannerReadPoints) {
3749         this.readPt = getReadpoint(isolationLevel);
3750         scannerReadPoints.put(this, this.readPt);
3751       }
3752 
3753       // Here we separate all scanners into two lists - scanner that provide data required
3754       // by the filter to operate (scanners list) and all others (joinedScanners list).
3755       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
3756       List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
3757       if (additionalScanners != null) {
3758         scanners.addAll(additionalScanners);
3759       }
3760 
3761       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
3762           scan.getFamilyMap().entrySet()) {
3763         Store store = stores.get(entry.getKey());
3764         KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
3765         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
3766           || this.filter.isFamilyEssential(entry.getKey())) {
3767           scanners.add(scanner);
3768         } else {
3769           joinedScanners.add(scanner);
3770         }
3771       }
3772       initializeKVHeap(scanners, joinedScanners, region);
3773     }
3774 
3775     protected void initializeKVHeap(List<KeyValueScanner> scanners,
3776         List<KeyValueScanner> joinedScanners, HRegion region)
3777         throws IOException {
3778       this.storeHeap = new KeyValueHeap(scanners, region.comparator);
3779       if (!joinedScanners.isEmpty()) {
3780         this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
3781       }
3782     }
3783 
3784     @Override
3785     public long getMaxResultSize() {
3786       return maxResultSize;
3787     }
3788 
3789     @Override
3790     public long getMvccReadPoint() {
3791       return this.readPt;
3792     }
3793 
3794     /**
3795      * Reset both the filter and the old filter.
3796      *
3797      * @throws IOException in case a filter raises an I/O exception.
3798      */
3799     protected void resetFilters() throws IOException {
3800       if (filter != null) {
3801         filter.reset();
3802       }
3803     }
3804 
3805     @Override
3806     public boolean next(List<Cell> outResults)
3807         throws IOException {
3808       // apply the batching limit by default
3809       return next(outResults, batch);
3810     }
3811 
3812     @Override
3813     public synchronized boolean next(List<Cell> outResults, int limit) throws IOException {
3814       if (this.filterClosed) {
3815         throw new UnknownScannerException("Scanner was closed (timed out?) " +
3816             "after we renewed it. Could be caused by a very slow scanner " +
3817             "or a lengthy garbage collection");
3818       }
3819       startRegionOperation(Operation.SCAN);
3820       readRequestsCount.increment();
3821       try {
3822         return nextRaw(outResults, limit);
3823       } finally {
3824         closeRegionOperation(Operation.SCAN);
3825       }
3826     }
3827 
3828     @Override
3829     public boolean nextRaw(List<Cell> outResults)
3830         throws IOException {
3831       return nextRaw(outResults, batch);
3832     }
3833 
3834     @Override
3835     public boolean nextRaw(List<Cell> outResults, int limit) throws IOException {
3836       boolean returnResult;
3837       if (outResults.isEmpty()) {
3838         // Usually outResults is empty. This is true when next is called
3839         // to handle scan or get operation.
3840         returnResult = nextInternal(outResults, limit);
3841       } else {
3842         List<Cell> tmpList = new ArrayList<Cell>();
3843         returnResult = nextInternal(tmpList, limit);
3844         outResults.addAll(tmpList);
3845       }
3846       resetFilters();
3847       if (isFilterDoneInternal()) {
3848         returnResult = false;
3849       }
3850       if (region != null && region.metricsRegion != null) {
3851         long totalSize = 0;
3852         for(Cell c:outResults) {
3853           // TODO clean up
3854           KeyValue kv = KeyValueUtil.ensureKeyValue(c);
3855           totalSize += kv.getLength();
3856         }
3857         region.metricsRegion.updateScanNext(totalSize);
3858       }
3859       return returnResult;
3860     }
3861 
3862 
3863     private void populateFromJoinedHeap(List<Cell> results, int limit)
3864         throws IOException {
3865       assert joinedContinuationRow != null;
3866       Cell kv = populateResult(results, this.joinedHeap, limit,
3867           joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
3868           joinedContinuationRow.getRowLength());
3869       if (kv != KV_LIMIT) {
3870         // We are done with this row, reset the continuation.
3871         joinedContinuationRow = null;
3872       }
3873       // As the data is obtained from two independent heaps, we need to
3874       // ensure that result list is sorted, because Result relies on that.
3875       Collections.sort(results, comparator);
3876     }
3877 
3878     /**
3879      * Fetches records with currentRow into results list, until next row or limit (if not -1).
3880      * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
3881      * @param limit Max amount of KVs to place in result list, -1 means no limit.
3882      * @param currentRow Byte array with key we are fetching.
3883      * @param offset offset for currentRow
3884      * @param length length for currentRow
3885      * @return KV_LIMIT if limit reached, next KeyValue otherwise.
3886      */
3887     private Cell populateResult(List<Cell> results, KeyValueHeap heap, int limit,
3888         byte[] currentRow, int offset, short length) throws IOException {
3889       Cell nextKv;
3890       do {
3891         heap.next(results, limit - results.size());
3892         if (limit > 0 && results.size() == limit) {
3893           return KV_LIMIT;
3894         }
3895         nextKv = heap.peek();
3896       } while (nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length));
3897 
3898       return nextKv;
3899     }
3900 
3901     /*
3902      * @return True if a filter rules the scanner is over, done.
3903      */
3904     @Override
3905     public synchronized boolean isFilterDone() throws IOException {
3906       return isFilterDoneInternal();
3907     }
3908 
3909     private boolean isFilterDoneInternal() throws IOException {
3910       return this.filter != null && this.filter.filterAllRemaining();
3911     }
3912 
3913     private boolean nextInternal(List<Cell> results, int limit)
3914     throws IOException {
3915       if (!results.isEmpty()) {
3916         throw new IllegalArgumentException("First parameter should be an empty list");
3917       }
3918       RpcCallContext rpcCall = RpcServer.getCurrentCall();
3919       // The loop here is used only when at some point during the next we determine
3920       // that due to effects of filters or otherwise, we have an empty row in the result.
3921       // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
3922       // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
3923       // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
3924       while (true) {
3925         if (rpcCall != null) {
3926           // If a user specifies a too-restrictive or too-slow scanner, the
3927           // client might time out and disconnect while the server side
3928           // is still processing the request. We should abort aggressively
3929           // in that case.
3930           long afterTime = rpcCall.disconnectSince();
3931           if (afterTime >= 0) {
3932             throw new CallerDisconnectedException(
3933                 "Aborting on region " + getRegionNameAsString() + ", call " +
3934                     this + " after " + afterTime + " ms, since " +
3935                     "caller disconnected");
3936           }
3937         }
3938 
3939         // Let's see what we have in the storeHeap.
3940         Cell current = this.storeHeap.peek();
3941 
3942         byte[] currentRow = null;
3943         int offset = 0;
3944         short length = 0;
3945         if (current != null) {
3946           currentRow = current.getRowArray();
3947           offset = current.getRowOffset();
3948           length = current.getRowLength();
3949         }
3950         boolean stopRow = isStopRow(currentRow, offset, length);
3951         // Check if we were getting data from the joinedHeap and hit the limit.
3952         // If not, then it's main path - getting results from storeHeap.
3953         if (joinedContinuationRow == null) {
3954           // First, check if we are at a stop row. If so, there are no more results.
3955           if (stopRow) {
3956             if (filter != null && filter.hasFilterRow()) {
3957               filter.filterRowCells(results);
3958             }
3959             return false;
3960           }
3961 
3962           // Check if rowkey filter wants to exclude this row. If so, loop to next.
3963           // Technically, if we hit limits before on this row, we don't need this call.
3964           if (filterRowKey(currentRow, offset, length)) {
3965             boolean moreRows = nextRow(currentRow, offset, length);
3966             if (!moreRows) return false;
3967             results.clear();
3968             continue;
3969           }
3970 
3971           Cell nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
3972               length);
3973           // Ok, we are good, let's try to get some results from the main heap.
3974           if (nextKv == KV_LIMIT) {
3975             if (this.filter != null && filter.hasFilterRow()) {
3976               throw new IncompatibleFilterException(
3977                 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
3978             }
3979             return true; // We hit the limit.
3980           }
3981 
3982           stopRow = nextKv == null ||
3983               isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
3984           // save that the row was empty before filters applied to it.
3985           final boolean isEmptyRow = results.isEmpty();
3986 
3987           // We have the part of the row necessary for filtering (all of it, usually).
3988           // First filter with the filterRow(List).
3989           FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
3990           if (filter != null && filter.hasFilterRow()) {
3991             ret = filter.filterRowCellsWithRet(results);
3992           }
3993 
3994           if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
3995             results.clear();
3996             boolean moreRows = nextRow(currentRow, offset, length);
3997             if (!moreRows) return false;
3998 
3999             // This row was totally filtered out, if this is NOT the last row,
4000             // we should continue on. Otherwise, nothing else to do.
4001             if (!stopRow) continue;
4002             return false;
4003           }
4004 
4005           // Ok, we are done with storeHeap for this row.
4006           // Now we may need to fetch additional, non-essential data into row.
4007           // These values are not needed for filter to work, so we postpone their
4008           // fetch to (possibly) reduce amount of data loads from disk.
4009           if (this.joinedHeap != null) {
4010             Cell nextJoinedKv = joinedHeap.peek();
4011             // If joinedHeap is pointing to some other row, try to seek to a correct one.
4012             boolean mayHaveData = (nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv,
4013                 currentRow, offset, length))
4014                 || (this.joinedHeap.requestSeek(
4015                     KeyValueUtil.createFirstOnRow(currentRow, offset, length), true, true)
4016                     && joinedHeap.peek() != null && CellUtil.matchingRow(joinedHeap.peek(),
4017                     currentRow, offset, length));
4018             if (mayHaveData) {
4019               joinedContinuationRow = current;
4020               populateFromJoinedHeap(results, limit);
4021             }
4022           }
4023         } else {
4024           // Populating from the joined heap was stopped by limits, populate some more.
4025           populateFromJoinedHeap(results, limit);
4026         }
4027 
4028         // We may have just called populateFromJoinedMap and hit the limits. If that is
4029         // the case, we need to call it again on the next next() invocation.
4030         if (joinedContinuationRow != null) {
4031           return true;
4032         }
4033 
4034         // Finally, we are done with both joinedHeap and storeHeap.
4035         // Double check to prevent empty rows from appearing in result. It could be
4036         // the case when SingleColumnValueExcludeFilter is used.
4037         if (results.isEmpty()) {
4038           boolean moreRows = nextRow(currentRow, offset, length);
4039           if (!moreRows) return false;
4040           if (!stopRow) continue;
4041         }
4042 
4043         // We are done. Return the result.
4044         return !stopRow;
4045       }
4046     }
4047 
4048     /**
4049      * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines
4050      * both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may
4051      * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns
4052      * true when filterRow(List<KeyValue> kvs) is overridden not the filterRow(). Therefore, the
4053      * filterRow() will be skipped.
4054      */
4055     private boolean filterRow() throws IOException {
4056       // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
4057       // filterRowCells(List<Cell> kvs) so we skip that scenario here.
4058       return filter != null && (!filter.hasFilterRow())
4059           && filter.filterRow();
4060     }
4061 
4062     private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
4063       return filter != null
4064           && filter.filterRowKey(row, offset, length);
4065     }
4066 
4067     protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
4068       assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
4069       Cell next;
4070       while ((next = this.storeHeap.peek()) != null &&
4071              CellUtil.matchingRow(next, currentRow, offset, length)) {
4072         this.storeHeap.next(MOCKED_LIST);
4073       }
4074       resetFilters();
4075       // Calling the hook in CP which allows it to do a fast forward
4076       return this.region.getCoprocessorHost() == null
4077           || this.region.getCoprocessorHost()
4078               .postScannerFilterRow(this, currentRow, offset, length);
4079     }
4080 
4081     protected boolean isStopRow(byte[] currentRow, int offset, short length) {
4082       return currentRow == null ||
4083           (stopRow != null &&
4084           comparator.compareRows(stopRow, 0, stopRow.length,
4085             currentRow, offset, length) <= isScan);
4086     }
4087 
4088     @Override
4089     public synchronized void close() {
4090       if (storeHeap != null) {
4091         storeHeap.close();
4092         storeHeap = null;
4093       }
4094       if (joinedHeap != null) {
4095         joinedHeap.close();
4096         joinedHeap = null;
4097       }
4098       // no need to synchronize here.
4099       scannerReadPoints.remove(this);
4100       this.filterClosed = true;
4101     }
4102 
4103     KeyValueHeap getStoreHeapForTesting() {
4104       return storeHeap;
4105     }
4106 
4107     @Override
4108     public synchronized boolean reseek(byte[] row) throws IOException {
4109       if (row == null) {
4110         throw new IllegalArgumentException("Row cannot be null.");
4111       }
4112       boolean result = false;
4113       startRegionOperation();
4114       try {
4115         KeyValue kv = KeyValueUtil.createFirstOnRow(row);
4116         // use request seek to make use of the lazy seek option. See HBASE-5520
4117         result = this.storeHeap.requestSeek(kv, true, true);
4118         if (this.joinedHeap != null) {
4119           result = this.joinedHeap.requestSeek(kv, true, true) || result;
4120         }
4121       } finally {
4122         closeRegionOperation();
4123       }
4124       return result;
4125     }
4126   }
4127 
4128   // Utility methods
4129   /**
4130    * A utility method to create new instances of HRegion based on the
4131    * {@link HConstants#REGION_IMPL} configuration property.
4132    * @param tableDir qualified path of directory where region should be located,
4133    * usually the table directory.
4134    * @param log The HLog is the outbound log for any updates to the HRegion
4135    * (There's a single HLog for all the HRegions on a single HRegionServer.)
4136    * The log file is a logfile from the previous execution that's
4137    * custom-computed for this HRegion. The HRegionServer computes and sorts the
4138    * appropriate log info for this HRegion. If there is a previous log file
4139    * (implying that the HRegion has been written-to before), then read it from
4140    * the supplied path.
4141    * @param fs is the filesystem.
4142    * @param conf is global configuration settings.
4143    * @param regionInfo - HRegionInfo that describes the region
4144    * is new), then read them from the supplied path.
4145    * @param htd the table descriptor
4146    * @return the new instance
4147    */
4148   static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
4149       Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
4150       RegionServerServices rsServices) {
4151     try {
4152       @SuppressWarnings("unchecked")
4153       Class<? extends HRegion> regionClass =
4154           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
4155 
4156       Constructor<? extends HRegion> c =
4157           regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
4158               Configuration.class, HRegionInfo.class, HTableDescriptor.class,
4159               RegionServerServices.class);
4160 
4161       return c.newInstance(tableDir, log, fs, conf, regionInfo, htd, rsServices);
4162     } catch (Throwable e) {
4163       // todo: what should I throw here?
4164       throw new IllegalStateException("Could not instantiate a region instance.", e);
4165     }
4166   }
4167 
4168   /**
4169    * Convenience method creating new HRegions. Used by createTable and by the
4170    * bootstrap code in the HMaster constructor.
4171    * Note, this method creates an {@link HLog} for the created region. It
4172    * needs to be closed explicitly.  Use {@link HRegion#getLog()} to get
4173    * access.  <b>When done with a region created using this method, you will
4174    * need to explicitly close the {@link HLog} it created too; it will not be
4175    * done for you.  Not closing the log will leave at least a daemon thread
4176    * running.</b>  Call {@link #closeHRegion(HRegion)} and it will do
4177    * necessary cleanup for you.
4178    * @param info Info for region to create.
4179    * @param rootDir Root directory for HBase instance
4180    * @return new HRegion
4181    *
4182    * @throws IOException
4183    */
4184   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4185       final Configuration conf, final HTableDescriptor hTableDescriptor)
4186   throws IOException {
4187     return createHRegion(info, rootDir, conf, hTableDescriptor, null);
4188   }
4189 
4190   /**
4191    * This will do the necessary cleanup a call to
4192    * {@link #createHRegion(HRegionInfo, Path, Configuration, HTableDescriptor)}
4193    * requires.  This method will close the region and then close its
4194    * associated {@link HLog} file.  You use it if you call the other createHRegion,
4195    * the one that takes an {@link HLog} instance but don't be surprised by the
4196    * call to the {@link HLog#closeAndDelete()} on the {@link HLog} the
4197    * HRegion was carrying.
4198    * @throws IOException
4199    */
4200   public static void closeHRegion(final HRegion r) throws IOException {
4201     if (r == null) return;
4202     r.close();
4203     if (r.getLog() == null) return;
4204     r.getLog().closeAndDelete();
4205   }
4206 
4207   /**
4208    * Convenience method creating new HRegions. Used by createTable.
4209    * The {@link HLog} for the created region needs to be closed explicitly.
4210    * Use {@link HRegion#getLog()} to get access.
4211    *
4212    * @param info Info for region to create.
4213    * @param rootDir Root directory for HBase instance
4214    * @param hlog shared HLog
4215    * @param initialize - true to initialize the region
4216    * @return new HRegion
4217    *
4218    * @throws IOException
4219    */
4220   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4221                                       final Configuration conf,
4222                                       final HTableDescriptor hTableDescriptor,
4223                                       final HLog hlog,
4224                                       final boolean initialize)
4225       throws IOException {
4226     return createHRegion(info, rootDir, conf, hTableDescriptor,
4227         hlog, initialize, false);
4228   }
4229 
4230   /**
4231    * Convenience method creating new HRegions. Used by createTable.
4232    * The {@link HLog} for the created region needs to be closed
4233    * explicitly, if it is not null.
4234    * Use {@link HRegion#getLog()} to get access.
4235    *
4236    * @param info Info for region to create.
4237    * @param rootDir Root directory for HBase instance
4238    * @param hlog shared HLog
4239    * @param initialize - true to initialize the region
4240    * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
4241    * @return new HRegion
4242    * @throws IOException
4243    */
4244   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4245                                       final Configuration conf,
4246                                       final HTableDescriptor hTableDescriptor,
4247                                       final HLog hlog,
4248                                       final boolean initialize, final boolean ignoreHLog)
4249       throws IOException {
4250       Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4251       return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
4252   }
4253 
4254   /**
4255    * Convenience method creating new HRegions. Used by createTable.
4256    * The {@link HLog} for the created region needs to be closed
4257    * explicitly, if it is not null.
4258    * Use {@link HRegion#getLog()} to get access.
4259    *
4260    * @param info Info for region to create.
4261    * @param rootDir Root directory for HBase instance
4262    * @param tableDir table directory
4263    * @param hlog shared HLog
4264    * @param initialize - true to initialize the region
4265    * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
4266    * @return new HRegion
4267    * @throws IOException
4268    */
4269   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
4270                                       final Configuration conf,
4271                                       final HTableDescriptor hTableDescriptor,
4272                                       final HLog hlog,
4273                                       final boolean initialize, final boolean ignoreHLog)
4274       throws IOException {
4275     LOG.info("creating HRegion " + info.getTable().getNameAsString()
4276         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
4277         " Table name == " + info.getTable().getNameAsString());
4278     FileSystem fs = FileSystem.get(conf);
4279     HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
4280     HLog effectiveHLog = hlog;
4281     if (hlog == null && !ignoreHLog) {
4282       effectiveHLog = HLogFactory.createHLog(fs, rfs.getRegionDir(),
4283                                              HConstants.HREGION_LOGDIR_NAME, conf);
4284     }
4285     HRegion region = HRegion.newHRegion(tableDir,
4286         effectiveHLog, fs, conf, info, hTableDescriptor, null);
4287     if (initialize) {
4288       // If initializing, set the sequenceId. It is also required by HLogPerformanceEvaluation when
4289       // verifying the WALEdits.
4290       region.setSequenceId(region.initialize(null));
4291     }
4292     return region;
4293   }
4294 
4295   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4296                                       final Configuration conf,
4297                                       final HTableDescriptor hTableDescriptor,
4298                                       final HLog hlog)
4299     throws IOException {
4300     return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
4301   }
4302 
4303 
4304   /**
4305    * Open a Region.
4306    * @param info Info for region to be opened.
4307    * @param wal HLog for region to use. This method will call
4308    * HLog#setSequenceNumber(long) passing the result of the call to
4309    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4310    * up.  HRegionStore does this every time it opens a new region.
4311    * @return new HRegion
4312    *
4313    * @throws IOException
4314    */
4315   public static HRegion openHRegion(final HRegionInfo info,
4316       final HTableDescriptor htd, final HLog wal,
4317       final Configuration conf)
4318   throws IOException {
4319     return openHRegion(info, htd, wal, conf, null, null);
4320   }
4321 
4322   /**
4323    * Open a Region.
4324    * @param info Info for region to be opened
4325    * @param htd the table descriptor
4326    * @param wal HLog for region to use. This method will call
4327    * HLog#setSequenceNumber(long) passing the result of the call to
4328    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4329    * up.  HRegionStore does this every time it opens a new region.
4330    * @param conf The Configuration object to use.
4331    * @param rsServices An interface we can request flushes against.
4332    * @param reporter An interface we can report progress against.
4333    * @return new HRegion
4334    *
4335    * @throws IOException
4336    */
4337   public static HRegion openHRegion(final HRegionInfo info,
4338     final HTableDescriptor htd, final HLog wal, final Configuration conf,
4339     final RegionServerServices rsServices,
4340     final CancelableProgressable reporter)
4341   throws IOException {
4342     return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
4343   }
4344 
4345   /**
4346    * Open a Region.
4347    * @param rootDir Root directory for HBase instance
4348    * @param info Info for region to be opened.
4349    * @param htd the table descriptor
4350    * @param wal HLog for region to use. This method will call
4351    * HLog#setSequenceNumber(long) passing the result of the call to
4352    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4353    * up.  HRegionStore does this every time it opens a new region.
4354    * @param conf The Configuration object to use.
4355    * @return new HRegion
4356    * @throws IOException
4357    */
4358   public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
4359       final HTableDescriptor htd, final HLog wal, final Configuration conf)
4360   throws IOException {
4361     return openHRegion(rootDir, info, htd, wal, conf, null, null);
4362   }
4363 
4364   /**
4365    * Open a Region.
4366    * @param rootDir Root directory for HBase instance
4367    * @param info Info for region to be opened.
4368    * @param htd the table descriptor
4369    * @param wal HLog for region to use. This method will call
4370    * HLog#setSequenceNumber(long) passing the result of the call to
4371    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4372    * up.  HRegionStore does this every time it opens a new region.
4373    * @param conf The Configuration object to use.
4374    * @param rsServices An interface we can request flushes against.
4375    * @param reporter An interface we can report progress against.
4376    * @return new HRegion
4377    * @throws IOException
4378    */
4379   public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
4380       final HTableDescriptor htd, final HLog wal, final Configuration conf,
4381       final RegionServerServices rsServices,
4382       final CancelableProgressable reporter)
4383   throws IOException {
4384     FileSystem fs = null;
4385     if (rsServices != null) {
4386       fs = rsServices.getFileSystem();
4387     }
4388     if (fs == null) {
4389       fs = FileSystem.get(conf);
4390     }
4391     return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
4392   }
4393 
4394   /**
4395    * Open a Region.
4396    * @param conf The Configuration object to use.
4397    * @param fs Filesystem to use
4398    * @param rootDir Root directory for HBase instance
4399    * @param info Info for region to be opened.
4400    * @param htd the table descriptor
4401    * @param wal HLog for region to use. This method will call
4402    * HLog#setSequenceNumber(long) passing the result of the call to
4403    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4404    * up.  HRegionStore does this every time it opens a new region.
4405    * @return new HRegion
4406    * @throws IOException
4407    */
4408   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4409       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal)
4410       throws IOException {
4411     return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
4412   }
4413 
4414   /**
4415    * Open a Region.
4416    * @param conf The Configuration object to use.
4417    * @param fs Filesystem to use
4418    * @param rootDir Root directory for HBase instance
4419    * @param info Info for region to be opened.
4420    * @param htd the table descriptor
4421    * @param wal HLog for region to use. This method will call
4422    * HLog#setSequenceNumber(long) passing the result of the call to
4423    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4424    * up.  HRegionStore does this every time it opens a new region.
4425    * @param rsServices An interface we can request flushes against.
4426    * @param reporter An interface we can report progress against.
4427    * @return new HRegion
4428    * @throws IOException
4429    */
4430   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4431       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4432       final RegionServerServices rsServices, final CancelableProgressable reporter)
4433       throws IOException {
4434     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4435     return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
4436   }
4437 
4438   /**
4439    * Open a Region.
4440    * @param conf The Configuration object to use.
4441    * @param fs Filesystem to use
4442    * @param rootDir Root directory for HBase instance
4443    * @param info Info for region to be opened.
4444    * @param htd the table descriptor
4445    * @param wal HLog for region to use. This method will call
4446    * HLog#setSequenceNumber(long) passing the result of the call to
4447    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4448    * up.  HRegionStore does this every time it opens a new region.
4449    * @param rsServices An interface we can request flushes against.
4450    * @param reporter An interface we can report progress against.
4451    * @return new HRegion
4452    * @throws IOException
4453    */
4454   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4455       final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4456       final RegionServerServices rsServices, final CancelableProgressable reporter)
4457       throws IOException {
4458     if (info == null) throw new NullPointerException("Passed region info is null");
4459     if (LOG.isDebugEnabled()) {
4460       LOG.debug("Opening region: " + info);
4461     }
4462     HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
4463     return r.openHRegion(reporter);
4464   }
4465 
4466 
4467   /**
4468    * Useful when reopening a closed region (normally for unit tests)
4469    * @param other original object
4470    * @param reporter An interface we can report progress against.
4471    * @return new HRegion
4472    * @throws IOException
4473    */
4474   public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
4475       throws IOException {
4476     HRegionFileSystem regionFs = other.getRegionFileSystem();
4477     HRegion r = newHRegion(regionFs.getTableDir(), other.getLog(), regionFs.getFileSystem(),
4478         other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
4479     return r.openHRegion(reporter);
4480   }
4481 
4482   /**
4483    * Open HRegion.
4484    * Calls initialize and sets sequenceId.
4485    * @return Returns <code>this</code>
4486    * @throws IOException
4487    */
4488   protected HRegion openHRegion(final CancelableProgressable reporter)
4489   throws IOException {
4490     checkCompressionCodecs();
4491 
4492     this.openSeqNum = initialize(reporter);
4493     this.setSequenceId(openSeqNum);
4494     return this;
4495   }
4496 
4497   private void checkCompressionCodecs() throws IOException {
4498     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4499       CompressionTest.testCompression(fam.getCompression());
4500       CompressionTest.testCompression(fam.getCompactionCompression());
4501     }
4502   }
4503 
4504   /**
4505    * Create a daughter region from given a temp directory with the region data.
4506    * @param hri Spec. for daughter region to open.
4507    * @throws IOException
4508    */
4509   HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
4510     // Move the files from the temporary .splits to the final /table/region directory
4511     fs.commitDaughterRegion(hri);
4512 
4513     // Create the daughter HRegion instance
4514     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
4515         this.getBaseConf(), hri, this.getTableDesc(), rsServices);
4516     r.readRequestsCount.set(this.getReadRequestsCount() / 2);
4517     r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
4518     return r;
4519   }
4520 
4521   /**
4522    * Create a merged region given a temp directory with the region data.
4523    * @param region_b another merging region
4524    * @return merged HRegion
4525    * @throws IOException
4526    */
4527   HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
4528       final HRegion region_b) throws IOException {
4529     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(),
4530         fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
4531         this.getTableDesc(), this.rsServices);
4532     r.readRequestsCount.set(this.getReadRequestsCount()
4533         + region_b.getReadRequestsCount());
4534     r.writeRequestsCount.set(this.getWriteRequestsCount()
4535         + region_b.getWriteRequestsCount());
4536     this.fs.commitMergedRegion(mergedRegionInfo);
4537     return r;
4538   }
4539 
4540   /**
4541    * Inserts a new region's meta information into the passed
4542    * <code>meta</code> region. Used by the HMaster bootstrap code adding
4543    * new table to hbase:meta table.
4544    *
4545    * @param meta hbase:meta HRegion to be updated
4546    * @param r HRegion to add to <code>meta</code>
4547    *
4548    * @throws IOException
4549    */
4550   // TODO remove since only test and merge use this
4551   public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
4552     meta.checkResources();
4553     // The row key is the region name
4554     byte[] row = r.getRegionName();
4555     final long now = EnvironmentEdgeManager.currentTimeMillis();
4556     final List<Cell> cells = new ArrayList<Cell>(2);
4557     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4558       HConstants.REGIONINFO_QUALIFIER, now,
4559       r.getRegionInfo().toByteArray()));
4560     // Set into the root table the version of the meta table.
4561     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4562       HConstants.META_VERSION_QUALIFIER, now,
4563       Bytes.toBytes(HConstants.META_VERSION)));
4564     meta.put(row, HConstants.CATALOG_FAMILY, cells);
4565   }
4566 
4567   /**
4568    * Computes the Path of the HRegion
4569    *
4570    * @param tabledir qualified path for table
4571    * @param name ENCODED region name
4572    * @return Path of HRegion directory
4573    */
4574   @Deprecated
4575   public static Path getRegionDir(final Path tabledir, final String name) {
4576     return new Path(tabledir, name);
4577   }
4578 
4579   /**
4580    * Computes the Path of the HRegion
4581    *
4582    * @param rootdir qualified path of HBase root directory
4583    * @param info HRegionInfo for the region
4584    * @return qualified path of region directory
4585    */
4586   @Deprecated
4587   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
4588     return new Path(
4589       FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
4590   }
4591 
4592   /**
4593    * Determines if the specified row is within the row range specified by the
4594    * specified HRegionInfo
4595    *
4596    * @param info HRegionInfo that specifies the row range
4597    * @param row row to be checked
4598    * @return true if the row is within the range specified by the HRegionInfo
4599    */
4600   public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
4601     return ((info.getStartKey().length == 0) ||
4602         (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
4603         ((info.getEndKey().length == 0) ||
4604             (Bytes.compareTo(info.getEndKey(), row) > 0));
4605   }
4606 
4607   /**
4608    * Merge two HRegions.  The regions must be adjacent and must not overlap.
4609    *
4610    * @return new merged HRegion
4611    * @throws IOException
4612    */
4613   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
4614   throws IOException {
4615     HRegion a = srcA;
4616     HRegion b = srcB;
4617 
4618     // Make sure that srcA comes first; important for key-ordering during
4619     // write of the merged file.
4620     if (srcA.getStartKey() == null) {
4621       if (srcB.getStartKey() == null) {
4622         throw new IOException("Cannot merge two regions with null start key");
4623       }
4624       // A's start key is null but B's isn't. Assume A comes before B
4625     } else if ((srcB.getStartKey() == null) ||
4626       (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
4627       a = srcB;
4628       b = srcA;
4629     }
4630 
4631     if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
4632       throw new IOException("Cannot merge non-adjacent regions");
4633     }
4634     return merge(a, b);
4635   }
4636 
4637   /**
4638    * Merge two regions whether they are adjacent or not.
4639    *
4640    * @param a region a
4641    * @param b region b
4642    * @return new merged region
4643    * @throws IOException
4644    */
4645   public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
4646     if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
4647       throw new IOException("Regions do not belong to the same table");
4648     }
4649 
4650     FileSystem fs = a.getRegionFileSystem().getFileSystem();
4651     // Make sure each region's cache is empty
4652     a.flushcache();
4653     b.flushcache();
4654 
4655     // Compact each region so we only have one store file per family
4656     a.compactStores(true);
4657     if (LOG.isDebugEnabled()) {
4658       LOG.debug("Files for region: " + a);
4659       a.getRegionFileSystem().logFileSystemState(LOG);
4660     }
4661     b.compactStores(true);
4662     if (LOG.isDebugEnabled()) {
4663       LOG.debug("Files for region: " + b);
4664       b.getRegionFileSystem().logFileSystemState(LOG);
4665     }
4666 
4667     RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
4668     if (!rmt.prepare(null)) {
4669       throw new IOException("Unable to merge regions " + a + " and " + b);
4670     }
4671     HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
4672     LOG.info("starting merge of regions: " + a + " and " + b
4673         + " into new region " + mergedRegionInfo.getRegionNameAsString()
4674         + " with start key <"
4675         + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
4676         + "> and end key <"
4677         + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
4678     HRegion dstRegion;
4679     try {
4680       dstRegion = rmt.execute(null, null);
4681     } catch (IOException ioe) {
4682       rmt.rollback(null, null);
4683       throw new IOException("Failed merging region " + a + " and " + b
4684           + ", and successfully rolled back");
4685     }
4686     dstRegion.compactStores(true);
4687 
4688     if (LOG.isDebugEnabled()) {
4689       LOG.debug("Files for new region");
4690       dstRegion.getRegionFileSystem().logFileSystemState(LOG);
4691     }
4692 
4693     if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
4694       throw new IOException("Merged region " + dstRegion
4695           + " still has references after the compaction, is compaction canceled?");
4696     }
4697 
4698     // Archiving the 'A' region
4699     HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
4700     // Archiving the 'B' region
4701     HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
4702 
4703     LOG.info("merge completed. New region is " + dstRegion);
4704     return dstRegion;
4705   }
4706 
4707   //
4708   // HBASE-880
4709   //
4710   /**
4711    * @param get get object
4712    * @return result
4713    * @throws IOException read exceptions
4714    */
4715   public Result get(final Get get) throws IOException {
4716     checkRow(get.getRow(), "Get");
4717     // Verify families are all valid
4718     if (get.hasFamilies()) {
4719       for (byte [] family: get.familySet()) {
4720         checkFamily(family);
4721       }
4722     } else { // Adding all families to scanner
4723       for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
4724         get.addFamily(family);
4725       }
4726     }
4727     List<Cell> results = get(get, true);
4728     return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null);
4729   }
4730 
4731   /*
4732    * Do a get based on the get parameter.
4733    * @param withCoprocessor invoke coprocessor or not. We don't want to
4734    * always invoke cp for this private method.
4735    */
4736   private List<Cell> get(Get get, boolean withCoprocessor)
4737   throws IOException {
4738 
4739     List<Cell> results = new ArrayList<Cell>();
4740 
4741     // pre-get CP hook
4742     if (withCoprocessor && (coprocessorHost != null)) {
4743        if (coprocessorHost.preGet(get, results)) {
4744          return results;
4745        }
4746     }
4747 
4748     Scan scan = new Scan(get);
4749 
4750     RegionScanner scanner = null;
4751     try {
4752       scanner = getScanner(scan);
4753       scanner.next(results);
4754     } finally {
4755       if (scanner != null)
4756         scanner.close();
4757     }
4758 
4759     // post-get CP hook
4760     if (withCoprocessor && (coprocessorHost != null)) {
4761       coprocessorHost.postGet(get, results);
4762     }
4763 
4764     // do after lock
4765     if (this.metricsRegion != null) {
4766       long totalSize = 0l;
4767       for (Cell kv : results) {
4768         totalSize += KeyValueUtil.ensureKeyValue(kv).getLength();
4769       }
4770       this.metricsRegion.updateGet(totalSize);
4771     }
4772 
4773     return results;
4774   }
4775 
4776   public void mutateRow(RowMutations rm) throws IOException {
4777     // Don't need nonces here - RowMutations only supports puts and deletes
4778     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
4779   }
4780 
4781   /**
4782    * Perform atomic mutations within the region w/o nonces.
4783    * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)}
4784    */
4785   public void mutateRowsWithLocks(Collection<Mutation> mutations,
4786       Collection<byte[]> rowsToLock) throws IOException {
4787     mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
4788   }
4789 
4790   /**
4791    * Perform atomic mutations within the region.
4792    * @param mutations The list of mutations to perform.
4793    * <code>mutations</code> can contain operations for multiple rows.
4794    * Caller has to ensure that all rows are contained in this region.
4795    * @param rowsToLock Rows to lock
4796    * @param nonceGroup Optional nonce group of the operation (client Id)
4797    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4798    * If multiple rows are locked care should be taken that
4799    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
4800    * @throws IOException
4801    */
4802   public void mutateRowsWithLocks(Collection<Mutation> mutations,
4803       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
4804     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
4805     processRowsWithLocks(proc, -1, nonceGroup, nonce);
4806   }
4807 
4808   /**
4809    * Performs atomic multiple reads and writes on a given row.
4810    *
4811    * @param processor The object defines the reads and writes to a row.
4812    * @param nonceGroup Optional nonce group of the operation (client Id)
4813    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4814    */
4815   public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
4816       throws IOException {
4817     processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
4818   }
4819 
4820   /**
4821    * Performs atomic multiple reads and writes on a given row.
4822    *
4823    * @param processor The object defines the reads and writes to a row.
4824    * @param timeout The timeout of the processor.process() execution
4825    *                Use a negative number to switch off the time bound
4826    * @param nonceGroup Optional nonce group of the operation (client Id)
4827    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4828    */
4829   public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
4830       long nonceGroup, long nonce) throws IOException {
4831 
4832     for (byte[] row : processor.getRowsToLock()) {
4833       checkRow(row, "processRowsWithLocks");
4834     }
4835     if (!processor.readOnly()) {
4836       checkReadOnly();
4837     }
4838     checkResources();
4839 
4840     startRegionOperation();
4841     WALEdit walEdit = new WALEdit();
4842 
4843     // 1. Run pre-process hook
4844     try {
4845       processor.preProcess(this, walEdit);
4846     } catch (IOException e) {
4847       closeRegionOperation();
4848       throw e;
4849     }
4850     // Short circuit the read only case
4851     if (processor.readOnly()) {
4852       try {
4853         long now = EnvironmentEdgeManager.currentTimeMillis();
4854         doProcessRowWithTimeout(
4855             processor, now, this, null, null, timeout);
4856         processor.postProcess(this, walEdit);
4857       } catch (IOException e) {
4858         throw e;
4859       } finally {
4860         closeRegionOperation();
4861       }
4862       return;
4863     }
4864 
4865     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
4866     boolean locked;
4867     boolean walSyncSuccessful = false;
4868     List<RowLock> acquiredRowLocks;
4869     long addedSize = 0;
4870     List<KeyValue> mutations = new ArrayList<KeyValue>();
4871     List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
4872     Collection<byte[]> rowsToLock = processor.getRowsToLock();
4873     long mvccNum = 0;
4874     HLogKey walKey = null;
4875     try {
4876       // 2. Acquire the row lock(s)
4877       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
4878       for (byte[] row : rowsToLock) {
4879         // Attempt to lock all involved rows, throw if any lock times out
4880         acquiredRowLocks.add(getRowLock(row));
4881       }
4882       // 3. Region lock
4883       lock(this.updatesLock.readLock(), acquiredRowLocks.size());
4884       locked = true;
4885       mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
4886 
4887       long now = EnvironmentEdgeManager.currentTimeMillis();
4888       try {
4889         // 4. Let the processor scan the rows, generate mutations and add
4890         //    waledits
4891         doProcessRowWithTimeout(
4892             processor, now, this, mutations, walEdit, timeout);
4893 
4894         if (!mutations.isEmpty()) {
4895           // 5. Get a mvcc write number
4896           writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
4897           // 6. Apply to memstore
4898           for (KeyValue kv : mutations) {
4899             kv.setMvccVersion(mvccNum);
4900             Store store = getStore(kv);
4901             if (store == null) {
4902               checkFamily(CellUtil.cloneFamily(kv));
4903               // unreachable
4904             }
4905             Pair<Long, Cell> ret = store.add(kv);
4906             addedSize += ret.getFirst();
4907             memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
4908           }
4909 
4910           long txid = 0;
4911           // 7. Append no sync
4912           if (!walEdit.isEmpty()) {
4913             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
4914               this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now, 
4915               processor.getClusterIds(), nonceGroup, nonce);
4916             txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
4917               walKey, walEdit, getSequenceId(), true, memstoreCells);
4918           }
4919           if(walKey == null){
4920             // since we use log sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
4921             // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
4922             walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
4923           }
4924 
4925           // 8. Release region lock
4926           if (locked) {
4927             this.updatesLock.readLock().unlock();
4928             locked = false;
4929           }
4930 
4931           // 9. Release row lock(s)
4932           releaseRowLocks(acquiredRowLocks);
4933 
4934           // 10. Sync edit log
4935           if (txid != 0) {
4936             syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
4937           }
4938           walSyncSuccessful = true;
4939         }
4940       } finally {
4941         if (!mutations.isEmpty() && !walSyncSuccessful) {
4942           LOG.warn("Wal sync failed. Roll back " + mutations.size() +
4943               " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
4944               processor.getRowsToLock().iterator().next()) + "...");
4945           for (KeyValue kv : mutations) {
4946             getStore(kv).rollback(kv);
4947           }
4948         }
4949         // 11. Roll mvcc forward
4950         if (writeEntry != null) {
4951           mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
4952         }
4953         if (locked) {
4954           this.updatesLock.readLock().unlock();
4955         }
4956         // release locks if some were acquired but another timed out
4957         releaseRowLocks(acquiredRowLocks);
4958       }
4959 
4960       // 12. Run post-process hook
4961       processor.postProcess(this, walEdit);
4962 
4963     } catch (IOException e) {
4964       throw e;
4965     } finally {
4966       closeRegionOperation();
4967       if (!mutations.isEmpty() &&
4968           isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
4969         requestFlush();
4970       }
4971     }
4972   }
4973 
4974   private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
4975                                        final long now,
4976                                        final HRegion region,
4977                                        final List<KeyValue> mutations,
4978                                        final WALEdit walEdit,
4979                                        final long timeout) throws IOException {
4980     // Short circuit the no time bound case.
4981     if (timeout < 0) {
4982       try {
4983         processor.process(now, region, mutations, walEdit);
4984       } catch (IOException e) {
4985         LOG.warn("RowProcessor:" + processor.getClass().getName() +
4986             " throws Exception on row(s):" +
4987             Bytes.toStringBinary(
4988               processor.getRowsToLock().iterator().next()) + "...", e);
4989         throw e;
4990       }
4991       return;
4992     }
4993 
4994     // Case with time bound
4995     FutureTask<Void> task =
4996       new FutureTask<Void>(new Callable<Void>() {
4997         @Override
4998         public Void call() throws IOException {
4999           try {
5000             processor.process(now, region, mutations, walEdit);
5001             return null;
5002           } catch (IOException e) {
5003             LOG.warn("RowProcessor:" + processor.getClass().getName() +
5004                 " throws Exception on row(s):" +
5005                 Bytes.toStringBinary(
5006                     processor.getRowsToLock().iterator().next()) + "...", e);
5007             throw e;
5008           }
5009         }
5010       });
5011     rowProcessorExecutor.execute(task);
5012     try {
5013       task.get(timeout, TimeUnit.MILLISECONDS);
5014     } catch (TimeoutException te) {
5015       LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
5016           Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
5017           "...");
5018       throw new IOException(te);
5019     } catch (Exception e) {
5020       throw new IOException(e);
5021     }
5022   }
5023 
5024   public Result append(Append append) throws IOException {
5025     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
5026   }
5027 
5028   // TODO: There's a lot of boiler plate code identical
5029   // to increment... See how to better unify that.
5030   /**
5031    * Perform one or more append operations on a row.
5032    *
5033    * @return new keyvalues after increment
5034    * @throws IOException
5035    */
5036   public Result append(Append append, long nonceGroup, long nonce)
5037       throws IOException {
5038     byte[] row = append.getRow();
5039     checkRow(row, "append");
5040     boolean flush = false;
5041     Durability durability = getEffectiveDurability(append.getDurability());
5042     boolean writeToWAL = durability != Durability.SKIP_WAL;
5043     WALEdit walEdits = null;
5044     List<Cell> allKVs = new ArrayList<Cell>(append.size());
5045     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5046 
5047     long size = 0;
5048     long txid = 0;
5049 
5050     checkReadOnly();
5051     checkResources();
5052     // Lock row
5053     startRegionOperation(Operation.APPEND);
5054     this.writeRequestsCount.increment();
5055     long mvccNum = 0;
5056     WriteEntry w = null;
5057     HLogKey walKey = null;
5058     RowLock rowLock = null;
5059     List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
5060     boolean doRollBackMemstore = false;
5061     try {
5062       rowLock = getRowLock(row);
5063       try {
5064         lock(this.updatesLock.readLock());
5065         try {
5066           // wait for all prior MVCC transactions to finish - while we hold the row lock
5067           // (so that we are guaranteed to see the latest state)
5068           mvcc.waitForPreviousTransactionsComplete();
5069           if (this.coprocessorHost != null) {
5070             Result r = this.coprocessorHost.preAppendAfterRowLock(append);
5071             if(r!= null) {
5072               return r;
5073             }
5074           }
5075           // now start my own transaction
5076           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5077           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5078           long now = EnvironmentEdgeManager.currentTimeMillis();
5079           // Process each family
5080           for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
5081 
5082             Store store = stores.get(family.getKey());
5083             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5084 
5085             // Sort the cells so that they match the order that they
5086             // appear in the Get results. Otherwise, we won't be able to
5087             // find the existing values if the cells are not specified
5088             // in order by the client since cells are in an array list.
5089             Collections.sort(family.getValue(), store.getComparator());
5090             // Get previous values for all columns in this family
5091             Get get = new Get(row);
5092             for (Cell cell : family.getValue()) {
5093               get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
5094             }
5095             List<Cell> results = get(get, false);
5096 
5097             // Iterate the input columns and update existing values if they were
5098             // found, otherwise add new column initialized to the append value
5099 
5100             // Avoid as much copying as possible. Every byte is copied at most
5101             // once.
5102             // Would be nice if KeyValue had scatter/gather logic
5103             int idx = 0;
5104             for (Cell cell : family.getValue()) {
5105               KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5106               KeyValue newKV;
5107               KeyValue oldKv = null;
5108               if (idx < results.size()
5109                   && CellUtil.matchingQualifier(results.get(idx),kv)) {
5110                 oldKv = KeyValueUtil.ensureKeyValue(results.get(idx));
5111                 // allocate an empty kv once
5112                 newKV = new KeyValue(row.length, kv.getFamilyLength(),
5113                     kv.getQualifierLength(), now, KeyValue.Type.Put,
5114                     oldKv.getValueLength() + kv.getValueLength(),
5115                     oldKv.getTagsLength() + kv.getTagsLength());
5116                 // copy in the value
5117                 System.arraycopy(oldKv.getValueArray(), oldKv.getValueOffset(),
5118                     newKV.getValueArray(), newKV.getValueOffset(),
5119                     oldKv.getValueLength());
5120                 System.arraycopy(kv.getValueArray(), kv.getValueOffset(),
5121                     newKV.getValueArray(),
5122                     newKV.getValueOffset() + oldKv.getValueLength(),
5123                     kv.getValueLength());
5124                 // copy in the tags
5125                 System.arraycopy(oldKv.getTagsArray(), oldKv.getTagsOffset(), newKV.getTagsArray(),
5126                     newKV.getTagsOffset(), oldKv.getTagsLength());
5127                 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
5128                     newKV.getTagsOffset() + oldKv.getTagsLength(), kv.getTagsLength());
5129                 // copy in row, family, and qualifier
5130                 System.arraycopy(kv.getRowArray(), kv.getRowOffset(),
5131                     newKV.getRowArray(), newKV.getRowOffset(), kv.getRowLength());
5132                 System.arraycopy(kv.getFamilyArray(), kv.getFamilyOffset(),
5133                     newKV.getFamilyArray(), newKV.getFamilyOffset(),
5134                     kv.getFamilyLength());
5135                 System.arraycopy(kv.getQualifierArray(), kv.getQualifierOffset(),
5136                     newKV.getQualifierArray(), newKV.getQualifierOffset(),
5137                     kv.getQualifierLength());
5138                 idx++;
5139               } else {
5140                 newKV = kv;
5141                 // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
5142                 // so only need to update the timestamp to 'now'
5143                 newKV.updateLatestStamp(Bytes.toBytes(now));
5144              }
5145               newKV.setMvccVersion(mvccNum);
5146               // Give coprocessors a chance to update the new cell
5147               if (coprocessorHost != null) {
5148                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5149                     RegionObserver.MutationType.APPEND, append, oldKv, newKV));
5150               }
5151               kvs.add(newKV);
5152 
5153               // Append update to WAL
5154               if (writeToWAL) {
5155                 if (walEdits == null) {
5156                   walEdits = new WALEdit();
5157                 }
5158                 walEdits.add(newKV);
5159               }
5160             }
5161 
5162             //store the kvs to the temporary memstore before writing HLog
5163             tempMemstore.put(store, kvs);
5164           }
5165 
5166           //Actually write to Memstore now
5167           for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5168             Store store = entry.getKey();
5169             if (store.getFamily().getMaxVersions() == 1) {
5170               // upsert if VERSIONS for this CF == 1
5171               size += store.upsert(entry.getValue(), getSmallestReadPoint());
5172               memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue()));
5173             } else {
5174               // otherwise keep older versions around
5175               for (Cell cell: entry.getValue()) {
5176                 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5177                 Pair<Long, Cell> ret = store.add(kv);
5178                 size += ret.getFirst();
5179                 memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
5180                 doRollBackMemstore = true;
5181               }
5182             }
5183             allKVs.addAll(entry.getValue());
5184           }
5185           
5186           // Actually write to WAL now
5187           if (writeToWAL) {
5188             // Using default cluster id, as this can only happen in the originating
5189             // cluster. A slave cluster receives the final value (not the delta)
5190             // as a Put.
5191             walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
5192               this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
5193             txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
5194               this.sequenceId, true, memstoreCells);
5195           } else {
5196             recordMutationWithoutWal(append.getFamilyCellMap());
5197           }
5198           if(walKey == null){
5199             // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
5200             walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
5201           }
5202           
5203           size = this.addAndGetGlobalMemstoreSize(size);
5204           flush = isFlushSize(size);
5205         } finally {
5206           this.updatesLock.readLock().unlock();
5207         }
5208       } finally {
5209         rowLock.release();
5210         rowLock = null;
5211       }
5212       // sync the transaction log outside the rowlock
5213       if(txid != 0){
5214         syncOrDefer(txid, durability);
5215       }
5216       doRollBackMemstore = false;
5217     } finally {
5218       if (rowLock != null) {
5219         rowLock.release();
5220       }
5221       // if the wal sync was unsuccessful, remove keys from memstore
5222       if (doRollBackMemstore) {
5223         rollbackMemstore(memstoreCells);
5224       }
5225       if (w != null) {
5226         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5227       }
5228       closeRegionOperation(Operation.APPEND);
5229     }
5230 
5231     if (this.metricsRegion != null) {
5232       this.metricsRegion.updateAppend();
5233     }
5234 
5235     if (flush) {
5236       // Request a cache flush. Do it outside update lock.
5237       requestFlush();
5238     }
5239 
5240 
5241     return append.isReturnResults() ? Result.create(allKVs) : null;
5242   }
5243 
5244   public Result increment(Increment increment) throws IOException {
5245     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
5246   }
5247 
5248   /**
5249    * Perform one or more increment operations on a row.
5250    * @return new keyvalues after increment
5251    * @throws IOException
5252    */
5253   public Result increment(Increment increment, long nonceGroup, long nonce)
5254   throws IOException {
5255     byte [] row = increment.getRow();
5256     checkRow(row, "increment");
5257     TimeRange tr = increment.getTimeRange();
5258     boolean flush = false;
5259     Durability durability = getEffectiveDurability(increment.getDurability());
5260     boolean writeToWAL = durability != Durability.SKIP_WAL;
5261     WALEdit walEdits = null;
5262     List<Cell> allKVs = new ArrayList<Cell>(increment.size());
5263     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5264 
5265     long size = 0;
5266     long txid = 0;
5267 
5268     checkReadOnly();
5269     checkResources();
5270     // Lock row
5271     startRegionOperation(Operation.INCREMENT);
5272     this.writeRequestsCount.increment();
5273     RowLock rowLock = null;
5274     WriteEntry w = null;
5275     HLogKey walKey = null;
5276     long mvccNum = 0;
5277     List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
5278     boolean doRollBackMemstore = false;
5279     try {
5280       rowLock = getRowLock(row);
5281       try {
5282         lock(this.updatesLock.readLock());
5283         try {
5284           // wait for all prior MVCC transactions to finish - while we hold the row lock
5285           // (so that we are guaranteed to see the latest state)
5286           mvcc.waitForPreviousTransactionsComplete();
5287           if (this.coprocessorHost != null) {
5288             Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
5289             if (r != null) {
5290               return r;
5291             }
5292           }
5293           // now start my own transaction
5294           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5295           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5296           long now = EnvironmentEdgeManager.currentTimeMillis();
5297           // Process each family
5298           for (Map.Entry<byte [], List<Cell>> family:
5299               increment.getFamilyCellMap().entrySet()) {
5300 
5301             Store store = stores.get(family.getKey());
5302             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5303 
5304             // Sort the cells so that they match the order that they
5305             // appear in the Get results. Otherwise, we won't be able to
5306             // find the existing values if the cells are not specified
5307             // in order by the client since cells are in an array list.
5308             Collections.sort(family.getValue(), store.getComparator());
5309             // Get previous values for all columns in this family
5310             Get get = new Get(row);
5311             for (Cell cell: family.getValue()) {
5312               get.addColumn(family.getKey(),  CellUtil.cloneQualifier(cell));
5313             }
5314             get.setTimeRange(tr.getMin(), tr.getMax());
5315             List<Cell> results = get(get, false);
5316 
5317             // Iterate the input columns and update existing values if they were
5318             // found, otherwise add new column initialized to the increment amount
5319             int idx = 0;
5320             for (Cell kv: family.getValue()) {
5321               long amount = Bytes.toLong(CellUtil.cloneValue(kv));
5322               boolean noWriteBack = (amount == 0);
5323 
5324               Cell c = null;
5325               if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
5326                 c = results.get(idx);
5327                 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
5328                   amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
5329                 } else {
5330                   // throw DoNotRetryIOException instead of IllegalArgumentException
5331                   throw new org.apache.hadoop.hbase.DoNotRetryIOException(
5332                       "Attempted to increment field that isn't 64 bits wide");
5333                 }
5334                 idx++;
5335               }
5336 
5337               // Append new incremented KeyValue to list
5338               byte[] q = CellUtil.cloneQualifier(kv);
5339               byte[] val = Bytes.toBytes(amount);
5340               int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
5341               int incCellTagsLen = kv.getTagsLength();
5342               KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, now,
5343                   KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
5344               System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length);
5345               System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(),
5346                   family.getKey().length);
5347               System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length);
5348               // copy in the value
5349               System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length);
5350               // copy tags
5351               if (oldCellTagsLen > 0) {
5352                 System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(),
5353                     newKV.getTagsOffset(), oldCellTagsLen);
5354               }
5355               if (incCellTagsLen > 0) {
5356                 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
5357                     newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
5358               }
5359               newKV.setMvccVersion(mvccNum);
5360               // Give coprocessors a chance to update the new cell
5361               if (coprocessorHost != null) {
5362                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5363                     RegionObserver.MutationType.INCREMENT, increment, c, newKV));
5364               }
5365               allKVs.add(newKV);
5366 
5367               if (!noWriteBack) {
5368                 kvs.add(newKV);
5369 
5370                 // Prepare WAL updates
5371                 if (writeToWAL) {
5372                   if (walEdits == null) {
5373                     walEdits = new WALEdit();
5374                   }
5375                   walEdits.add(newKV);
5376                 }
5377               }
5378             }
5379 
5380             //store the kvs to the temporary memstore before writing HLog
5381             if (!kvs.isEmpty()) {
5382               tempMemstore.put(store, kvs);
5383             }
5384           }
5385 
5386           //Actually write to Memstore now
5387           if (!tempMemstore.isEmpty()) {
5388             for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5389               Store store = entry.getKey();
5390               if (store.getFamily().getMaxVersions() == 1) {
5391                 // upsert if VERSIONS for this CF == 1
5392                 size += store.upsert(entry.getValue(), getSmallestReadPoint());
5393                 memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue()));
5394               } else {
5395                 // otherwise keep older versions around
5396                 for (Cell cell : entry.getValue()) {
5397                   KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5398                   Pair<Long, Cell> ret = store.add(kv);
5399                   size += ret.getFirst();
5400                   memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
5401                   doRollBackMemstore = true;
5402                 }
5403               }
5404             }
5405             size = this.addAndGetGlobalMemstoreSize(size);
5406             flush = isFlushSize(size);
5407           }
5408           
5409           // Actually write to WAL now
5410           if (walEdits != null && !walEdits.isEmpty()) {
5411             if (writeToWAL) {
5412               // Using default cluster id, as this can only happen in the originating
5413               // cluster. A slave cluster receives the final value (not the delta)
5414               // as a Put.
5415               walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
5416                 this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
5417               txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
5418                 walKey, walEdits, getSequenceId(), true, memstoreCells);
5419             } else {
5420               recordMutationWithoutWal(increment.getFamilyCellMap());
5421             }
5422           }
5423           if(walKey == null){
5424             // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
5425             walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
5426           }
5427         } finally {
5428           this.updatesLock.readLock().unlock();
5429         }
5430       } finally {
5431         rowLock.release();
5432         rowLock = null;
5433       }
5434       // sync the transaction log outside the rowlock
5435       if(txid != 0){
5436         syncOrDefer(txid, durability);
5437       }
5438       doRollBackMemstore = false;
5439     } finally {
5440       if (rowLock != null) {
5441         rowLock.release();
5442       }
5443       // if the wal sync was unsuccessful, remove keys from memstore
5444       if (doRollBackMemstore) {
5445         rollbackMemstore(memstoreCells);
5446       }
5447       if (w != null) {
5448         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5449       }
5450       closeRegionOperation(Operation.INCREMENT);
5451       if (this.metricsRegion != null) {
5452         this.metricsRegion.updateIncrement();
5453       }
5454     }
5455 
5456     if (flush) {
5457       // Request a cache flush.  Do it outside update lock.
5458       requestFlush();
5459     }
5460 
5461     return Result.create(allKVs);
5462   }
5463 
5464   //
5465   // New HBASE-880 Helpers
5466   //
5467 
5468   private void checkFamily(final byte [] family)
5469   throws NoSuchColumnFamilyException {
5470     if (!this.htableDescriptor.hasFamily(family)) {
5471       throw new NoSuchColumnFamilyException("Column family " +
5472           Bytes.toString(family) + " does not exist in region " + this
5473           + " in table " + this.htableDescriptor);
5474     }
5475   }
5476 
5477   public static final long FIXED_OVERHEAD = ClassSize.align(
5478       ClassSize.OBJECT +
5479       ClassSize.ARRAY +
5480       40 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
5481       (12 * Bytes.SIZEOF_LONG) +
5482       4 * Bytes.SIZEOF_BOOLEAN);
5483 
5484   // woefully out of date - currently missing:
5485   // 1 x HashMap - coprocessorServiceHandlers
5486   // 6 x Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
5487   //   checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,
5488   //   writeRequestsCount
5489   // 1 x HRegion$WriteState - writestate
5490   // 1 x RegionCoprocessorHost - coprocessorHost
5491   // 1 x RegionSplitPolicy - splitPolicy
5492   // 1 x MetricsRegion - metricsRegion
5493   // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper
5494   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
5495       ClassSize.OBJECT + // closeLock
5496       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
5497       (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
5498       (2 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, scannerReadPoints
5499       WriteState.HEAP_SIZE + // writestate
5500       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
5501       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
5502       MultiVersionConsistencyControl.FIXED_SIZE // mvcc
5503       + ClassSize.TREEMAP // maxSeqIdInStores
5504       + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
5505       ;
5506 
5507   @Override
5508   public long heapSize() {
5509     long heapSize = DEEP_OVERHEAD;
5510     for (Store store : this.stores.values()) {
5511       heapSize += store.heapSize();
5512     }
5513     // this does not take into account row locks, recent flushes, mvcc entries, and more
5514     return heapSize;
5515   }
5516 
5517   /*
5518    * This method calls System.exit.
5519    * @param message Message to print out.  May be null.
5520    */
5521   private static void printUsageAndExit(final String message) {
5522     if (message != null && message.length() > 0) System.out.println(message);
5523     System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
5524     System.out.println("Options:");
5525     System.out.println(" major_compact  Pass this option to major compact " +
5526       "passed region.");
5527     System.out.println("Default outputs scan of passed region.");
5528     System.exit(1);
5529   }
5530 
5531   /**
5532    * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
5533    * be available for handling
5534    * {@link HRegion#execService(com.google.protobuf.RpcController,
5535    *    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
5536    *
5537    * <p>
5538    * Only a single instance may be registered per region for a given {@link Service} subclass (the
5539    * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
5540    * After the first registration, subsequent calls with the same service name will fail with
5541    * a return value of {@code false}.
5542    * </p>
5543    * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
5544    * @return {@code true} if the registration was successful, {@code false}
5545    * otherwise
5546    */
5547   public boolean registerService(Service instance) {
5548     /*
5549      * No stacking of instances is allowed for a single service name
5550      */
5551     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
5552     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
5553       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
5554           " already registered, rejecting request from "+instance
5555       );
5556       return false;
5557     }
5558 
5559     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
5560     if (LOG.isDebugEnabled()) {
5561       LOG.debug("Registered coprocessor service: region="+
5562           Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
5563     }
5564     return true;
5565   }
5566 
5567   /**
5568    * Executes a single protocol buffer coprocessor endpoint {@link Service} method using
5569    * the registered protocol handlers.  {@link Service} implementations must be registered via the
5570    * {@link HRegion#registerService(com.google.protobuf.Service)}
5571    * method before they are available.
5572    *
5573    * @param controller an {@code RpcController} implementation to pass to the invoked service
5574    * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
5575    *     and parameters for the method invocation
5576    * @return a protocol buffer {@code Message} instance containing the method's result
5577    * @throws IOException if no registered service handler is found or an error
5578    *     occurs during the invocation
5579    * @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)
5580    */
5581   public Message execService(RpcController controller, CoprocessorServiceCall call)
5582       throws IOException {
5583     String serviceName = call.getServiceName();
5584     String methodName = call.getMethodName();
5585     if (!coprocessorServiceHandlers.containsKey(serviceName)) {
5586       throw new UnknownProtocolException(null,
5587           "No registered coprocessor service found for name "+serviceName+
5588           " in region "+Bytes.toStringBinary(getRegionName()));
5589     }
5590 
5591     Service service = coprocessorServiceHandlers.get(serviceName);
5592     Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
5593     Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
5594     if (methodDesc == null) {
5595       throw new UnknownProtocolException(service.getClass(),
5596           "Unknown method "+methodName+" called on service "+serviceName+
5597               " in region "+Bytes.toStringBinary(getRegionName()));
5598     }
5599 
5600     Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
5601         .mergeFrom(call.getRequest()).build();
5602 
5603     if (coprocessorHost != null) {
5604       request = coprocessorHost.preEndpointInvocation(service, methodName, request);
5605     }
5606 
5607     final Message.Builder responseBuilder =
5608         service.getResponsePrototype(methodDesc).newBuilderForType();
5609     service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
5610       @Override
5611       public void run(Message message) {
5612         if (message != null) {
5613           responseBuilder.mergeFrom(message);
5614         }
5615       }
5616     });
5617 
5618     if (coprocessorHost != null) {
5619       coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
5620     }
5621 
5622     return responseBuilder.build();
5623   }
5624 
5625   /*
5626    * Process table.
5627    * Do major compaction or list content.
5628    * @throws IOException
5629    */
5630   private static void processTable(final FileSystem fs, final Path p,
5631       final HLog log, final Configuration c,
5632       final boolean majorCompact)
5633   throws IOException {
5634     HRegion region;
5635     // Currently expects tables have one region only.
5636     if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
5637       region = HRegion.newHRegion(p, log, fs, c,
5638         HRegionInfo.FIRST_META_REGIONINFO, HTableDescriptor.META_TABLEDESC, null);
5639     } else {
5640       throw new IOException("Not a known catalog table: " + p.toString());
5641     }
5642     try {
5643       region.initialize(null);
5644       if (majorCompact) {
5645         region.compactStores(true);
5646       } else {
5647         // Default behavior
5648         Scan scan = new Scan();
5649         // scan.addFamily(HConstants.CATALOG_FAMILY);
5650         RegionScanner scanner = region.getScanner(scan);
5651         try {
5652           List<Cell> kvs = new ArrayList<Cell>();
5653           boolean done;
5654           do {
5655             kvs.clear();
5656             done = scanner.next(kvs);
5657             if (kvs.size() > 0) LOG.info(kvs);
5658           } while (done);
5659         } finally {
5660           scanner.close();
5661         }
5662       }
5663     } finally {
5664       region.close();
5665     }
5666   }
5667 
5668   boolean shouldForceSplit() {
5669     return this.splitRequest;
5670   }
5671 
5672   byte[] getExplicitSplitPoint() {
5673     return this.explicitSplitPoint;
5674   }
5675 
5676   void forceSplit(byte[] sp) {
5677     // NOTE : this HRegion will go away after the forced split is successful
5678     //        therefore, no reason to clear this value
5679     this.splitRequest = true;
5680     if (sp != null) {
5681       this.explicitSplitPoint = sp;
5682     }
5683   }
5684 
5685   void clearSplit_TESTS_ONLY() {
5686     this.splitRequest = false;
5687   }
5688 
5689   /**
5690    * Give the region a chance to prepare before it is split.
5691    */
5692   protected void prepareToSplit() {
5693     // nothing
5694   }
5695 
5696   /**
5697    * Return the splitpoint. null indicates the region isn't splittable
5698    * If the splitpoint isn't explicitly specified, it will go over the stores
5699    * to find the best splitpoint. Currently the criteria of best splitpoint
5700    * is based on the size of the store.
5701    */
5702   public byte[] checkSplit() {
5703     // Can't split META
5704     if (this.getRegionInfo().isMetaTable() ||
5705         TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
5706       if (shouldForceSplit()) {
5707         LOG.warn("Cannot split meta region in HBase 0.20 and above");
5708       }
5709       return null;
5710     }
5711 
5712     // Can't split region which is in recovering state
5713     if (this.isRecovering()) {
5714       LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
5715       return null;
5716     }
5717 
5718     if (!splitPolicy.shouldSplit()) {
5719       return null;
5720     }
5721 
5722     byte[] ret = splitPolicy.getSplitPoint();
5723 
5724     if (ret != null) {
5725       try {
5726         checkRow(ret, "calculated split");
5727       } catch (IOException e) {
5728         LOG.error("Ignoring invalid split", e);
5729         return null;
5730       }
5731     }
5732     return ret;
5733   }
5734 
5735   /**
5736    * @return The priority that this region should have in the compaction queue
5737    */
5738   public int getCompactPriority() {
5739     int count = Integer.MAX_VALUE;
5740     for (Store store : stores.values()) {
5741       count = Math.min(count, store.getCompactPriority());
5742     }
5743     return count;
5744   }
5745 
5746 
5747   /** @return the coprocessor host */
5748   public RegionCoprocessorHost getCoprocessorHost() {
5749     return coprocessorHost;
5750   }
5751 
5752   /** @param coprocessorHost the new coprocessor host */
5753   public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
5754     this.coprocessorHost = coprocessorHost;
5755   }
5756 
5757   /**
5758    * This method needs to be called before any public call that reads or
5759    * modifies data. It has to be called just before a try.
5760    * #closeRegionOperation needs to be called in the try's finally block
5761    * Acquires a read lock and checks if the region is closing or closed.
5762    * @throws IOException
5763    */
5764   public void startRegionOperation() throws IOException {
5765     startRegionOperation(Operation.ANY);
5766   }
5767 
5768   /**
5769    * @param op The operation is about to be taken on the region
5770    * @throws IOException
5771    */
5772   protected void startRegionOperation(Operation op) throws IOException {
5773     switch (op) {
5774     case INCREMENT:
5775     case APPEND:
5776     case GET:
5777     case SCAN:
5778     case SPLIT_REGION:
5779     case MERGE_REGION:
5780     case PUT:
5781     case DELETE:
5782     case BATCH_MUTATE:
5783     case COMPACT_REGION:
5784       // when a region is in recovering state, no read, split or merge is allowed
5785       if (this.isRecovering() && (this.disallowWritesInRecovering ||
5786               (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
5787         throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering");
5788       }
5789       break;
5790     default:
5791       break;
5792     }
5793     if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
5794         || op == Operation.COMPACT_REGION) {
5795       // split, merge or compact region doesn't need to check the closing/closed state or lock the
5796       // region
5797       return;
5798     }
5799     if (this.closing.get()) {
5800       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5801     }
5802     lock(lock.readLock());
5803     if (this.closed.get()) {
5804       lock.readLock().unlock();
5805       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
5806     }
5807     try {
5808       if (coprocessorHost != null) {
5809         coprocessorHost.postStartRegionOperation(op);
5810       }
5811     } catch (Exception e) {
5812       lock.readLock().unlock();
5813       throw new IOException(e);
5814     }
5815   }
5816 
5817   /**
5818    * Closes the lock. This needs to be called in the finally block corresponding
5819    * to the try block of #startRegionOperation
5820    * @throws IOException
5821    */
5822   public void closeRegionOperation() throws IOException {
5823     closeRegionOperation(Operation.ANY);
5824   }
5825 
5826   /**
5827    * Closes the lock. This needs to be called in the finally block corresponding
5828    * to the try block of {@link #startRegionOperation(Operation)}
5829    * @throws IOException
5830    */
5831   public void closeRegionOperation(Operation operation) throws IOException {
5832     lock.readLock().unlock();
5833     if (coprocessorHost != null) {
5834       coprocessorHost.postCloseRegionOperation(operation);
5835     }
5836   }
5837 
5838   /**
5839    * This method needs to be called before any public call that reads or
5840    * modifies stores in bulk. It has to be called just before a try.
5841    * #closeBulkRegionOperation needs to be called in the try's finally block
5842    * Acquires a writelock and checks if the region is closing or closed.
5843    * @throws NotServingRegionException when the region is closing or closed
5844    * @throws RegionTooBusyException if failed to get the lock in time
5845    * @throws InterruptedIOException if interrupted while waiting for a lock
5846    */
5847   private void startBulkRegionOperation(boolean writeLockNeeded)
5848       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
5849     if (this.closing.get()) {
5850       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5851     }
5852     if (writeLockNeeded) lock(lock.writeLock());
5853     else lock(lock.readLock());
5854     if (this.closed.get()) {
5855       if (writeLockNeeded) lock.writeLock().unlock();
5856       else lock.readLock().unlock();
5857       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
5858     }
5859   }
5860 
5861   /**
5862    * Closes the lock. This needs to be called in the finally block corresponding
5863    * to the try block of #startRegionOperation
5864    */
5865   private void closeBulkRegionOperation(){
5866     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
5867     else lock.readLock().unlock();
5868   }
5869 
5870   /**
5871    * Update counters for numer of puts without wal and the size of possible data loss.
5872    * These information are exposed by the region server metrics.
5873    */
5874   private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
5875     numMutationsWithoutWAL.increment();
5876     if (numMutationsWithoutWAL.get() <= 1) {
5877       LOG.info("writing data to region " + this +
5878                " with WAL disabled. Data may be lost in the event of a crash.");
5879     }
5880 
5881     long mutationSize = 0;
5882     for (List<Cell> cells: familyMap.values()) {
5883       for (Cell cell : cells) {
5884         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5885         mutationSize += kv.getKeyLength() + kv.getValueLength();
5886       }
5887     }
5888 
5889     dataInMemoryWithoutWAL.add(mutationSize);
5890   }
5891 
5892   private void lock(final Lock lock)
5893       throws RegionTooBusyException, InterruptedIOException {
5894     lock(lock, 1);
5895   }
5896 
5897   /**
5898    * Try to acquire a lock.  Throw RegionTooBusyException
5899    * if failed to get the lock in time. Throw InterruptedIOException
5900    * if interrupted while waiting for the lock.
5901    */
5902   private void lock(final Lock lock, final int multiplier)
5903       throws RegionTooBusyException, InterruptedIOException {
5904     try {
5905       final long waitTime = Math.min(maxBusyWaitDuration,
5906           busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
5907       if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
5908         throw new RegionTooBusyException(
5909             "failed to get a lock in " + waitTime + " ms. " +
5910                 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
5911                 this.getRegionInfo().getRegionNameAsString()) +
5912                 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
5913                 this.getRegionServerServices().getServerName()));
5914       }
5915     } catch (InterruptedException ie) {
5916       LOG.info("Interrupted while waiting for a lock");
5917       InterruptedIOException iie = new InterruptedIOException();
5918       iie.initCause(ie);
5919       throw iie;
5920     }
5921   }
5922 
5923   /**
5924    * Calls sync with the given transaction ID if the region's table is not
5925    * deferring it.
5926    * @param txid should sync up to which transaction
5927    * @throws IOException If anything goes wrong with DFS
5928    */
5929   private void syncOrDefer(long txid, Durability durability) throws IOException {
5930     if (this.getRegionInfo().isMetaRegion()) {
5931       this.log.sync(txid);
5932     } else {
5933       switch(durability) {
5934       case USE_DEFAULT:
5935         // do what table defaults to
5936         if (shouldSyncLog()) {
5937           this.log.sync(txid);
5938         }
5939         break;
5940       case SKIP_WAL:
5941         // nothing do to
5942         break;
5943       case ASYNC_WAL:
5944         // nothing do to
5945         break;
5946       case SYNC_WAL:
5947       case FSYNC_WAL:
5948         // sync the WAL edit (SYNC and FSYNC treated the same for now)
5949         this.log.sync(txid);
5950         break;
5951       }
5952     }
5953   }
5954 
5955   /**
5956    * Check whether we should sync the log from the table's durability settings
5957    */
5958   private boolean shouldSyncLog() {
5959     return durability.ordinal() >  Durability.ASYNC_WAL.ordinal();
5960   }
5961 
5962   /**
5963    * A mocked list implementaion - discards all updates.
5964    */
5965   private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
5966 
5967     @Override
5968     public void add(int index, Cell element) {
5969       // do nothing
5970     }
5971 
5972     @Override
5973     public boolean addAll(int index, Collection<? extends Cell> c) {
5974       return false; // this list is never changed as a result of an update
5975     }
5976 
5977     @Override
5978     public KeyValue get(int index) {
5979       throw new UnsupportedOperationException();
5980     }
5981 
5982     @Override
5983     public int size() {
5984       return 0;
5985     }
5986   };
5987 
5988   /**
5989    * Facility for dumping and compacting catalog tables.
5990    * Only does catalog tables since these are only tables we for sure know
5991    * schema on.  For usage run:
5992    * <pre>
5993    *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
5994    * </pre>
5995    * @throws IOException
5996    */
5997   public static void main(String[] args) throws IOException {
5998     if (args.length < 1) {
5999       printUsageAndExit(null);
6000     }
6001     boolean majorCompact = false;
6002     if (args.length > 1) {
6003       if (!args[1].toLowerCase().startsWith("major")) {
6004         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
6005       }
6006       majorCompact = true;
6007     }
6008     final Path tableDir = new Path(args[0]);
6009     final Configuration c = HBaseConfiguration.create();
6010     final FileSystem fs = FileSystem.get(c);
6011     final Path logdir = new Path(c.get("hbase.tmp.dir"));
6012     final String logname = "hlog" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
6013 
6014     final HLog log = HLogFactory.createHLog(fs, logdir, logname, c);
6015     try {
6016       processTable(fs, tableDir, log, c, majorCompact);
6017     } finally {
6018        log.close();
6019        // TODO: is this still right?
6020        BlockCache bc = new CacheConfig(c).getBlockCache();
6021        if (bc != null) bc.shutdown();
6022     }
6023   }
6024 
6025   /**
6026    * Gets the latest sequence number that was read from storage when this region was opened.
6027    */
6028   public long getOpenSeqNum() {
6029     return this.openSeqNum;
6030   }
6031 
6032   /**
6033    * Gets max sequence ids of stores that was read from storage when this region was opened. WAL
6034    * Edits with smaller or equal sequence number will be skipped from replay.
6035    */
6036   public Map<byte[], Long> getMaxStoreSeqIdForLogReplay() {
6037     return this.maxSeqIdInStores;
6038   }
6039 
6040   /**
6041    * @return if a given region is in compaction now.
6042    */
6043   public CompactionState getCompactionState() {
6044     boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
6045     return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
6046         : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
6047   }
6048 
6049   public void reportCompactionRequestStart(boolean isMajor){
6050     (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
6051   }
6052 
6053   public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
6054     int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
6055 
6056     // metrics
6057     compactionsFinished.incrementAndGet();
6058     compactionNumFilesCompacted.addAndGet(numFiles);
6059     compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
6060 
6061     assert newValue >= 0;
6062   }
6063 
6064   /**
6065    * Do not change this sequence id. See {@link #sequenceId} comment.
6066    * @return sequenceId 
6067    */
6068   @VisibleForTesting
6069   public AtomicLong getSequenceId() {
6070     return this.sequenceId;
6071   }
6072 
6073   /**
6074    * sets this region's sequenceId.
6075    * @param value new value
6076    */
6077   private void setSequenceId(long value) {
6078     this.sequenceId.set(value);
6079   }
6080 
6081   /**
6082    * Listener class to enable callers of
6083    * bulkLoadHFile() to perform any necessary
6084    * pre/post processing of a given bulkload call
6085    */
6086   public interface BulkLoadListener {
6087 
6088     /**
6089      * Called before an HFile is actually loaded
6090      * @param family family being loaded to
6091      * @param srcPath path of HFile
6092      * @return final path to be used for actual loading
6093      * @throws IOException
6094      */
6095     String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
6096 
6097     /**
6098      * Called after a successful HFile load
6099      * @param family family being loaded to
6100      * @param srcPath path of HFile
6101      * @throws IOException
6102      */
6103     void doneBulkLoad(byte[] family, String srcPath) throws IOException;
6104 
6105     /**
6106      * Called after a failed HFile load
6107      * @param family family being loaded to
6108      * @param srcPath path of HFile
6109      * @throws IOException
6110      */
6111     void failedBulkLoad(byte[] family, String srcPath) throws IOException;
6112   }
6113 
6114   @VisibleForTesting class RowLockContext {
6115     private final HashedBytes row;
6116     private final CountDownLatch latch = new CountDownLatch(1);
6117     private final Thread thread;
6118     private int lockCount = 0;
6119 
6120     RowLockContext(HashedBytes row) {
6121       this.row = row;
6122       this.thread = Thread.currentThread();
6123     }
6124 
6125     boolean ownedByCurrentThread() {
6126       return thread == Thread.currentThread();
6127     }
6128 
6129     RowLock newLock() {
6130       lockCount++;
6131       return new RowLock(this);
6132     }
6133 
6134     void releaseLock() {
6135       if (!ownedByCurrentThread()) {
6136         throw new IllegalArgumentException("Lock held by thread: " + thread
6137           + " cannot be released by different thread: " + Thread.currentThread());
6138       }
6139       lockCount--;
6140       if (lockCount == 0) {
6141         // no remaining locks by the thread, unlock and allow other threads to access
6142         RowLockContext existingContext = lockedRows.remove(row);
6143         if (existingContext != this) {
6144           throw new RuntimeException(
6145               "Internal row lock state inconsistent, should not happen, row: " + row);
6146         }
6147         latch.countDown();
6148       }
6149     }
6150   }
6151 
6152   /**
6153    * Row lock held by a given thread.
6154    * One thread may acquire multiple locks on the same row simultaneously.
6155    * The locks must be released by calling release() from the same thread.
6156    */
6157   public static class RowLock {
6158     @VisibleForTesting final RowLockContext context;
6159     private boolean released = false;
6160 
6161     @VisibleForTesting RowLock(RowLockContext context) {
6162       this.context = context;
6163     }
6164 
6165     /**
6166      * Release the given lock.  If there are no remaining locks held by the current thread
6167      * then unlock the row and allow other threads to acquire the lock.
6168      * @throws IllegalArgumentException if called by a different thread than the lock owning thread
6169      */
6170     public void release() {
6171       if (!released) {
6172         context.releaseLock();
6173         released = true;
6174       }
6175     }
6176   }
6177   
6178   /**
6179    * Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore
6180    * the WALEdit append later.
6181    * @param wal
6182    * @param cells list of KeyValues inserted into memstore. Those KeyValues are passed in order to
6183    *        be updated with right mvcc values(their log sequence nu
6184    * @return Return the key used appending with no sync and no append.
6185    * @throws IOException
6186    */
6187   private HLogKey appendNoSyncNoAppend(final HLog wal, List<KeyValue> cells) throws IOException {
6188     HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
6189       HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
6190     // Call append but with an empty WALEdit.  The returned seqeunce id will not be associated
6191     // with any edit and we can be sure it went in after all outstanding appends.
6192     wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
6193       WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
6194     return key;
6195   }
6196 }