View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.io.UnsupportedEncodingException;
25  import java.lang.reflect.Constructor;
26  import java.text.ParseException;
27  import java.util.AbstractList;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.HashSet;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.NavigableMap;
38  import java.util.NavigableSet;
39  import java.util.RandomAccess;
40  import java.util.Set;
41  import java.util.TreeMap;
42  import java.util.concurrent.Callable;
43  import java.util.concurrent.CompletionService;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.ConcurrentMap;
46  import java.util.concurrent.ConcurrentSkipListMap;
47  import java.util.concurrent.CountDownLatch;
48  import java.util.concurrent.ExecutionException;
49  import java.util.concurrent.ExecutorCompletionService;
50  import java.util.concurrent.ExecutorService;
51  import java.util.concurrent.Executors;
52  import java.util.concurrent.Future;
53  import java.util.concurrent.FutureTask;
54  import java.util.concurrent.ThreadFactory;
55  import java.util.concurrent.ThreadPoolExecutor;
56  import java.util.concurrent.TimeUnit;
57  import java.util.concurrent.TimeoutException;
58  import java.util.concurrent.atomic.AtomicBoolean;
59  import java.util.concurrent.atomic.AtomicInteger;
60  import java.util.concurrent.atomic.AtomicLong;
61  import java.util.concurrent.locks.Lock;
62  import java.util.concurrent.locks.ReentrantReadWriteLock;
63  
64  import org.apache.commons.lang.RandomStringUtils;
65  import org.apache.commons.logging.Log;
66  import org.apache.commons.logging.LogFactory;
67  import org.apache.hadoop.conf.Configuration;
68  import org.apache.hadoop.fs.FileStatus;
69  import org.apache.hadoop.fs.FileSystem;
70  import org.apache.hadoop.fs.Path;
71  import org.apache.hadoop.hbase.Cell;
72  import org.apache.hadoop.hbase.CellScanner;
73  import org.apache.hadoop.hbase.CellUtil;
74  import org.apache.hadoop.hbase.CompoundConfiguration;
75  import org.apache.hadoop.hbase.DoNotRetryIOException;
76  import org.apache.hadoop.hbase.DroppedSnapshotException;
77  import org.apache.hadoop.hbase.HBaseConfiguration;
78  import org.apache.hadoop.hbase.HColumnDescriptor;
79  import org.apache.hadoop.hbase.HConstants;
80  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
81  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
82  import org.apache.hadoop.hbase.HRegionInfo;
83  import org.apache.hadoop.hbase.HTableDescriptor;
84  import org.apache.hadoop.hbase.KeyValue;
85  import org.apache.hadoop.hbase.KeyValueUtil;
86  import org.apache.hadoop.hbase.NamespaceDescriptor;
87  import org.apache.hadoop.hbase.NotServingRegionException;
88  import org.apache.hadoop.hbase.RegionTooBusyException;
89  import org.apache.hadoop.hbase.TableName;
90  import org.apache.hadoop.hbase.Tag;
91  import org.apache.hadoop.hbase.TagType;
92  import org.apache.hadoop.hbase.UnknownScannerException;
93  import org.apache.hadoop.hbase.backup.HFileArchiver;
94  import org.apache.hadoop.hbase.classification.InterfaceAudience;
95  import org.apache.hadoop.hbase.client.Append;
96  import org.apache.hadoop.hbase.client.Delete;
97  import org.apache.hadoop.hbase.client.Durability;
98  import org.apache.hadoop.hbase.client.Get;
99  import org.apache.hadoop.hbase.client.Increment;
100 import org.apache.hadoop.hbase.client.IsolationLevel;
101 import org.apache.hadoop.hbase.client.Mutation;
102 import org.apache.hadoop.hbase.client.Put;
103 import org.apache.hadoop.hbase.client.Result;
104 import org.apache.hadoop.hbase.client.RowMutations;
105 import org.apache.hadoop.hbase.client.Scan;
106 import org.apache.hadoop.hbase.conf.ConfigurationManager;
107 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
108 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
109 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
110 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
111 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
112 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
113 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
114 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
115 import org.apache.hadoop.hbase.filter.FilterWrapper;
116 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
117 import org.apache.hadoop.hbase.io.HeapSize;
118 import org.apache.hadoop.hbase.io.TimeRange;
119 import org.apache.hadoop.hbase.io.hfile.BlockCache;
120 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
121 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
122 import org.apache.hadoop.hbase.ipc.RpcCallContext;
123 import org.apache.hadoop.hbase.ipc.RpcServer;
124 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
125 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
126 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
128 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
129 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
130 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
131 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
132 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
133 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
134 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
135 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
136 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
137 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
138 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
139 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
140 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
141 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
142 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
143 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
144 import org.apache.hadoop.hbase.util.Bytes;
145 import org.apache.hadoop.hbase.util.CancelableProgressable;
146 import org.apache.hadoop.hbase.util.ClassSize;
147 import org.apache.hadoop.hbase.util.CompressionTest;
148 import org.apache.hadoop.hbase.util.Counter;
149 import org.apache.hadoop.hbase.util.EncryptionTest;
150 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
151 import org.apache.hadoop.hbase.util.FSTableDescriptors;
152 import org.apache.hadoop.hbase.util.FSUtils;
153 import org.apache.hadoop.hbase.util.HashedBytes;
154 import org.apache.hadoop.hbase.util.Pair;
155 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
156 import org.apache.hadoop.hbase.util.Threads;
157 import org.apache.hadoop.hbase.wal.WAL;
158 import org.apache.hadoop.hbase.wal.WALFactory;
159 import org.apache.hadoop.hbase.wal.WALKey;
160 import org.apache.hadoop.hbase.wal.WALSplitter;
161 import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
162 import org.apache.hadoop.io.MultipleIOException;
163 import org.apache.hadoop.util.StringUtils;
164 
165 import com.google.common.annotations.VisibleForTesting;
166 import com.google.common.base.Optional;
167 import com.google.common.base.Preconditions;
168 import com.google.common.collect.Lists;
169 import com.google.common.collect.Maps;
170 import com.google.common.io.Closeables;
171 import com.google.protobuf.Descriptors;
172 import com.google.protobuf.Message;
173 import com.google.protobuf.RpcCallback;
174 import com.google.protobuf.RpcController;
175 import com.google.protobuf.Service;
176 
177 /**
178  * HRegion stores data for a certain region of a table.  It stores all columns
179  * for each row. A given table consists of one or more HRegions.
180  *
181  * <p>We maintain multiple HStores for a single HRegion.
182  *
183  * <p>An Store is a set of rows with some column data; together,
184  * they make up all the data for the rows.
185  *
186  * <p>Each HRegion has a 'startKey' and 'endKey'.
187  * <p>The first is inclusive, the second is exclusive (except for
188  * the final region)  The endKey of region 0 is the same as
189  * startKey for region 1 (if it exists).  The startKey for the
190  * first region is null. The endKey for the final region is null.
191  *
192  * <p>Locking at the HRegion level serves only one purpose: preventing the
193  * region from being closed (and consequently split) while other operations
194  * are ongoing. Each row level operation obtains both a row lock and a region
195  * read lock for the duration of the operation. While a scanner is being
196  * constructed, getScanner holds a read lock. If the scanner is successfully
197  * constructed, it holds a read lock until it is closed. A close takes out a
198  * write lock and consequently will block for ongoing operations and will block
199  * new operations from starting while the close is in progress.
200  *
201  * <p>An HRegion is defined by its table and its key extent.
202  *
203  * <p>It consists of at least one Store.  The number of Stores should be
204  * configurable, so that data which is accessed together is stored in the same
205  * Store.  Right now, we approximate that by building a single Store for
206  * each column family.  (This config info will be communicated via the
207  * tabledesc.)
208  *
209  * <p>The HTableDescriptor contains metainfo about the HRegion's table.
210  * regionName is a unique identifier for this HRegion. (startKey, endKey]
211  * defines the keyspace for this HRegion.
212  */
213 @InterfaceAudience.Private
214 public class HRegion implements HeapSize, PropagatingConfigurationObserver { // , Writable{
215   public static final Log LOG = LogFactory.getLog(HRegion.class);
216 
217   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
218       "hbase.hregion.scan.loadColumnFamiliesOnDemand";
219 
220   /**
221    * This is the global default value for durability. All tables/mutations not
222    * defining a durability or using USE_DEFAULT will default to this value.
223    */
224   private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL;
225 
226   final AtomicBoolean closed = new AtomicBoolean(false);
227   /* Closing can take some time; use the closing flag if there is stuff we don't
228    * want to do while in closing state; e.g. like offer this region up to the
229    * master as a region to close if the carrying regionserver is overloaded.
230    * Once set, it is never cleared.
231    */
232   final AtomicBoolean closing = new AtomicBoolean(false);
233 
234   /**
235    * The max sequence id of flushed data on this region.  Used doing some rough calculations on
236    * whether time to flush or not.
237    */
238   protected volatile long maxFlushedSeqId = -1L;
239 
240   /**
241    * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL
242    * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
243    * Its default value is -1L. This default is used as a marker to indicate
244    * that the region hasn't opened yet. Once it is opened, it is set to the derived
245    * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
246    *
247    * <p>Control of this sequence is handed off to the WAL implementation.  It is responsible
248    * for tagging edits with the correct sequence id since it is responsible for getting the
249    * edits into the WAL files. It controls updating the sequence id value.  DO NOT UPDATE IT
250    * OUTSIDE OF THE WAL.  The value you get will not be what you think it is.
251    */
252   private final AtomicLong sequenceId = new AtomicLong(-1L);
253 
254   /**
255    * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
256    * startRegionOperation to possibly invoke different checks before any region operations. Not all
257    * operations have to be defined here. It's only needed when a special check is need in
258    * startRegionOperation
259    */
260   public enum Operation {
261     ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
262     REPLAY_BATCH_MUTATE, COMPACT_REGION
263   }
264 
265   //////////////////////////////////////////////////////////////////////////////
266   // Members
267   //////////////////////////////////////////////////////////////////////////////
268 
269   // map from a locked row to the context for that lock including:
270   // - CountDownLatch for threads waiting on that row
271   // - the thread that owns the lock (allow reentrancy)
272   // - reference count of (reentrant) locks held by the thread
273   // - the row itself
274   private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
275       new ConcurrentHashMap<HashedBytes, RowLockContext>();
276 
277   protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
278       Bytes.BYTES_RAWCOMPARATOR);
279 
280   // TODO: account for each registered handler in HeapSize computation
281   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
282 
283   public final AtomicLong memstoreSize = new AtomicLong(0);
284 
285   // Debug possible data loss due to WAL off
286   final Counter numMutationsWithoutWAL = new Counter();
287   final Counter dataInMemoryWithoutWAL = new Counter();
288 
289   // Debug why CAS operations are taking a while.
290   final Counter checkAndMutateChecksPassed = new Counter();
291   final Counter checkAndMutateChecksFailed = new Counter();
292 
293   //Number of requests
294   final Counter readRequestsCount = new Counter();
295   final Counter writeRequestsCount = new Counter();
296 
297   // Number of requests blocked by memstore size.
298   private final Counter blockedRequestsCount = new Counter();
299 
300   /**
301    * @return the number of blocked requests count.
302    */
303   public long getBlockedRequestsCount() {
304     return this.blockedRequestsCount.get();
305   }
306 
307   // Compaction counters
308   final AtomicLong compactionsFinished = new AtomicLong(0L);
309   final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
310   final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
311 
312 
313   private final WAL wal;
314   private final HRegionFileSystem fs;
315   protected final Configuration conf;
316   private final Configuration baseConf;
317   private final KeyValue.KVComparator comparator;
318   private final int rowLockWaitDuration;
319   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
320 
321   // The internal wait duration to acquire a lock before read/update
322   // from the region. It is not per row. The purpose of this wait time
323   // is to avoid waiting a long time while the region is busy, so that
324   // we can release the IPC handler soon enough to improve the
325   // availability of the region server. It can be adjusted by
326   // tuning configuration "hbase.busy.wait.duration".
327   final long busyWaitDuration;
328   static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
329 
330   // If updating multiple rows in one call, wait longer,
331   // i.e. waiting for busyWaitDuration * # of rows. However,
332   // we can limit the max multiplier.
333   final int maxBusyWaitMultiplier;
334 
335   // Max busy wait duration. There is no point to wait longer than the RPC
336   // purge timeout, when a RPC call will be terminated by the RPC engine.
337   final long maxBusyWaitDuration;
338 
339   // negative number indicates infinite timeout
340   static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
341   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
342 
343   private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
344 
345   /**
346    * The sequence ID that was encountered when this region was opened.
347    */
348   private long openSeqNum = HConstants.NO_SEQNUM;
349 
350   /**
351    * The default setting for whether to enable on-demand CF loading for
352    * scan requests to this region. Requests can override it.
353    */
354   private boolean isLoadingCfsOnDemandDefault = false;
355 
356   private final AtomicInteger majorInProgress = new AtomicInteger(0);
357   private final AtomicInteger minorInProgress = new AtomicInteger(0);
358 
359   //
360   // Context: During replay we want to ensure that we do not lose any data. So, we
361   // have to be conservative in how we replay wals. For each store, we calculate
362   // the maxSeqId up to which the store was flushed. And, skip the edits which
363   // are equal to or lower than maxSeqId for each store.
364   // The following map is populated when opening the region
365   Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
366 
367   /**
368    * Config setting for whether to allow writes when a region is in recovering or not.
369    */
370   private boolean disallowWritesInRecovering = false;
371 
372   // when a region is in recovering state, it can only accept writes not reads
373   private volatile boolean isRecovering = false;
374 
375   private volatile Optional<ConfigurationManager> configurationManager;
376 
377   /**
378    * @return The smallest mvcc readPoint across all the scanners in this
379    * region. Writes older than this readPoint, are included  in every
380    * read operation.
381    */
382   public long getSmallestReadPoint() {
383     long minimumReadPoint;
384     // We need to ensure that while we are calculating the smallestReadPoint
385     // no new RegionScanners can grab a readPoint that we are unaware of.
386     // We achieve this by synchronizing on the scannerReadPoints object.
387     synchronized(scannerReadPoints) {
388       minimumReadPoint = mvcc.memstoreReadPoint();
389 
390       for (Long readPoint: this.scannerReadPoints.values()) {
391         if (readPoint < minimumReadPoint) {
392           minimumReadPoint = readPoint;
393         }
394       }
395     }
396     return minimumReadPoint;
397   }
398   /*
399    * Data structure of write state flags used coordinating flushes,
400    * compactions and closes.
401    */
402   static class WriteState {
403     // Set while a memstore flush is happening.
404     volatile boolean flushing = false;
405     // Set when a flush has been requested.
406     volatile boolean flushRequested = false;
407     // Number of compactions running.
408     volatile int compacting = 0;
409     // Gets set in close. If set, cannot compact or flush again.
410     volatile boolean writesEnabled = true;
411     // Set if region is read-only
412     volatile boolean readOnly = false;
413     // whether the reads are enabled. This is different than readOnly, because readOnly is
414     // static in the lifetime of the region, while readsEnabled is dynamic
415     volatile boolean readsEnabled = true;
416 
417     /**
418      * Set flags that make this region read-only.
419      *
420      * @param onOff flip value for region r/o setting
421      */
422     synchronized void setReadOnly(final boolean onOff) {
423       this.writesEnabled = !onOff;
424       this.readOnly = onOff;
425     }
426 
427     boolean isReadOnly() {
428       return this.readOnly;
429     }
430 
431     boolean isFlushRequested() {
432       return this.flushRequested;
433     }
434 
435     void setReadsEnabled(boolean readsEnabled) {
436       this.readsEnabled = readsEnabled;
437     }
438 
439     static final long HEAP_SIZE = ClassSize.align(
440         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
441   }
442 
443   /**
444    * Objects from this class are created when flushing to describe all the different states that
445    * that method ends up in. The Result enum describes those states. The sequence id should only
446    * be specified if the flush was successful, and the failure message should only be specified
447    * if it didn't flush.
448    */
449   public static class FlushResult {
450     enum Result {
451       FLUSHED_NO_COMPACTION_NEEDED,
452       FLUSHED_COMPACTION_NEEDED,
453       // Special case where a flush didn't run because there's nothing in the memstores. Used when
454       // bulk loading to know when we can still load even if a flush didn't happen.
455       CANNOT_FLUSH_MEMSTORE_EMPTY,
456       CANNOT_FLUSH
457       // Be careful adding more to this enum, look at the below methods to make sure
458     }
459 
460     final Result result;
461     final String failureReason;
462     final long flushSequenceId;
463 
464     /**
465      * Convenience constructor to use when the flush is successful, the failure message is set to
466      * null.
467      * @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED.
468      * @param flushSequenceId Generated sequence id that comes right after the edits in the
469      *                        memstores.
470      */
471     FlushResult(Result result, long flushSequenceId) {
472       this(result, flushSequenceId, null);
473       assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
474           .FLUSHED_COMPACTION_NEEDED;
475     }
476 
477     /**
478      * Convenience constructor to use when we cannot flush.
479      * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
480      * @param failureReason Reason why we couldn't flush.
481      */
482     FlushResult(Result result, String failureReason) {
483       this(result, -1, failureReason);
484       assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
485     }
486 
487     /**
488      * Constructor with all the parameters.
489      * @param result Any of the Result.
490      * @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
491      * @param failureReason Reason why we couldn't flush, or null.
492      */
493     FlushResult(Result result, long flushSequenceId, String failureReason) {
494       this.result = result;
495       this.flushSequenceId = flushSequenceId;
496       this.failureReason = failureReason;
497     }
498 
499     /**
500      * Convenience method, the equivalent of checking if result is
501      * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
502      * @return true if the memstores were flushed, else false.
503      */
504     public boolean isFlushSucceeded() {
505       return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
506           .FLUSHED_COMPACTION_NEEDED;
507     }
508 
509     /**
510      * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
511      * @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
512      */
513     public boolean isCompactionNeeded() {
514       return result == Result.FLUSHED_COMPACTION_NEEDED;
515     }
516   }
517 
518   final WriteState writestate = new WriteState();
519 
520   long memstoreFlushSize;
521   final long timestampSlop;
522   final long rowProcessorTimeout;
523 
524   // Last flush time for each Store. Useful when we are flushing for each column
525   private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap =
526       new ConcurrentHashMap<Store, Long>();
527 
528   final RegionServerServices rsServices;
529   private RegionServerAccounting rsAccounting;
530   private long flushCheckInterval;
531   // flushPerChanges is to prevent too many changes in memstore
532   private long flushPerChanges;
533   private long blockingMemStoreSize;
534   final long threadWakeFrequency;
535   // Used to guard closes
536   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
537 
538   // Stop updates lock
539   private final ReentrantReadWriteLock updatesLock = new ReentrantReadWriteLock();
540   private boolean splitRequest;
541   private byte[] explicitSplitPoint = null;
542 
543   private final MultiVersionConsistencyControl mvcc =
544       new MultiVersionConsistencyControl();
545 
546   // Coprocessor host
547   private RegionCoprocessorHost coprocessorHost;
548 
549   private HTableDescriptor htableDescriptor = null;
550   private RegionSplitPolicy splitPolicy;
551   private FlushPolicy flushPolicy;
552 
553   private final MetricsRegion metricsRegion;
554   private final MetricsRegionWrapperImpl metricsRegionWrapper;
555   private final Durability durability;
556   private final boolean regionStatsEnabled;
557 
558   /**
559    * HRegion constructor. This constructor should only be used for testing and
560    * extensions.  Instances of HRegion should be instantiated with the
561    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
562    *
563    * @param tableDir qualified path of directory where region should be located,
564    * usually the table directory.
565    * @param wal The WAL is the outbound log for any updates to the HRegion
566    * The wal file is a logfile from the previous execution that's
567    * custom-computed for this HRegion. The HRegionServer computes and sorts the
568    * appropriate wal info for this HRegion. If there is a previous wal file
569    * (implying that the HRegion has been written-to before), then read it from
570    * the supplied path.
571    * @param fs is the filesystem.
572    * @param confParam is global configuration settings.
573    * @param regionInfo - HRegionInfo that describes the region
574    * is new), then read them from the supplied path.
575    * @param htd the table descriptor
576    * @param rsServices reference to {@link RegionServerServices} or null
577    */
578   @Deprecated
579   public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
580       final Configuration confParam, final HRegionInfo regionInfo,
581       final HTableDescriptor htd, final RegionServerServices rsServices) {
582     this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
583       wal, confParam, htd, rsServices);
584   }
585 
586   /**
587    * HRegion constructor. This constructor should only be used for testing and
588    * extensions.  Instances of HRegion should be instantiated with the
589    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
590    *
591    * @param fs is the filesystem.
592    * @param wal The WAL is the outbound log for any updates to the HRegion
593    * The wal file is a logfile from the previous execution that's
594    * custom-computed for this HRegion. The HRegionServer computes and sorts the
595    * appropriate wal info for this HRegion. If there is a previous wal file
596    * (implying that the HRegion has been written-to before), then read it from
597    * the supplied path.
598    * @param confParam is global configuration settings.
599    * @param htd the table descriptor
600    * @param rsServices reference to {@link RegionServerServices} or null
601    */
602   public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration confParam,
603       final HTableDescriptor htd, final RegionServerServices rsServices) {
604     if (htd == null) {
605       throw new IllegalArgumentException("Need table descriptor");
606     }
607 
608     if (confParam instanceof CompoundConfiguration) {
609       throw new IllegalArgumentException("Need original base configuration");
610     }
611 
612     this.comparator = fs.getRegionInfo().getComparator();
613     this.wal = wal;
614     this.fs = fs;
615 
616     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
617     this.baseConf = confParam;
618     this.conf = new CompoundConfiguration()
619       .add(confParam)
620       .addStringMap(htd.getConfiguration())
621       .addBytesMap(htd.getValues());
622     this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
623         DEFAULT_CACHE_FLUSH_INTERVAL);
624     this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
625     if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
626       throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
627           + MAX_FLUSH_PER_CHANGES);
628     }
629     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
630                     DEFAULT_ROWLOCK_WAIT_DURATION);
631 
632     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
633     this.htableDescriptor = htd;
634     this.rsServices = rsServices;
635     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
636     setHTableSpecificConf();
637     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
638 
639     this.busyWaitDuration = conf.getLong(
640       "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
641     this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
642     if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
643       throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
644         + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
645         + maxBusyWaitMultiplier + "). Their product should be positive");
646     }
647     this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
648       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
649 
650     /*
651      * timestamp.slop provides a server-side constraint on the timestamp. This
652      * assumes that you base your TS around currentTimeMillis(). In this case,
653      * throw an error to the user if the user-specified TS is newer than now +
654      * slop. LATEST_TIMESTAMP == don't use this functionality
655      */
656     this.timestampSlop = conf.getLong(
657         "hbase.hregion.keyvalue.timestamp.slop.millisecs",
658         HConstants.LATEST_TIMESTAMP);
659 
660     /**
661      * Timeout for the process time in processRowsWithLocks().
662      * Use -1 to switch off time bound.
663      */
664     this.rowProcessorTimeout = conf.getLong(
665         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
666     this.durability = htd.getDurability() == Durability.USE_DEFAULT
667         ? DEFAULT_DURABLITY
668         : htd.getDurability();
669     if (rsServices != null) {
670       this.rsAccounting = this.rsServices.getRegionServerAccounting();
671       // don't initialize coprocessors if not running within a regionserver
672       // TODO: revisit if coprocessors should load in other cases
673       this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
674       this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
675       this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
676 
677       Map<String, HRegion> recoveringRegions = rsServices.getRecoveringRegions();
678       String encodedName = getRegionInfo().getEncodedName();
679       if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
680         this.isRecovering = true;
681         recoveringRegions.put(encodedName, this);
682       }
683     } else {
684       this.metricsRegionWrapper = null;
685       this.metricsRegion = null;
686     }
687     if (LOG.isDebugEnabled()) {
688       // Write out region name as string and its encoded name.
689       LOG.debug("Instantiated " + this);
690     }
691 
692     // by default, we allow writes against a region when it's in recovering
693     this.disallowWritesInRecovering =
694         conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
695           HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
696     configurationManager = Optional.absent();
697 
698     // disable stats tracking system tables, but check the config for everything else
699     this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals(
700         NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ?
701           false :
702           conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
703               HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
704   }
705 
706   void setHTableSpecificConf() {
707     if (this.htableDescriptor == null) return;
708     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
709 
710     if (flushSize <= 0) {
711       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
712         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
713     }
714     this.memstoreFlushSize = flushSize;
715     this.blockingMemStoreSize = this.memstoreFlushSize *
716         conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
717   }
718 
719   /**
720    * Initialize this region.
721    * Used only by tests and SplitTransaction to reopen the region.
722    * You should use createHRegion() or openHRegion()
723    * @return What the next sequence (edit) id should be.
724    * @throws IOException e
725    * @deprecated use HRegion.createHRegion() or HRegion.openHRegion()
726    */
727   @Deprecated
728   public long initialize() throws IOException {
729     return initialize(null);
730   }
731 
732   /**
733    * Initialize this region.
734    *
735    * @param reporter Tickle every so often if initialize is taking a while.
736    * @return What the next sequence (edit) id should be.
737    * @throws IOException e
738    */
739   private long initialize(final CancelableProgressable reporter) throws IOException {
740     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
741     long nextSeqId = -1;
742     try {
743       nextSeqId = initializeRegionInternals(reporter, status);
744       return nextSeqId;
745     } finally {
746       // nextSeqid will be -1 if the initialization fails.
747       // At least it will be 0 otherwise.
748       if (nextSeqId == -1) {
749         status
750             .abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
751       }
752     }
753   }
754 
755   private long initializeRegionInternals(final CancelableProgressable reporter,
756       final MonitoredTask status) throws IOException, UnsupportedEncodingException {
757     if (coprocessorHost != null) {
758       status.setStatus("Running coprocessor pre-open hook");
759       coprocessorHost.preOpen();
760     }
761 
762     // Write HRI to a file in case we need to recover hbase:meta
763     status.setStatus("Writing region info on filesystem");
764     fs.checkRegionInfoOnFilesystem();
765 
766 
767 
768     // Initialize all the HStores
769     status.setStatus("Initializing all the Stores");
770     long maxSeqId = initializeRegionStores(reporter, status);
771 
772     this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
773     this.writestate.flushRequested = false;
774     this.writestate.compacting = 0;
775 
776     if (this.writestate.writesEnabled) {
777       // Remove temporary data left over from old regions
778       status.setStatus("Cleaning up temporary data from old regions");
779       fs.cleanupTempDir();
780     }
781 
782     if (this.writestate.writesEnabled) {
783       status.setStatus("Cleaning up detritus from prior splits");
784       // Get rid of any splits or merges that were lost in-progress.  Clean out
785       // these directories here on open.  We may be opening a region that was
786       // being split but we crashed in the middle of it all.
787       fs.cleanupAnySplitDetritus();
788       fs.cleanupMergesDir();
789     }
790 
791     // Initialize split policy
792     this.splitPolicy = RegionSplitPolicy.create(this, conf);
793 
794     // Initialize flush policy
795     this.flushPolicy = FlushPolicyFactory.create(this, conf);
796 
797     long lastFlushTime = EnvironmentEdgeManager.currentTime();
798     for (Store store: stores.values()) {
799       this.lastStoreFlushTimeMap.put(store, lastFlushTime);
800     }
801 
802     // Use maximum of log sequenceid or that which was found in stores
803     // (particularly if no recovered edits, seqid will be -1).
804     long nextSeqid = maxSeqId;
805 
806     // In distributedLogReplay mode, we don't know the last change sequence number because region
807     // is opened before recovery completes. So we add a safety bumper to avoid new sequence number
808     // overlaps used sequence numbers
809     if (this.writestate.writesEnabled) {
810       nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs
811           .getRegionDir(), nextSeqid, (this.isRecovering ? (this.flushPerChanges + 10000000) : 1));
812     } else {
813       nextSeqid++;
814     }
815 
816     LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
817       "; next sequenceid=" + nextSeqid);
818 
819     // A region can be reopened if failed a split; reset flags
820     this.closing.set(false);
821     this.closed.set(false);
822 
823     if (coprocessorHost != null) {
824       status.setStatus("Running coprocessor post-open hooks");
825       coprocessorHost.postOpen();
826     }
827 
828     status.markComplete("Region opened successfully");
829     return nextSeqid;
830   }
831 
832   private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
833       throws IOException, UnsupportedEncodingException {
834     // Load in all the HStores.
835 
836     long maxSeqId = -1;
837     // initialized to -1 so that we pick up MemstoreTS from column families
838     long maxMemstoreTS = -1;
839 
840     if (!htableDescriptor.getFamilies().isEmpty()) {
841       // initialize the thread pool for opening stores in parallel.
842       ThreadPoolExecutor storeOpenerThreadPool =
843         getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
844       CompletionService<HStore> completionService =
845         new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
846 
847       // initialize each store in parallel
848       for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
849         status.setStatus("Instantiating store for column family " + family);
850         completionService.submit(new Callable<HStore>() {
851           @Override
852           public HStore call() throws IOException {
853             return instantiateHStore(family);
854           }
855         });
856       }
857       boolean allStoresOpened = false;
858       try {
859         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
860           Future<HStore> future = completionService.take();
861           HStore store = future.get();
862           this.stores.put(store.getColumnFamilyName().getBytes(), store);
863 
864           long storeMaxSequenceId = store.getMaxSequenceId();
865           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
866               storeMaxSequenceId);
867           if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
868             maxSeqId = storeMaxSequenceId;
869           }
870           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
871           if (maxStoreMemstoreTS > maxMemstoreTS) {
872             maxMemstoreTS = maxStoreMemstoreTS;
873           }
874         }
875         allStoresOpened = true;
876       } catch (InterruptedException e) {
877         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
878       } catch (ExecutionException e) {
879         throw new IOException(e.getCause());
880       } finally {
881         storeOpenerThreadPool.shutdownNow();
882         if (!allStoresOpened) {
883           // something went wrong, close all opened stores
884           LOG.error("Could not initialize all stores for the region=" + this);
885           for (Store store : this.stores.values()) {
886             try {
887               store.close();
888             } catch (IOException e) {
889               LOG.warn(e.getMessage());
890             }
891           }
892         }
893       }
894     }
895     if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
896       // Recover any edits if available.
897       maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
898           this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
899     }
900     maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
901     mvcc.initialize(maxSeqId);
902     return maxSeqId;
903   }
904 
905   private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
906     Map<byte[], List<Path>> storeFiles
907     = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
908     for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
909       Store store = entry.getValue();
910       ArrayList<Path> storeFileNames = new ArrayList<Path>();
911       for (StoreFile storeFile: store.getStorefiles()) {
912         storeFileNames.add(storeFile.getPath());
913       }
914       storeFiles.put(entry.getKey(), storeFileNames);
915     }
916 
917     RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
918       RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
919       getRegionServerServices().getServerName(), storeFiles);
920     WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc,
921       getSequenceId());
922   }
923 
924   private void writeRegionCloseMarker(WAL wal) throws IOException {
925     Map<byte[], List<Path>> storeFiles
926     = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
927     for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
928       Store store = entry.getValue();
929       ArrayList<Path> storeFileNames = new ArrayList<Path>();
930       for (StoreFile storeFile: store.getStorefiles()) {
931         storeFileNames.add(storeFile.getPath());
932       }
933       storeFiles.put(entry.getKey(), storeFileNames);
934     }
935 
936     RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
937       RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
938       getRegionServerServices().getServerName(), storeFiles);
939     WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc,
940       getSequenceId());
941 
942     // Store SeqId in HDFS when a region closes
943     // checking region folder exists is due to many tests which delete the table folder while a
944     // table is still online
945     if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
946       WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
947         getSequenceId().get(), 0);
948     }
949   }
950 
951   /**
952    * @return True if this region has references.
953    */
954   public boolean hasReferences() {
955     for (Store store : this.stores.values()) {
956       if (store.hasReferences()) return true;
957     }
958     return false;
959   }
960 
961   /**
962    * This function will return the HDFS blocks distribution based on the data
963    * captured when HFile is created
964    * @return The HDFS blocks distribution for the region.
965    */
966   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
967     HDFSBlocksDistribution hdfsBlocksDistribution =
968       new HDFSBlocksDistribution();
969     synchronized (this.stores) {
970       for (Store store : this.stores.values()) {
971         for (StoreFile sf : store.getStorefiles()) {
972           HDFSBlocksDistribution storeFileBlocksDistribution =
973             sf.getHDFSBlockDistribution();
974           hdfsBlocksDistribution.add(storeFileBlocksDistribution);
975         }
976       }
977     }
978     return hdfsBlocksDistribution;
979   }
980 
981   /**
982    * This is a helper function to compute HDFS block distribution on demand
983    * @param conf configuration
984    * @param tableDescriptor HTableDescriptor of the table
985    * @param regionInfo encoded name of the region
986    * @return The HDFS blocks distribution for the given region.
987    * @throws IOException
988    */
989   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
990       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
991     Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
992     return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
993   }
994 
995   /**
996    * This is a helper function to compute HDFS block distribution on demand
997    * @param conf configuration
998    * @param tableDescriptor HTableDescriptor of the table
999    * @param regionInfo encoded name of the region
1000    * @param tablePath the table directory
1001    * @return The HDFS blocks distribution for the given region.
1002    * @throws IOException
1003    */
1004   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
1005       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo,  Path tablePath)
1006       throws IOException {
1007     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
1008     FileSystem fs = tablePath.getFileSystem(conf);
1009 
1010     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
1011     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
1012       Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
1013       if (storeFiles == null) continue;
1014 
1015       for (StoreFileInfo storeFileInfo : storeFiles) {
1016         hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
1017       }
1018     }
1019     return hdfsBlocksDistribution;
1020   }
1021 
1022   public AtomicLong getMemstoreSize() {
1023     return memstoreSize;
1024   }
1025 
1026   /**
1027    * Increase the size of mem store in this region and the size of global mem
1028    * store
1029    * @return the size of memstore in this region
1030    */
1031   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
1032     if (this.rsAccounting != null) {
1033       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
1034     }
1035     return this.memstoreSize.addAndGet(memStoreSize);
1036   }
1037 
1038   /** @return a HRegionInfo object for this region */
1039   public HRegionInfo getRegionInfo() {
1040     return this.fs.getRegionInfo();
1041   }
1042 
1043   /**
1044    * @return Instance of {@link RegionServerServices} used by this HRegion.
1045    * Can be null.
1046    */
1047   RegionServerServices getRegionServerServices() {
1048     return this.rsServices;
1049   }
1050 
1051   /** @return readRequestsCount for this region */
1052   long getReadRequestsCount() {
1053     return this.readRequestsCount.get();
1054   }
1055 
1056   /** @return writeRequestsCount for this region */
1057   long getWriteRequestsCount() {
1058     return this.writeRequestsCount.get();
1059   }
1060 
1061   public MetricsRegion getMetrics() {
1062     return metricsRegion;
1063   }
1064 
1065   /** @return true if region is closed */
1066   public boolean isClosed() {
1067     return this.closed.get();
1068   }
1069 
1070   /**
1071    * @return True if closing process has started.
1072    */
1073   public boolean isClosing() {
1074     return this.closing.get();
1075   }
1076 
1077   /**
1078    * Reset recovering state of current region
1079    */
1080   public void setRecovering(boolean newState) {
1081     boolean wasRecovering = this.isRecovering;
1082     this.isRecovering = newState;
1083     if (wasRecovering && !isRecovering) {
1084       // Call only when wal replay is over.
1085       coprocessorHost.postLogReplay();
1086     }
1087   }
1088 
1089   /**
1090    * @return True if current region is in recovering
1091    */
1092   public boolean isRecovering() {
1093     return this.isRecovering;
1094   }
1095 
1096   /** @return true if region is available (not closed and not closing) */
1097   public boolean isAvailable() {
1098     return !isClosed() && !isClosing();
1099   }
1100 
1101   /** @return true if region is splittable */
1102   public boolean isSplittable() {
1103     return isAvailable() && !hasReferences();
1104   }
1105 
1106   /**
1107    * @return true if region is mergeable
1108    */
1109   public boolean isMergeable() {
1110     if (!isAvailable()) {
1111       LOG.debug("Region " + this.getRegionNameAsString()
1112           + " is not mergeable because it is closing or closed");
1113       return false;
1114     }
1115     if (hasReferences()) {
1116       LOG.debug("Region " + this.getRegionNameAsString()
1117           + " is not mergeable because it has references");
1118       return false;
1119     }
1120 
1121     return true;
1122   }
1123 
1124   public boolean areWritesEnabled() {
1125     synchronized(this.writestate) {
1126       return this.writestate.writesEnabled;
1127     }
1128   }
1129 
1130    public MultiVersionConsistencyControl getMVCC() {
1131      return mvcc;
1132    }
1133 
1134    /*
1135     * Returns readpoint considering given IsolationLevel
1136     */
1137    public long getReadpoint(IsolationLevel isolationLevel) {
1138      if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1139        // This scan can read even uncommitted transactions
1140        return Long.MAX_VALUE;
1141      }
1142      return mvcc.memstoreReadPoint();
1143    }
1144 
1145    public boolean isLoadingCfsOnDemandDefault() {
1146      return this.isLoadingCfsOnDemandDefault;
1147    }
1148 
1149   /**
1150    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
1151    * service any more calls.
1152    *
1153    * <p>This method could take some time to execute, so don't call it from a
1154    * time-sensitive thread.
1155    *
1156    * @return Vector of all the storage files that the HRegion's component
1157    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
1158    * vector if already closed and null if judged that it should not close.
1159    *
1160    * @throws IOException e
1161    */
1162   public Map<byte[], List<StoreFile>> close() throws IOException {
1163     return close(false);
1164   }
1165 
1166   private final Object closeLock = new Object();
1167 
1168   /** Conf key for the periodic flush interval */
1169   public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1170       "hbase.regionserver.optionalcacheflushinterval";
1171   /** Default interval for the memstore flush */
1172   public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1173 
1174   /** Conf key to force a flush if there are already enough changes for one region in memstore */
1175   public static final String MEMSTORE_FLUSH_PER_CHANGES =
1176       "hbase.regionserver.flush.per.changes";
1177   public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000; // 30 millions
1178   /**
1179    * The following MAX_FLUSH_PER_CHANGES is large enough because each KeyValue has 20+ bytes
1180    * overhead. Therefore, even 1G empty KVs occupy at least 20GB memstore size for a single region
1181    */
1182   public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G
1183 
1184   /**
1185    * Close down this HRegion.  Flush the cache unless abort parameter is true,
1186    * Shut down each HStore, don't service any more calls.
1187    *
1188    * This method could take some time to execute, so don't call it from a
1189    * time-sensitive thread.
1190    *
1191    * @param abort true if server is aborting (only during testing)
1192    * @return Vector of all the storage files that the HRegion's component
1193    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
1194    * we are not to close at this time or we are already closed.
1195    *
1196    * @throws IOException e
1197    */
1198   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1199     // Only allow one thread to close at a time. Serialize them so dual
1200     // threads attempting to close will run up against each other.
1201     MonitoredTask status = TaskMonitor.get().createStatus(
1202         "Closing region " + this +
1203         (abort ? " due to abort" : ""));
1204 
1205     status.setStatus("Waiting for close lock");
1206     try {
1207       synchronized (closeLock) {
1208         return doClose(abort, status);
1209       }
1210     } finally {
1211       status.cleanup();
1212     }
1213   }
1214 
1215   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1216       throws IOException {
1217     if (isClosed()) {
1218       LOG.warn("Region " + this + " already closed");
1219       return null;
1220     }
1221 
1222     if (coprocessorHost != null) {
1223       status.setStatus("Running coprocessor pre-close hooks");
1224       this.coprocessorHost.preClose(abort);
1225     }
1226 
1227     status.setStatus("Disabling compacts and flushes for region");
1228     synchronized (writestate) {
1229       // Disable compacting and flushing by background threads for this
1230       // region.
1231       writestate.writesEnabled = false;
1232       LOG.debug("Closing " + this + ": disabling compactions & flushes");
1233       waitForFlushesAndCompactions();
1234     }
1235     // If we were not just flushing, is it worth doing a preflush...one
1236     // that will clear out of the bulk of the memstore before we put up
1237     // the close flag?
1238     if (!abort && worthPreFlushing()) {
1239       status.setStatus("Pre-flushing region before close");
1240       LOG.info("Running close preflush of " + this.getRegionNameAsString());
1241       try {
1242         internalFlushcache(status);
1243       } catch (IOException ioe) {
1244         // Failed to flush the region. Keep going.
1245         status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1246       }
1247     }
1248 
1249     this.closing.set(true);
1250     status.setStatus("Disabling writes for close");
1251     // block waiting for the lock for closing
1252     lock.writeLock().lock();
1253     try {
1254       if (this.isClosed()) {
1255         status.abort("Already got closed by another process");
1256         // SplitTransaction handles the null
1257         return null;
1258       }
1259       LOG.debug("Updates disabled for region " + this);
1260       // Don't flush the cache if we are aborting
1261       if (!abort) {
1262         int flushCount = 0;
1263         while (this.getMemstoreSize().get() > 0) {
1264           try {
1265             if (flushCount++ > 0) {
1266               int actualFlushes = flushCount - 1;
1267               if (actualFlushes > 5) {
1268                 // If we tried 5 times and are unable to clear memory, abort
1269                 // so we do not lose data
1270                 throw new DroppedSnapshotException("Failed clearing memory after " +
1271                   actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
1272               }
1273               LOG.info("Running extra flush, " + actualFlushes +
1274                 " (carrying snapshot?) " + this);
1275             }
1276             internalFlushcache(status);
1277           } catch (IOException ioe) {
1278             status.setStatus("Failed flush " + this + ", putting online again");
1279             synchronized (writestate) {
1280               writestate.writesEnabled = true;
1281             }
1282             // Have to throw to upper layers.  I can't abort server from here.
1283             throw ioe;
1284           }
1285         }
1286       }
1287 
1288       Map<byte[], List<StoreFile>> result =
1289         new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1290       if (!stores.isEmpty()) {
1291         // initialize the thread pool for closing stores in parallel.
1292         ThreadPoolExecutor storeCloserThreadPool =
1293           getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
1294         CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1295           new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1296 
1297         // close each store in parallel
1298         for (final Store store : stores.values()) {
1299           assert abort || store.getFlushableSize() == 0;
1300           completionService
1301               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1302                 @Override
1303                 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1304                   return new Pair<byte[], Collection<StoreFile>>(
1305                     store.getFamily().getName(), store.close());
1306                 }
1307               });
1308         }
1309         try {
1310           for (int i = 0; i < stores.size(); i++) {
1311             Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1312             Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1313             List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1314             if (familyFiles == null) {
1315               familyFiles = new ArrayList<StoreFile>();
1316               result.put(storeFiles.getFirst(), familyFiles);
1317             }
1318             familyFiles.addAll(storeFiles.getSecond());
1319           }
1320         } catch (InterruptedException e) {
1321           throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1322         } catch (ExecutionException e) {
1323           throw new IOException(e.getCause());
1324         } finally {
1325           storeCloserThreadPool.shutdownNow();
1326         }
1327       }
1328 
1329       status.setStatus("Writing region close event to WAL");
1330       if (!abort && wal != null && getRegionServerServices() != null && !writestate.readOnly) {
1331         writeRegionCloseMarker(wal);
1332       }
1333 
1334       this.closed.set(true);
1335       if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
1336       if (coprocessorHost != null) {
1337         status.setStatus("Running coprocessor post-close hooks");
1338         this.coprocessorHost.postClose(abort);
1339       }
1340       if (this.metricsRegion != null) {
1341         this.metricsRegion.close();
1342       }
1343       if (this.metricsRegionWrapper != null) {
1344         Closeables.closeQuietly(this.metricsRegionWrapper);
1345       }
1346       status.markComplete("Closed");
1347       LOG.info("Closed " + this);
1348       return result;
1349     } finally {
1350       lock.writeLock().unlock();
1351     }
1352   }
1353 
1354   /**
1355    * Wait for all current flushes and compactions of the region to complete.
1356    * <p>
1357    * Exposed for TESTING.
1358    */
1359   public void waitForFlushesAndCompactions() {
1360     synchronized (writestate) {
1361       boolean interrupted = false;
1362       try {
1363         while (writestate.compacting > 0 || writestate.flushing) {
1364           LOG.debug("waiting for " + writestate.compacting + " compactions"
1365             + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1366           try {
1367             writestate.wait();
1368           } catch (InterruptedException iex) {
1369             // essentially ignore and propagate the interrupt back up
1370             LOG.warn("Interrupted while waiting");
1371             interrupted = true;
1372           }
1373         }
1374       } finally {
1375         if (interrupted) {
1376           Thread.currentThread().interrupt();
1377         }
1378       }
1379     }
1380   }
1381 
1382   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1383       final String threadNamePrefix) {
1384     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1385     int maxThreads = Math.min(numStores,
1386         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1387             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1388     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1389   }
1390 
1391   protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1392       final String threadNamePrefix) {
1393     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1394     int maxThreads = Math.max(1,
1395         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1396             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1397             / numStores);
1398     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1399   }
1400 
1401   static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1402       final String threadNamePrefix) {
1403     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1404       new ThreadFactory() {
1405         private int count = 1;
1406 
1407         @Override
1408         public Thread newThread(Runnable r) {
1409           return new Thread(r, threadNamePrefix + "-" + count++);
1410         }
1411       });
1412   }
1413 
1414    /**
1415     * @return True if its worth doing a flush before we put up the close flag.
1416     */
1417   private boolean worthPreFlushing() {
1418     return this.memstoreSize.get() >
1419       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1420   }
1421 
1422   //////////////////////////////////////////////////////////////////////////////
1423   // HRegion accessors
1424   //////////////////////////////////////////////////////////////////////////////
1425 
1426   /** @return start key for region */
1427   public byte [] getStartKey() {
1428     return this.getRegionInfo().getStartKey();
1429   }
1430 
1431   /** @return end key for region */
1432   public byte [] getEndKey() {
1433     return this.getRegionInfo().getEndKey();
1434   }
1435 
1436   /** @return region id */
1437   public long getRegionId() {
1438     return this.getRegionInfo().getRegionId();
1439   }
1440 
1441   /** @return region name */
1442   public byte [] getRegionName() {
1443     return this.getRegionInfo().getRegionName();
1444   }
1445 
1446   /** @return region name as string for logging */
1447   public String getRegionNameAsString() {
1448     return this.getRegionInfo().getRegionNameAsString();
1449   }
1450 
1451   /** @return HTableDescriptor for this region */
1452   public HTableDescriptor getTableDesc() {
1453     return this.htableDescriptor;
1454   }
1455 
1456   /** @return WAL in use for this region */
1457   public WAL getWAL() {
1458     return this.wal;
1459   }
1460 
1461   /**
1462    * @return split policy for this region.
1463    */
1464   public RegionSplitPolicy getSplitPolicy() {
1465     return this.splitPolicy;
1466   }
1467 
1468   /**
1469    * A split takes the config from the parent region & passes it to the daughter
1470    * region's constructor. If 'conf' was passed, you would end up using the HTD
1471    * of the parent region in addition to the new daughter HTD. Pass 'baseConf'
1472    * to the daughter regions to avoid this tricky dedupe problem.
1473    * @return Configuration object
1474    */
1475   Configuration getBaseConf() {
1476     return this.baseConf;
1477   }
1478 
1479   /** @return {@link FileSystem} being used by this region */
1480   public FileSystem getFilesystem() {
1481     return fs.getFileSystem();
1482   }
1483 
1484   /** @return the {@link HRegionFileSystem} used by this region */
1485   public HRegionFileSystem getRegionFileSystem() {
1486     return this.fs;
1487   }
1488 
1489   /**
1490    * @return Returns the earliest time a store in the region was flushed. All
1491    *         other stores in the region would have been flushed either at, or
1492    *         after this time.
1493    */
1494   @VisibleForTesting
1495   public long getEarliestFlushTimeForAllStores() {
1496     return Collections.min(lastStoreFlushTimeMap.values());
1497   }
1498 
1499   //////////////////////////////////////////////////////////////////////////////
1500   // HRegion maintenance.
1501   //
1502   // These methods are meant to be called periodically by the HRegionServer for
1503   // upkeep.
1504   //////////////////////////////////////////////////////////////////////////////
1505 
1506   /** @return returns size of largest HStore. */
1507   public long getLargestHStoreSize() {
1508     long size = 0;
1509     for (Store h : stores.values()) {
1510       long storeSize = h.getSize();
1511       if (storeSize > size) {
1512         size = storeSize;
1513       }
1514     }
1515     return size;
1516   }
1517 
1518   /**
1519    * @return KeyValue Comparator
1520    */
1521   public KeyValue.KVComparator getComparator() {
1522     return this.comparator;
1523   }
1524 
1525   /*
1526    * Do preparation for pending compaction.
1527    * @throws IOException
1528    */
1529   protected void doRegionCompactionPrep() throws IOException {
1530   }
1531 
1532   void triggerMajorCompaction() {
1533     for (Store h : stores.values()) {
1534       h.triggerMajorCompaction();
1535     }
1536   }
1537 
1538   /**
1539    * This is a helper function that compact all the stores synchronously
1540    * It is used by utilities and testing
1541    *
1542    * @param majorCompaction True to force a major compaction regardless of thresholds
1543    * @throws IOException e
1544    */
1545   public void compactStores(final boolean majorCompaction)
1546   throws IOException {
1547     if (majorCompaction) {
1548       this.triggerMajorCompaction();
1549     }
1550     compactStores();
1551   }
1552 
1553   /**
1554    * This is a helper function that compact all the stores synchronously
1555    * It is used by utilities and testing
1556    *
1557    * @throws IOException e
1558    */
1559   public void compactStores() throws IOException {
1560     for (Store s : getStores().values()) {
1561       CompactionContext compaction = s.requestCompaction();
1562       if (compaction != null) {
1563         compact(compaction, s);
1564       }
1565     }
1566   }
1567 
1568   /*
1569    * Called by compaction thread and after region is opened to compact the
1570    * HStores if necessary.
1571    *
1572    * <p>This operation could block for a long time, so don't call it from a
1573    * time-sensitive thread.
1574    *
1575    * Note that no locking is necessary at this level because compaction only
1576    * conflicts with a region split, and that cannot happen because the region
1577    * server does them sequentially and not in parallel.
1578    *
1579    * @param cr Compaction details, obtained by requestCompaction()
1580    * @return whether the compaction completed
1581    * @throws IOException e
1582    */
1583   public boolean compact(CompactionContext compaction, Store store) throws IOException {
1584     assert compaction != null && compaction.hasSelection();
1585     assert !compaction.getRequest().getFiles().isEmpty();
1586     if (this.closing.get() || this.closed.get()) {
1587       LOG.debug("Skipping compaction on " + this + " because closing/closed");
1588       store.cancelRequestedCompaction(compaction);
1589       return false;
1590     }
1591     MonitoredTask status = null;
1592     boolean requestNeedsCancellation = true;
1593     // block waiting for the lock for compaction
1594     lock.readLock().lock();
1595     try {
1596       byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1597       if (stores.get(cf) != store) {
1598         LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1599             + " has been re-instantiated, cancel this compaction request. "
1600             + " It may be caused by the roll back of split transaction");
1601         return false;
1602       }
1603 
1604       status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1605       if (this.closed.get()) {
1606         String msg = "Skipping compaction on " + this + " because closed";
1607         LOG.debug(msg);
1608         status.abort(msg);
1609         return false;
1610       }
1611       boolean wasStateSet = false;
1612       try {
1613         synchronized (writestate) {
1614           if (writestate.writesEnabled) {
1615             wasStateSet = true;
1616             ++writestate.compacting;
1617           } else {
1618             String msg = "NOT compacting region " + this + ". Writes disabled.";
1619             LOG.info(msg);
1620             status.abort(msg);
1621             return false;
1622           }
1623         }
1624         LOG.info("Starting compaction on " + store + " in region " + this
1625             + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1626         doRegionCompactionPrep();
1627         try {
1628           status.setStatus("Compacting store " + store);
1629           // We no longer need to cancel the request on the way out of this
1630           // method because Store#compact will clean up unconditionally
1631           requestNeedsCancellation = false;
1632           store.compact(compaction);
1633         } catch (InterruptedIOException iioe) {
1634           String msg = "compaction interrupted";
1635           LOG.info(msg, iioe);
1636           status.abort(msg);
1637           return false;
1638         }
1639       } finally {
1640         if (wasStateSet) {
1641           synchronized (writestate) {
1642             --writestate.compacting;
1643             if (writestate.compacting <= 0) {
1644               writestate.notifyAll();
1645             }
1646           }
1647         }
1648       }
1649       status.markComplete("Compaction complete");
1650       return true;
1651     } finally {
1652       try {
1653         if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
1654         if (status != null) status.cleanup();
1655       } finally {
1656         lock.readLock().unlock();
1657       }
1658     }
1659   }
1660 
1661   /**
1662    * Flush all stores.
1663    * <p>
1664    * See {@link #flushcache(boolean)}.
1665    *
1666    * @return whether the flush is success and whether the region needs compacting
1667    * @throws IOException
1668    */
1669   public FlushResult flushcache() throws IOException {
1670     return flushcache(true);
1671   }
1672 
1673   /**
1674    * Flush the cache.
1675    *
1676    * When this method is called the cache will be flushed unless:
1677    * <ol>
1678    *   <li>the cache is empty</li>
1679    *   <li>the region is closed.</li>
1680    *   <li>a flush is already in progress</li>
1681    *   <li>writes are disabled</li>
1682    * </ol>
1683    *
1684    * <p>This method may block for some time, so it should not be called from a
1685    * time-sensitive thread.
1686    * @param forceFlushAllStores whether we want to flush all stores
1687    * @return whether the flush is success and whether the region needs compacting
1688    *
1689    * @throws IOException general io exceptions
1690    * @throws DroppedSnapshotException Thrown when replay of wal is required
1691    * because a Snapshot was not properly persisted.
1692    */
1693   public FlushResult flushcache(boolean forceFlushAllStores) throws IOException {
1694     // fail-fast instead of waiting on the lock
1695     if (this.closing.get()) {
1696       String msg = "Skipping flush on " + this + " because closing";
1697       LOG.debug(msg);
1698       return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1699     }
1700     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1701     status.setStatus("Acquiring readlock on region");
1702     // block waiting for the lock for flushing cache
1703     lock.readLock().lock();
1704     try {
1705       if (this.closed.get()) {
1706         String msg = "Skipping flush on " + this + " because closed";
1707         LOG.debug(msg);
1708         status.abort(msg);
1709         return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1710       }
1711       if (coprocessorHost != null) {
1712         status.setStatus("Running coprocessor pre-flush hooks");
1713         coprocessorHost.preFlush();
1714       }
1715       if (numMutationsWithoutWAL.get() > 0) {
1716         numMutationsWithoutWAL.set(0);
1717         dataInMemoryWithoutWAL.set(0);
1718       }
1719       synchronized (writestate) {
1720         if (!writestate.flushing && writestate.writesEnabled) {
1721           this.writestate.flushing = true;
1722         } else {
1723           if (LOG.isDebugEnabled()) {
1724             LOG.debug("NOT flushing memstore for region " + this
1725                 + ", flushing=" + writestate.flushing + ", writesEnabled="
1726                 + writestate.writesEnabled);
1727           }
1728           String msg = "Not flushing since "
1729               + (writestate.flushing ? "already flushing"
1730               : "writes not enabled");
1731           status.abort(msg);
1732           return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1733         }
1734       }
1735 
1736       try {
1737         Collection<Store> specificStoresToFlush =
1738             forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
1739         FlushResult fs = internalFlushcache(specificStoresToFlush, status);
1740 
1741         if (coprocessorHost != null) {
1742           status.setStatus("Running post-flush coprocessor hooks");
1743           coprocessorHost.postFlush();
1744         }
1745 
1746         status.markComplete("Flush successful");
1747         return fs;
1748       } finally {
1749         synchronized (writestate) {
1750           writestate.flushing = false;
1751           this.writestate.flushRequested = false;
1752           writestate.notifyAll();
1753         }
1754       }
1755     } finally {
1756       lock.readLock().unlock();
1757       status.cleanup();
1758     }
1759   }
1760 
1761   /**
1762    * Should the store be flushed because it is old enough.
1763    * <p>
1764    * Every FlushPolicy should call this to determine whether a store is old enough to flush(except
1765    * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always
1766    * returns true which will make a lot of flush requests.
1767    */
1768   boolean shouldFlushStore(Store store) {
1769     long maxFlushedSeqId =
1770         this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store
1771             .getFamily().getName()) - 1;
1772     if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) {
1773       if (LOG.isDebugEnabled()) {
1774         LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
1775             + " will be flushed because its max flushed seqId(" + maxFlushedSeqId
1776             + ") is far away from current(" + sequenceId.get() + "), max allowed is "
1777             + flushPerChanges);
1778       }
1779       return true;
1780     }
1781     if (flushCheckInterval <= 0) {
1782       return false;
1783     }
1784     long now = EnvironmentEdgeManager.currentTime();
1785     if (store.timeOfOldestEdit() < now - flushCheckInterval) {
1786       if (LOG.isDebugEnabled()) {
1787         LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
1788             + " will be flushed because time of its oldest edit (" + store.timeOfOldestEdit()
1789             + ") is far away from now(" + now + "), max allowed is " + flushCheckInterval);
1790       }
1791       return true;
1792     }
1793     return false;
1794   }
1795 
1796   /**
1797    * Should the memstore be flushed now
1798    */
1799   boolean shouldFlush() {
1800     // This is a rough measure.
1801     if (this.maxFlushedSeqId > 0
1802           && (this.maxFlushedSeqId + this.flushPerChanges < this.sequenceId.get())) {
1803       return true;
1804     }
1805     if (flushCheckInterval <= 0) { //disabled
1806       return false;
1807     }
1808     long now = EnvironmentEdgeManager.currentTime();
1809     //if we flushed in the recent past, we don't need to do again now
1810     if ((now - getEarliestFlushTimeForAllStores() < flushCheckInterval)) {
1811       return false;
1812     }
1813     //since we didn't flush in the recent past, flush now if certain conditions
1814     //are met. Return true on first such memstore hit.
1815     for (Store s : this.getStores().values()) {
1816       if (s.timeOfOldestEdit() < now - flushCheckInterval) {
1817         // we have an old enough edit in the memstore, flush
1818         return true;
1819       }
1820     }
1821     return false;
1822   }
1823 
1824   /**
1825    * Flushing all stores.
1826    *
1827    * @see #internalFlushcache(Collection, MonitoredTask)
1828    */
1829   private FlushResult internalFlushcache(MonitoredTask status)
1830       throws IOException {
1831     return internalFlushcache(stores.values(), status);
1832   }
1833 
1834   /**
1835    * Flushing given stores.
1836    *
1837    * @see #internalFlushcache(WAL, long, Collection, MonitoredTask)
1838    */
1839   private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
1840       MonitoredTask status) throws IOException {
1841     return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
1842         status);
1843   }
1844 
1845   /**
1846    * Flush the memstore. Flushing the memstore is a little tricky. We have a lot
1847    * of updates in the memstore, all of which have also been written to the wal.
1848    * We need to write those updates in the memstore out to disk, while being
1849    * able to process reads/writes as much as possible during the flush
1850    * operation.
1851    * <p>
1852    * This method may block for some time. Every time you call it, we up the
1853    * regions sequence id even if we don't flush; i.e. the returned region id
1854    * will be at least one larger than the last edit applied to this region. The
1855    * returned id does not refer to an actual edit. The returned id can be used
1856    * for say installing a bulk loaded file just ahead of the last hfile that was
1857    * the result of this flush, etc.
1858    *
1859    * @param wal
1860    *          Null if we're NOT to go via wal.
1861    * @param myseqid
1862    *          The seqid to use if <code>wal</code> is null writing out flush
1863    *          file.
1864    * @param storesToFlush
1865    *          The list of stores to flush.
1866    * @return object describing the flush's state
1867    * @throws IOException
1868    *           general io exceptions
1869    * @throws DroppedSnapshotException
1870    *           Thrown when replay of wal is required because a Snapshot was not
1871    *           properly persisted.
1872    */
1873   protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
1874       final Collection<Store> storesToFlush, MonitoredTask status) throws IOException {
1875     if (this.rsServices != null && this.rsServices.isAborted()) {
1876       // Don't flush when server aborting, it's unsafe
1877       throw new IOException("Aborting flush because server is aborted...");
1878     }
1879     final long startTime = EnvironmentEdgeManager.currentTime();
1880     // If nothing to flush, return, but we need to safely update the region sequence id
1881     if (this.memstoreSize.get() <= 0) {
1882       // Take an update lock because am about to change the sequence id and we want the sequence id
1883       // to be at the border of the empty memstore.
1884       MultiVersionConsistencyControl.WriteEntry w = null;
1885       this.updatesLock.writeLock().lock();
1886       try {
1887         if (this.memstoreSize.get() <= 0) {
1888           // Presume that if there are still no edits in the memstore, then there are no edits for
1889           // this region out in the WAL subsystem so no need to do any trickery clearing out
1890           // edits in the WAL system. Up the sequence number so the resulting flush id is for
1891           // sure just beyond the last appended region edit (useful as a marker when bulk loading,
1892           // etc.)
1893           // wal can be null replaying edits.
1894           if (wal != null) {
1895             w = mvcc.beginMemstoreInsert();
1896             long flushSeqId = getNextSequenceId(wal);
1897             FlushResult flushResult = new FlushResult(
1898               FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush");
1899             w.setWriteNumber(flushSeqId);
1900             mvcc.waitForPreviousTransactionsComplete(w);
1901             w = null;
1902             return flushResult;
1903           } else {
1904             return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
1905               "Nothing to flush");
1906           }
1907         }
1908       } finally {
1909         this.updatesLock.writeLock().unlock();
1910         if (w != null) {
1911           mvcc.advanceMemstore(w);
1912         }
1913       }
1914     }
1915 
1916     if (LOG.isInfoEnabled()) {
1917       LOG.info("Started memstore flush for " + this + ", current region memstore size "
1918           + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/"
1919           + stores.size() + " column families' memstores are being flushed."
1920           + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid));
1921       // only log when we are not flushing all stores.
1922       if (this.stores.size() > storesToFlush.size()) {
1923         for (Store store: storesToFlush) {
1924           LOG.info("Flushing Column Family: " + store.getColumnFamilyName()
1925               + " which was occupying "
1926               + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore.");
1927         }
1928       }
1929     }
1930     // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
1931     // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
1932     // allow updates again so its value will represent the size of the updates received
1933     // during flush
1934     MultiVersionConsistencyControl.WriteEntry w = null;
1935     // We have to take an update lock during snapshot, or else a write could end up in both snapshot
1936     // and memstore (makes it difficult to do atomic rows then)
1937     status.setStatus("Obtaining lock to block concurrent updates");
1938     // block waiting for the lock for internal flush
1939     this.updatesLock.writeLock().lock();
1940     status.setStatus("Preparing to flush by snapshotting stores in " +
1941       getRegionInfo().getEncodedName());
1942     long totalFlushableSizeOfFlushableStores = 0;
1943 
1944     Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
1945     for (Store store: storesToFlush) {
1946       flushedFamilyNames.add(store.getFamily().getName());
1947     }
1948 
1949     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
1950     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
1951         Bytes.BYTES_COMPARATOR);
1952     // The sequence id of this flush operation which is used to log FlushMarker and pass to
1953     // createFlushContext to use as the store file's sequence id.
1954     long flushOpSeqId = HConstants.NO_SEQNUM;
1955     // The max flushed sequence id after this flush operation. Used as completeSequenceId which is
1956     // passed to HMaster.
1957     long flushedSeqId = HConstants.NO_SEQNUM;
1958     byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
1959 
1960     long trxId = 0;
1961     try {
1962       try {
1963         w = mvcc.beginMemstoreInsert();
1964         if (wal != null) {
1965           if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) {
1966             // This should never happen.
1967             String msg = "Flush will not be started for ["
1968                 + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
1969             status.setStatus(msg);
1970             return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1971           }
1972           flushOpSeqId = getNextSequenceId(wal);
1973           long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
1974           // no oldestUnflushedSeqId means we flushed all stores.
1975           // or the unflushed stores are all empty.
1976           flushedSeqId =
1977               oldestUnflushedSeqId == HConstants.NO_SEQNUM ? flushOpSeqId : oldestUnflushedSeqId - 1;
1978         } else {
1979           // use the provided sequence Id as WAL is not being used for this flush.
1980           flushedSeqId = flushOpSeqId = myseqid;
1981         }
1982 
1983         for (Store s : storesToFlush) {
1984           totalFlushableSizeOfFlushableStores += s.getFlushableSize();
1985           storeFlushCtxs.add(s.createFlushContext(flushOpSeqId));
1986           committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
1987         }
1988 
1989         // write the snapshot start to WAL
1990         if (wal != null) {
1991           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
1992             getRegionInfo(), flushOpSeqId, committedFiles);
1993           // no sync. Sync is below where we do not hold the updates lock
1994           trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1995             desc, sequenceId, false);
1996         }
1997 
1998         // Prepare flush (take a snapshot)
1999         for (StoreFlushContext flush : storeFlushCtxs) {
2000           flush.prepare();
2001         }
2002       } catch (IOException ex) {
2003         if (wal != null) {
2004           if (trxId > 0) { // check whether we have already written START_FLUSH to WAL
2005             try {
2006               FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2007                 getRegionInfo(), flushOpSeqId, committedFiles);
2008               WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2009                 desc, sequenceId, false);
2010             } catch (Throwable t) {
2011               LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
2012                   StringUtils.stringifyException(t));
2013               // ignore this since we will be aborting the RS with DSE.
2014             }
2015           }
2016           // we have called wal.startCacheFlush(), now we have to abort it
2017           wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2018           throw ex; // let upper layers deal with it.
2019         }
2020       } finally {
2021         this.updatesLock.writeLock().unlock();
2022       }
2023       String s = "Finished memstore snapshotting " + this +
2024         ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores;
2025       status.setStatus(s);
2026       if (LOG.isTraceEnabled()) LOG.trace(s);
2027       // sync unflushed WAL changes
2028       // see HBASE-8208 for details
2029       if (wal != null) {
2030         try {
2031           wal.sync(); // ensure that flush marker is sync'ed
2032         } catch (IOException ioe) {
2033           LOG.warn("Unexpected exception while wal.sync(), ignoring. Exception: "
2034               + StringUtils.stringifyException(ioe));
2035         }
2036       }
2037 
2038       // wait for all in-progress transactions to commit to WAL before
2039       // we can start the flush. This prevents
2040       // uncommitted transactions from being written into HFiles.
2041       // We have to block before we start the flush, otherwise keys that
2042       // were removed via a rollbackMemstore could be written to Hfiles.
2043       w.setWriteNumber(flushOpSeqId);
2044       mvcc.waitForPreviousTransactionsComplete(w);
2045       // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
2046       w = null;
2047       s = "Flushing stores of " + this;
2048       status.setStatus(s);
2049       if (LOG.isTraceEnabled()) LOG.trace(s);
2050     } finally {
2051       if (w != null) {
2052         // in case of failure just mark current w as complete
2053         mvcc.advanceMemstore(w);
2054       }
2055     }
2056 
2057     // Any failure from here on out will be catastrophic requiring server
2058     // restart so wal content can be replayed and put back into the memstore.
2059     // Otherwise, the snapshot content while backed up in the wal, it will not
2060     // be part of the current running servers state.
2061     boolean compactionRequested = false;
2062     try {
2063       // A.  Flush memstore to all the HStores.
2064       // Keep running vector of all store files that includes both old and the
2065       // just-made new flush store file. The new flushed file is still in the
2066       // tmp directory.
2067 
2068       for (StoreFlushContext flush : storeFlushCtxs) {
2069         flush.flushCache(status);
2070       }
2071 
2072       // Switch snapshot (in memstore) -> new hfile (thus causing
2073       // all the store scanners to reset/reseek).
2074       Iterator<Store> it = storesToFlush.iterator();
2075       // stores.values() and storeFlushCtxs have same order
2076       for (StoreFlushContext flush : storeFlushCtxs) {
2077         boolean needsCompaction = flush.commit(status);
2078         if (needsCompaction) {
2079           compactionRequested = true;
2080         }
2081         committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles());
2082       }
2083       storeFlushCtxs.clear();
2084 
2085       // Set down the memstore size by amount of flush.
2086       this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
2087 
2088       if (wal != null) {
2089         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
2090         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
2091           getRegionInfo(), flushOpSeqId, committedFiles);
2092         WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2093           desc, sequenceId, true);
2094       }
2095     } catch (Throwable t) {
2096       // An exception here means that the snapshot was not persisted.
2097       // The wal needs to be replayed so its content is restored to memstore.
2098       // Currently, only a server restart will do this.
2099       // We used to only catch IOEs but its possible that we'd get other
2100       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
2101       // all and sundry.
2102       if (wal != null) {
2103         try {
2104           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2105             getRegionInfo(), flushOpSeqId, committedFiles);
2106           WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2107             desc, sequenceId, false);
2108         } catch (Throwable ex) {
2109           LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
2110               StringUtils.stringifyException(ex));
2111           // ignore this since we will be aborting the RS with DSE.
2112         }
2113         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2114       }
2115       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
2116           Bytes.toStringBinary(getRegionName()));
2117       dse.initCause(t);
2118       status.abort("Flush failed: " + StringUtils.stringifyException(t));
2119       throw dse;
2120     }
2121 
2122     // If we get to here, the HStores have been written.
2123     if (wal != null) {
2124       wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2125     }
2126 
2127     // Record latest flush time
2128     for (Store store: storesToFlush) {
2129       this.lastStoreFlushTimeMap.put(store, startTime);
2130     }
2131 
2132     // Update the oldest unflushed sequence id for region.
2133     this.maxFlushedSeqId = flushedSeqId;
2134 
2135     // C. Finally notify anyone waiting on memstore to clear:
2136     // e.g. checkResources().
2137     synchronized (this) {
2138       notifyAll(); // FindBugs NN_NAKED_NOTIFY
2139     }
2140 
2141     long time = EnvironmentEdgeManager.currentTime() - startTime;
2142     long memstoresize = this.memstoreSize.get();
2143     String msg = "Finished memstore flush of ~"
2144         + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
2145         + totalFlushableSizeOfFlushableStores + ", currentsize="
2146         + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
2147         + " for region " + this + " in " + time + "ms, sequenceid="
2148         + flushOpSeqId +  ", compaction requested=" + compactionRequested
2149         + ((wal == null) ? "; wal=null" : "");
2150     LOG.info(msg);
2151     status.setStatus(msg);
2152 
2153     return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
2154         FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId);
2155   }
2156 
2157   /**
2158    * Method to safely get the next sequence number.
2159    * @return Next sequence number unassociated with any actual edit.
2160    * @throws IOException
2161    */
2162   private long getNextSequenceId(final WAL wal) throws IOException {
2163     WALKey key = this.appendEmptyEdit(wal, null);
2164     return key.getSequenceId();
2165   }
2166 
2167   //////////////////////////////////////////////////////////////////////////////
2168   // get() methods for client use.
2169   //////////////////////////////////////////////////////////////////////////////
2170   /**
2171    * Return all the data for the row that matches <i>row</i> exactly,
2172    * or the one that immediately preceeds it, at or immediately before
2173    * <i>ts</i>.
2174    *
2175    * @param row row key
2176    * @return map of values
2177    * @throws IOException
2178    */
2179   Result getClosestRowBefore(final byte [] row)
2180   throws IOException{
2181     return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
2182   }
2183 
2184   /**
2185    * Return all the data for the row that matches <i>row</i> exactly,
2186    * or the one that immediately precedes it, at or immediately before
2187    * <i>ts</i>.
2188    *
2189    * @param row row key
2190    * @param family column family to find on
2191    * @return map of values
2192    * @throws IOException read exceptions
2193    */
2194   public Result getClosestRowBefore(final byte [] row, final byte [] family)
2195   throws IOException {
2196     if (coprocessorHost != null) {
2197       Result result = new Result();
2198       if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
2199         return result;
2200       }
2201     }
2202     // look across all the HStores for this region and determine what the
2203     // closest key is across all column families, since the data may be sparse
2204     checkRow(row, "getClosestRowBefore");
2205     startRegionOperation(Operation.GET);
2206     this.readRequestsCount.increment();
2207     try {
2208       Store store = getStore(family);
2209       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
2210       Cell key = store.getRowKeyAtOrBefore(row);
2211       Result result = null;
2212       if (key != null) {
2213         Get get = new Get(CellUtil.cloneRow(key));
2214         get.addFamily(family);
2215         result = get(get);
2216       }
2217       if (coprocessorHost != null) {
2218         coprocessorHost.postGetClosestRowBefore(row, family, result);
2219       }
2220       return result;
2221     } finally {
2222       closeRegionOperation(Operation.GET);
2223     }
2224   }
2225 
2226   /**
2227    * Return an iterator that scans over the HRegion, returning the indicated
2228    * columns and rows specified by the {@link Scan}.
2229    * <p>
2230    * This Iterator must be closed by the caller.
2231    *
2232    * @param scan configured {@link Scan}
2233    * @return RegionScanner
2234    * @throws IOException read exceptions
2235    */
2236   public RegionScanner getScanner(Scan scan) throws IOException {
2237    return getScanner(scan, null);
2238   }
2239 
2240   void prepareScanner(Scan scan) throws IOException {
2241     if(!scan.hasFamilies()) {
2242       // Adding all families to scanner
2243       for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
2244         scan.addFamily(family);
2245       }
2246     }
2247   }
2248 
2249   protected RegionScanner getScanner(Scan scan,
2250       List<KeyValueScanner> additionalScanners) throws IOException {
2251     startRegionOperation(Operation.SCAN);
2252     try {
2253       // Verify families are all valid
2254       prepareScanner(scan);
2255       if(scan.hasFamilies()) {
2256         for(byte [] family : scan.getFamilyMap().keySet()) {
2257           checkFamily(family);
2258         }
2259       }
2260       return instantiateRegionScanner(scan, additionalScanners);
2261     } finally {
2262       closeRegionOperation(Operation.SCAN);
2263     }
2264   }
2265 
2266   protected RegionScanner instantiateRegionScanner(Scan scan,
2267       List<KeyValueScanner> additionalScanners) throws IOException {
2268     if (scan.isReversed()) {
2269       if (scan.getFilter() != null) {
2270         scan.getFilter().setReversed(true);
2271       }
2272       return new ReversedRegionScannerImpl(scan, additionalScanners, this);
2273     }
2274     return new RegionScannerImpl(scan, additionalScanners, this);
2275   }
2276 
2277   /*
2278    * @param delete The passed delete is modified by this method. WARNING!
2279    */
2280   void prepareDelete(Delete delete) throws IOException {
2281     // Check to see if this is a deleteRow insert
2282     if(delete.getFamilyCellMap().isEmpty()){
2283       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
2284         // Don't eat the timestamp
2285         delete.addFamily(family, delete.getTimeStamp());
2286       }
2287     } else {
2288       for(byte [] family : delete.getFamilyCellMap().keySet()) {
2289         if(family == null) {
2290           throw new NoSuchColumnFamilyException("Empty family is invalid");
2291         }
2292         checkFamily(family);
2293       }
2294     }
2295   }
2296 
2297   //////////////////////////////////////////////////////////////////////////////
2298   // set() methods for client use.
2299   //////////////////////////////////////////////////////////////////////////////
2300   /**
2301    * @param delete delete object
2302    * @throws IOException read exceptions
2303    */
2304   public void delete(Delete delete)
2305   throws IOException {
2306     checkReadOnly();
2307     checkResources();
2308     startRegionOperation(Operation.DELETE);
2309     try {
2310       delete.getRow();
2311       // All edits for the given row (across all column families) must happen atomically.
2312       doBatchMutate(delete);
2313     } finally {
2314       closeRegionOperation(Operation.DELETE);
2315     }
2316   }
2317 
2318   /**
2319    * Row needed by below method.
2320    */
2321   private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2322   /**
2323    * This is used only by unit tests. Not required to be a public API.
2324    * @param familyMap map of family to edits for the given family.
2325    * @throws IOException
2326    */
2327   void delete(NavigableMap<byte[], List<Cell>> familyMap,
2328       Durability durability) throws IOException {
2329     Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2330     delete.setFamilyCellMap(familyMap);
2331     delete.setDurability(durability);
2332     doBatchMutate(delete);
2333   }
2334 
2335   /**
2336    * Setup correct timestamps in the KVs in Delete object.
2337    * Caller should have the row and region locks.
2338    * @param mutation
2339    * @param familyMap
2340    * @param byteNow
2341    * @throws IOException
2342    */
2343   void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2344       byte[] byteNow) throws IOException {
2345     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2346 
2347       byte[] family = e.getKey();
2348       List<Cell> cells = e.getValue();
2349       assert cells instanceof RandomAccess;
2350 
2351       Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2352       int listSize = cells.size();
2353       for (int i=0; i < listSize; i++) {
2354         Cell cell = cells.get(i);
2355         //  Check if time is LATEST, change to time of most recent addition if so
2356         //  This is expensive.
2357         if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) {
2358           byte[] qual = CellUtil.cloneQualifier(cell);
2359           if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2360 
2361           Integer count = kvCount.get(qual);
2362           if (count == null) {
2363             kvCount.put(qual, 1);
2364           } else {
2365             kvCount.put(qual, count + 1);
2366           }
2367           count = kvCount.get(qual);
2368 
2369           Get get = new Get(CellUtil.cloneRow(cell));
2370           get.setMaxVersions(count);
2371           get.addColumn(family, qual);
2372           if (coprocessorHost != null) {
2373             if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2374                 byteNow, get)) {
2375               updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2376             }
2377           } else {
2378             updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2379           }
2380         } else {
2381           CellUtil.updateLatestStamp(cell, byteNow, 0);
2382         }
2383       }
2384     }
2385   }
2386 
2387   void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow)
2388       throws IOException {
2389     List<Cell> result = get(get, false);
2390 
2391     if (result.size() < count) {
2392       // Nothing to delete
2393       CellUtil.updateLatestStamp(cell, byteNow, 0);
2394       return;
2395     }
2396     if (result.size() > count) {
2397       throw new RuntimeException("Unexpected size: " + result.size());
2398     }
2399     Cell getCell = result.get(count - 1);
2400     CellUtil.setTimestamp(cell, getCell.getTimestamp());
2401   }
2402 
2403   /**
2404    * @throws IOException
2405    */
2406   public void put(Put put)
2407   throws IOException {
2408     checkReadOnly();
2409 
2410     // Do a rough check that we have resources to accept a write.  The check is
2411     // 'rough' in that between the resource check and the call to obtain a
2412     // read lock, resources may run out.  For now, the thought is that this
2413     // will be extremely rare; we'll deal with it when it happens.
2414     checkResources();
2415     startRegionOperation(Operation.PUT);
2416     try {
2417       // All edits for the given row (across all column families) must happen atomically.
2418       doBatchMutate(put);
2419     } finally {
2420       closeRegionOperation(Operation.PUT);
2421     }
2422   }
2423 
2424   /**
2425    * Struct-like class that tracks the progress of a batch operation,
2426    * accumulating status codes and tracking the index at which processing
2427    * is proceeding.
2428    */
2429   private abstract static class BatchOperationInProgress<T> {
2430     T[] operations;
2431     int nextIndexToProcess = 0;
2432     OperationStatus[] retCodeDetails;
2433     WALEdit[] walEditsFromCoprocessors;
2434 
2435     public BatchOperationInProgress(T[] operations) {
2436       this.operations = operations;
2437       this.retCodeDetails = new OperationStatus[operations.length];
2438       this.walEditsFromCoprocessors = new WALEdit[operations.length];
2439       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2440     }
2441 
2442     public abstract Mutation getMutation(int index);
2443     public abstract long getNonceGroup(int index);
2444     public abstract long getNonce(int index);
2445     /** This method is potentially expensive and should only be used for non-replay CP path. */
2446     public abstract Mutation[] getMutationsForCoprocs();
2447     public abstract boolean isInReplay();
2448     public abstract long getReplaySequenceId();
2449 
2450     public boolean isDone() {
2451       return nextIndexToProcess == operations.length;
2452     }
2453   }
2454 
2455   private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2456     private long nonceGroup;
2457     private long nonce;
2458     public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2459       super(operations);
2460       this.nonceGroup = nonceGroup;
2461       this.nonce = nonce;
2462     }
2463 
2464     @Override
2465     public Mutation getMutation(int index) {
2466       return this.operations[index];
2467     }
2468 
2469     @Override
2470     public long getNonceGroup(int index) {
2471       return nonceGroup;
2472     }
2473 
2474     @Override
2475     public long getNonce(int index) {
2476       return nonce;
2477     }
2478 
2479     @Override
2480     public Mutation[] getMutationsForCoprocs() {
2481       return this.operations;
2482     }
2483 
2484     @Override
2485     public boolean isInReplay() {
2486       return false;
2487     }
2488 
2489     @Override
2490     public long getReplaySequenceId() {
2491       return 0;
2492     }
2493   }
2494 
2495   private static class ReplayBatch extends BatchOperationInProgress<MutationReplay> {
2496     private long replaySeqId = 0;
2497     public ReplayBatch(MutationReplay[] operations, long seqId) {
2498       super(operations);
2499       this.replaySeqId = seqId;
2500     }
2501 
2502     @Override
2503     public Mutation getMutation(int index) {
2504       return this.operations[index].mutation;
2505     }
2506 
2507     @Override
2508     public long getNonceGroup(int index) {
2509       return this.operations[index].nonceGroup;
2510     }
2511 
2512     @Override
2513     public long getNonce(int index) {
2514       return this.operations[index].nonce;
2515     }
2516 
2517     @Override
2518     public Mutation[] getMutationsForCoprocs() {
2519       assert false;
2520       throw new RuntimeException("Should not be called for replay batch");
2521     }
2522 
2523     @Override
2524     public boolean isInReplay() {
2525       return true;
2526     }
2527 
2528     @Override
2529     public long getReplaySequenceId() {
2530       return this.replaySeqId;
2531     }
2532   }
2533 
2534   /**
2535    * Perform a batch of mutations.
2536    * It supports only Put and Delete mutations and will ignore other types passed.
2537    * @param mutations the list of mutations
2538    * @return an array of OperationStatus which internally contains the
2539    *         OperationStatusCode and the exceptionMessage if any.
2540    * @throws IOException
2541    */
2542   public OperationStatus[] batchMutate(
2543       Mutation[] mutations, long nonceGroup, long nonce) throws IOException {
2544     // As it stands, this is used for 3 things
2545     //  * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
2546     //  * coprocessor calls (see ex. BulkDeleteEndpoint).
2547     // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
2548     return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2549   }
2550 
2551   public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2552     return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2553   }
2554 
2555   /**
2556    * Replay a batch of mutations.
2557    * @param mutations mutations to replay.
2558    * @param replaySeqId SeqId for current mutations
2559    * @return an array of OperationStatus which internally contains the
2560    *         OperationStatusCode and the exceptionMessage if any.
2561    * @throws IOException
2562    */
2563   public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
2564       throws IOException {
2565     return batchMutate(new ReplayBatch(mutations, replaySeqId));
2566   }
2567 
2568   /**
2569    * Perform a batch of mutations.
2570    * It supports only Put and Delete mutations and will ignore other types passed.
2571    * @param batchOp contains the list of mutations
2572    * @return an array of OperationStatus which internally contains the
2573    *         OperationStatusCode and the exceptionMessage if any.
2574    * @throws IOException
2575    */
2576   OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2577     boolean initialized = false;
2578     Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
2579     startRegionOperation(op);
2580     try {
2581       while (!batchOp.isDone()) {
2582         if (!batchOp.isInReplay()) {
2583           checkReadOnly();
2584         }
2585         checkResources();
2586 
2587         if (!initialized) {
2588           this.writeRequestsCount.add(batchOp.operations.length);
2589           if (!batchOp.isInReplay()) {
2590             doPreMutationHook(batchOp);
2591           }
2592           initialized = true;
2593         }
2594         long addedSize = doMiniBatchMutation(batchOp);
2595         long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2596         if (isFlushSize(newSize)) {
2597           requestFlush();
2598         }
2599       }
2600     } finally {
2601       closeRegionOperation(op);
2602     }
2603     return batchOp.retCodeDetails;
2604   }
2605 
2606 
2607   private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2608       throws IOException {
2609     /* Run coprocessor pre hook outside of locks to avoid deadlock */
2610     WALEdit walEdit = new WALEdit();
2611     if (coprocessorHost != null) {
2612       for (int i = 0 ; i < batchOp.operations.length; i++) {
2613         Mutation m = batchOp.getMutation(i);
2614         if (m instanceof Put) {
2615           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2616             // pre hook says skip this Put
2617             // mark as success and skip in doMiniBatchMutation
2618             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2619           }
2620         } else if (m instanceof Delete) {
2621           Delete curDel = (Delete) m;
2622           if (curDel.getFamilyCellMap().isEmpty()) {
2623             // handle deleting a row case
2624             prepareDelete(curDel);
2625           }
2626           if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2627             // pre hook says skip this Delete
2628             // mark as success and skip in doMiniBatchMutation
2629             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2630           }
2631         } else {
2632           // In case of passing Append mutations along with the Puts and Deletes in batchMutate
2633           // mark the operation return code as failure so that it will not be considered in
2634           // the doMiniBatchMutation
2635           batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2636               "Put/Delete mutations only supported in batchMutate() now");
2637         }
2638         if (!walEdit.isEmpty()) {
2639           batchOp.walEditsFromCoprocessors[i] = walEdit;
2640           walEdit = new WALEdit();
2641         }
2642       }
2643     }
2644   }
2645 
2646   @SuppressWarnings("unchecked")
2647   private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2648     boolean isInReplay = batchOp.isInReplay();
2649     // variable to note if all Put items are for the same CF -- metrics related
2650     boolean putsCfSetConsistent = true;
2651     //The set of columnFamilies first seen for Put.
2652     Set<byte[]> putsCfSet = null;
2653     // variable to note if all Delete items are for the same CF -- metrics related
2654     boolean deletesCfSetConsistent = true;
2655     //The set of columnFamilies first seen for Delete.
2656     Set<byte[]> deletesCfSet = null;
2657 
2658     long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2659     WALEdit walEdit = new WALEdit(isInReplay);
2660     MultiVersionConsistencyControl.WriteEntry w = null;
2661     long txid = 0;
2662     boolean doRollBackMemstore = false;
2663     boolean locked = false;
2664 
2665     /** Keep track of the locks we hold so we can release them in finally clause */
2666     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2667     // reference family maps directly so coprocessors can mutate them if desired
2668     Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2669     List<Cell> memstoreCells = new ArrayList<Cell>();
2670     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
2671     int firstIndex = batchOp.nextIndexToProcess;
2672     int lastIndexExclusive = firstIndex;
2673     boolean success = false;
2674     int noOfPuts = 0, noOfDeletes = 0;
2675     WALKey walKey = null;
2676     long mvccNum = 0;
2677     try {
2678       // ------------------------------------
2679       // STEP 1. Try to acquire as many locks as we can, and ensure
2680       // we acquire at least one.
2681       // ----------------------------------
2682       int numReadyToWrite = 0;
2683       long now = EnvironmentEdgeManager.currentTime();
2684       while (lastIndexExclusive < batchOp.operations.length) {
2685         Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2686         boolean isPutMutation = mutation instanceof Put;
2687 
2688         Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2689         // store the family map reference to allow for mutations
2690         familyMaps[lastIndexExclusive] = familyMap;
2691 
2692         // skip anything that "ran" already
2693         if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2694             != OperationStatusCode.NOT_RUN) {
2695           lastIndexExclusive++;
2696           continue;
2697         }
2698 
2699         try {
2700           if (isPutMutation) {
2701             // Check the families in the put. If bad, skip this one.
2702             if (isInReplay) {
2703               removeNonExistentColumnFamilyForReplay(familyMap);
2704             } else {
2705               checkFamilies(familyMap.keySet());
2706             }
2707             checkTimestamps(mutation.getFamilyCellMap(), now);
2708           } else {
2709             prepareDelete((Delete) mutation);
2710           }
2711         } catch (NoSuchColumnFamilyException nscf) {
2712           LOG.warn("No such column family in batch mutation", nscf);
2713           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2714               OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2715           lastIndexExclusive++;
2716           continue;
2717         } catch (FailedSanityCheckException fsce) {
2718           LOG.warn("Batch Mutation did not pass sanity check", fsce);
2719           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2720               OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2721           lastIndexExclusive++;
2722           continue;
2723         }
2724 
2725         // If we haven't got any rows in our batch, we should block to
2726         // get the next one.
2727         boolean shouldBlock = numReadyToWrite == 0;
2728         RowLock rowLock = null;
2729         try {
2730           rowLock = getRowLockInternal(mutation.getRow(), shouldBlock);
2731         } catch (IOException ioe) {
2732           LOG.warn("Failed getting lock in batch put, row="
2733             + Bytes.toStringBinary(mutation.getRow()), ioe);
2734         }
2735         if (rowLock == null) {
2736           // We failed to grab another lock
2737           assert !shouldBlock : "Should never fail to get lock when blocking";
2738           break; // stop acquiring more rows for this batch
2739         } else {
2740           acquiredRowLocks.add(rowLock);
2741         }
2742 
2743         lastIndexExclusive++;
2744         numReadyToWrite++;
2745 
2746         if (isPutMutation) {
2747           // If Column Families stay consistent through out all of the
2748           // individual puts then metrics can be reported as a mutliput across
2749           // column families in the first put.
2750           if (putsCfSet == null) {
2751             putsCfSet = mutation.getFamilyCellMap().keySet();
2752           } else {
2753             putsCfSetConsistent = putsCfSetConsistent
2754                 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
2755           }
2756         } else {
2757           if (deletesCfSet == null) {
2758             deletesCfSet = mutation.getFamilyCellMap().keySet();
2759           } else {
2760             deletesCfSetConsistent = deletesCfSetConsistent
2761                 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
2762           }
2763         }
2764       }
2765 
2766       // we should record the timestamp only after we have acquired the rowLock,
2767       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
2768       now = EnvironmentEdgeManager.currentTime();
2769       byte[] byteNow = Bytes.toBytes(now);
2770 
2771       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
2772       if (numReadyToWrite <= 0) return 0L;
2773 
2774       // We've now grabbed as many mutations off the list as we can
2775 
2776       // ------------------------------------
2777       // STEP 2. Update any LATEST_TIMESTAMP timestamps
2778       // ----------------------------------
2779       for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
2780         // skip invalid
2781         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2782             != OperationStatusCode.NOT_RUN) continue;
2783 
2784         Mutation mutation = batchOp.getMutation(i);
2785         if (mutation instanceof Put) {
2786           updateCellTimestamps(familyMaps[i].values(), byteNow);
2787           noOfPuts++;
2788         } else {
2789           prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
2790           noOfDeletes++;
2791         }
2792         rewriteCellTags(familyMaps[i], mutation);
2793       }
2794 
2795       lock(this.updatesLock.readLock(), numReadyToWrite);
2796       locked = true;
2797       if(isInReplay) {
2798         mvccNum = batchOp.getReplaySequenceId();
2799       } else {
2800         mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
2801       }
2802       //
2803       // ------------------------------------
2804       // Acquire the latest mvcc number
2805       // ----------------------------------
2806       w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
2807 
2808       // calling the pre CP hook for batch mutation
2809       if (!isInReplay && coprocessorHost != null) {
2810         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2811           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2812           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2813         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2814       }
2815 
2816       // ------------------------------------
2817       // STEP 3. Write back to memstore
2818       // Write to memstore. It is ok to write to memstore
2819       // first without updating the WAL because we do not roll
2820       // forward the memstore MVCC. The MVCC will be moved up when
2821       // the complete operation is done. These changes are not yet
2822       // visible to scanners till we update the MVCC. The MVCC is
2823       // moved only when the sync is complete.
2824       // ----------------------------------
2825       long addedSize = 0;
2826       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2827         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2828             != OperationStatusCode.NOT_RUN) {
2829           continue;
2830         }
2831         doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
2832         addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay);
2833       }
2834 
2835       // ------------------------------------
2836       // STEP 4. Build WAL edit
2837       // ----------------------------------
2838       Durability durability = Durability.USE_DEFAULT;
2839       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2840         // Skip puts that were determined to be invalid during preprocessing
2841         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2842             != OperationStatusCode.NOT_RUN) {
2843           continue;
2844         }
2845         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2846 
2847         Mutation m = batchOp.getMutation(i);
2848         Durability tmpDur = getEffectiveDurability(m.getDurability());
2849         if (tmpDur.ordinal() > durability.ordinal()) {
2850           durability = tmpDur;
2851         }
2852         if (tmpDur == Durability.SKIP_WAL) {
2853           recordMutationWithoutWal(m.getFamilyCellMap());
2854           continue;
2855         }
2856 
2857         long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
2858         // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
2859         // Given how nonces are originally written, these should be contiguous.
2860         // They don't have to be, it will still work, just write more WALEdits than needed.
2861         if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
2862           if (walEdit.size() > 0) {
2863             assert isInReplay;
2864             if (!isInReplay) {
2865               throw new IOException("Multiple nonces per batch and not in replay");
2866             }
2867             // txid should always increase, so having the one from the last call is ok.
2868             // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
2869             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2870               this.htableDescriptor.getTableName(), now, m.getClusterIds(),
2871               currentNonceGroup, currentNonce);
2872             txid = this.wal.append(this.htableDescriptor,  this.getRegionInfo(),  walKey,
2873               walEdit, getSequenceId(), true, null);
2874             walEdit = new WALEdit(isInReplay);
2875             walKey = null;
2876           }
2877           currentNonceGroup = nonceGroup;
2878           currentNonce = nonce;
2879         }
2880 
2881         // Add WAL edits by CP
2882         WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2883         if (fromCP != null) {
2884           for (Cell cell : fromCP.getCells()) {
2885             walEdit.add(cell);
2886           }
2887         }
2888         addFamilyMapToWALEdit(familyMaps[i], walEdit);
2889       }
2890 
2891       // -------------------------
2892       // STEP 5. Append the final edit to WAL. Do not sync wal.
2893       // -------------------------
2894       Mutation mutation = batchOp.getMutation(firstIndex);
2895       if (walEdit.size() > 0) {
2896         // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
2897         walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2898             this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
2899             mutation.getClusterIds(), currentNonceGroup, currentNonce);
2900         if(isInReplay) {
2901           walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
2902         }
2903         txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
2904           getSequenceId(), true, memstoreCells);
2905       }
2906       if(walKey == null){
2907         // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
2908         walKey = this.appendEmptyEdit(this.wal, memstoreCells);
2909       }
2910 
2911       // -------------------------------
2912       // STEP 6. Release row locks, etc.
2913       // -------------------------------
2914       if (locked) {
2915         this.updatesLock.readLock().unlock();
2916         locked = false;
2917       }
2918       releaseRowLocks(acquiredRowLocks);
2919 
2920       // -------------------------
2921       // STEP 7. Sync wal.
2922       // -------------------------
2923       if (txid != 0) {
2924         syncOrDefer(txid, durability);
2925       }
2926 
2927       doRollBackMemstore = false;
2928       // calling the post CP hook for batch mutation
2929       if (!isInReplay && coprocessorHost != null) {
2930         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2931           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2932           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2933         coprocessorHost.postBatchMutate(miniBatchOp);
2934       }
2935 
2936 
2937       // ------------------------------------------------------------------
2938       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
2939       // ------------------------------------------------------------------
2940       if (w != null) {
2941         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2942         w = null;
2943       }
2944 
2945       // ------------------------------------
2946       // STEP 9. Run coprocessor post hooks. This should be done after the wal is
2947       // synced so that the coprocessor contract is adhered to.
2948       // ------------------------------------
2949       if (!isInReplay && coprocessorHost != null) {
2950         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2951           // only for successful puts
2952           if (batchOp.retCodeDetails[i].getOperationStatusCode()
2953               != OperationStatusCode.SUCCESS) {
2954             continue;
2955           }
2956           Mutation m = batchOp.getMutation(i);
2957           if (m instanceof Put) {
2958             coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
2959           } else {
2960             coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
2961           }
2962         }
2963       }
2964 
2965       success = true;
2966       return addedSize;
2967     } finally {
2968       // if the wal sync was unsuccessful, remove keys from memstore
2969       if (doRollBackMemstore) {
2970         rollbackMemstore(memstoreCells);
2971       }
2972       if (w != null) {
2973         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2974       }
2975 
2976       if (locked) {
2977         this.updatesLock.readLock().unlock();
2978       }
2979       releaseRowLocks(acquiredRowLocks);
2980 
2981       // See if the column families were consistent through the whole thing.
2982       // if they were then keep them. If they were not then pass a null.
2983       // null will be treated as unknown.
2984       // Total time taken might be involving Puts and Deletes.
2985       // Split the time for puts and deletes based on the total number of Puts and Deletes.
2986 
2987       if (noOfPuts > 0) {
2988         // There were some Puts in the batch.
2989         if (this.metricsRegion != null) {
2990           this.metricsRegion.updatePut();
2991         }
2992       }
2993       if (noOfDeletes > 0) {
2994         // There were some Deletes in the batch.
2995         if (this.metricsRegion != null) {
2996           this.metricsRegion.updateDelete();
2997         }
2998       }
2999       if (!success) {
3000         for (int i = firstIndex; i < lastIndexExclusive; i++) {
3001           if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
3002             batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
3003           }
3004         }
3005       }
3006       if (coprocessorHost != null && !batchOp.isInReplay()) {
3007         // call the coprocessor hook to do any finalization steps
3008         // after the put is done
3009         MiniBatchOperationInProgress<Mutation> miniBatchOp =
3010             new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3011                 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
3012                 lastIndexExclusive);
3013         coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
3014       }
3015 
3016       batchOp.nextIndexToProcess = lastIndexExclusive;
3017     }
3018   }
3019 
3020   /**
3021    * Returns effective durability from the passed durability and
3022    * the table descriptor.
3023    */
3024   protected Durability getEffectiveDurability(Durability d) {
3025     return d == Durability.USE_DEFAULT ? this.durability : d;
3026   }
3027 
3028   //TODO, Think that gets/puts and deletes should be refactored a bit so that
3029   //the getting of the lock happens before, so that you would just pass it into
3030   //the methods. So in the case of checkAndMutate you could just do lockRow,
3031   //get, put, unlockRow or something
3032   /**
3033    *
3034    * @throws IOException
3035    * @return true if the new put was executed, false otherwise
3036    */
3037   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
3038       CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
3039       boolean writeToWAL)
3040   throws IOException{
3041     checkReadOnly();
3042     //TODO, add check for value length or maybe even better move this to the
3043     //client if this becomes a global setting
3044     checkResources();
3045     boolean isPut = w instanceof Put;
3046     if (!isPut && !(w instanceof Delete))
3047       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
3048           "be Put or Delete");
3049     if (!Bytes.equals(row, w.getRow())) {
3050       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
3051           "getRow must match the passed row");
3052     }
3053 
3054     startRegionOperation();
3055     try {
3056       Get get = new Get(row);
3057       checkFamily(family);
3058       get.addColumn(family, qualifier);
3059 
3060       // Lock row - note that doBatchMutate will relock this row if called
3061       RowLock rowLock = getRowLock(get.getRow());
3062       // wait for all previous transactions to complete (with lock held)
3063       mvcc.waitForPreviousTransactionsComplete();
3064       try {
3065         if (this.getCoprocessorHost() != null) {
3066           Boolean processed = null;
3067           if (w instanceof Put) {
3068             processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
3069                 qualifier, compareOp, comparator, (Put) w);
3070           } else if (w instanceof Delete) {
3071             processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
3072                 qualifier, compareOp, comparator, (Delete) w);
3073           }
3074           if (processed != null) {
3075             return processed;
3076           }
3077         }
3078         List<Cell> result = get(get, false);
3079 
3080         boolean valueIsNull = comparator.getValue() == null ||
3081           comparator.getValue().length == 0;
3082         boolean matches = false;
3083         if (result.size() == 0 && valueIsNull) {
3084           matches = true;
3085         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3086             valueIsNull) {
3087           matches = true;
3088         } else if (result.size() == 1 && !valueIsNull) {
3089           Cell kv = result.get(0);
3090           int compareResult = comparator.compareTo(kv.getValueArray(),
3091               kv.getValueOffset(), kv.getValueLength());
3092           switch (compareOp) {
3093           case LESS:
3094             matches = compareResult < 0;
3095             break;
3096           case LESS_OR_EQUAL:
3097             matches = compareResult <= 0;
3098             break;
3099           case EQUAL:
3100             matches = compareResult == 0;
3101             break;
3102           case NOT_EQUAL:
3103             matches = compareResult != 0;
3104             break;
3105           case GREATER_OR_EQUAL:
3106             matches = compareResult >= 0;
3107             break;
3108           case GREATER:
3109             matches = compareResult > 0;
3110             break;
3111           default:
3112             throw new RuntimeException("Unknown Compare op " + compareOp.name());
3113           }
3114         }
3115         //If matches put the new put or delete the new delete
3116         if (matches) {
3117           // All edits for the given row (across all column families) must
3118           // happen atomically.
3119           doBatchMutate(w);
3120           this.checkAndMutateChecksPassed.increment();
3121           return true;
3122         }
3123         this.checkAndMutateChecksFailed.increment();
3124         return false;
3125       } finally {
3126         rowLock.release();
3127       }
3128     } finally {
3129       closeRegionOperation();
3130     }
3131   }
3132 
3133   //TODO, Think that gets/puts and deletes should be refactored a bit so that
3134   //the getting of the lock happens before, so that you would just pass it into
3135   //the methods. So in the case of checkAndMutate you could just do lockRow,
3136   //get, put, unlockRow or something
3137   /**
3138    *
3139    * @throws IOException
3140    * @return true if the new put was executed, false otherwise
3141    */
3142   public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
3143       CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
3144       boolean writeToWAL)
3145       throws IOException{
3146     checkReadOnly();
3147     //TODO, add check for value length or maybe even better move this to the
3148     //client if this becomes a global setting
3149     checkResources();
3150 
3151     startRegionOperation();
3152     try {
3153       Get get = new Get(row);
3154       checkFamily(family);
3155       get.addColumn(family, qualifier);
3156 
3157       // Lock row - note that doBatchMutate will relock this row if called
3158       RowLock rowLock = getRowLock(get.getRow());
3159       // wait for all previous transactions to complete (with lock held)
3160       mvcc.waitForPreviousTransactionsComplete();
3161       try {
3162         List<Cell> result = get(get, false);
3163 
3164         boolean valueIsNull = comparator.getValue() == null ||
3165             comparator.getValue().length == 0;
3166         boolean matches = false;
3167         if (result.size() == 0 && valueIsNull) {
3168           matches = true;
3169         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3170             valueIsNull) {
3171           matches = true;
3172         } else if (result.size() == 1 && !valueIsNull) {
3173           Cell kv = result.get(0);
3174           int compareResult = comparator.compareTo(kv.getValueArray(),
3175               kv.getValueOffset(), kv.getValueLength());
3176           switch (compareOp) {
3177           case LESS:
3178             matches = compareResult < 0;
3179             break;
3180           case LESS_OR_EQUAL:
3181             matches = compareResult <= 0;
3182             break;
3183           case EQUAL:
3184             matches = compareResult == 0;
3185             break;
3186           case NOT_EQUAL:
3187             matches = compareResult != 0;
3188             break;
3189           case GREATER_OR_EQUAL:
3190             matches = compareResult >= 0;
3191             break;
3192           case GREATER:
3193             matches = compareResult > 0;
3194             break;
3195           default:
3196             throw new RuntimeException("Unknown Compare op " + compareOp.name());
3197           }
3198         }
3199         //If matches put the new put or delete the new delete
3200         if (matches) {
3201           // All edits for the given row (across all column families) must
3202           // happen atomically.
3203           mutateRow(rm);
3204           this.checkAndMutateChecksPassed.increment();
3205           return true;
3206         }
3207         this.checkAndMutateChecksFailed.increment();
3208         return false;
3209       } finally {
3210         rowLock.release();
3211       }
3212     } finally {
3213       closeRegionOperation();
3214     }
3215   }
3216   private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
3217     // Currently this is only called for puts and deletes, so no nonces.
3218     OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
3219         HConstants.NO_NONCE, HConstants.NO_NONCE);
3220     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
3221       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
3222     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
3223       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
3224     }
3225   }
3226 
3227   /**
3228    * Complete taking the snapshot on the region. Writes the region info and adds references to the
3229    * working snapshot directory.
3230    *
3231    * TODO for api consistency, consider adding another version with no {@link ForeignExceptionSnare}
3232    * arg.  (In the future other cancellable HRegion methods could eventually add a
3233    * {@link ForeignExceptionSnare}, or we could do something fancier).
3234    *
3235    * @param desc snapshot description object
3236    * @param exnSnare ForeignExceptionSnare that captures external exceptions in case we need to
3237    *   bail out.  This is allowed to be null and will just be ignored in that case.
3238    * @throws IOException if there is an external or internal error causing the snapshot to fail
3239    */
3240   public void addRegionToSnapshot(SnapshotDescription desc,
3241       ForeignExceptionSnare exnSnare) throws IOException {
3242     Path rootDir = FSUtils.getRootDir(conf);
3243     Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
3244 
3245     SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
3246                                                         snapshotDir, desc, exnSnare);
3247     manifest.addRegion(this);
3248   }
3249 
3250   /**
3251    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the
3252    * provided current timestamp.
3253    * @throws IOException
3254    */
3255   void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
3256       throws IOException {
3257     for (List<Cell> cells: cellItr) {
3258       if (cells == null) continue;
3259       assert cells instanceof RandomAccess;
3260       int listSize = cells.size();
3261       for (int i = 0; i < listSize; i++) {
3262         CellUtil.updateLatestStamp(cells.get(i), now, 0);
3263       }
3264     }
3265   }
3266 
3267   /**
3268    * Possibly rewrite incoming cell tags.
3269    */
3270   void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
3271     // Check if we have any work to do and early out otherwise
3272     // Update these checks as more logic is added here
3273 
3274     if (m.getTTL() == Long.MAX_VALUE) {
3275       return;
3276     }
3277 
3278     // From this point we know we have some work to do
3279 
3280     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
3281       List<Cell> cells = e.getValue();
3282       assert cells instanceof RandomAccess;
3283       int listSize = cells.size();
3284       for (int i = 0; i < listSize; i++) {
3285         Cell cell = cells.get(i);
3286         List<Tag> newTags = new ArrayList<Tag>();
3287         Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
3288           cell.getTagsOffset(), cell.getTagsLength());
3289 
3290         // Carry forward existing tags
3291 
3292         while (tagIterator.hasNext()) {
3293 
3294           // Add any filters or tag specific rewrites here
3295 
3296           newTags.add(tagIterator.next());
3297         }
3298 
3299         // Cell TTL handling
3300 
3301         // Check again if we need to add a cell TTL because early out logic
3302         // above may change when there are more tag based features in core.
3303         if (m.getTTL() != Long.MAX_VALUE) {
3304           // Add a cell TTL tag
3305           newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
3306         }
3307 
3308         // Rewrite the cell with the updated set of tags
3309 
3310         cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
3311           cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3312           cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
3313           cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
3314           cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
3315           newTags));
3316       }
3317     }
3318   }
3319 
3320   /*
3321    * Check if resources to support an update.
3322    *
3323    * We throw RegionTooBusyException if above memstore limit
3324    * and expect client to retry using some kind of backoff
3325   */
3326   private void checkResources() throws RegionTooBusyException {
3327     // If catalog region, do not impose resource constraints or block updates.
3328     if (this.getRegionInfo().isMetaRegion()) return;
3329 
3330     if (this.memstoreSize.get() > this.blockingMemStoreSize) {
3331       blockedRequestsCount.increment();
3332       requestFlush();
3333       throw new RegionTooBusyException("Above memstore limit, " +
3334           "regionName=" + (this.getRegionInfo() == null ? "unknown" :
3335           this.getRegionInfo().getRegionNameAsString()) +
3336           ", server=" + (this.getRegionServerServices() == null ? "unknown" :
3337           this.getRegionServerServices().getServerName()) +
3338           ", memstoreSize=" + memstoreSize.get() +
3339           ", blockingMemStoreSize=" + blockingMemStoreSize);
3340     }
3341   }
3342 
3343   /**
3344    * @throws IOException Throws exception if region is in read-only mode.
3345    */
3346   protected void checkReadOnly() throws IOException {
3347     if (this.writestate.isReadOnly()) {
3348       throw new IOException("region is read only");
3349     }
3350   }
3351 
3352   protected void checkReadsEnabled() throws IOException {
3353     if (!this.writestate.readsEnabled) {
3354       throw new IOException ("The region's reads are disabled. Cannot serve the request");
3355     }
3356   }
3357 
3358   public void setReadsEnabled(boolean readsEnabled) {
3359     this.writestate.setReadsEnabled(readsEnabled);
3360   }
3361 
3362   /**
3363    * Add updates first to the wal and then add values to memstore.
3364    * Warning: Assumption is caller has lock on passed in row.
3365    * @param edits Cell updates by column
3366    * @throws IOException
3367    */
3368   private void put(final byte [] row, byte [] family, List<Cell> edits)
3369   throws IOException {
3370     NavigableMap<byte[], List<Cell>> familyMap;
3371     familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
3372 
3373     familyMap.put(family, edits);
3374     Put p = new Put(row);
3375     p.setFamilyCellMap(familyMap);
3376     doBatchMutate(p);
3377   }
3378 
3379   /**
3380    * Atomically apply the given map of family->edits to the memstore.
3381    * This handles the consistency control on its own, but the caller
3382    * should already have locked updatesLock.readLock(). This also does
3383    * <b>not</b> check the families for validity.
3384    *
3385    * @param familyMap Map of kvs per family
3386    * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
3387    *        If null, then this method internally creates a mvcc transaction.
3388    * @param output newly added KVs into memstore
3389    * @param isInReplay true when adding replayed KVs into memstore
3390    * @return the additional memory usage of the memstore caused by the
3391    * new entries.
3392    * @throws IOException
3393    */
3394   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
3395     long mvccNum, List<Cell> memstoreCells, boolean isInReplay) throws IOException {
3396     long size = 0;
3397 
3398     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3399       byte[] family = e.getKey();
3400       List<Cell> cells = e.getValue();
3401       assert cells instanceof RandomAccess;
3402       Store store = getStore(family);
3403       int listSize = cells.size();
3404       for (int i=0; i < listSize; i++) {
3405         Cell cell = cells.get(i);
3406         CellUtil.setSequenceId(cell, mvccNum);
3407         Pair<Long, Cell> ret = store.add(cell);
3408         size += ret.getFirst();
3409         memstoreCells.add(ret.getSecond());
3410         if(isInReplay) {
3411           // set memstore newly added cells with replay mvcc number
3412           CellUtil.setSequenceId(ret.getSecond(), mvccNum);
3413         }
3414       }
3415     }
3416 
3417      return size;
3418    }
3419 
3420   /**
3421    * Remove all the keys listed in the map from the memstore. This method is
3422    * called when a Put/Delete has updated memstore but subsequently fails to update
3423    * the wal. This method is then invoked to rollback the memstore.
3424    */
3425   private void rollbackMemstore(List<Cell> memstoreCells) {
3426     int kvsRolledback = 0;
3427 
3428     for (Cell cell : memstoreCells) {
3429       byte[] family = CellUtil.cloneFamily(cell);
3430       Store store = getStore(family);
3431       store.rollback(cell);
3432       kvsRolledback++;
3433     }
3434     LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
3435   }
3436 
3437   /**
3438    * Check the collection of families for validity.
3439    * @throws NoSuchColumnFamilyException if a family does not exist.
3440    */
3441   void checkFamilies(Collection<byte[]> families)
3442   throws NoSuchColumnFamilyException {
3443     for (byte[] family : families) {
3444       checkFamily(family);
3445     }
3446   }
3447 
3448   /**
3449    * During replay, there could exist column families which are removed between region server
3450    * failure and replay
3451    */
3452   private void removeNonExistentColumnFamilyForReplay(
3453       final Map<byte[], List<Cell>> familyMap) {
3454     List<byte[]> nonExistentList = null;
3455     for (byte[] family : familyMap.keySet()) {
3456       if (!this.htableDescriptor.hasFamily(family)) {
3457         if (nonExistentList == null) {
3458           nonExistentList = new ArrayList<byte[]>();
3459         }
3460         nonExistentList.add(family);
3461       }
3462     }
3463     if (nonExistentList != null) {
3464       for (byte[] family : nonExistentList) {
3465         // Perhaps schema was changed between crash and replay
3466         LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3467         familyMap.remove(family);
3468       }
3469     }
3470   }
3471 
3472   void checkTimestamps(final Map<byte[], List<Cell>> familyMap,
3473       long now) throws FailedSanityCheckException {
3474     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3475       return;
3476     }
3477     long maxTs = now + timestampSlop;
3478     for (List<Cell> kvs : familyMap.values()) {
3479       assert kvs instanceof RandomAccess;
3480       int listSize  = kvs.size();
3481       for (int i=0; i < listSize; i++) {
3482         Cell cell = kvs.get(i);
3483         // see if the user-side TS is out of range. latest = server-side
3484         long ts = cell.getTimestamp();
3485         if (ts != HConstants.LATEST_TIMESTAMP && ts > maxTs) {
3486           throw new FailedSanityCheckException("Timestamp for KV out of range "
3487               + cell + " (too.new=" + timestampSlop + ")");
3488         }
3489       }
3490     }
3491   }
3492 
3493   /**
3494    * Append the given map of family->edits to a WALEdit data structure.
3495    * This does not write to the WAL itself.
3496    * @param familyMap map of family->edits
3497    * @param walEdit the destination entry to append into
3498    */
3499   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3500       WALEdit walEdit) {
3501     for (List<Cell> edits : familyMap.values()) {
3502       assert edits instanceof RandomAccess;
3503       int listSize = edits.size();
3504       for (int i=0; i < listSize; i++) {
3505         Cell cell = edits.get(i);
3506         walEdit.add(cell);
3507       }
3508     }
3509   }
3510 
3511   private void requestFlush() {
3512     if (this.rsServices == null) {
3513       return;
3514     }
3515     synchronized (writestate) {
3516       if (this.writestate.isFlushRequested()) {
3517         return;
3518       }
3519       writestate.flushRequested = true;
3520     }
3521     // Make request outside of synchronize block; HBASE-818.
3522     this.rsServices.getFlushRequester().requestFlush(this, false);
3523     if (LOG.isDebugEnabled()) {
3524       LOG.debug("Flush requested on " + this);
3525     }
3526   }
3527 
3528   /*
3529    * @param size
3530    * @return True if size is over the flush threshold
3531    */
3532   private boolean isFlushSize(final long size) {
3533     return size > this.memstoreFlushSize;
3534   }
3535 
3536   /**
3537    * Read the edits put under this region by wal splitting process.  Put
3538    * the recovered edits back up into this region.
3539    *
3540    * <p>We can ignore any wal message that has a sequence ID that's equal to or
3541    * lower than minSeqId.  (Because we know such messages are already
3542    * reflected in the HFiles.)
3543    *
3544    * <p>While this is running we are putting pressure on memory yet we are
3545    * outside of our usual accounting because we are not yet an onlined region
3546    * (this stuff is being run as part of Region initialization).  This means
3547    * that if we're up against global memory limits, we'll not be flagged to flush
3548    * because we are not online. We can't be flushed by usual mechanisms anyways;
3549    * we're not yet online so our relative sequenceids are not yet aligned with
3550    * WAL sequenceids -- not till we come up online, post processing of split
3551    * edits.
3552    *
3553    * <p>But to help relieve memory pressure, at least manage our own heap size
3554    * flushing if are in excess of per-region limits.  Flushing, though, we have
3555    * to be careful and avoid using the regionserver/wal sequenceid.  Its running
3556    * on a different line to whats going on in here in this region context so if we
3557    * crashed replaying these edits, but in the midst had a flush that used the
3558    * regionserver wal with a sequenceid in excess of whats going on in here
3559    * in this region and with its split editlogs, then we could miss edits the
3560    * next time we go to recover. So, we have to flush inline, using seqids that
3561    * make sense in a this single region context only -- until we online.
3562    *
3563    * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
3564    * the maxSeqId for the store to be applied, else its skipped.
3565    * @return the sequence id of the last edit added to this region out of the
3566    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3567    * @throws UnsupportedEncodingException
3568    * @throws IOException
3569    */
3570   protected long replayRecoveredEditsIfAny(final Path regiondir,
3571       Map<byte[], Long> maxSeqIdInStores,
3572       final CancelableProgressable reporter, final MonitoredTask status)
3573       throws UnsupportedEncodingException, IOException {
3574     long minSeqIdForTheRegion = -1;
3575     for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3576       if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3577         minSeqIdForTheRegion = maxSeqIdInStore;
3578       }
3579     }
3580     long seqid = minSeqIdForTheRegion;
3581 
3582     FileSystem fs = this.fs.getFileSystem();
3583     NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
3584     if (LOG.isDebugEnabled()) {
3585       LOG.debug("Found " + (files == null ? 0 : files.size())
3586         + " recovered edits file(s) under " + regiondir);
3587     }
3588 
3589     if (files == null || files.isEmpty()) return seqid;
3590 
3591     for (Path edits: files) {
3592       if (edits == null || !fs.exists(edits)) {
3593         LOG.warn("Null or non-existent edits file: " + edits);
3594         continue;
3595       }
3596       if (isZeroLengthThenDelete(fs, edits)) continue;
3597 
3598       long maxSeqId;
3599       String fileName = edits.getName();
3600       maxSeqId = Math.abs(Long.parseLong(fileName));
3601       if (maxSeqId <= minSeqIdForTheRegion) {
3602         if (LOG.isDebugEnabled()) {
3603           String msg = "Maximum sequenceid for this wal is " + maxSeqId
3604             + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3605             + ", skipped the whole file, path=" + edits;
3606           LOG.debug(msg);
3607         }
3608         continue;
3609       }
3610 
3611       try {
3612         // replay the edits. Replay can return -1 if everything is skipped, only update
3613         // if seqId is greater
3614         seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3615       } catch (IOException e) {
3616         boolean skipErrors = conf.getBoolean(
3617             HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3618             conf.getBoolean(
3619                 "hbase.skip.errors",
3620                 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3621         if (conf.get("hbase.skip.errors") != null) {
3622           LOG.warn(
3623               "The property 'hbase.skip.errors' has been deprecated. Please use " +
3624               HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3625         }
3626         if (skipErrors) {
3627           Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3628           LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3629               + "=true so continuing. Renamed " + edits +
3630               " as " + p, e);
3631         } else {
3632           throw e;
3633         }
3634       }
3635     }
3636     // The edits size added into rsAccounting during this replaying will not
3637     // be required any more. So just clear it.
3638     if (this.rsAccounting != null) {
3639       this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
3640     }
3641     if (seqid > minSeqIdForTheRegion) {
3642       // Then we added some edits to memory. Flush and cleanup split edit files.
3643       internalFlushcache(null, seqid, stores.values(), status);
3644     }
3645     // Now delete the content of recovered edits.  We're done w/ them.
3646     for (Path file: files) {
3647       if (!fs.delete(file, false)) {
3648         LOG.error("Failed delete of " + file);
3649       } else {
3650         LOG.debug("Deleted recovered.edits file=" + file);
3651       }
3652     }
3653     return seqid;
3654   }
3655 
3656   /*
3657    * @param edits File of recovered edits.
3658    * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in wal
3659    * must be larger than this to be replayed for each store.
3660    * @param reporter
3661    * @return the sequence id of the last edit added to this region out of the
3662    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3663    * @throws IOException
3664    */
3665   private long replayRecoveredEdits(final Path edits,
3666       Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3667     throws IOException {
3668     String msg = "Replaying edits from " + edits;
3669     LOG.info(msg);
3670     MonitoredTask status = TaskMonitor.get().createStatus(msg);
3671     FileSystem fs = this.fs.getFileSystem();
3672 
3673     status.setStatus("Opening recovered edits");
3674     WAL.Reader reader = null;
3675     try {
3676       reader = WALFactory.createReader(fs, edits, conf);
3677       long currentEditSeqId = -1;
3678       long currentReplaySeqId = -1;
3679       long firstSeqIdInLog = -1;
3680       long skippedEdits = 0;
3681       long editsCount = 0;
3682       long intervalEdits = 0;
3683       WAL.Entry entry;
3684       Store store = null;
3685       boolean reported_once = false;
3686       ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3687 
3688       try {
3689         // How many edits seen before we check elapsed time
3690         int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
3691             2000);
3692         // How often to send a progress report (default 1/2 master timeout)
3693         int period = this.conf.getInt("hbase.hstore.report.period", 300000);
3694         long lastReport = EnvironmentEdgeManager.currentTime();
3695 
3696         while ((entry = reader.next()) != null) {
3697           WALKey key = entry.getKey();
3698           WALEdit val = entry.getEdit();
3699 
3700           if (ng != null) { // some test, or nonces disabled
3701             ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
3702           }
3703 
3704           if (reporter != null) {
3705             intervalEdits += val.size();
3706             if (intervalEdits >= interval) {
3707               // Number of edits interval reached
3708               intervalEdits = 0;
3709               long cur = EnvironmentEdgeManager.currentTime();
3710               if (lastReport + period <= cur) {
3711                 status.setStatus("Replaying edits..." +
3712                     " skipped=" + skippedEdits +
3713                     " edits=" + editsCount);
3714                 // Timeout reached
3715                 if(!reporter.progress()) {
3716                   msg = "Progressable reporter failed, stopping replay";
3717                   LOG.warn(msg);
3718                   status.abort(msg);
3719                   throw new IOException(msg);
3720                 }
3721                 reported_once = true;
3722                 lastReport = cur;
3723               }
3724             }
3725           }
3726 
3727           if (firstSeqIdInLog == -1) {
3728             firstSeqIdInLog = key.getLogSeqNum();
3729           }
3730           if (currentEditSeqId > key.getLogSeqNum()) {
3731             // when this condition is true, it means we have a serious defect because we need to
3732             // maintain increasing SeqId for WAL edits per region
3733             LOG.error("Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key
3734                 + "; edit=" + val);
3735           } else {
3736             currentEditSeqId = key.getLogSeqNum();
3737           }
3738           currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
3739             key.getOrigLogSeqNum() : currentEditSeqId;
3740 
3741           // Start coprocessor replay here. The coprocessor is for each WALEdit
3742           // instead of a KeyValue.
3743           if (coprocessorHost != null) {
3744             status.setStatus("Running pre-WAL-restore hook in coprocessors");
3745             if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
3746               // if bypass this wal entry, ignore it ...
3747               continue;
3748             }
3749           }
3750 
3751           boolean flush = false;
3752           for (Cell cell: val.getCells()) {
3753             // Check this edit is for me. Also, guard against writing the special
3754             // METACOLUMN info such as HBASE::CACHEFLUSH entries
3755             if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY) ||
3756                 !Bytes.equals(key.getEncodedRegionName(),
3757                   this.getRegionInfo().getEncodedNameAsBytes())) {
3758               //this is a special edit, we should handle it
3759               CompactionDescriptor compaction = WALEdit.getCompaction(cell);
3760               if (compaction != null) {
3761                 //replay the compaction
3762                 completeCompactionMarker(compaction);
3763               }
3764 
3765               skippedEdits++;
3766               continue;
3767             }
3768             // Figure which store the edit is meant for.
3769             if (store == null || !CellUtil.matchingFamily(cell, store.getFamily().getName())) {
3770               store = getStore(cell);
3771             }
3772             if (store == null) {
3773               // This should never happen.  Perhaps schema was changed between
3774               // crash and redeploy?
3775               LOG.warn("No family for " + cell);
3776               skippedEdits++;
3777               continue;
3778             }
3779             // Now, figure if we should skip this edit.
3780             if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
3781                 .getName())) {
3782               skippedEdits++;
3783               continue;
3784             }
3785             CellUtil.setSequenceId(cell, currentReplaySeqId);
3786 
3787             // Once we are over the limit, restoreEdit will keep returning true to
3788             // flush -- but don't flush until we've played all the kvs that make up
3789             // the WALEdit.
3790             if (!flush) {
3791               flush = restoreEdit(store, cell);
3792             }
3793 
3794             editsCount++;
3795           }
3796           if (flush) {
3797             internalFlushcache(null, currentEditSeqId, stores.values(), status);
3798           }
3799 
3800           if (coprocessorHost != null) {
3801             coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3802           }
3803         }
3804       } catch (EOFException eof) {
3805         Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3806         msg = "Encountered EOF. Most likely due to Master failure during " +
3807             "wal splitting, so we have this data in another edit.  " +
3808             "Continuing, but renaming " + edits + " as " + p;
3809         LOG.warn(msg, eof);
3810         status.abort(msg);
3811       } catch (IOException ioe) {
3812         // If the IOE resulted from bad file format,
3813         // then this problem is idempotent and retrying won't help
3814         if (ioe.getCause() instanceof ParseException) {
3815           Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3816           msg = "File corruption encountered!  " +
3817               "Continuing, but renaming " + edits + " as " + p;
3818           LOG.warn(msg, ioe);
3819           status.setStatus(msg);
3820         } else {
3821           status.abort(StringUtils.stringifyException(ioe));
3822           // other IO errors may be transient (bad network connection,
3823           // checksum exception on one datanode, etc).  throw & retry
3824           throw ioe;
3825         }
3826       }
3827       if (reporter != null && !reported_once) {
3828         reporter.progress();
3829       }
3830       msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3831         ", firstSequenceIdInLog=" + firstSeqIdInLog +
3832         ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
3833       status.markComplete(msg);
3834       LOG.debug(msg);
3835       return currentEditSeqId;
3836     } finally {
3837       status.cleanup();
3838       if (reader != null) {
3839          reader.close();
3840       }
3841     }
3842   }
3843 
3844   /**
3845    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
3846    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
3847    * See HBASE-2331.
3848    */
3849   void completeCompactionMarker(CompactionDescriptor compaction)
3850       throws IOException {
3851     Store store = this.getStore(compaction.getFamilyName().toByteArray());
3852     if (store == null) {
3853       LOG.warn("Found Compaction WAL edit for deleted family:" +
3854           Bytes.toString(compaction.getFamilyName().toByteArray()));
3855       return;
3856     }
3857     store.completeCompactionMarker(compaction);
3858   }
3859 
3860   /**
3861    * Used by tests
3862    * @param s Store to add edit too.
3863    * @param cell Cell to add.
3864    * @return True if we should flush.
3865    */
3866   protected boolean restoreEdit(final Store s, final Cell cell) {
3867     long kvSize = s.add(cell).getFirst();
3868     if (this.rsAccounting != null) {
3869       rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
3870     }
3871     return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3872   }
3873 
3874   /*
3875    * @param fs
3876    * @param p File to check.
3877    * @return True if file was zero-length (and if so, we'll delete it in here).
3878    * @throws IOException
3879    */
3880   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3881       throws IOException {
3882     FileStatus stat = fs.getFileStatus(p);
3883     if (stat.getLen() > 0) return false;
3884     LOG.warn("File " + p + " is zero-length, deleting.");
3885     fs.delete(p, false);
3886     return true;
3887   }
3888 
3889   protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
3890     return new HStore(this, family, this.conf);
3891   }
3892 
3893   /**
3894    * Return HStore instance.
3895    * Use with caution.  Exposed for use of fixup utilities.
3896    * @param column Name of column family hosted by this region.
3897    * @return Store that goes with the family on passed <code>column</code>.
3898    * TODO: Make this lookup faster.
3899    */
3900   public Store getStore(final byte[] column) {
3901     return this.stores.get(column);
3902   }
3903 
3904   /**
3905    * Return HStore instance. Does not do any copy: as the number of store is limited, we
3906    *  iterate on the list.
3907    */
3908   private Store getStore(Cell cell) {
3909     for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
3910       if (Bytes.equals(
3911           cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3912           famStore.getKey(), 0, famStore.getKey().length)) {
3913         return famStore.getValue();
3914       }
3915     }
3916 
3917     return null;
3918   }
3919 
3920   public Map<byte[], Store> getStores() {
3921     return this.stores;
3922   }
3923 
3924   /**
3925    * Return list of storeFiles for the set of CFs.
3926    * Uses closeLock to prevent the race condition where a region closes
3927    * in between the for loop - closing the stores one by one, some stores
3928    * will return 0 files.
3929    * @return List of storeFiles.
3930    */
3931   public List<String> getStoreFileList(final byte [][] columns)
3932     throws IllegalArgumentException {
3933     List<String> storeFileNames = new ArrayList<String>();
3934     synchronized(closeLock) {
3935       for(byte[] column : columns) {
3936         Store store = this.stores.get(column);
3937         if (store == null) {
3938           throw new IllegalArgumentException("No column family : " +
3939               new String(column) + " available");
3940         }
3941         for (StoreFile storeFile: store.getStorefiles()) {
3942           storeFileNames.add(storeFile.getPath().toString());
3943         }
3944       }
3945     }
3946     return storeFileNames;
3947   }
3948 
3949   //////////////////////////////////////////////////////////////////////////////
3950   // Support code
3951   //////////////////////////////////////////////////////////////////////////////
3952 
3953   /** Make sure this is a valid row for the HRegion */
3954   void checkRow(final byte [] row, String op) throws IOException {
3955     if (!rowIsInRange(getRegionInfo(), row)) {
3956       throw new WrongRegionException("Requested row out of range for " +
3957           op + " on HRegion " + this + ", startKey='" +
3958           Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" +
3959           Bytes.toStringBinary(getEndKey()) + "', row='" +
3960           Bytes.toStringBinary(row) + "'");
3961     }
3962   }
3963 
3964   /**
3965    * Tries to acquire a lock on the given row.
3966    * @param waitForLock if true, will block until the lock is available.
3967    *        Otherwise, just tries to obtain the lock and returns
3968    *        false if unavailable.
3969    * @return the row lock if acquired,
3970    *   null if waitForLock was false and the lock was not acquired
3971    * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
3972    */
3973   public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
3974     startRegionOperation();
3975     try {
3976       return getRowLockInternal(row, waitForLock);
3977     } finally {
3978       closeRegionOperation();
3979     }
3980   }
3981 
3982   /**
3983    * A version of getRowLock(byte[], boolean) to use when a region operation has already been
3984    * started (the calling thread has already acquired the region-close-guard lock).
3985    */
3986   protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
3987     checkRow(row, "row lock");
3988     HashedBytes rowKey = new HashedBytes(row);
3989     RowLockContext rowLockContext = new RowLockContext(rowKey);
3990 
3991     // loop until we acquire the row lock (unless !waitForLock)
3992     while (true) {
3993       RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
3994       if (existingContext == null) {
3995         // Row is not already locked by any thread, use newly created context.
3996         break;
3997       } else if (existingContext.ownedByCurrentThread()) {
3998         // Row is already locked by current thread, reuse existing context instead.
3999         rowLockContext = existingContext;
4000         break;
4001       } else {
4002         if (!waitForLock) {
4003           return null;
4004         }
4005         try {
4006           // Row is already locked by some other thread, give up or wait for it
4007           if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
4008             throw new IOException("Timed out waiting for lock for row: " + rowKey);
4009           }
4010         } catch (InterruptedException ie) {
4011           LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
4012           InterruptedIOException iie = new InterruptedIOException();
4013           iie.initCause(ie);
4014           throw iie;
4015         }
4016       }
4017     }
4018 
4019     // allocate new lock for this thread
4020     return rowLockContext.newLock();
4021   }
4022 
4023   /**
4024    * Acquires a lock on the given row.
4025    * The same thread may acquire multiple locks on the same row.
4026    * @return the acquired row lock
4027    * @throws IOException if the lock could not be acquired after waiting
4028    */
4029   public RowLock getRowLock(byte[] row) throws IOException {
4030     return getRowLock(row, true);
4031   }
4032 
4033   /**
4034    * If the given list of row locks is not null, releases all locks.
4035    */
4036   public void releaseRowLocks(List<RowLock> rowLocks) {
4037     if (rowLocks != null) {
4038       for (RowLock rowLock : rowLocks) {
4039         rowLock.release();
4040       }
4041       rowLocks.clear();
4042     }
4043   }
4044 
4045   /**
4046    * Determines whether multiple column families are present
4047    * Precondition: familyPaths is not null
4048    *
4049    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
4050    */
4051   private static boolean hasMultipleColumnFamilies(
4052       List<Pair<byte[], String>> familyPaths) {
4053     boolean multipleFamilies = false;
4054     byte[] family = null;
4055     for (Pair<byte[], String> pair : familyPaths) {
4056       byte[] fam = pair.getFirst();
4057       if (family == null) {
4058         family = fam;
4059       } else if (!Bytes.equals(family, fam)) {
4060         multipleFamilies = true;
4061         break;
4062       }
4063     }
4064     return multipleFamilies;
4065   }
4066 
4067 
4068   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
4069                                 boolean assignSeqId) throws IOException {
4070     return bulkLoadHFiles(familyPaths, assignSeqId, null);
4071   }
4072 
4073   /**
4074    * Attempts to atomically load a group of hfiles.  This is critical for loading
4075    * rows with multiple column families atomically.
4076    *
4077    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
4078    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
4079    * file about to be bulk loaded
4080    * @return true if successful, false if failed recoverably
4081    * @throws IOException if failed unrecoverably.
4082    */
4083   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
4084       BulkLoadListener bulkLoadListener) throws IOException {
4085     Preconditions.checkNotNull(familyPaths);
4086     // we need writeLock for multi-family bulk load
4087     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
4088     try {
4089       this.writeRequestsCount.increment();
4090 
4091       // There possibly was a split that happened between when the split keys
4092       // were gathered and before the HRegion's write lock was taken.  We need
4093       // to validate the HFile region before attempting to bulk load all of them
4094       List<IOException> ioes = new ArrayList<IOException>();
4095       List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
4096       for (Pair<byte[], String> p : familyPaths) {
4097         byte[] familyName = p.getFirst();
4098         String path = p.getSecond();
4099 
4100         Store store = getStore(familyName);
4101         if (store == null) {
4102           IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
4103               "No such column family " + Bytes.toStringBinary(familyName));
4104           ioes.add(ioe);
4105         } else {
4106           try {
4107             store.assertBulkLoadHFileOk(new Path(path));
4108           } catch (WrongRegionException wre) {
4109             // recoverable (file doesn't fit in region)
4110             failures.add(p);
4111           } catch (IOException ioe) {
4112             // unrecoverable (hdfs problem)
4113             ioes.add(ioe);
4114           }
4115         }
4116       }
4117 
4118       // validation failed because of some sort of IO problem.
4119       if (ioes.size() != 0) {
4120         IOException e = MultipleIOException.createIOException(ioes);
4121         LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
4122         throw e;
4123       }
4124 
4125       // validation failed, bail out before doing anything permanent.
4126       if (failures.size() != 0) {
4127         StringBuilder list = new StringBuilder();
4128         for (Pair<byte[], String> p : failures) {
4129           list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
4130             .append(p.getSecond());
4131         }
4132         // problem when validating
4133         LOG.warn("There was a recoverable bulk load failure likely due to a" +
4134             " split.  These (family, HFile) pairs were not loaded: " + list);
4135         return false;
4136       }
4137 
4138       long seqId = -1;
4139       // We need to assign a sequential ID that's in between two memstores in order to preserve
4140       // the guarantee that all the edits lower than the highest sequential ID from all the
4141       // HFiles are flushed on disk. See HBASE-10958.  The sequence id returned when we flush is
4142       // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
4143       // a sequence id that we can be sure is beyond the last hfile written).
4144       if (assignSeqId) {
4145         FlushResult fs = this.flushcache(true);
4146         if (fs.isFlushSucceeded()) {
4147           seqId = fs.flushSequenceId;
4148         } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
4149           seqId = fs.flushSequenceId;
4150         } else {
4151           throw new IOException("Could not bulk load with an assigned sequential ID because the " +
4152               "flush didn't run. Reason for not flushing: " + fs.failureReason);
4153         }
4154       }
4155 
4156       for (Pair<byte[], String> p : familyPaths) {
4157         byte[] familyName = p.getFirst();
4158         String path = p.getSecond();
4159         Store store = getStore(familyName);
4160         try {
4161           String finalPath = path;
4162           if(bulkLoadListener != null) {
4163             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
4164           }
4165           store.bulkLoadHFile(finalPath, seqId);
4166           if(bulkLoadListener != null) {
4167             bulkLoadListener.doneBulkLoad(familyName, path);
4168           }
4169         } catch (IOException ioe) {
4170           // A failure here can cause an atomicity violation that we currently
4171           // cannot recover from since it is likely a failed HDFS operation.
4172 
4173           // TODO Need a better story for reverting partial failures due to HDFS.
4174           LOG.error("There was a partial failure due to IO when attempting to" +
4175               " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe);
4176           if(bulkLoadListener != null) {
4177             try {
4178               bulkLoadListener.failedBulkLoad(familyName, path);
4179             } catch (Exception ex) {
4180               LOG.error("Error while calling failedBulkLoad for family "+
4181                   Bytes.toString(familyName)+" with path "+path, ex);
4182             }
4183           }
4184           throw ioe;
4185         }
4186       }
4187       return true;
4188     } finally {
4189       closeBulkRegionOperation();
4190     }
4191   }
4192 
4193   @Override
4194   public boolean equals(Object o) {
4195     return o instanceof HRegion && Bytes.equals(this.getRegionName(),
4196                                                 ((HRegion) o).getRegionName());
4197   }
4198 
4199   @Override
4200   public int hashCode() {
4201     return Bytes.hashCode(this.getRegionName());
4202   }
4203 
4204   @Override
4205   public String toString() {
4206     return this.getRegionNameAsString();
4207   }
4208 
4209   /**
4210    * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
4211    */
4212   class RegionScannerImpl implements RegionScanner {
4213     // Package local for testability
4214     KeyValueHeap storeHeap = null;
4215     /** Heap of key-values that are not essential for the provided filters and are thus read
4216      * on demand, if on-demand column family loading is enabled.*/
4217     KeyValueHeap joinedHeap = null;
4218     /**
4219      * If the joined heap data gathering is interrupted due to scan limits, this will
4220      * contain the row for which we are populating the values.*/
4221     protected Cell joinedContinuationRow = null;
4222     // KeyValue indicating that limit is reached when scanning
4223     private final KeyValue KV_LIMIT = new KeyValue();
4224     protected final byte[] stopRow;
4225     private final FilterWrapper filter;
4226     private int batch;
4227     protected int isScan;
4228     private boolean filterClosed = false;
4229     private long readPt;
4230     private long maxResultSize;
4231     protected HRegion region;
4232 
4233     @Override
4234     public HRegionInfo getRegionInfo() {
4235       return region.getRegionInfo();
4236     }
4237 
4238     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
4239         throws IOException {
4240 
4241       this.region = region;
4242       this.maxResultSize = scan.getMaxResultSize();
4243       if (scan.hasFilter()) {
4244         this.filter = new FilterWrapper(scan.getFilter());
4245       } else {
4246         this.filter = null;
4247       }
4248 
4249       this.batch = scan.getBatch();
4250       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
4251         this.stopRow = null;
4252       } else {
4253         this.stopRow = scan.getStopRow();
4254       }
4255       // If we are doing a get, we want to be [startRow,endRow] normally
4256       // it is [startRow,endRow) and if startRow=endRow we get nothing.
4257       this.isScan = scan.isGetScan() ? -1 : 0;
4258 
4259       // synchronize on scannerReadPoints so that nobody calculates
4260       // getSmallestReadPoint, before scannerReadPoints is updated.
4261       IsolationLevel isolationLevel = scan.getIsolationLevel();
4262       synchronized(scannerReadPoints) {
4263         this.readPt = getReadpoint(isolationLevel);
4264         scannerReadPoints.put(this, this.readPt);
4265       }
4266 
4267       // Here we separate all scanners into two lists - scanner that provide data required
4268       // by the filter to operate (scanners list) and all others (joinedScanners list).
4269       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
4270       List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
4271       if (additionalScanners != null) {
4272         scanners.addAll(additionalScanners);
4273       }
4274 
4275       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
4276           scan.getFamilyMap().entrySet()) {
4277         Store store = stores.get(entry.getKey());
4278         KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
4279         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
4280           || this.filter.isFamilyEssential(entry.getKey())) {
4281           scanners.add(scanner);
4282         } else {
4283           joinedScanners.add(scanner);
4284         }
4285       }
4286       initializeKVHeap(scanners, joinedScanners, region);
4287     }
4288 
4289     protected void initializeKVHeap(List<KeyValueScanner> scanners,
4290         List<KeyValueScanner> joinedScanners, HRegion region)
4291         throws IOException {
4292       this.storeHeap = new KeyValueHeap(scanners, region.comparator);
4293       if (!joinedScanners.isEmpty()) {
4294         this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
4295       }
4296     }
4297 
4298     @Override
4299     public long getMaxResultSize() {
4300       return maxResultSize;
4301     }
4302 
4303     @Override
4304     public long getMvccReadPoint() {
4305       return this.readPt;
4306     }
4307 
4308     /**
4309      * Reset both the filter and the old filter.
4310      *
4311      * @throws IOException in case a filter raises an I/O exception.
4312      */
4313     protected void resetFilters() throws IOException {
4314       if (filter != null) {
4315         filter.reset();
4316       }
4317     }
4318 
4319     @Override
4320     public boolean next(List<Cell> outResults)
4321         throws IOException {
4322       // apply the batching limit by default
4323       return next(outResults, batch);
4324     }
4325 
4326     @Override
4327     public synchronized boolean next(List<Cell> outResults, int limit) throws IOException {
4328       if (this.filterClosed) {
4329         throw new UnknownScannerException("Scanner was closed (timed out?) " +
4330             "after we renewed it. Could be caused by a very slow scanner " +
4331             "or a lengthy garbage collection");
4332       }
4333       startRegionOperation(Operation.SCAN);
4334       readRequestsCount.increment();
4335       try {
4336         return nextRaw(outResults, limit);
4337       } finally {
4338         closeRegionOperation(Operation.SCAN);
4339       }
4340     }
4341 
4342     @Override
4343     public boolean nextRaw(List<Cell> outResults)
4344         throws IOException {
4345       return nextRaw(outResults, batch);
4346     }
4347 
4348     @Override
4349     public boolean nextRaw(List<Cell> outResults, int limit) throws IOException {
4350       if (storeHeap == null) {
4351         // scanner is closed
4352         throw new UnknownScannerException("Scanner was closed");
4353       }
4354       boolean returnResult;
4355       if (outResults.isEmpty()) {
4356         // Usually outResults is empty. This is true when next is called
4357         // to handle scan or get operation.
4358         returnResult = nextInternal(outResults, limit);
4359       } else {
4360         List<Cell> tmpList = new ArrayList<Cell>();
4361         returnResult = nextInternal(tmpList, limit);
4362         outResults.addAll(tmpList);
4363       }
4364       resetFilters();
4365       if (isFilterDoneInternal()) {
4366         returnResult = false;
4367       }
4368       return returnResult;
4369     }
4370 
4371     private void populateFromJoinedHeap(List<Cell> results, int limit)
4372         throws IOException {
4373       assert joinedContinuationRow != null;
4374       Cell kv = populateResult(results, this.joinedHeap, limit,
4375           joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
4376           joinedContinuationRow.getRowLength());
4377       if (kv != KV_LIMIT) {
4378         // We are done with this row, reset the continuation.
4379         joinedContinuationRow = null;
4380       }
4381       // As the data is obtained from two independent heaps, we need to
4382       // ensure that result list is sorted, because Result relies on that.
4383       Collections.sort(results, comparator);
4384     }
4385 
4386     /**
4387      * Fetches records with currentRow into results list, until next row or limit (if not -1).
4388      * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
4389      * @param limit Max amount of KVs to place in result list, -1 means no limit.
4390      * @param currentRow Byte array with key we are fetching.
4391      * @param offset offset for currentRow
4392      * @param length length for currentRow
4393      * @return KV_LIMIT if limit reached, next KeyValue otherwise.
4394      */
4395     private Cell populateResult(List<Cell> results, KeyValueHeap heap, int limit,
4396         byte[] currentRow, int offset, short length) throws IOException {
4397       Cell nextKv;
4398       do {
4399         heap.next(results, limit - results.size());
4400         if (limit > 0 && results.size() == limit) {
4401           return KV_LIMIT;
4402         }
4403         nextKv = heap.peek();
4404       } while (nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length));
4405 
4406       return nextKv;
4407     }
4408 
4409     /*
4410      * @return True if a filter rules the scanner is over, done.
4411      */
4412     @Override
4413     public synchronized boolean isFilterDone() throws IOException {
4414       return isFilterDoneInternal();
4415     }
4416 
4417     private boolean isFilterDoneInternal() throws IOException {
4418       return this.filter != null && this.filter.filterAllRemaining();
4419     }
4420 
4421     private boolean nextInternal(List<Cell> results, int limit)
4422     throws IOException {
4423       if (!results.isEmpty()) {
4424         throw new IllegalArgumentException("First parameter should be an empty list");
4425       }
4426       RpcCallContext rpcCall = RpcServer.getCurrentCall();
4427       // The loop here is used only when at some point during the next we determine
4428       // that due to effects of filters or otherwise, we have an empty row in the result.
4429       // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
4430       // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
4431       // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
4432       while (true) {
4433         if (rpcCall != null) {
4434           // If a user specifies a too-restrictive or too-slow scanner, the
4435           // client might time out and disconnect while the server side
4436           // is still processing the request. We should abort aggressively
4437           // in that case.
4438           long afterTime = rpcCall.disconnectSince();
4439           if (afterTime >= 0) {
4440             throw new CallerDisconnectedException(
4441                 "Aborting on region " + getRegionNameAsString() + ", call " +
4442                     this + " after " + afterTime + " ms, since " +
4443                     "caller disconnected");
4444           }
4445         }
4446 
4447         // Let's see what we have in the storeHeap.
4448         Cell current = this.storeHeap.peek();
4449 
4450         byte[] currentRow = null;
4451         int offset = 0;
4452         short length = 0;
4453         if (current != null) {
4454           currentRow = current.getRowArray();
4455           offset = current.getRowOffset();
4456           length = current.getRowLength();
4457         }
4458         boolean stopRow = isStopRow(currentRow, offset, length);
4459         // Check if we were getting data from the joinedHeap and hit the limit.
4460         // If not, then it's main path - getting results from storeHeap.
4461         if (joinedContinuationRow == null) {
4462           // First, check if we are at a stop row. If so, there are no more results.
4463           if (stopRow) {
4464             if (filter != null && filter.hasFilterRow()) {
4465               filter.filterRowCells(results);
4466             }
4467             return false;
4468           }
4469 
4470           // Check if rowkey filter wants to exclude this row. If so, loop to next.
4471           // Technically, if we hit limits before on this row, we don't need this call.
4472           if (filterRowKey(currentRow, offset, length)) {
4473             boolean moreRows = nextRow(currentRow, offset, length);
4474             if (!moreRows) return false;
4475             results.clear();
4476             continue;
4477           }
4478 
4479           Cell nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
4480               length);
4481           // Ok, we are good, let's try to get some results from the main heap.
4482           if (nextKv == KV_LIMIT) {
4483             if (this.filter != null && filter.hasFilterRow()) {
4484               throw new IncompatibleFilterException(
4485                 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
4486             }
4487             return true; // We hit the limit.
4488           }
4489 
4490           stopRow = nextKv == null ||
4491               isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
4492           // save that the row was empty before filters applied to it.
4493           final boolean isEmptyRow = results.isEmpty();
4494 
4495           // We have the part of the row necessary for filtering (all of it, usually).
4496           // First filter with the filterRow(List).
4497           FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
4498           if (filter != null && filter.hasFilterRow()) {
4499             ret = filter.filterRowCellsWithRet(results);
4500           }
4501 
4502           if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
4503             results.clear();
4504             boolean moreRows = nextRow(currentRow, offset, length);
4505             if (!moreRows) return false;
4506 
4507             // This row was totally filtered out, if this is NOT the last row,
4508             // we should continue on. Otherwise, nothing else to do.
4509             if (!stopRow) continue;
4510             return false;
4511           }
4512 
4513           // Ok, we are done with storeHeap for this row.
4514           // Now we may need to fetch additional, non-essential data into row.
4515           // These values are not needed for filter to work, so we postpone their
4516           // fetch to (possibly) reduce amount of data loads from disk.
4517           if (this.joinedHeap != null) {
4518             Cell nextJoinedKv = joinedHeap.peek();
4519             // If joinedHeap is pointing to some other row, try to seek to a correct one.
4520             boolean mayHaveData = (nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv,
4521                 currentRow, offset, length))
4522                 || (this.joinedHeap.requestSeek(
4523                     KeyValueUtil.createFirstOnRow(currentRow, offset, length), true, true)
4524                     && joinedHeap.peek() != null && CellUtil.matchingRow(joinedHeap.peek(),
4525                     currentRow, offset, length));
4526             if (mayHaveData) {
4527               joinedContinuationRow = current;
4528               populateFromJoinedHeap(results, limit);
4529             }
4530           }
4531         } else {
4532           // Populating from the joined heap was stopped by limits, populate some more.
4533           populateFromJoinedHeap(results, limit);
4534         }
4535 
4536         // We may have just called populateFromJoinedMap and hit the limits. If that is
4537         // the case, we need to call it again on the next next() invocation.
4538         if (joinedContinuationRow != null) {
4539           return true;
4540         }
4541 
4542         // Finally, we are done with both joinedHeap and storeHeap.
4543         // Double check to prevent empty rows from appearing in result. It could be
4544         // the case when SingleColumnValueExcludeFilter is used.
4545         if (results.isEmpty()) {
4546           boolean moreRows = nextRow(currentRow, offset, length);
4547           if (!moreRows) return false;
4548           if (!stopRow) continue;
4549         }
4550 
4551         // We are done. Return the result.
4552         return !stopRow;
4553       }
4554     }
4555 
4556     /**
4557      * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines
4558      * both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may
4559      * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns
4560      * true when filterRow(List<KeyValue> kvs) is overridden not the filterRow(). Therefore, the
4561      * filterRow() will be skipped.
4562      */
4563     private boolean filterRow() throws IOException {
4564       // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
4565       // filterRowCells(List<Cell> kvs) so we skip that scenario here.
4566       return filter != null && (!filter.hasFilterRow())
4567           && filter.filterRow();
4568     }
4569 
4570     private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
4571       return filter != null
4572           && filter.filterRowKey(row, offset, length);
4573     }
4574 
4575     protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
4576       assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
4577       Cell next;
4578       while ((next = this.storeHeap.peek()) != null &&
4579              CellUtil.matchingRow(next, currentRow, offset, length)) {
4580         this.storeHeap.next(MOCKED_LIST);
4581       }
4582       resetFilters();
4583       // Calling the hook in CP which allows it to do a fast forward
4584       return this.region.getCoprocessorHost() == null
4585           || this.region.getCoprocessorHost()
4586               .postScannerFilterRow(this, currentRow, offset, length);
4587     }
4588 
4589     protected boolean isStopRow(byte[] currentRow, int offset, short length) {
4590       return currentRow == null ||
4591           (stopRow != null &&
4592           comparator.compareRows(stopRow, 0, stopRow.length,
4593             currentRow, offset, length) <= isScan);
4594     }
4595 
4596     @Override
4597     public synchronized void close() {
4598       if (storeHeap != null) {
4599         storeHeap.close();
4600         storeHeap = null;
4601       }
4602       if (joinedHeap != null) {
4603         joinedHeap.close();
4604         joinedHeap = null;
4605       }
4606       // no need to synchronize here.
4607       scannerReadPoints.remove(this);
4608       this.filterClosed = true;
4609     }
4610 
4611     KeyValueHeap getStoreHeapForTesting() {
4612       return storeHeap;
4613     }
4614 
4615     @Override
4616     public synchronized boolean reseek(byte[] row) throws IOException {
4617       if (row == null) {
4618         throw new IllegalArgumentException("Row cannot be null.");
4619       }
4620       boolean result = false;
4621       startRegionOperation();
4622       try {
4623         KeyValue kv = KeyValueUtil.createFirstOnRow(row);
4624         // use request seek to make use of the lazy seek option. See HBASE-5520
4625         result = this.storeHeap.requestSeek(kv, true, true);
4626         if (this.joinedHeap != null) {
4627           result = this.joinedHeap.requestSeek(kv, true, true) || result;
4628         }
4629       } finally {
4630         closeRegionOperation();
4631       }
4632       return result;
4633     }
4634   }
4635 
4636   // Utility methods
4637   /**
4638    * A utility method to create new instances of HRegion based on the
4639    * {@link HConstants#REGION_IMPL} configuration property.
4640    * @param tableDir qualified path of directory where region should be located,
4641    * usually the table directory.
4642    * @param wal The WAL is the outbound log for any updates to the HRegion
4643    * The wal file is a logfile from the previous execution that's
4644    * custom-computed for this HRegion. The HRegionServer computes and sorts the
4645    * appropriate wal info for this HRegion. If there is a previous file
4646    * (implying that the HRegion has been written-to before), then read it from
4647    * the supplied path.
4648    * @param fs is the filesystem.
4649    * @param conf is global configuration settings.
4650    * @param regionInfo - HRegionInfo that describes the region
4651    * is new), then read them from the supplied path.
4652    * @param htd the table descriptor
4653    * @return the new instance
4654    */
4655   static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
4656       Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
4657       RegionServerServices rsServices) {
4658     try {
4659       @SuppressWarnings("unchecked")
4660       Class<? extends HRegion> regionClass =
4661           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
4662 
4663       Constructor<? extends HRegion> c =
4664           regionClass.getConstructor(Path.class, WAL.class, FileSystem.class,
4665               Configuration.class, HRegionInfo.class, HTableDescriptor.class,
4666               RegionServerServices.class);
4667 
4668       return c.newInstance(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
4669     } catch (Throwable e) {
4670       // todo: what should I throw here?
4671       throw new IllegalStateException("Could not instantiate a region instance.", e);
4672     }
4673   }
4674 
4675   /**
4676    * Convenience method creating new HRegions. Used by createTable and by the
4677    * bootstrap code in the HMaster constructor.
4678    * Note, this method creates an {@link WAL} for the created region. It
4679    * needs to be closed explicitly.  Use {@link HRegion#getWAL()} to get
4680    * access.  <b>When done with a region created using this method, you will
4681    * need to explicitly close the {@link WAL} it created too; it will not be
4682    * done for you.  Not closing the wal will leave at least a daemon thread
4683    * running.</b>  Call {@link #closeHRegion(HRegion)} and it will do
4684    * necessary cleanup for you.
4685    * @param info Info for region to create.
4686    * @param rootDir Root directory for HBase instance
4687    * @return new HRegion
4688    *
4689    * @throws IOException
4690    */
4691   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4692       final Configuration conf, final HTableDescriptor hTableDescriptor)
4693   throws IOException {
4694     return createHRegion(info, rootDir, conf, hTableDescriptor, null);
4695   }
4696 
4697   /**
4698    * This will do the necessary cleanup a call to
4699    * {@link #createHRegion(HRegionInfo, Path, Configuration, HTableDescriptor)}
4700    * requires.  This method will close the region and then close its
4701    * associated {@link WAL} file.  You can still use it if you call the other createHRegion,
4702    * the one that takes an {@link WAL} instance but don't be surprised by the
4703    * call to the {@link WAL#close()} on the {@link WAL} the
4704    * HRegion was carrying.
4705    * @throws IOException
4706    */
4707   public static void closeHRegion(final HRegion r) throws IOException {
4708     if (r == null) return;
4709     r.close();
4710     if (r.getWAL() == null) return;
4711     r.getWAL().close();
4712   }
4713 
4714   /**
4715    * Convenience method creating new HRegions. Used by createTable.
4716    * The {@link WAL} for the created region needs to be closed explicitly.
4717    * Use {@link HRegion#getWAL()} to get access.
4718    *
4719    * @param info Info for region to create.
4720    * @param rootDir Root directory for HBase instance
4721    * @param wal shared WAL
4722    * @param initialize - true to initialize the region
4723    * @return new HRegion
4724    *
4725    * @throws IOException
4726    */
4727   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4728                                       final Configuration conf,
4729                                       final HTableDescriptor hTableDescriptor,
4730                                       final WAL wal,
4731                                       final boolean initialize)
4732       throws IOException {
4733     return createHRegion(info, rootDir, conf, hTableDescriptor,
4734         wal, initialize, false);
4735   }
4736 
4737   /**
4738    * Convenience method creating new HRegions. Used by createTable.
4739    * The {@link WAL} for the created region needs to be closed
4740    * explicitly, if it is not null.
4741    * Use {@link HRegion#getWAL()} to get access.
4742    *
4743    * @param info Info for region to create.
4744    * @param rootDir Root directory for HBase instance
4745    * @param wal shared WAL
4746    * @param initialize - true to initialize the region
4747    * @param ignoreWAL - true to skip generate new wal if it is null, mostly for createTable
4748    * @return new HRegion
4749    * @throws IOException
4750    */
4751   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4752                                       final Configuration conf,
4753                                       final HTableDescriptor hTableDescriptor,
4754                                       final WAL wal,
4755                                       final boolean initialize, final boolean ignoreWAL)
4756       throws IOException {
4757       Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4758       return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, wal, initialize,
4759           ignoreWAL);
4760   }
4761 
4762   /**
4763    * Convenience method creating new HRegions. Used by createTable.
4764    * The {@link WAL} for the created region needs to be closed
4765    * explicitly, if it is not null.
4766    * Use {@link HRegion#getWAL()} to get access.
4767    *
4768    * @param info Info for region to create.
4769    * @param rootDir Root directory for HBase instance
4770    * @param tableDir table directory
4771    * @param wal shared WAL
4772    * @param initialize - true to initialize the region
4773    * @param ignoreWAL - true to skip generate new wal if it is null, mostly for createTable
4774    * @return new HRegion
4775    * @throws IOException
4776    */
4777   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
4778                                       final Configuration conf,
4779                                       final HTableDescriptor hTableDescriptor,
4780                                       final WAL wal,
4781                                       final boolean initialize, final boolean ignoreWAL)
4782       throws IOException {
4783     LOG.info("creating HRegion " + info.getTable().getNameAsString()
4784         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
4785         " Table name == " + info.getTable().getNameAsString());
4786     FileSystem fs = FileSystem.get(conf);
4787     HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
4788     WAL effectiveWAL = wal;
4789     if (wal == null && !ignoreWAL) {
4790       // TODO HBASE-11983 There'll be no roller for this wal?
4791       // The WAL subsystem will use the default rootDir rather than the passed in rootDir
4792       // unless I pass along via the conf.
4793       Configuration confForWAL = new Configuration(conf);
4794       confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
4795       effectiveWAL = (new WALFactory(confForWAL,
4796           Collections.<WALActionsListener>singletonList(new MetricsWAL()),
4797           "hregion-" + RandomStringUtils.randomNumeric(8))).
4798             getWAL(info.getEncodedNameAsBytes());
4799     }
4800     HRegion region = HRegion.newHRegion(tableDir,
4801         effectiveWAL, fs, conf, info, hTableDescriptor, null);
4802     if (initialize) {
4803       // If initializing, set the sequenceId. It is also required by WALPerformanceEvaluation when
4804       // verifying the WALEdits.
4805       region.setSequenceId(region.initialize(null));
4806     }
4807     return region;
4808   }
4809 
4810   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4811                                       final Configuration conf,
4812                                       final HTableDescriptor hTableDescriptor,
4813                                       final WAL wal)
4814     throws IOException {
4815     return createHRegion(info, rootDir, conf, hTableDescriptor, wal, true);
4816   }
4817 
4818 
4819   /**
4820    * Open a Region.
4821    * @param info Info for region to be opened.
4822    * @param wal WAL for region to use. This method will call
4823    * WAL#setSequenceNumber(long) passing the result of the call to
4824    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4825    * up.  HRegionStore does this every time it opens a new region.
4826    * @return new HRegion
4827    *
4828    * @throws IOException
4829    */
4830   public static HRegion openHRegion(final HRegionInfo info,
4831       final HTableDescriptor htd, final WAL wal,
4832       final Configuration conf)
4833   throws IOException {
4834     return openHRegion(info, htd, wal, conf, null, null);
4835   }
4836 
4837   /**
4838    * Open a Region.
4839    * @param info Info for region to be opened
4840    * @param htd the table descriptor
4841    * @param wal WAL for region to use. This method will call
4842    * WAL#setSequenceNumber(long) passing the result of the call to
4843    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4844    * up.  HRegionStore does this every time it opens a new region.
4845    * @param conf The Configuration object to use.
4846    * @param rsServices An interface we can request flushes against.
4847    * @param reporter An interface we can report progress against.
4848    * @return new HRegion
4849    *
4850    * @throws IOException
4851    */
4852   public static HRegion openHRegion(final HRegionInfo info,
4853     final HTableDescriptor htd, final WAL wal, final Configuration conf,
4854     final RegionServerServices rsServices,
4855     final CancelableProgressable reporter)
4856   throws IOException {
4857     return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
4858   }
4859 
4860   /**
4861    * Open a Region.
4862    * @param rootDir Root directory for HBase instance
4863    * @param info Info for region to be opened.
4864    * @param htd the table descriptor
4865    * @param wal WAL for region to use. This method will call
4866    * WAL#setSequenceNumber(long) passing the result of the call to
4867    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4868    * up.  HRegionStore does this every time it opens a new region.
4869    * @param conf The Configuration object to use.
4870    * @return new HRegion
4871    * @throws IOException
4872    */
4873   public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
4874       final HTableDescriptor htd, final WAL wal, final Configuration conf)
4875   throws IOException {
4876     return openHRegion(rootDir, info, htd, wal, conf, null, null);
4877   }
4878 
4879   /**
4880    * Open a Region.
4881    * @param rootDir Root directory for HBase instance
4882    * @param info Info for region to be opened.
4883    * @param htd the table descriptor
4884    * @param wal WAL for region to use. This method will call
4885    * WAL#setSequenceNumber(long) passing the result of the call to
4886    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4887    * up.  HRegionStore does this every time it opens a new region.
4888    * @param conf The Configuration object to use.
4889    * @param rsServices An interface we can request flushes against.
4890    * @param reporter An interface we can report progress against.
4891    * @return new HRegion
4892    * @throws IOException
4893    */
4894   public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
4895       final HTableDescriptor htd, final WAL wal, final Configuration conf,
4896       final RegionServerServices rsServices,
4897       final CancelableProgressable reporter)
4898   throws IOException {
4899     FileSystem fs = null;
4900     if (rsServices != null) {
4901       fs = rsServices.getFileSystem();
4902     }
4903     if (fs == null) {
4904       fs = FileSystem.get(conf);
4905     }
4906     return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
4907   }
4908 
4909   /**
4910    * Open a Region.
4911    * @param conf The Configuration object to use.
4912    * @param fs Filesystem to use
4913    * @param rootDir Root directory for HBase instance
4914    * @param info Info for region to be opened.
4915    * @param htd the table descriptor
4916    * @param wal WAL for region to use. This method will call
4917    * WAL#setSequenceNumber(long) passing the result of the call to
4918    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4919    * up.  HRegionStore does this every time it opens a new region.
4920    * @return new HRegion
4921    * @throws IOException
4922    */
4923   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4924       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal)
4925       throws IOException {
4926     return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
4927   }
4928 
4929   /**
4930    * Open a Region.
4931    * @param conf The Configuration object to use.
4932    * @param fs Filesystem to use
4933    * @param rootDir Root directory for HBase instance
4934    * @param info Info for region to be opened.
4935    * @param htd the table descriptor
4936    * @param wal WAL for region to use. This method will call
4937    * WAL#setSequenceNumber(long) passing the result of the call to
4938    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4939    * up.  HRegionStore does this every time it opens a new region.
4940    * @param rsServices An interface we can request flushes against.
4941    * @param reporter An interface we can report progress against.
4942    * @return new HRegion
4943    * @throws IOException
4944    */
4945   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4946       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal,
4947       final RegionServerServices rsServices, final CancelableProgressable reporter)
4948       throws IOException {
4949     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4950     return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
4951   }
4952 
4953   /**
4954    * Open a Region.
4955    * @param conf The Configuration object to use.
4956    * @param fs Filesystem to use
4957    * @param rootDir Root directory for HBase instance
4958    * @param info Info for region to be opened.
4959    * @param htd the table descriptor
4960    * @param wal WAL for region to use. This method will call
4961    * WAL#setSequenceNumber(long) passing the result of the call to
4962    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4963    * up.  HRegionStore does this every time it opens a new region.
4964    * @param rsServices An interface we can request flushes against.
4965    * @param reporter An interface we can report progress against.
4966    * @return new HRegion
4967    * @throws IOException
4968    */
4969   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4970       final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd,
4971       final WAL wal, final RegionServerServices rsServices,
4972       final CancelableProgressable reporter)
4973       throws IOException {
4974     if (info == null) throw new NullPointerException("Passed region info is null");
4975     if (LOG.isDebugEnabled()) {
4976       LOG.debug("Opening region: " + info);
4977     }
4978     HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
4979     return r.openHRegion(reporter);
4980   }
4981 
4982 
4983   /**
4984    * Useful when reopening a closed region (normally for unit tests)
4985    * @param other original object
4986    * @param reporter An interface we can report progress against.
4987    * @return new HRegion
4988    * @throws IOException
4989    */
4990   public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
4991       throws IOException {
4992     HRegionFileSystem regionFs = other.getRegionFileSystem();
4993     HRegion r = newHRegion(regionFs.getTableDir(), other.getWAL(), regionFs.getFileSystem(),
4994         other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
4995     return r.openHRegion(reporter);
4996   }
4997 
4998   /**
4999    * Open HRegion.
5000    * Calls initialize and sets sequenceId.
5001    * @return Returns <code>this</code>
5002    * @throws IOException
5003    */
5004   protected HRegion openHRegion(final CancelableProgressable reporter)
5005   throws IOException {
5006     // Refuse to open the region if we are missing local compression support
5007     checkCompressionCodecs();
5008     // Refuse to open the region if encryption configuration is incorrect or
5009     // codec support is missing
5010     checkEncryption();
5011     // Refuse to open the region if a required class cannot be loaded
5012     checkClassLoading();
5013     this.openSeqNum = initialize(reporter);
5014     this.setSequenceId(openSeqNum);
5015     if (wal != null && getRegionServerServices() != null) {
5016       writeRegionOpenMarker(wal, openSeqNum);
5017     }
5018     return this;
5019   }
5020 
5021   private void checkCompressionCodecs() throws IOException {
5022     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
5023       CompressionTest.testCompression(fam.getCompression());
5024       CompressionTest.testCompression(fam.getCompactionCompression());
5025     }
5026   }
5027 
5028   private void checkEncryption() throws IOException {
5029     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
5030       EncryptionTest.testEncryption(conf, fam.getEncryptionType(), fam.getEncryptionKey());
5031     }
5032   }
5033 
5034   private void checkClassLoading() throws IOException {
5035     RegionSplitPolicy.getSplitPolicyClass(this.htableDescriptor, conf);
5036     RegionCoprocessorHost.testTableCoprocessorAttrs(conf, this.htableDescriptor);
5037   }
5038 
5039   /**
5040    * Create a daughter region from given a temp directory with the region data.
5041    * @param hri Spec. for daughter region to open.
5042    * @throws IOException
5043    */
5044   HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
5045     // Move the files from the temporary .splits to the final /table/region directory
5046     fs.commitDaughterRegion(hri);
5047 
5048     // Create the daughter HRegion instance
5049     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
5050         this.getBaseConf(), hri, this.getTableDesc(), rsServices);
5051     r.readRequestsCount.set(this.getReadRequestsCount() / 2);
5052     r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
5053     return r;
5054   }
5055 
5056   /**
5057    * Create a merged region given a temp directory with the region data.
5058    * @param region_b another merging region
5059    * @return merged HRegion
5060    * @throws IOException
5061    */
5062   HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
5063       final HRegion region_b) throws IOException {
5064     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
5065         fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
5066         this.getTableDesc(), this.rsServices);
5067     r.readRequestsCount.set(this.getReadRequestsCount()
5068         + region_b.getReadRequestsCount());
5069     r.writeRequestsCount.set(this.getWriteRequestsCount()
5070 
5071         + region_b.getWriteRequestsCount());
5072     this.fs.commitMergedRegion(mergedRegionInfo);
5073     return r;
5074   }
5075 
5076   /**
5077    * Inserts a new region's meta information into the passed
5078    * <code>meta</code> region. Used by the HMaster bootstrap code adding
5079    * new table to hbase:meta table.
5080    *
5081    * @param meta hbase:meta HRegion to be updated
5082    * @param r HRegion to add to <code>meta</code>
5083    *
5084    * @throws IOException
5085    */
5086   // TODO remove since only test and merge use this
5087   public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
5088     meta.checkResources();
5089     // The row key is the region name
5090     byte[] row = r.getRegionName();
5091     final long now = EnvironmentEdgeManager.currentTime();
5092     final List<Cell> cells = new ArrayList<Cell>(2);
5093     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
5094       HConstants.REGIONINFO_QUALIFIER, now,
5095       r.getRegionInfo().toByteArray()));
5096     // Set into the root table the version of the meta table.
5097     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
5098       HConstants.META_VERSION_QUALIFIER, now,
5099       Bytes.toBytes(HConstants.META_VERSION)));
5100     meta.put(row, HConstants.CATALOG_FAMILY, cells);
5101   }
5102 
5103   /**
5104    * Computes the Path of the HRegion
5105    *
5106    * @param tabledir qualified path for table
5107    * @param name ENCODED region name
5108    * @return Path of HRegion directory
5109    */
5110   @Deprecated
5111   public static Path getRegionDir(final Path tabledir, final String name) {
5112     return new Path(tabledir, name);
5113   }
5114 
5115   /**
5116    * Computes the Path of the HRegion
5117    *
5118    * @param rootdir qualified path of HBase root directory
5119    * @param info HRegionInfo for the region
5120    * @return qualified path of region directory
5121    */
5122   @Deprecated
5123   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
5124     return new Path(
5125       FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
5126   }
5127 
5128   /**
5129    * Determines if the specified row is within the row range specified by the
5130    * specified HRegionInfo
5131    *
5132    * @param info HRegionInfo that specifies the row range
5133    * @param row row to be checked
5134    * @return true if the row is within the range specified by the HRegionInfo
5135    */
5136   public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
5137     return ((info.getStartKey().length == 0) ||
5138         (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
5139         ((info.getEndKey().length == 0) ||
5140             (Bytes.compareTo(info.getEndKey(), row) > 0));
5141   }
5142 
5143   /**
5144    * Merge two HRegions.  The regions must be adjacent and must not overlap.
5145    *
5146    * @return new merged HRegion
5147    * @throws IOException
5148    */
5149   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
5150   throws IOException {
5151     HRegion a = srcA;
5152     HRegion b = srcB;
5153 
5154     // Make sure that srcA comes first; important for key-ordering during
5155     // write of the merged file.
5156     if (srcA.getStartKey() == null) {
5157       if (srcB.getStartKey() == null) {
5158         throw new IOException("Cannot merge two regions with null start key");
5159       }
5160       // A's start key is null but B's isn't. Assume A comes before B
5161     } else if ((srcB.getStartKey() == null) ||
5162       (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
5163       a = srcB;
5164       b = srcA;
5165     }
5166 
5167     if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
5168       throw new IOException("Cannot merge non-adjacent regions");
5169     }
5170     return merge(a, b);
5171   }
5172 
5173   /**
5174    * Merge two regions whether they are adjacent or not.
5175    *
5176    * @param a region a
5177    * @param b region b
5178    * @return new merged region
5179    * @throws IOException
5180    */
5181   public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
5182     if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
5183       throw new IOException("Regions do not belong to the same table");
5184     }
5185 
5186     FileSystem fs = a.getRegionFileSystem().getFileSystem();
5187     // Make sure each region's cache is empty
5188     a.flushcache(true);
5189     b.flushcache(true);
5190 
5191     // Compact each region so we only have one store file per family
5192     a.compactStores(true);
5193     if (LOG.isDebugEnabled()) {
5194       LOG.debug("Files for region: " + a);
5195       a.getRegionFileSystem().logFileSystemState(LOG);
5196     }
5197     b.compactStores(true);
5198     if (LOG.isDebugEnabled()) {
5199       LOG.debug("Files for region: " + b);
5200       b.getRegionFileSystem().logFileSystemState(LOG);
5201     }
5202 
5203     RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
5204     if (!rmt.prepare(null)) {
5205       throw new IOException("Unable to merge regions " + a + " and " + b);
5206     }
5207     HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
5208     LOG.info("starting merge of regions: " + a + " and " + b
5209         + " into new region " + mergedRegionInfo.getRegionNameAsString()
5210         + " with start key <"
5211         + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
5212         + "> and end key <"
5213         + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
5214     HRegion dstRegion;
5215     try {
5216       dstRegion = rmt.execute(null, null);
5217     } catch (IOException ioe) {
5218       rmt.rollback(null, null);
5219       throw new IOException("Failed merging region " + a + " and " + b
5220           + ", and successfully rolled back");
5221     }
5222     dstRegion.compactStores(true);
5223 
5224     if (LOG.isDebugEnabled()) {
5225       LOG.debug("Files for new region");
5226       dstRegion.getRegionFileSystem().logFileSystemState(LOG);
5227     }
5228 
5229     if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
5230       throw new IOException("Merged region " + dstRegion
5231           + " still has references after the compaction, is compaction canceled?");
5232     }
5233 
5234     // Archiving the 'A' region
5235     HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
5236     // Archiving the 'B' region
5237     HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
5238 
5239     LOG.info("merge completed. New region is " + dstRegion);
5240     return dstRegion;
5241   }
5242 
5243   //
5244   // HBASE-880
5245   //
5246   /**
5247    * @param get get object
5248    * @return result
5249    * @throws IOException read exceptions
5250    */
5251   public Result get(final Get get) throws IOException {
5252     checkRow(get.getRow(), "Get");
5253     // Verify families are all valid
5254     if (get.hasFamilies()) {
5255       for (byte [] family: get.familySet()) {
5256         checkFamily(family);
5257       }
5258     } else { // Adding all families to scanner
5259       for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
5260         get.addFamily(family);
5261       }
5262     }
5263     List<Cell> results = get(get, true);
5264     boolean stale = this.getRegionInfo().getReplicaId() != 0;
5265     return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
5266   }
5267 
5268   /*
5269    * Do a get based on the get parameter.
5270    * @param withCoprocessor invoke coprocessor or not. We don't want to
5271    * always invoke cp for this private method.
5272    */
5273   public List<Cell> get(Get get, boolean withCoprocessor)
5274   throws IOException {
5275 
5276     List<Cell> results = new ArrayList<Cell>();
5277 
5278     // pre-get CP hook
5279     if (withCoprocessor && (coprocessorHost != null)) {
5280        if (coprocessorHost.preGet(get, results)) {
5281          return results;
5282        }
5283     }
5284 
5285     Scan scan = new Scan(get);
5286 
5287     RegionScanner scanner = null;
5288     try {
5289       scanner = getScanner(scan);
5290       scanner.next(results);
5291     } finally {
5292       if (scanner != null)
5293         scanner.close();
5294     }
5295 
5296     // post-get CP hook
5297     if (withCoprocessor && (coprocessorHost != null)) {
5298       coprocessorHost.postGet(get, results);
5299     }
5300 
5301     // do after lock
5302     if (this.metricsRegion != null) {
5303       long totalSize = 0L;
5304       for (Cell cell : results) {
5305         totalSize += CellUtil.estimatedSerializedSizeOf(cell);
5306       }
5307       this.metricsRegion.updateGet(totalSize);
5308     }
5309 
5310     return results;
5311   }
5312 
5313   public ClientProtos.RegionLoadStats mutateRow(RowMutations rm) throws IOException {
5314     // Don't need nonces here - RowMutations only supports puts and deletes
5315     return mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
5316   }
5317 
5318   /**
5319    * Perform atomic mutations within the region w/o nonces.
5320    * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)}
5321    */
5322   public ClientProtos.RegionLoadStats mutateRowsWithLocks(Collection<Mutation> mutations,
5323       Collection<byte[]> rowsToLock) throws IOException {
5324     return mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
5325   }
5326 
5327   /**
5328    * Perform atomic mutations within the region.
5329    * @param mutations The list of mutations to perform.
5330    * <code>mutations</code> can contain operations for multiple rows.
5331    * Caller has to ensure that all rows are contained in this region.
5332    * @param rowsToLock Rows to lock
5333    * @param nonceGroup Optional nonce group of the operation (client Id)
5334    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5335    * If multiple rows are locked care should be taken that
5336    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
5337    * @throws IOException
5338    */
5339   public ClientProtos.RegionLoadStats mutateRowsWithLocks(Collection<Mutation> mutations,
5340       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
5341     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
5342     processRowsWithLocks(proc, -1, nonceGroup, nonce);
5343     return getRegionStats();
5344   }
5345 
5346   /**
5347    * @return the current load statistics for the the region
5348    */
5349   public ClientProtos.RegionLoadStats getRegionStats() {
5350     if (!regionStatsEnabled) {
5351       return null;
5352     }
5353     ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
5354     stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this
5355         .memstoreFlushSize)));
5356     return stats.build();
5357   }
5358 
5359   /**
5360    * Performs atomic multiple reads and writes on a given row.
5361    *
5362    * @param processor The object defines the reads and writes to a row.
5363    * @param nonceGroup Optional nonce group of the operation (client Id)
5364    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5365    */
5366   public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
5367       throws IOException {
5368     processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
5369   }
5370 
5371   /**
5372    * Performs atomic multiple reads and writes on a given row.
5373    *
5374    * @param processor The object defines the reads and writes to a row.
5375    * @param timeout The timeout of the processor.process() execution
5376    *                Use a negative number to switch off the time bound
5377    * @param nonceGroup Optional nonce group of the operation (client Id)
5378    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5379    */
5380   public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
5381       long nonceGroup, long nonce) throws IOException {
5382 
5383     for (byte[] row : processor.getRowsToLock()) {
5384       checkRow(row, "processRowsWithLocks");
5385     }
5386     if (!processor.readOnly()) {
5387       checkReadOnly();
5388     }
5389     checkResources();
5390 
5391     startRegionOperation();
5392     WALEdit walEdit = new WALEdit();
5393 
5394     // 1. Run pre-process hook
5395     try {
5396       processor.preProcess(this, walEdit);
5397     } catch (IOException e) {
5398       closeRegionOperation();
5399       throw e;
5400     }
5401     // Short circuit the read only case
5402     if (processor.readOnly()) {
5403       try {
5404         long now = EnvironmentEdgeManager.currentTime();
5405         doProcessRowWithTimeout(
5406             processor, now, this, null, null, timeout);
5407         processor.postProcess(this, walEdit, true);
5408       } catch (IOException e) {
5409         throw e;
5410       } finally {
5411         closeRegionOperation();
5412       }
5413       return;
5414     }
5415 
5416     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
5417     boolean locked;
5418     boolean walSyncSuccessful = false;
5419     List<RowLock> acquiredRowLocks;
5420     long addedSize = 0;
5421     List<Mutation> mutations = new ArrayList<Mutation>();
5422     List<Cell> memstoreCells = new ArrayList<Cell>();
5423     Collection<byte[]> rowsToLock = processor.getRowsToLock();
5424     long mvccNum = 0;
5425     WALKey walKey = null;
5426     try {
5427       // 2. Acquire the row lock(s)
5428       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
5429       for (byte[] row : rowsToLock) {
5430         // Attempt to lock all involved rows, throw if any lock times out
5431         acquiredRowLocks.add(getRowLock(row));
5432       }
5433       // 3. Region lock
5434       lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
5435       locked = true;
5436       // Get a mvcc write number
5437       mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5438 
5439       long now = EnvironmentEdgeManager.currentTime();
5440       try {
5441         // 4. Let the processor scan the rows, generate mutations and add
5442         //    waledits
5443         doProcessRowWithTimeout(
5444             processor, now, this, mutations, walEdit, timeout);
5445 
5446         if (!mutations.isEmpty()) {
5447           // 5. Start mvcc transaction
5448           writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5449           // 6. Call the preBatchMutate hook
5450           processor.preBatchMutate(this, walEdit);
5451           // 7. Apply to memstore
5452           for (Mutation m : mutations) {
5453             // Handle any tag based cell features
5454             rewriteCellTags(m.getFamilyCellMap(), m);
5455 
5456             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5457               Cell cell = cellScanner.current();
5458               CellUtil.setSequenceId(cell, mvccNum);
5459               Store store = getStore(cell);
5460               if (store == null) {
5461                 checkFamily(CellUtil.cloneFamily(cell));
5462                 // unreachable
5463               }
5464               Pair<Long, Cell> ret = store.add(cell);
5465               addedSize += ret.getFirst();
5466               memstoreCells.add(ret.getSecond());
5467             }
5468           }
5469 
5470           long txid = 0;
5471           // 8. Append no sync
5472           if (!walEdit.isEmpty()) {
5473             // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
5474             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
5475               this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
5476               processor.getClusterIds(), nonceGroup, nonce);
5477             txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
5478               walKey, walEdit, getSequenceId(), true, memstoreCells);
5479           }
5480           if(walKey == null){
5481             // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
5482             // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
5483             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
5484           }
5485           // 9. Release region lock
5486           if (locked) {
5487             this.updatesLock.readLock().unlock();
5488             locked = false;
5489           }
5490 
5491           // 10. Release row lock(s)
5492           releaseRowLocks(acquiredRowLocks);
5493 
5494           // 11. Sync edit log
5495           if (txid != 0) {
5496             syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
5497           }
5498           walSyncSuccessful = true;
5499           // 12. call postBatchMutate hook
5500           processor.postBatchMutate(this);
5501         }
5502       } finally {
5503         if (!mutations.isEmpty() && !walSyncSuccessful) {
5504           LOG.warn("Wal sync failed. Roll back " + mutations.size() +
5505               " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
5506               processor.getRowsToLock().iterator().next()) + "...");
5507           for (Mutation m : mutations) {
5508             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5509               Cell cell = cellScanner.current();
5510               getStore(cell).rollback(cell);
5511             }
5512           }
5513         }
5514         // 13. Roll mvcc forward
5515         if (writeEntry != null) {
5516           mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
5517         }
5518         if (locked) {
5519           this.updatesLock.readLock().unlock();
5520         }
5521         // release locks if some were acquired but another timed out
5522         releaseRowLocks(acquiredRowLocks);
5523       }
5524 
5525       // 14. Run post-process hook
5526       processor.postProcess(this, walEdit, walSyncSuccessful);
5527 
5528     } catch (IOException e) {
5529       throw e;
5530     } finally {
5531       closeRegionOperation();
5532       if (!mutations.isEmpty() &&
5533           isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
5534         requestFlush();
5535       }
5536     }
5537   }
5538 
5539   private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
5540                                        final long now,
5541                                        final HRegion region,
5542                                        final List<Mutation> mutations,
5543                                        final WALEdit walEdit,
5544                                        final long timeout) throws IOException {
5545     // Short circuit the no time bound case.
5546     if (timeout < 0) {
5547       try {
5548         processor.process(now, region, mutations, walEdit);
5549       } catch (IOException e) {
5550         LOG.warn("RowProcessor:" + processor.getClass().getName() +
5551             " throws Exception on row(s):" +
5552             Bytes.toStringBinary(
5553               processor.getRowsToLock().iterator().next()) + "...", e);
5554         throw e;
5555       }
5556       return;
5557     }
5558 
5559     // Case with time bound
5560     FutureTask<Void> task =
5561       new FutureTask<Void>(new Callable<Void>() {
5562         @Override
5563         public Void call() throws IOException {
5564           try {
5565             processor.process(now, region, mutations, walEdit);
5566             return null;
5567           } catch (IOException e) {
5568             LOG.warn("RowProcessor:" + processor.getClass().getName() +
5569                 " throws Exception on row(s):" +
5570                 Bytes.toStringBinary(
5571                     processor.getRowsToLock().iterator().next()) + "...", e);
5572             throw e;
5573           }
5574         }
5575       });
5576     rowProcessorExecutor.execute(task);
5577     try {
5578       task.get(timeout, TimeUnit.MILLISECONDS);
5579     } catch (TimeoutException te) {
5580       LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
5581           Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
5582           "...");
5583       throw new IOException(te);
5584     } catch (Exception e) {
5585       throw new IOException(e);
5586     }
5587   }
5588 
5589   public Result append(Append append) throws IOException {
5590     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
5591   }
5592 
5593   // TODO: There's a lot of boiler plate code identical to increment.
5594   // We should refactor append and increment as local get-mutate-put
5595   // transactions, so all stores only go through one code path for puts.
5596   /**
5597    * Perform one or more append operations on a row.
5598    *
5599    * @return new keyvalues after increment
5600    * @throws IOException
5601    */
5602   public Result append(Append append, long nonceGroup, long nonce)
5603       throws IOException {
5604     byte[] row = append.getRow();
5605     checkRow(row, "append");
5606     boolean flush = false;
5607     Durability durability = getEffectiveDurability(append.getDurability());
5608     boolean writeToWAL = durability != Durability.SKIP_WAL;
5609     WALEdit walEdits = null;
5610     List<Cell> allKVs = new ArrayList<Cell>(append.size());
5611     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5612     long size = 0;
5613     long txid = 0;
5614 
5615     checkReadOnly();
5616     checkResources();
5617     // Lock row
5618     startRegionOperation(Operation.APPEND);
5619     this.writeRequestsCount.increment();
5620     long mvccNum = 0;
5621     WriteEntry w = null;
5622     WALKey walKey = null;
5623     RowLock rowLock = null;
5624     List<Cell> memstoreCells = new ArrayList<Cell>();
5625     boolean doRollBackMemstore = false;
5626     try {
5627       rowLock = getRowLock(row);
5628       try {
5629         lock(this.updatesLock.readLock());
5630         try {
5631           // wait for all prior MVCC transactions to finish - while we hold the row lock
5632           // (so that we are guaranteed to see the latest state)
5633           mvcc.waitForPreviousTransactionsComplete();
5634           if (this.coprocessorHost != null) {
5635             Result r = this.coprocessorHost.preAppendAfterRowLock(append);
5636             if(r!= null) {
5637               return r;
5638             }
5639           }
5640           // now start my own transaction
5641           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5642           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5643           long now = EnvironmentEdgeManager.currentTime();
5644           // Process each family
5645           for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
5646 
5647             Store store = stores.get(family.getKey());
5648             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5649 
5650             // Sort the cells so that they match the order that they
5651             // appear in the Get results. Otherwise, we won't be able to
5652             // find the existing values if the cells are not specified
5653             // in order by the client since cells are in an array list.
5654             Collections.sort(family.getValue(), store.getComparator());
5655             // Get previous values for all columns in this family
5656             Get get = new Get(row);
5657             for (Cell cell : family.getValue()) {
5658               get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
5659             }
5660             List<Cell> results = get(get, false);
5661             // Iterate the input columns and update existing values if they were
5662             // found, otherwise add new column initialized to the append value
5663 
5664             // Avoid as much copying as possible. We may need to rewrite and
5665             // consolidate tags. Bytes are only copied once.
5666             // Would be nice if KeyValue had scatter/gather logic
5667             int idx = 0;
5668             for (Cell cell : family.getValue()) {
5669               Cell newCell;
5670               Cell oldCell = null;
5671               if (idx < results.size()
5672                   && CellUtil.matchingQualifier(results.get(idx), cell)) {
5673                 oldCell = results.get(idx);
5674                 long ts = Math.max(now, oldCell.getTimestamp());
5675 
5676                 // Process cell tags
5677                 List<Tag> newTags = new ArrayList<Tag>();
5678 
5679                 // Make a union of the set of tags in the old and new KVs
5680 
5681                 if (oldCell.getTagsLength() > 0) {
5682                   Iterator<Tag> i = CellUtil.tagsIterator(oldCell.getTagsArray(),
5683                     oldCell.getTagsOffset(), oldCell.getTagsLength());
5684                   while (i.hasNext()) {
5685                     newTags.add(i.next());
5686                   }
5687                 }
5688                 if (cell.getTagsLength() > 0) {
5689                   Iterator<Tag> i  = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
5690                     cell.getTagsLength());
5691                   while (i.hasNext()) {
5692                     newTags.add(i.next());
5693                   }
5694                 }
5695 
5696                 // Cell TTL handling
5697 
5698                 if (append.getTTL() != Long.MAX_VALUE) {
5699                   // Add the new TTL tag
5700                   newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
5701                 }
5702 
5703                 // Rebuild tags
5704                 byte[] tagBytes = Tag.fromList(newTags);
5705 
5706                 // allocate an empty cell once
5707                 newCell = new KeyValue(row.length, cell.getFamilyLength(),
5708                     cell.getQualifierLength(), ts, KeyValue.Type.Put,
5709                     oldCell.getValueLength() + cell.getValueLength(),
5710                     tagBytes.length);
5711                 // copy in row, family, and qualifier
5712                 System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
5713                   newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
5714                 System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
5715                   newCell.getFamilyArray(), newCell.getFamilyOffset(),
5716                   cell.getFamilyLength());
5717                 System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
5718                   newCell.getQualifierArray(), newCell.getQualifierOffset(),
5719                   cell.getQualifierLength());
5720                 // copy in the value
5721                 System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
5722                   newCell.getValueArray(), newCell.getValueOffset(),
5723                   oldCell.getValueLength());
5724                 System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
5725                   newCell.getValueArray(),
5726                   newCell.getValueOffset() + oldCell.getValueLength(),
5727                   cell.getValueLength());
5728                 // Copy in tag data
5729                 System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
5730                   tagBytes.length);
5731                 idx++;
5732               } else {
5733                 // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP
5734                 CellUtil.updateLatestStamp(cell, now);
5735 
5736                 // Cell TTL handling
5737 
5738                 if (append.getTTL() != Long.MAX_VALUE) {
5739                   List<Tag> newTags = new ArrayList<Tag>(1);
5740                   newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
5741                   // Add the new TTL tag
5742                   newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
5743                       cell.getRowLength(),
5744                     cell.getFamilyArray(), cell.getFamilyOffset(),
5745                       cell.getFamilyLength(),
5746                     cell.getQualifierArray(), cell.getQualifierOffset(),
5747                       cell.getQualifierLength(),
5748                     cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
5749                     cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
5750                     newTags);
5751                 } else {
5752                   newCell = cell;
5753                 }
5754               }
5755 
5756               CellUtil.setSequenceId(newCell, mvccNum);
5757               // Give coprocessors a chance to update the new cell
5758               if (coprocessorHost != null) {
5759                 newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,
5760                     append, oldCell, newCell);
5761               }
5762               kvs.add(newCell);
5763 
5764               // Append update to WAL
5765               if (writeToWAL) {
5766                 if (walEdits == null) {
5767                   walEdits = new WALEdit();
5768                 }
5769                 walEdits.add(newCell);
5770               }
5771             }
5772 
5773             //store the kvs to the temporary memstore before writing WAL
5774             tempMemstore.put(store, kvs);
5775           }
5776 
5777           //Actually write to Memstore now
5778           for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5779             Store store = entry.getKey();
5780             if (store.getFamily().getMaxVersions() == 1) {
5781               // upsert if VERSIONS for this CF == 1
5782               size += store.upsert(entry.getValue(), getSmallestReadPoint());
5783               memstoreCells.addAll(entry.getValue());
5784             } else {
5785               // otherwise keep older versions around
5786               for (Cell cell: entry.getValue()) {
5787                 Pair<Long, Cell> ret = store.add(cell);
5788                 size += ret.getFirst();
5789                 memstoreCells.add(ret.getSecond());
5790                 doRollBackMemstore = true;
5791               }
5792             }
5793             allKVs.addAll(entry.getValue());
5794           }
5795 
5796           // Actually write to WAL now
5797           if (writeToWAL) {
5798             // Using default cluster id, as this can only happen in the originating
5799             // cluster. A slave cluster receives the final value (not the delta)
5800             // as a Put.
5801             // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
5802             walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
5803               this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
5804             txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
5805               this.sequenceId, true, memstoreCells);
5806           } else {
5807             recordMutationWithoutWal(append.getFamilyCellMap());
5808           }
5809           if (walKey == null) {
5810             // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
5811             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
5812           }
5813           size = this.addAndGetGlobalMemstoreSize(size);
5814           flush = isFlushSize(size);
5815         } finally {
5816           this.updatesLock.readLock().unlock();
5817         }
5818       } finally {
5819         rowLock.release();
5820         rowLock = null;
5821       }
5822       // sync the transaction log outside the rowlock
5823       if(txid != 0){
5824         syncOrDefer(txid, durability);
5825       }
5826       doRollBackMemstore = false;
5827     } finally {
5828       if (rowLock != null) {
5829         rowLock.release();
5830       }
5831       // if the wal sync was unsuccessful, remove keys from memstore
5832       if (doRollBackMemstore) {
5833         rollbackMemstore(memstoreCells);
5834       }
5835       if (w != null) {
5836         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5837       }
5838       closeRegionOperation(Operation.APPEND);
5839     }
5840 
5841     if (this.metricsRegion != null) {
5842       this.metricsRegion.updateAppend();
5843     }
5844 
5845     if (flush) {
5846       // Request a cache flush. Do it outside update lock.
5847       requestFlush();
5848     }
5849 
5850 
5851     return append.isReturnResults() ? Result.create(allKVs) : null;
5852   }
5853 
5854   public Result increment(Increment increment) throws IOException {
5855     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
5856   }
5857 
5858   // TODO: There's a lot of boiler plate code identical to append.
5859   // We should refactor append and increment as local get-mutate-put
5860   // transactions, so all stores only go through one code path for puts.
5861   /**
5862    * Perform one or more increment operations on a row.
5863    * @return new keyvalues after increment
5864    * @throws IOException
5865    */
5866   public Result increment(Increment increment, long nonceGroup, long nonce)
5867   throws IOException {
5868     byte [] row = increment.getRow();
5869     checkRow(row, "increment");
5870     TimeRange tr = increment.getTimeRange();
5871     boolean flush = false;
5872     Durability durability = getEffectiveDurability(increment.getDurability());
5873     boolean writeToWAL = durability != Durability.SKIP_WAL;
5874     WALEdit walEdits = null;
5875     List<Cell> allKVs = new ArrayList<Cell>(increment.size());
5876     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5877 
5878     long size = 0;
5879     long txid = 0;
5880 
5881     checkReadOnly();
5882     checkResources();
5883     // Lock row
5884     startRegionOperation(Operation.INCREMENT);
5885     this.writeRequestsCount.increment();
5886     RowLock rowLock = null;
5887     WriteEntry w = null;
5888     WALKey walKey = null;
5889     long mvccNum = 0;
5890     List<Cell> memstoreCells = new ArrayList<Cell>();
5891     boolean doRollBackMemstore = false;
5892     try {
5893       rowLock = getRowLock(row);
5894       try {
5895         lock(this.updatesLock.readLock());
5896         try {
5897           // wait for all prior MVCC transactions to finish - while we hold the row lock
5898           // (so that we are guaranteed to see the latest state)
5899           mvcc.waitForPreviousTransactionsComplete();
5900           if (this.coprocessorHost != null) {
5901             Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
5902             if (r != null) {
5903               return r;
5904             }
5905           }
5906           // now start my own transaction
5907           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5908           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5909           long now = EnvironmentEdgeManager.currentTime();
5910           // Process each family
5911           for (Map.Entry<byte [], List<Cell>> family:
5912               increment.getFamilyCellMap().entrySet()) {
5913 
5914             Store store = stores.get(family.getKey());
5915             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5916 
5917             // Sort the cells so that they match the order that they
5918             // appear in the Get results. Otherwise, we won't be able to
5919             // find the existing values if the cells are not specified
5920             // in order by the client since cells are in an array list.
5921             Collections.sort(family.getValue(), store.getComparator());
5922             // Get previous values for all columns in this family
5923             Get get = new Get(row);
5924             for (Cell cell: family.getValue()) {
5925               get.addColumn(family.getKey(),  CellUtil.cloneQualifier(cell));
5926             }
5927             get.setTimeRange(tr.getMin(), tr.getMax());
5928             List<Cell> results = get(get, false);
5929 
5930             // Iterate the input columns and update existing values if they were
5931             // found, otherwise add new column initialized to the increment amount
5932             int idx = 0;
5933             for (Cell cell: family.getValue()) {
5934               long amount = Bytes.toLong(CellUtil.cloneValue(cell));
5935               boolean noWriteBack = (amount == 0);
5936               List<Tag> newTags = new ArrayList<Tag>();
5937 
5938               // Carry forward any tags that might have been added by a coprocessor
5939               if (cell.getTagsLength() > 0) {
5940                 Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(),
5941                   cell.getTagsOffset(), cell.getTagsLength());
5942                 while (i.hasNext()) {
5943                   newTags.add(i.next());
5944                 }
5945               }
5946 
5947               Cell c = null;
5948               long ts = now;
5949               if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
5950                 c = results.get(idx);
5951                 ts = Math.max(now, c.getTimestamp());
5952                 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
5953                   amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
5954                 } else {
5955                   // throw DoNotRetryIOException instead of IllegalArgumentException
5956                   throw new org.apache.hadoop.hbase.DoNotRetryIOException(
5957                       "Attempted to increment field that isn't 64 bits wide");
5958                 }
5959                 // Carry tags forward from previous version
5960                 if (c.getTagsLength() > 0) {
5961                   Iterator<Tag> i = CellUtil.tagsIterator(c.getTagsArray(),
5962                     c.getTagsOffset(), c.getTagsLength());
5963                   while (i.hasNext()) {
5964                     newTags.add(i.next());
5965                   }
5966                 }
5967                 idx++;
5968               }
5969 
5970               // Append new incremented KeyValue to list
5971               byte[] q = CellUtil.cloneQualifier(cell);
5972               byte[] val = Bytes.toBytes(amount);
5973 
5974               // Add the TTL tag if the mutation carried one
5975               if (increment.getTTL() != Long.MAX_VALUE) {
5976                 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
5977               }
5978 
5979               Cell newKV = new KeyValue(row, 0, row.length,
5980                 family.getKey(), 0, family.getKey().length,
5981                 q, 0, q.length,
5982                 ts,
5983                 KeyValue.Type.Put,
5984                 val, 0, val.length,
5985                 newTags);
5986 
5987               CellUtil.setSequenceId(newKV, mvccNum);
5988 
5989               // Give coprocessors a chance to update the new cell
5990               if (coprocessorHost != null) {
5991                 newKV = coprocessorHost.postMutationBeforeWAL(
5992                     RegionObserver.MutationType.INCREMENT, increment, c, newKV);
5993               }
5994               allKVs.add(newKV);
5995 
5996               if (!noWriteBack) {
5997                 kvs.add(newKV);
5998 
5999                 // Prepare WAL updates
6000                 if (writeToWAL) {
6001                   if (walEdits == null) {
6002                     walEdits = new WALEdit();
6003                   }
6004                   walEdits.add(newKV);
6005                 }
6006               }
6007             }
6008 
6009             //store the kvs to the temporary memstore before writing WAL
6010             if (!kvs.isEmpty()) {
6011               tempMemstore.put(store, kvs);
6012             }
6013           }
6014 
6015           //Actually write to Memstore now
6016           if (!tempMemstore.isEmpty()) {
6017             for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
6018               Store store = entry.getKey();
6019               if (store.getFamily().getMaxVersions() == 1) {
6020                 // upsert if VERSIONS for this CF == 1
6021                 size += store.upsert(entry.getValue(), getSmallestReadPoint());
6022                 memstoreCells.addAll(entry.getValue());
6023               } else {
6024                 // otherwise keep older versions around
6025                 for (Cell cell : entry.getValue()) {
6026                   Pair<Long, Cell> ret = store.add(cell);
6027                   size += ret.getFirst();
6028                   memstoreCells.add(ret.getSecond());
6029                   doRollBackMemstore = true;
6030                 }
6031               }
6032             }
6033             size = this.addAndGetGlobalMemstoreSize(size);
6034             flush = isFlushSize(size);
6035           }
6036 
6037           // Actually write to WAL now
6038           if (walEdits != null && !walEdits.isEmpty()) {
6039             if (writeToWAL) {
6040               // Using default cluster id, as this can only happen in the originating
6041               // cluster. A slave cluster receives the final value (not the delta)
6042               // as a Put.
6043               // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
6044               walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
6045                 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
6046               txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
6047                 walKey, walEdits, getSequenceId(), true, memstoreCells);
6048             } else {
6049               recordMutationWithoutWal(increment.getFamilyCellMap());
6050             }
6051           }
6052           if(walKey == null){
6053             // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
6054             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
6055           }
6056         } finally {
6057           this.updatesLock.readLock().unlock();
6058         }
6059       } finally {
6060         rowLock.release();
6061         rowLock = null;
6062       }
6063       // sync the transaction log outside the rowlock
6064       if(txid != 0){
6065         syncOrDefer(txid, durability);
6066       }
6067       doRollBackMemstore = false;
6068     } finally {
6069       if (rowLock != null) {
6070         rowLock.release();
6071       }
6072       // if the wal sync was unsuccessful, remove keys from memstore
6073       if (doRollBackMemstore) {
6074         rollbackMemstore(memstoreCells);
6075       }
6076       if (w != null) {
6077         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
6078       }
6079       closeRegionOperation(Operation.INCREMENT);
6080       if (this.metricsRegion != null) {
6081         this.metricsRegion.updateIncrement();
6082       }
6083     }
6084 
6085     if (flush) {
6086       // Request a cache flush.  Do it outside update lock.
6087       requestFlush();
6088     }
6089 
6090     return Result.create(allKVs);
6091   }
6092 
6093   //
6094   // New HBASE-880 Helpers
6095   //
6096 
6097   private void checkFamily(final byte [] family)
6098   throws NoSuchColumnFamilyException {
6099     if (!this.htableDescriptor.hasFamily(family)) {
6100       throw new NoSuchColumnFamilyException("Column family " +
6101           Bytes.toString(family) + " does not exist in region " + this
6102           + " in table " + this.htableDescriptor);
6103     }
6104   }
6105 
6106   public static final long FIXED_OVERHEAD = ClassSize.align(
6107       ClassSize.OBJECT +
6108       ClassSize.ARRAY +
6109       44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
6110       (11 * Bytes.SIZEOF_LONG) +
6111       4 * Bytes.SIZEOF_BOOLEAN);
6112 
6113   // woefully out of date - currently missing:
6114   // 1 x HashMap - coprocessorServiceHandlers
6115   // 6 x Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
6116   //   checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,
6117   //   writeRequestsCount
6118   // 1 x HRegion$WriteState - writestate
6119   // 1 x RegionCoprocessorHost - coprocessorHost
6120   // 1 x RegionSplitPolicy - splitPolicy
6121   // 1 x MetricsRegion - metricsRegion
6122   // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper
6123   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
6124       ClassSize.OBJECT + // closeLock
6125       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
6126       (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
6127       (2 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, scannerReadPoints
6128       WriteState.HEAP_SIZE + // writestate
6129       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
6130       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
6131       MultiVersionConsistencyControl.FIXED_SIZE // mvcc
6132       + ClassSize.TREEMAP // maxSeqIdInStores
6133       + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
6134       ;
6135 
6136   @Override
6137   public long heapSize() {
6138     long heapSize = DEEP_OVERHEAD;
6139     for (Store store : this.stores.values()) {
6140       heapSize += store.heapSize();
6141     }
6142     // this does not take into account row locks, recent flushes, mvcc entries, and more
6143     return heapSize;
6144   }
6145 
6146   /*
6147    * This method calls System.exit.
6148    * @param message Message to print out.  May be null.
6149    */
6150   private static void printUsageAndExit(final String message) {
6151     if (message != null && message.length() > 0) System.out.println(message);
6152     System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
6153     System.out.println("Options:");
6154     System.out.println(" major_compact  Pass this option to major compact " +
6155       "passed region.");
6156     System.out.println("Default outputs scan of passed region.");
6157     System.exit(1);
6158   }
6159 
6160   /**
6161    * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
6162    * be available for handling
6163    * {@link HRegion#execService(com.google.protobuf.RpcController,
6164    *    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
6165    *
6166    * <p>
6167    * Only a single instance may be registered per region for a given {@link Service} subclass (the
6168    * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
6169    * After the first registration, subsequent calls with the same service name will fail with
6170    * a return value of {@code false}.
6171    * </p>
6172    * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
6173    * @return {@code true} if the registration was successful, {@code false}
6174    * otherwise
6175    */
6176   public boolean registerService(Service instance) {
6177     /*
6178      * No stacking of instances is allowed for a single service name
6179      */
6180     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
6181     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
6182       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
6183           " already registered, rejecting request from "+instance
6184       );
6185       return false;
6186     }
6187 
6188     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
6189     if (LOG.isDebugEnabled()) {
6190       LOG.debug("Registered coprocessor service: region="+
6191           Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
6192     }
6193     return true;
6194   }
6195 
6196   /**
6197    * Executes a single protocol buffer coprocessor endpoint {@link Service} method using
6198    * the registered protocol handlers.  {@link Service} implementations must be registered via the
6199    * {@link HRegion#registerService(com.google.protobuf.Service)}
6200    * method before they are available.
6201    *
6202    * @param controller an {@code RpcController} implementation to pass to the invoked service
6203    * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
6204    *     and parameters for the method invocation
6205    * @return a protocol buffer {@code Message} instance containing the method's result
6206    * @throws IOException if no registered service handler is found or an error
6207    *     occurs during the invocation
6208    * @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)
6209    */
6210   public Message execService(RpcController controller, CoprocessorServiceCall call)
6211       throws IOException {
6212     String serviceName = call.getServiceName();
6213     String methodName = call.getMethodName();
6214     if (!coprocessorServiceHandlers.containsKey(serviceName)) {
6215       throw new UnknownProtocolException(null,
6216           "No registered coprocessor service found for name "+serviceName+
6217           " in region "+Bytes.toStringBinary(getRegionName()));
6218     }
6219 
6220     Service service = coprocessorServiceHandlers.get(serviceName);
6221     Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
6222     Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
6223     if (methodDesc == null) {
6224       throw new UnknownProtocolException(service.getClass(),
6225           "Unknown method "+methodName+" called on service "+serviceName+
6226               " in region "+Bytes.toStringBinary(getRegionName()));
6227     }
6228 
6229     Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
6230         .mergeFrom(call.getRequest()).build();
6231 
6232     if (coprocessorHost != null) {
6233       request = coprocessorHost.preEndpointInvocation(service, methodName, request);
6234     }
6235 
6236     final Message.Builder responseBuilder =
6237         service.getResponsePrototype(methodDesc).newBuilderForType();
6238     service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
6239       @Override
6240       public void run(Message message) {
6241         if (message != null) {
6242           responseBuilder.mergeFrom(message);
6243         }
6244       }
6245     });
6246 
6247     if (coprocessorHost != null) {
6248       coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
6249     }
6250 
6251     return responseBuilder.build();
6252   }
6253 
6254   /*
6255    * Process table.
6256    * Do major compaction or list content.
6257    * @throws IOException
6258    */
6259   private static void processTable(final FileSystem fs, final Path p,
6260       final WALFactory walFactory, final Configuration c,
6261       final boolean majorCompact)
6262   throws IOException {
6263     HRegion region;
6264     FSTableDescriptors fst = new FSTableDescriptors(c);
6265     // Currently expects tables have one region only.
6266     if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
6267       final WAL wal = walFactory.getMetaWAL(
6268           HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
6269       region = HRegion.newHRegion(p, wal, fs, c,
6270         HRegionInfo.FIRST_META_REGIONINFO,
6271           fst.get(TableName.META_TABLE_NAME), null);
6272     } else {
6273       throw new IOException("Not a known catalog table: " + p.toString());
6274     }
6275     try {
6276       region.initialize(null);
6277       if (majorCompact) {
6278         region.compactStores(true);
6279       } else {
6280         // Default behavior
6281         Scan scan = new Scan();
6282         // scan.addFamily(HConstants.CATALOG_FAMILY);
6283         RegionScanner scanner = region.getScanner(scan);
6284         try {
6285           List<Cell> kvs = new ArrayList<Cell>();
6286           boolean done;
6287           do {
6288             kvs.clear();
6289             done = scanner.next(kvs);
6290             if (kvs.size() > 0) LOG.info(kvs);
6291           } while (done);
6292         } finally {
6293           scanner.close();
6294         }
6295       }
6296     } finally {
6297       region.close();
6298     }
6299   }
6300 
6301   boolean shouldForceSplit() {
6302     return this.splitRequest;
6303   }
6304 
6305   byte[] getExplicitSplitPoint() {
6306     return this.explicitSplitPoint;
6307   }
6308 
6309   void forceSplit(byte[] sp) {
6310     // This HRegion will go away after the forced split is successful
6311     // But if a forced split fails, we need to clear forced split.
6312     this.splitRequest = true;
6313     if (sp != null) {
6314       this.explicitSplitPoint = sp;
6315     }
6316   }
6317 
6318   void clearSplit() {
6319     this.splitRequest = false;
6320     this.explicitSplitPoint = null;
6321   }
6322 
6323   /**
6324    * Give the region a chance to prepare before it is split.
6325    */
6326   protected void prepareToSplit() {
6327     // nothing
6328   }
6329 
6330   /**
6331    * Return the splitpoint. null indicates the region isn't splittable
6332    * If the splitpoint isn't explicitly specified, it will go over the stores
6333    * to find the best splitpoint. Currently the criteria of best splitpoint
6334    * is based on the size of the store.
6335    */
6336   public byte[] checkSplit() {
6337     // Can't split META
6338     if (this.getRegionInfo().isMetaTable() ||
6339         TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
6340       if (shouldForceSplit()) {
6341         LOG.warn("Cannot split meta region in HBase 0.20 and above");
6342       }
6343       return null;
6344     }
6345 
6346     // Can't split region which is in recovering state
6347     if (this.isRecovering()) {
6348       LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
6349       return null;
6350     }
6351 
6352     if (!splitPolicy.shouldSplit()) {
6353       return null;
6354     }
6355 
6356     byte[] ret = splitPolicy.getSplitPoint();
6357 
6358     if (ret != null) {
6359       try {
6360         checkRow(ret, "calculated split");
6361       } catch (IOException e) {
6362         LOG.error("Ignoring invalid split", e);
6363         return null;
6364       }
6365     }
6366     return ret;
6367   }
6368 
6369   /**
6370    * @return The priority that this region should have in the compaction queue
6371    */
6372   public int getCompactPriority() {
6373     int count = Integer.MAX_VALUE;
6374     for (Store store : stores.values()) {
6375       count = Math.min(count, store.getCompactPriority());
6376     }
6377     return count;
6378   }
6379 
6380 
6381   /** @return the coprocessor host */
6382   public RegionCoprocessorHost getCoprocessorHost() {
6383     return coprocessorHost;
6384   }
6385 
6386   /** @param coprocessorHost the new coprocessor host */
6387   public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
6388     this.coprocessorHost = coprocessorHost;
6389   }
6390 
6391   /**
6392    * This method needs to be called before any public call that reads or
6393    * modifies data. It has to be called just before a try.
6394    * #closeRegionOperation needs to be called in the try's finally block
6395    * Acquires a read lock and checks if the region is closing or closed.
6396    * @throws IOException
6397    */
6398   public void startRegionOperation() throws IOException {
6399     startRegionOperation(Operation.ANY);
6400   }
6401 
6402   /**
6403    * @param op The operation is about to be taken on the region
6404    * @throws IOException
6405    */
6406   protected void startRegionOperation(Operation op) throws IOException {
6407     switch (op) {
6408     case GET:  // read operations
6409     case SCAN:
6410       checkReadsEnabled();
6411     case INCREMENT: // write operations
6412     case APPEND:
6413     case SPLIT_REGION:
6414     case MERGE_REGION:
6415     case PUT:
6416     case DELETE:
6417     case BATCH_MUTATE:
6418     case COMPACT_REGION:
6419       // when a region is in recovering state, no read, split or merge is allowed
6420       if (isRecovering() && (this.disallowWritesInRecovering ||
6421               (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
6422         throw new RegionInRecoveryException(this.getRegionNameAsString() +
6423           " is recovering; cannot take reads");
6424       }
6425       break;
6426     default:
6427       break;
6428     }
6429     if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
6430         || op == Operation.COMPACT_REGION) {
6431       // split, merge or compact region doesn't need to check the closing/closed state or lock the
6432       // region
6433       return;
6434     }
6435     if (this.closing.get()) {
6436       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
6437     }
6438     lock(lock.readLock());
6439     if (this.closed.get()) {
6440       lock.readLock().unlock();
6441       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
6442     }
6443     try {
6444       if (coprocessorHost != null) {
6445         coprocessorHost.postStartRegionOperation(op);
6446       }
6447     } catch (Exception e) {
6448       lock.readLock().unlock();
6449       throw new IOException(e);
6450     }
6451   }
6452 
6453   /**
6454    * Closes the lock. This needs to be called in the finally block corresponding
6455    * to the try block of #startRegionOperation
6456    * @throws IOException
6457    */
6458   public void closeRegionOperation() throws IOException {
6459     closeRegionOperation(Operation.ANY);
6460   }
6461 
6462   /**
6463    * Closes the lock. This needs to be called in the finally block corresponding
6464    * to the try block of {@link #startRegionOperation(Operation)}
6465    * @throws IOException
6466    */
6467   public void closeRegionOperation(Operation operation) throws IOException {
6468     lock.readLock().unlock();
6469     if (coprocessorHost != null) {
6470       coprocessorHost.postCloseRegionOperation(operation);
6471     }
6472   }
6473 
6474   /**
6475    * This method needs to be called before any public call that reads or
6476    * modifies stores in bulk. It has to be called just before a try.
6477    * #closeBulkRegionOperation needs to be called in the try's finally block
6478    * Acquires a writelock and checks if the region is closing or closed.
6479    * @throws NotServingRegionException when the region is closing or closed
6480    * @throws RegionTooBusyException if failed to get the lock in time
6481    * @throws InterruptedIOException if interrupted while waiting for a lock
6482    */
6483   private void startBulkRegionOperation(boolean writeLockNeeded)
6484       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
6485     if (this.closing.get()) {
6486       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
6487     }
6488     if (writeLockNeeded) lock(lock.writeLock());
6489     else lock(lock.readLock());
6490     if (this.closed.get()) {
6491       if (writeLockNeeded) lock.writeLock().unlock();
6492       else lock.readLock().unlock();
6493       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
6494     }
6495   }
6496 
6497   /**
6498    * Closes the lock. This needs to be called in the finally block corresponding
6499    * to the try block of #startRegionOperation
6500    */
6501   private void closeBulkRegionOperation(){
6502     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
6503     else lock.readLock().unlock();
6504   }
6505 
6506   /**
6507    * Update counters for numer of puts without wal and the size of possible data loss.
6508    * These information are exposed by the region server metrics.
6509    */
6510   private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
6511     numMutationsWithoutWAL.increment();
6512     if (numMutationsWithoutWAL.get() <= 1) {
6513       LOG.info("writing data to region " + this +
6514                " with WAL disabled. Data may be lost in the event of a crash.");
6515     }
6516 
6517     long mutationSize = 0;
6518     for (List<Cell> cells: familyMap.values()) {
6519       assert cells instanceof RandomAccess;
6520       int listSize = cells.size();
6521       for (int i=0; i < listSize; i++) {
6522         Cell cell = cells.get(i);
6523         // TODO we need include tags length also here.
6524         mutationSize += KeyValueUtil.keyLength(cell) + cell.getValueLength();
6525       }
6526     }
6527 
6528     dataInMemoryWithoutWAL.add(mutationSize);
6529   }
6530 
6531   private void lock(final Lock lock)
6532       throws RegionTooBusyException, InterruptedIOException {
6533     lock(lock, 1);
6534   }
6535 
6536   /**
6537    * Try to acquire a lock.  Throw RegionTooBusyException
6538    * if failed to get the lock in time. Throw InterruptedIOException
6539    * if interrupted while waiting for the lock.
6540    */
6541   private void lock(final Lock lock, final int multiplier)
6542       throws RegionTooBusyException, InterruptedIOException {
6543     try {
6544       final long waitTime = Math.min(maxBusyWaitDuration,
6545           busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
6546       if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
6547         throw new RegionTooBusyException(
6548             "failed to get a lock in " + waitTime + " ms. " +
6549                 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
6550                 this.getRegionInfo().getRegionNameAsString()) +
6551                 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
6552                 this.getRegionServerServices().getServerName()));
6553       }
6554     } catch (InterruptedException ie) {
6555       LOG.info("Interrupted while waiting for a lock");
6556       InterruptedIOException iie = new InterruptedIOException();
6557       iie.initCause(ie);
6558       throw iie;
6559     }
6560   }
6561 
6562   /**
6563    * Calls sync with the given transaction ID if the region's table is not
6564    * deferring it.
6565    * @param txid should sync up to which transaction
6566    * @throws IOException If anything goes wrong with DFS
6567    */
6568   private void syncOrDefer(long txid, Durability durability) throws IOException {
6569     if (this.getRegionInfo().isMetaRegion()) {
6570       this.wal.sync(txid);
6571     } else {
6572       switch(durability) {
6573       case USE_DEFAULT:
6574         // do what table defaults to
6575         if (shouldSyncWAL()) {
6576           this.wal.sync(txid);
6577         }
6578         break;
6579       case SKIP_WAL:
6580         // nothing do to
6581         break;
6582       case ASYNC_WAL:
6583         // nothing do to
6584         break;
6585       case SYNC_WAL:
6586       case FSYNC_WAL:
6587         // sync the WAL edit (SYNC and FSYNC treated the same for now)
6588         this.wal.sync(txid);
6589         break;
6590       }
6591     }
6592   }
6593 
6594   /**
6595    * Check whether we should sync the wal from the table's durability settings
6596    */
6597   private boolean shouldSyncWAL() {
6598     return durability.ordinal() >  Durability.ASYNC_WAL.ordinal();
6599   }
6600 
6601   /**
6602    * A mocked list implementaion - discards all updates.
6603    */
6604   private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
6605 
6606     @Override
6607     public void add(int index, Cell element) {
6608       // do nothing
6609     }
6610 
6611     @Override
6612     public boolean addAll(int index, Collection<? extends Cell> c) {
6613       return false; // this list is never changed as a result of an update
6614     }
6615 
6616     @Override
6617     public KeyValue get(int index) {
6618       throw new UnsupportedOperationException();
6619     }
6620 
6621     @Override
6622     public int size() {
6623       return 0;
6624     }
6625   };
6626 
6627   /**
6628    * Facility for dumping and compacting catalog tables.
6629    * Only does catalog tables since these are only tables we for sure know
6630    * schema on.  For usage run:
6631    * <pre>
6632    *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
6633    * </pre>
6634    * @throws IOException
6635    */
6636   public static void main(String[] args) throws IOException {
6637     if (args.length < 1) {
6638       printUsageAndExit(null);
6639     }
6640     boolean majorCompact = false;
6641     if (args.length > 1) {
6642       if (!args[1].toLowerCase().startsWith("major")) {
6643         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
6644       }
6645       majorCompact = true;
6646     }
6647     final Path tableDir = new Path(args[0]);
6648     final Configuration c = HBaseConfiguration.create();
6649     final FileSystem fs = FileSystem.get(c);
6650     final Path logdir = new Path(c.get("hbase.tmp.dir"));
6651     final String logname = "wal" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
6652 
6653     final Configuration walConf = new Configuration(c);
6654     FSUtils.setRootDir(walConf, logdir);
6655     final WALFactory wals = new WALFactory(walConf, null, logname);
6656     try {
6657       processTable(fs, tableDir, wals, c, majorCompact);
6658     } finally {
6659        wals.close();
6660        // TODO: is this still right?
6661        BlockCache bc = new CacheConfig(c).getBlockCache();
6662        if (bc != null) bc.shutdown();
6663     }
6664   }
6665 
6666   /**
6667    * Gets the latest sequence number that was read from storage when this region was opened.
6668    */
6669   public long getOpenSeqNum() {
6670     return this.openSeqNum;
6671   }
6672 
6673   /**
6674    * Gets max sequence ids of stores that was read from storage when this region was opened. WAL
6675    * Edits with smaller or equal sequence number will be skipped from replay.
6676    */
6677   public Map<byte[], Long> getMaxStoreSeqIdForLogReplay() {
6678     return this.maxSeqIdInStores;
6679   }
6680 
6681   @VisibleForTesting
6682   public long getOldestSeqIdOfStore(byte[] familyName) {
6683     return wal.getEarliestMemstoreSeqNum(getRegionInfo()
6684         .getEncodedNameAsBytes(), familyName);
6685   }
6686 
6687   /**
6688    * @return if a given region is in compaction now.
6689    */
6690   public CompactionState getCompactionState() {
6691     boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
6692     return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
6693         : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
6694   }
6695 
6696   public void reportCompactionRequestStart(boolean isMajor){
6697     (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
6698   }
6699 
6700   public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
6701     int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
6702 
6703     // metrics
6704     compactionsFinished.incrementAndGet();
6705     compactionNumFilesCompacted.addAndGet(numFiles);
6706     compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
6707 
6708     assert newValue >= 0;
6709   }
6710 
6711   /**
6712    * Do not change this sequence id. See {@link #sequenceId} comment.
6713    * @return sequenceId
6714    */
6715   @VisibleForTesting
6716   public AtomicLong getSequenceId() {
6717     return this.sequenceId;
6718   }
6719 
6720   /**
6721    * sets this region's sequenceId.
6722    * @param value new value
6723    */
6724   private void setSequenceId(long value) {
6725     this.sequenceId.set(value);
6726   }
6727 
6728   /**
6729    * Listener class to enable callers of
6730    * bulkLoadHFile() to perform any necessary
6731    * pre/post processing of a given bulkload call
6732    */
6733   public interface BulkLoadListener {
6734 
6735     /**
6736      * Called before an HFile is actually loaded
6737      * @param family family being loaded to
6738      * @param srcPath path of HFile
6739      * @return final path to be used for actual loading
6740      * @throws IOException
6741      */
6742     String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
6743 
6744     /**
6745      * Called after a successful HFile load
6746      * @param family family being loaded to
6747      * @param srcPath path of HFile
6748      * @throws IOException
6749      */
6750     void doneBulkLoad(byte[] family, String srcPath) throws IOException;
6751 
6752     /**
6753      * Called after a failed HFile load
6754      * @param family family being loaded to
6755      * @param srcPath path of HFile
6756      * @throws IOException
6757      */
6758     void failedBulkLoad(byte[] family, String srcPath) throws IOException;
6759   }
6760 
6761   @VisibleForTesting class RowLockContext {
6762     private final HashedBytes row;
6763     private final CountDownLatch latch = new CountDownLatch(1);
6764     private final Thread thread;
6765     private int lockCount = 0;
6766 
6767     RowLockContext(HashedBytes row) {
6768       this.row = row;
6769       this.thread = Thread.currentThread();
6770     }
6771 
6772     boolean ownedByCurrentThread() {
6773       return thread == Thread.currentThread();
6774     }
6775 
6776     RowLock newLock() {
6777       lockCount++;
6778       return new RowLock(this);
6779     }
6780 
6781     @Override
6782     public String toString() {
6783       Thread t = this.thread;
6784       return "Thread=" + (t == null? "null": t.getName()) + ", row=" + this.row +
6785         ", lockCount=" + this.lockCount;
6786     }
6787 
6788     void releaseLock() {
6789       if (!ownedByCurrentThread()) {
6790         throw new IllegalArgumentException("Lock held by thread: " + thread
6791           + " cannot be released by different thread: " + Thread.currentThread());
6792       }
6793       lockCount--;
6794       if (lockCount == 0) {
6795         // no remaining locks by the thread, unlock and allow other threads to access
6796         RowLockContext existingContext = lockedRows.remove(row);
6797         if (existingContext != this) {
6798           throw new RuntimeException(
6799               "Internal row lock state inconsistent, should not happen, row: " + row);
6800         }
6801         latch.countDown();
6802       }
6803     }
6804   }
6805 
6806   /**
6807    * Row lock held by a given thread.
6808    * One thread may acquire multiple locks on the same row simultaneously.
6809    * The locks must be released by calling release() from the same thread.
6810    */
6811   public static class RowLock {
6812     @VisibleForTesting final RowLockContext context;
6813     private boolean released = false;
6814 
6815     @VisibleForTesting RowLock(RowLockContext context) {
6816       this.context = context;
6817     }
6818 
6819     /**
6820      * Release the given lock.  If there are no remaining locks held by the current thread
6821      * then unlock the row and allow other threads to acquire the lock.
6822      * @throws IllegalArgumentException if called by a different thread than the lock owning thread
6823      */
6824     public void release() {
6825       if (!released) {
6826         context.releaseLock();
6827         released = true;
6828       }
6829     }
6830   }
6831 
6832   /**
6833    * Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore
6834    * the WALEdit append later.
6835    * @param wal
6836    * @param cells list of Cells inserted into memstore. Those Cells are passed in order to
6837    *        be updated with right mvcc values(their wal sequence number)
6838    * @return Return the key used appending with no sync and no append.
6839    * @throws IOException
6840    */
6841   private WALKey appendEmptyEdit(final WAL wal, List<Cell> cells) throws IOException {
6842     // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
6843     WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
6844       WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
6845     // Call append but with an empty WALEdit.  The returned seqeunce id will not be associated
6846     // with any edit and we can be sure it went in after all outstanding appends.
6847     wal.append(getTableDesc(), getRegionInfo(), key,
6848       WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
6849     return key;
6850   }
6851 
6852   /**
6853    * Explictly sync wal
6854    * @throws IOException
6855    */
6856   public void syncWal() throws IOException {
6857     if(this.wal != null) {
6858       this.wal.sync();
6859     }
6860   }
6861 
6862   /**
6863    * {@inheritDoc}
6864    */
6865   @Override
6866   public void onConfigurationChange(Configuration conf) {
6867     // Do nothing for now.
6868   }
6869 
6870   /**
6871    * {@inheritDoc}
6872    */
6873   @Override
6874   public void registerChildren(ConfigurationManager manager) {
6875     configurationManager = Optional.of(manager);
6876     for (Store s : this.stores.values()) {
6877       configurationManager.get().registerObserver(s);
6878     }
6879   }
6880 
6881   /**
6882    * {@inheritDoc}
6883    */
6884   @Override
6885   public void deregisterChildren(ConfigurationManager manager) {
6886     for (Store s : this.stores.values()) {
6887       configurationManager.get().deregisterObserver(s);
6888     }
6889   }
6890 }