View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.io.UnsupportedEncodingException;
25  import java.lang.reflect.Constructor;
26  import java.text.ParseException;
27  import java.util.AbstractList;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.HashSet;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.NavigableMap;
38  import java.util.NavigableSet;
39  import java.util.RandomAccess;
40  import java.util.Set;
41  import java.util.TreeMap;
42  import java.util.concurrent.Callable;
43  import java.util.concurrent.CompletionService;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.ConcurrentMap;
46  import java.util.concurrent.ConcurrentSkipListMap;
47  import java.util.concurrent.CountDownLatch;
48  import java.util.concurrent.ExecutionException;
49  import java.util.concurrent.ExecutorCompletionService;
50  import java.util.concurrent.ExecutorService;
51  import java.util.concurrent.Executors;
52  import java.util.concurrent.Future;
53  import java.util.concurrent.FutureTask;
54  import java.util.concurrent.ThreadFactory;
55  import java.util.concurrent.ThreadPoolExecutor;
56  import java.util.concurrent.TimeUnit;
57  import java.util.concurrent.TimeoutException;
58  import java.util.concurrent.atomic.AtomicBoolean;
59  import java.util.concurrent.atomic.AtomicInteger;
60  import java.util.concurrent.atomic.AtomicLong;
61  import java.util.concurrent.locks.Lock;
62  import java.util.concurrent.locks.ReentrantReadWriteLock;
63  
64  import com.google.protobuf.ByteString;
65  import org.apache.commons.logging.Log;
66  import org.apache.commons.logging.LogFactory;
67  import org.apache.hadoop.conf.Configuration;
68  import org.apache.hadoop.fs.FileStatus;
69  import org.apache.hadoop.fs.FileSystem;
70  import org.apache.hadoop.fs.Path;
71  import org.apache.hadoop.hbase.Cell;
72  import org.apache.hadoop.hbase.CellScanner;
73  import org.apache.hadoop.hbase.CellUtil;
74  import org.apache.hadoop.hbase.CompoundConfiguration;
75  import org.apache.hadoop.hbase.DoNotRetryIOException;
76  import org.apache.hadoop.hbase.DroppedSnapshotException;
77  import org.apache.hadoop.hbase.HBaseConfiguration;
78  import org.apache.hadoop.hbase.HColumnDescriptor;
79  import org.apache.hadoop.hbase.HConstants;
80  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
81  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
82  import org.apache.hadoop.hbase.HRegionInfo;
83  import org.apache.hadoop.hbase.HTableDescriptor;
84  import org.apache.hadoop.hbase.KeyValue;
85  import org.apache.hadoop.hbase.KeyValueUtil;
86  import org.apache.hadoop.hbase.NamespaceDescriptor;
87  import org.apache.hadoop.hbase.NotServingRegionException;
88  import org.apache.hadoop.hbase.RegionTooBusyException;
89  import org.apache.hadoop.hbase.TableName;
90  import org.apache.hadoop.hbase.Tag;
91  import org.apache.hadoop.hbase.TagType;
92  import org.apache.hadoop.hbase.UnknownScannerException;
93  import org.apache.hadoop.hbase.backup.HFileArchiver;
94  import org.apache.hadoop.hbase.classification.InterfaceAudience;
95  import org.apache.hadoop.hbase.client.Append;
96  import org.apache.hadoop.hbase.client.Delete;
97  import org.apache.hadoop.hbase.client.Durability;
98  import org.apache.hadoop.hbase.client.Get;
99  import org.apache.hadoop.hbase.client.Increment;
100 import org.apache.hadoop.hbase.client.IsolationLevel;
101 import org.apache.hadoop.hbase.client.Mutation;
102 import org.apache.hadoop.hbase.client.Put;
103 import org.apache.hadoop.hbase.client.Result;
104 import org.apache.hadoop.hbase.client.RowMutations;
105 import org.apache.hadoop.hbase.client.Scan;
106 import org.apache.hadoop.hbase.conf.ConfigurationManager;
107 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
108 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
109 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
110 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
111 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
112 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
113 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
114 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
115 import org.apache.hadoop.hbase.filter.FilterWrapper;
116 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
117 import org.apache.hadoop.hbase.io.HeapSize;
118 import org.apache.hadoop.hbase.io.TimeRange;
119 import org.apache.hadoop.hbase.io.hfile.BlockCache;
120 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
121 import org.apache.hadoop.hbase.io.hfile.HFile;
122 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
123 import org.apache.hadoop.hbase.ipc.RpcCallContext;
124 import org.apache.hadoop.hbase.ipc.RpcServer;
125 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
126 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
127 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
128 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
129 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
130 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
131 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
132 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
133 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
134 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
135 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
136 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
137 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
138 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
139 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
140 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
141 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
142 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
143 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
144 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
145 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
146 import org.apache.hadoop.hbase.util.ByteStringer;
147 import org.apache.hadoop.hbase.util.Bytes;
148 import org.apache.hadoop.hbase.util.CancelableProgressable;
149 import org.apache.hadoop.hbase.util.ClassSize;
150 import org.apache.hadoop.hbase.util.CompressionTest;
151 import org.apache.hadoop.hbase.util.Counter;
152 import org.apache.hadoop.hbase.util.EncryptionTest;
153 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
154 import org.apache.hadoop.hbase.util.FSTableDescriptors;
155 import org.apache.hadoop.hbase.util.FSUtils;
156 import org.apache.hadoop.hbase.util.HashedBytes;
157 import org.apache.hadoop.hbase.util.Pair;
158 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
159 import org.apache.hadoop.hbase.util.Threads;
160 import org.apache.hadoop.hbase.wal.WAL;
161 import org.apache.hadoop.hbase.wal.WALFactory;
162 import org.apache.hadoop.hbase.wal.WALKey;
163 import org.apache.hadoop.hbase.wal.WALSplitter;
164 import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
165 import org.apache.hadoop.io.MultipleIOException;
166 import org.apache.hadoop.util.StringUtils;
167 
168 import com.google.common.annotations.VisibleForTesting;
169 import com.google.common.base.Optional;
170 import com.google.common.base.Preconditions;
171 import com.google.common.collect.Lists;
172 import com.google.common.collect.Maps;
173 import com.google.common.io.Closeables;
174 import com.google.protobuf.Descriptors;
175 import com.google.protobuf.Message;
176 import com.google.protobuf.RpcCallback;
177 import com.google.protobuf.RpcController;
178 import com.google.protobuf.Service;
179 
180 /**
181  * HRegion stores data for a certain region of a table.  It stores all columns
182  * for each row. A given table consists of one or more HRegions.
183  *
184  * <p>We maintain multiple HStores for a single HRegion.
185  *
186  * <p>An Store is a set of rows with some column data; together,
187  * they make up all the data for the rows.
188  *
189  * <p>Each HRegion has a 'startKey' and 'endKey'.
190  * <p>The first is inclusive, the second is exclusive (except for
191  * the final region)  The endKey of region 0 is the same as
192  * startKey for region 1 (if it exists).  The startKey for the
193  * first region is null. The endKey for the final region is null.
194  *
195  * <p>Locking at the HRegion level serves only one purpose: preventing the
196  * region from being closed (and consequently split) while other operations
197  * are ongoing. Each row level operation obtains both a row lock and a region
198  * read lock for the duration of the operation. While a scanner is being
199  * constructed, getScanner holds a read lock. If the scanner is successfully
200  * constructed, it holds a read lock until it is closed. A close takes out a
201  * write lock and consequently will block for ongoing operations and will block
202  * new operations from starting while the close is in progress.
203  *
204  * <p>An HRegion is defined by its table and its key extent.
205  *
206  * <p>It consists of at least one Store.  The number of Stores should be
207  * configurable, so that data which is accessed together is stored in the same
208  * Store.  Right now, we approximate that by building a single Store for
209  * each column family.  (This config info will be communicated via the
210  * tabledesc.)
211  *
212  * <p>The HTableDescriptor contains metainfo about the HRegion's table.
213  * regionName is a unique identifier for this HRegion. (startKey, endKey]
214  * defines the keyspace for this HRegion.
215  */
216 @InterfaceAudience.Private
217 public class HRegion implements HeapSize, PropagatingConfigurationObserver { // , Writable{
218   public static final Log LOG = LogFactory.getLog(HRegion.class);
219 
220   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
221       "hbase.hregion.scan.loadColumnFamiliesOnDemand";
222 
223   /**
224    * This is the global default value for durability. All tables/mutations not
225    * defining a durability or using USE_DEFAULT will default to this value.
226    */
227   private static final Durability DEFAULT_DURABILITY = Durability.SYNC_WAL;
228 
229   final AtomicBoolean closed = new AtomicBoolean(false);
230   /* Closing can take some time; use the closing flag if there is stuff we don't
231    * want to do while in closing state; e.g. like offer this region up to the
232    * master as a region to close if the carrying regionserver is overloaded.
233    * Once set, it is never cleared.
234    */
235   final AtomicBoolean closing = new AtomicBoolean(false);
236 
237   /**
238    * The max sequence id of flushed data on this region.  Used doing some rough calculations on
239    * whether time to flush or not.
240    */
241   protected volatile long maxFlushedSeqId = -1L;
242 
243   /**
244    * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL
245    * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
246    * Its default value is -1L. This default is used as a marker to indicate
247    * that the region hasn't opened yet. Once it is opened, it is set to the derived
248    * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
249    *
250    * <p>Control of this sequence is handed off to the WAL implementation.  It is responsible
251    * for tagging edits with the correct sequence id since it is responsible for getting the
252    * edits into the WAL files. It controls updating the sequence id value.  DO NOT UPDATE IT
253    * OUTSIDE OF THE WAL.  The value you get will not be what you think it is.
254    */
255   private final AtomicLong sequenceId = new AtomicLong(-1L);
256 
257   /**
258    * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
259    * startRegionOperation to possibly invoke different checks before any region operations. Not all
260    * operations have to be defined here. It's only needed when a special check is need in
261    * startRegionOperation
262    */
263   public enum Operation {
264     ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
265     REPLAY_BATCH_MUTATE, COMPACT_REGION
266   }
267 
268   //////////////////////////////////////////////////////////////////////////////
269   // Members
270   //////////////////////////////////////////////////////////////////////////////
271 
272   // map from a locked row to the context for that lock including:
273   // - CountDownLatch for threads waiting on that row
274   // - the thread that owns the lock (allow reentrancy)
275   // - reference count of (reentrant) locks held by the thread
276   // - the row itself
277   private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
278       new ConcurrentHashMap<HashedBytes, RowLockContext>();
279 
280   protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
281       Bytes.BYTES_RAWCOMPARATOR);
282 
283   // TODO: account for each registered handler in HeapSize computation
284   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
285 
286   public final AtomicLong memstoreSize = new AtomicLong(0);
287 
288   // Debug possible data loss due to WAL off
289   final Counter numMutationsWithoutWAL = new Counter();
290   final Counter dataInMemoryWithoutWAL = new Counter();
291 
292   // Debug why CAS operations are taking a while.
293   final Counter checkAndMutateChecksPassed = new Counter();
294   final Counter checkAndMutateChecksFailed = new Counter();
295 
296   //Number of requests
297   final Counter readRequestsCount = new Counter();
298   final Counter writeRequestsCount = new Counter();
299 
300   // Number of requests blocked by memstore size.
301   private final Counter blockedRequestsCount = new Counter();
302 
303   /**
304    * @return the number of blocked requests count.
305    */
306   public long getBlockedRequestsCount() {
307     return this.blockedRequestsCount.get();
308   }
309 
310   // Compaction counters
311   final AtomicLong compactionsFinished = new AtomicLong(0L);
312   final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
313   final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
314 
315 
316   private final WAL wal;
317   private final HRegionFileSystem fs;
318   protected final Configuration conf;
319   private final Configuration baseConf;
320   private final KeyValue.KVComparator comparator;
321   private final int rowLockWaitDuration;
322   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
323 
324   // The internal wait duration to acquire a lock before read/update
325   // from the region. It is not per row. The purpose of this wait time
326   // is to avoid waiting a long time while the region is busy, so that
327   // we can release the IPC handler soon enough to improve the
328   // availability of the region server. It can be adjusted by
329   // tuning configuration "hbase.busy.wait.duration".
330   final long busyWaitDuration;
331   static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
332 
333   // If updating multiple rows in one call, wait longer,
334   // i.e. waiting for busyWaitDuration * # of rows. However,
335   // we can limit the max multiplier.
336   final int maxBusyWaitMultiplier;
337 
338   // Max busy wait duration. There is no point to wait longer than the RPC
339   // purge timeout, when a RPC call will be terminated by the RPC engine.
340   final long maxBusyWaitDuration;
341 
342   // negative number indicates infinite timeout
343   static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
344   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
345 
346   private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
347 
348   /**
349    * The sequence ID that was encountered when this region was opened.
350    */
351   private long openSeqNum = HConstants.NO_SEQNUM;
352 
353   /**
354    * The default setting for whether to enable on-demand CF loading for
355    * scan requests to this region. Requests can override it.
356    */
357   private boolean isLoadingCfsOnDemandDefault = false;
358 
359   private final AtomicInteger majorInProgress = new AtomicInteger(0);
360   private final AtomicInteger minorInProgress = new AtomicInteger(0);
361 
362   //
363   // Context: During replay we want to ensure that we do not lose any data. So, we
364   // have to be conservative in how we replay wals. For each store, we calculate
365   // the maxSeqId up to which the store was flushed. And, skip the edits which
366   // are equal to or lower than maxSeqId for each store.
367   // The following map is populated when opening the region
368   Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
369 
370   /**
371    * Config setting for whether to allow writes when a region is in recovering or not.
372    */
373   private boolean disallowWritesInRecovering = false;
374 
375   // when a region is in recovering state, it can only accept writes not reads
376   private volatile boolean isRecovering = false;
377 
378   private volatile Optional<ConfigurationManager> configurationManager;
379 
380   /**
381    * @return The smallest mvcc readPoint across all the scanners in this
382    * region. Writes older than this readPoint, are included  in every
383    * read operation.
384    */
385   public long getSmallestReadPoint() {
386     long minimumReadPoint;
387     // We need to ensure that while we are calculating the smallestReadPoint
388     // no new RegionScanners can grab a readPoint that we are unaware of.
389     // We achieve this by synchronizing on the scannerReadPoints object.
390     synchronized(scannerReadPoints) {
391       minimumReadPoint = mvcc.memstoreReadPoint();
392 
393       for (Long readPoint: this.scannerReadPoints.values()) {
394         if (readPoint < minimumReadPoint) {
395           minimumReadPoint = readPoint;
396         }
397       }
398     }
399     return minimumReadPoint;
400   }
401   /*
402    * Data structure of write state flags used coordinating flushes,
403    * compactions and closes.
404    */
405   static class WriteState {
406     // Set while a memstore flush is happening.
407     volatile boolean flushing = false;
408     // Set when a flush has been requested.
409     volatile boolean flushRequested = false;
410     // Number of compactions running.
411     volatile int compacting = 0;
412     // Gets set in close. If set, cannot compact or flush again.
413     volatile boolean writesEnabled = true;
414     // Set if region is read-only
415     volatile boolean readOnly = false;
416     // whether the reads are enabled. This is different than readOnly, because readOnly is
417     // static in the lifetime of the region, while readsEnabled is dynamic
418     volatile boolean readsEnabled = true;
419 
420     /**
421      * Set flags that make this region read-only.
422      *
423      * @param onOff flip value for region r/o setting
424      */
425     synchronized void setReadOnly(final boolean onOff) {
426       this.writesEnabled = !onOff;
427       this.readOnly = onOff;
428     }
429 
430     boolean isReadOnly() {
431       return this.readOnly;
432     }
433 
434     boolean isFlushRequested() {
435       return this.flushRequested;
436     }
437 
438     void setReadsEnabled(boolean readsEnabled) {
439       this.readsEnabled = readsEnabled;
440     }
441 
442     static final long HEAP_SIZE = ClassSize.align(
443         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
444   }
445 
446   /**
447    * Objects from this class are created when flushing to describe all the different states that
448    * that method ends up in. The Result enum describes those states. The sequence id should only
449    * be specified if the flush was successful, and the failure message should only be specified
450    * if it didn't flush.
451    */
452   public static class FlushResult {
453     enum Result {
454       FLUSHED_NO_COMPACTION_NEEDED,
455       FLUSHED_COMPACTION_NEEDED,
456       // Special case where a flush didn't run because there's nothing in the memstores. Used when
457       // bulk loading to know when we can still load even if a flush didn't happen.
458       CANNOT_FLUSH_MEMSTORE_EMPTY,
459       CANNOT_FLUSH
460       // Be careful adding more to this enum, look at the below methods to make sure
461     }
462 
463     final Result result;
464     final String failureReason;
465     final long flushSequenceId;
466 
467     /**
468      * Convenience constructor to use when the flush is successful, the failure message is set to
469      * null.
470      * @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED.
471      * @param flushSequenceId Generated sequence id that comes right after the edits in the
472      *                        memstores.
473      */
474     FlushResult(Result result, long flushSequenceId) {
475       this(result, flushSequenceId, null);
476       assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
477           .FLUSHED_COMPACTION_NEEDED;
478     }
479 
480     /**
481      * Convenience constructor to use when we cannot flush.
482      * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
483      * @param failureReason Reason why we couldn't flush.
484      */
485     FlushResult(Result result, String failureReason) {
486       this(result, -1, failureReason);
487       assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
488     }
489 
490     /**
491      * Constructor with all the parameters.
492      * @param result Any of the Result.
493      * @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
494      * @param failureReason Reason why we couldn't flush, or null.
495      */
496     FlushResult(Result result, long flushSequenceId, String failureReason) {
497       this.result = result;
498       this.flushSequenceId = flushSequenceId;
499       this.failureReason = failureReason;
500     }
501 
502     /**
503      * Convenience method, the equivalent of checking if result is
504      * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
505      * @return true if the memstores were flushed, else false.
506      */
507     public boolean isFlushSucceeded() {
508       return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
509           .FLUSHED_COMPACTION_NEEDED;
510     }
511 
512     /**
513      * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
514      * @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
515      */
516     public boolean isCompactionNeeded() {
517       return result == Result.FLUSHED_COMPACTION_NEEDED;
518     }
519   }
520 
521   final WriteState writestate = new WriteState();
522 
523   long memstoreFlushSize;
524   final long timestampSlop;
525   final long rowProcessorTimeout;
526 
527   // Last flush time for each Store. Useful when we are flushing for each column
528   private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap =
529       new ConcurrentHashMap<Store, Long>();
530 
531   final RegionServerServices rsServices;
532   private RegionServerAccounting rsAccounting;
533   private long flushCheckInterval;
534   // flushPerChanges is to prevent too many changes in memstore
535   private long flushPerChanges;
536   private long blockingMemStoreSize;
537   final long threadWakeFrequency;
538   // Used to guard closes
539   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
540 
541   // Stop updates lock
542   private final ReentrantReadWriteLock updatesLock = new ReentrantReadWriteLock();
543   private boolean splitRequest;
544   private byte[] explicitSplitPoint = null;
545 
546   private final MultiVersionConsistencyControl mvcc =
547       new MultiVersionConsistencyControl();
548 
549   // Coprocessor host
550   private RegionCoprocessorHost coprocessorHost;
551 
552   private HTableDescriptor htableDescriptor = null;
553   private RegionSplitPolicy splitPolicy;
554   private FlushPolicy flushPolicy;
555 
556   private final MetricsRegion metricsRegion;
557   private final MetricsRegionWrapperImpl metricsRegionWrapper;
558   private final Durability durability;
559   private final boolean regionStatsEnabled;
560 
561   /**
562    * HRegion constructor. This constructor should only be used for testing and
563    * extensions.  Instances of HRegion should be instantiated with the
564    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
565    *
566    * @param tableDir qualified path of directory where region should be located,
567    * usually the table directory.
568    * @param wal The WAL is the outbound log for any updates to the HRegion
569    * The wal file is a logfile from the previous execution that's
570    * custom-computed for this HRegion. The HRegionServer computes and sorts the
571    * appropriate wal info for this HRegion. If there is a previous wal file
572    * (implying that the HRegion has been written-to before), then read it from
573    * the supplied path.
574    * @param fs is the filesystem.
575    * @param confParam is global configuration settings.
576    * @param regionInfo - HRegionInfo that describes the region
577    * is new), then read them from the supplied path.
578    * @param htd the table descriptor
579    * @param rsServices reference to {@link RegionServerServices} or null
580    */
581   @Deprecated
582   public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
583       final Configuration confParam, final HRegionInfo regionInfo,
584       final HTableDescriptor htd, final RegionServerServices rsServices) {
585     this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
586       wal, confParam, htd, rsServices);
587   }
588 
589   /**
590    * HRegion constructor. This constructor should only be used for testing and
591    * extensions.  Instances of HRegion should be instantiated with the
592    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
593    *
594    * @param fs is the filesystem.
595    * @param wal The WAL is the outbound log for any updates to the HRegion
596    * The wal file is a logfile from the previous execution that's
597    * custom-computed for this HRegion. The HRegionServer computes and sorts the
598    * appropriate wal info for this HRegion. If there is a previous wal file
599    * (implying that the HRegion has been written-to before), then read it from
600    * the supplied path.
601    * @param confParam is global configuration settings.
602    * @param htd the table descriptor
603    * @param rsServices reference to {@link RegionServerServices} or null
604    */
605   public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration confParam,
606       final HTableDescriptor htd, final RegionServerServices rsServices) {
607     if (htd == null) {
608       throw new IllegalArgumentException("Need table descriptor");
609     }
610 
611     if (confParam instanceof CompoundConfiguration) {
612       throw new IllegalArgumentException("Need original base configuration");
613     }
614 
615     this.comparator = fs.getRegionInfo().getComparator();
616     this.wal = wal;
617     this.fs = fs;
618 
619     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
620     this.baseConf = confParam;
621     this.conf = new CompoundConfiguration()
622       .add(confParam)
623       .addStringMap(htd.getConfiguration())
624       .addBytesMap(htd.getValues());
625     this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
626         DEFAULT_CACHE_FLUSH_INTERVAL);
627     this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
628     if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
629       throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
630           + MAX_FLUSH_PER_CHANGES);
631     }
632     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
633                     DEFAULT_ROWLOCK_WAIT_DURATION);
634 
635     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
636     this.htableDescriptor = htd;
637     this.rsServices = rsServices;
638     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
639     setHTableSpecificConf();
640     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
641 
642     this.busyWaitDuration = conf.getLong(
643       "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
644     this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
645     if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
646       throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
647         + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
648         + maxBusyWaitMultiplier + "). Their product should be positive");
649     }
650     this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
651       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
652 
653     /*
654      * timestamp.slop provides a server-side constraint on the timestamp. This
655      * assumes that you base your TS around currentTimeMillis(). In this case,
656      * throw an error to the user if the user-specified TS is newer than now +
657      * slop. LATEST_TIMESTAMP == don't use this functionality
658      */
659     this.timestampSlop = conf.getLong(
660         "hbase.hregion.keyvalue.timestamp.slop.millisecs",
661         HConstants.LATEST_TIMESTAMP);
662 
663     /**
664      * Timeout for the process time in processRowsWithLocks().
665      * Use -1 to switch off time bound.
666      */
667     this.rowProcessorTimeout = conf.getLong(
668         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
669     this.durability = htd.getDurability() == Durability.USE_DEFAULT
670         ? DEFAULT_DURABILITY
671         : htd.getDurability();
672     if (rsServices != null) {
673       this.rsAccounting = this.rsServices.getRegionServerAccounting();
674       // don't initialize coprocessors if not running within a regionserver
675       // TODO: revisit if coprocessors should load in other cases
676       this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
677       this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
678       this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
679 
680       Map<String, HRegion> recoveringRegions = rsServices.getRecoveringRegions();
681       String encodedName = getRegionInfo().getEncodedName();
682       if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
683         this.isRecovering = true;
684         recoveringRegions.put(encodedName, this);
685       }
686     } else {
687       this.metricsRegionWrapper = null;
688       this.metricsRegion = null;
689     }
690     if (LOG.isDebugEnabled()) {
691       // Write out region name as string and its encoded name.
692       LOG.debug("Instantiated " + this);
693     }
694 
695     // by default, we allow writes against a region when it's in recovering
696     this.disallowWritesInRecovering =
697         conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
698           HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
699     configurationManager = Optional.absent();
700 
701     // disable stats tracking system tables, but check the config for everything else
702     this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals(
703         NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ?
704           false :
705           conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
706               HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
707   }
708 
709   void setHTableSpecificConf() {
710     if (this.htableDescriptor == null) return;
711     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
712 
713     if (flushSize <= 0) {
714       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
715         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
716     }
717     this.memstoreFlushSize = flushSize;
718     this.blockingMemStoreSize = this.memstoreFlushSize *
719         conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
720   }
721 
722   /**
723    * Initialize this region.
724    * Used only by tests and SplitTransaction to reopen the region.
725    * You should use createHRegion() or openHRegion()
726    * @return What the next sequence (edit) id should be.
727    * @throws IOException e
728    * @deprecated use HRegion.createHRegion() or HRegion.openHRegion()
729    */
730   @Deprecated
731   public long initialize() throws IOException {
732     return initialize(null);
733   }
734 
735   /**
736    * Initialize this region.
737    *
738    * @param reporter Tickle every so often if initialize is taking a while.
739    * @return What the next sequence (edit) id should be.
740    * @throws IOException e
741    */
742   private long initialize(final CancelableProgressable reporter) throws IOException {
743     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
744     long nextSeqId = -1;
745     try {
746       nextSeqId = initializeRegionInternals(reporter, status);
747       return nextSeqId;
748     } finally {
749       // nextSeqid will be -1 if the initialization fails.
750       // At least it will be 0 otherwise.
751       if (nextSeqId == -1) {
752         status
753             .abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
754       }
755     }
756   }
757 
758   private long initializeRegionInternals(final CancelableProgressable reporter,
759       final MonitoredTask status) throws IOException {
760     if (coprocessorHost != null) {
761       status.setStatus("Running coprocessor pre-open hook");
762       coprocessorHost.preOpen();
763     }
764 
765     // Write HRI to a file in case we need to recover hbase:meta
766     status.setStatus("Writing region info on filesystem");
767     fs.checkRegionInfoOnFilesystem();
768 
769 
770 
771     // Initialize all the HStores
772     status.setStatus("Initializing all the Stores");
773     long maxSeqId = initializeRegionStores(reporter, status);
774 
775     this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
776     this.writestate.flushRequested = false;
777     this.writestate.compacting = 0;
778 
779     if (this.writestate.writesEnabled) {
780       // Remove temporary data left over from old regions
781       status.setStatus("Cleaning up temporary data from old regions");
782       fs.cleanupTempDir();
783     }
784 
785     if (this.writestate.writesEnabled) {
786       status.setStatus("Cleaning up detritus from prior splits");
787       // Get rid of any splits or merges that were lost in-progress.  Clean out
788       // these directories here on open.  We may be opening a region that was
789       // being split but we crashed in the middle of it all.
790       fs.cleanupAnySplitDetritus();
791       fs.cleanupMergesDir();
792     }
793 
794     // Initialize split policy
795     this.splitPolicy = RegionSplitPolicy.create(this, conf);
796 
797     // Initialize flush policy
798     this.flushPolicy = FlushPolicyFactory.create(this, conf);
799 
800     long lastFlushTime = EnvironmentEdgeManager.currentTime();
801     for (Store store: stores.values()) {
802       this.lastStoreFlushTimeMap.put(store, lastFlushTime);
803     }
804 
805     // Use maximum of log sequenceid or that which was found in stores
806     // (particularly if no recovered edits, seqid will be -1).
807     long nextSeqid = maxSeqId;
808 
809     // In distributedLogReplay mode, we don't know the last change sequence number because region
810     // is opened before recovery completes. So we add a safety bumper to avoid new sequence number
811     // overlaps used sequence numbers
812     if (this.writestate.writesEnabled) {
813       nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs
814           .getRegionDir(), nextSeqid, (this.isRecovering ? (this.flushPerChanges + 10000000) : 1));
815     } else {
816       nextSeqid++;
817     }
818 
819     LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
820       "; next sequenceid=" + nextSeqid);
821 
822     // A region can be reopened if failed a split; reset flags
823     this.closing.set(false);
824     this.closed.set(false);
825 
826     if (coprocessorHost != null) {
827       status.setStatus("Running coprocessor post-open hooks");
828       coprocessorHost.postOpen();
829     }
830 
831     status.markComplete("Region opened successfully");
832     return nextSeqid;
833   }
834 
835   private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
836       throws IOException {
837     // Load in all the HStores.
838 
839     long maxSeqId = -1;
840     // initialized to -1 so that we pick up MemstoreTS from column families
841     long maxMemstoreTS = -1;
842 
843     if (!htableDescriptor.getFamilies().isEmpty()) {
844       // initialize the thread pool for opening stores in parallel.
845       ThreadPoolExecutor storeOpenerThreadPool =
846         getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
847       CompletionService<HStore> completionService =
848         new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
849 
850       // initialize each store in parallel
851       for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
852         status.setStatus("Instantiating store for column family " + family);
853         completionService.submit(new Callable<HStore>() {
854           @Override
855           public HStore call() throws IOException {
856             return instantiateHStore(family);
857           }
858         });
859       }
860       boolean allStoresOpened = false;
861       try {
862         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
863           Future<HStore> future = completionService.take();
864           HStore store = future.get();
865           this.stores.put(store.getColumnFamilyName().getBytes(), store);
866 
867           long storeMaxSequenceId = store.getMaxSequenceId();
868           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
869               storeMaxSequenceId);
870           if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
871             maxSeqId = storeMaxSequenceId;
872           }
873           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
874           if (maxStoreMemstoreTS > maxMemstoreTS) {
875             maxMemstoreTS = maxStoreMemstoreTS;
876           }
877         }
878         allStoresOpened = true;
879       } catch (InterruptedException e) {
880         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
881       } catch (ExecutionException e) {
882         throw new IOException(e.getCause());
883       } finally {
884         storeOpenerThreadPool.shutdownNow();
885         if (!allStoresOpened) {
886           // something went wrong, close all opened stores
887           LOG.error("Could not initialize all stores for the region=" + this);
888           for (Store store : this.stores.values()) {
889             try {
890               store.close();
891             } catch (IOException e) {
892               LOG.warn(e.getMessage());
893             }
894           }
895         }
896       }
897     }
898     if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
899       // Recover any edits if available.
900       maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
901           this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
902     }
903     maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
904     mvcc.initialize(maxSeqId);
905     return maxSeqId;
906   }
907 
908   private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
909     Map<byte[], List<Path>> storeFiles
910     = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
911     for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
912       Store store = entry.getValue();
913       ArrayList<Path> storeFileNames = new ArrayList<Path>();
914       for (StoreFile storeFile: store.getStorefiles()) {
915         storeFileNames.add(storeFile.getPath());
916       }
917       storeFiles.put(entry.getKey(), storeFileNames);
918     }
919 
920     RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
921       RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
922       getRegionServerServices().getServerName(), storeFiles);
923     WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc,
924       getSequenceId());
925   }
926 
927   private void writeRegionCloseMarker(WAL wal) throws IOException {
928     Map<byte[], List<Path>> storeFiles
929     = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
930     for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
931       Store store = entry.getValue();
932       ArrayList<Path> storeFileNames = new ArrayList<Path>();
933       for (StoreFile storeFile: store.getStorefiles()) {
934         storeFileNames.add(storeFile.getPath());
935       }
936       storeFiles.put(entry.getKey(), storeFileNames);
937     }
938 
939     RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
940       RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
941       getRegionServerServices().getServerName(), storeFiles);
942     WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc,
943       getSequenceId());
944 
945     // Store SeqId in HDFS when a region closes
946     // checking region folder exists is due to many tests which delete the table folder while a
947     // table is still online
948     if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
949       WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
950         getSequenceId().get(), 0);
951     }
952   }
953 
954   /**
955    * @return True if this region has references.
956    */
957   public boolean hasReferences() {
958     for (Store store : this.stores.values()) {
959       if (store.hasReferences()) return true;
960     }
961     return false;
962   }
963 
964   /**
965    * This function will return the HDFS blocks distribution based on the data
966    * captured when HFile is created
967    * @return The HDFS blocks distribution for the region.
968    */
969   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
970     HDFSBlocksDistribution hdfsBlocksDistribution =
971       new HDFSBlocksDistribution();
972     synchronized (this.stores) {
973       for (Store store : this.stores.values()) {
974         for (StoreFile sf : store.getStorefiles()) {
975           HDFSBlocksDistribution storeFileBlocksDistribution =
976             sf.getHDFSBlockDistribution();
977           hdfsBlocksDistribution.add(storeFileBlocksDistribution);
978         }
979       }
980     }
981     return hdfsBlocksDistribution;
982   }
983 
984   /**
985    * This is a helper function to compute HDFS block distribution on demand
986    * @param conf configuration
987    * @param tableDescriptor HTableDescriptor of the table
988    * @param regionInfo encoded name of the region
989    * @return The HDFS blocks distribution for the given region.
990    * @throws IOException
991    */
992   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
993       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
994     Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
995     return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
996   }
997 
998   /**
999    * This is a helper function to compute HDFS block distribution on demand
1000    * @param conf configuration
1001    * @param tableDescriptor HTableDescriptor of the table
1002    * @param regionInfo encoded name of the region
1003    * @param tablePath the table directory
1004    * @return The HDFS blocks distribution for the given region.
1005    * @throws IOException
1006    */
1007   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
1008       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo,  Path tablePath)
1009       throws IOException {
1010     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
1011     FileSystem fs = tablePath.getFileSystem(conf);
1012 
1013     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
1014     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
1015       Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
1016       if (storeFiles == null) continue;
1017 
1018       for (StoreFileInfo storeFileInfo : storeFiles) {
1019         hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
1020       }
1021     }
1022     return hdfsBlocksDistribution;
1023   }
1024 
1025   public AtomicLong getMemstoreSize() {
1026     return memstoreSize;
1027   }
1028 
1029   /**
1030    * Increase the size of mem store in this region and the size of global mem
1031    * store
1032    * @return the size of memstore in this region
1033    */
1034   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
1035     if (this.rsAccounting != null) {
1036       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
1037     }
1038     return this.memstoreSize.addAndGet(memStoreSize);
1039   }
1040 
1041   /** @return a HRegionInfo object for this region */
1042   public HRegionInfo getRegionInfo() {
1043     return this.fs.getRegionInfo();
1044   }
1045 
1046   /**
1047    * @return Instance of {@link RegionServerServices} used by this HRegion.
1048    * Can be null.
1049    */
1050   RegionServerServices getRegionServerServices() {
1051     return this.rsServices;
1052   }
1053 
1054   /** @return readRequestsCount for this region */
1055   long getReadRequestsCount() {
1056     return this.readRequestsCount.get();
1057   }
1058 
1059   /** @return writeRequestsCount for this region */
1060   long getWriteRequestsCount() {
1061     return this.writeRequestsCount.get();
1062   }
1063 
1064   public MetricsRegion getMetrics() {
1065     return metricsRegion;
1066   }
1067 
1068   /** @return true if region is closed */
1069   public boolean isClosed() {
1070     return this.closed.get();
1071   }
1072 
1073   /**
1074    * @return True if closing process has started.
1075    */
1076   public boolean isClosing() {
1077     return this.closing.get();
1078   }
1079 
1080   /**
1081    * Reset recovering state of current region
1082    */
1083   public void setRecovering(boolean newState) {
1084     boolean wasRecovering = this.isRecovering;
1085     this.isRecovering = newState;
1086     if (wasRecovering && !isRecovering) {
1087       // Call only when wal replay is over.
1088       coprocessorHost.postLogReplay();
1089     }
1090   }
1091 
1092   /**
1093    * @return True if current region is in recovering
1094    */
1095   public boolean isRecovering() {
1096     return this.isRecovering;
1097   }
1098 
1099   /** @return true if region is available (not closed and not closing) */
1100   public boolean isAvailable() {
1101     return !isClosed() && !isClosing();
1102   }
1103 
1104   /** @return true if region is splittable */
1105   public boolean isSplittable() {
1106     return isAvailable() && !hasReferences();
1107   }
1108 
1109   /**
1110    * @return true if region is mergeable
1111    */
1112   public boolean isMergeable() {
1113     if (!isAvailable()) {
1114       LOG.debug("Region " + this.getRegionNameAsString()
1115           + " is not mergeable because it is closing or closed");
1116       return false;
1117     }
1118     if (hasReferences()) {
1119       LOG.debug("Region " + this.getRegionNameAsString()
1120           + " is not mergeable because it has references");
1121       return false;
1122     }
1123 
1124     return true;
1125   }
1126 
1127   public boolean areWritesEnabled() {
1128     synchronized(this.writestate) {
1129       return this.writestate.writesEnabled;
1130     }
1131   }
1132 
1133    public MultiVersionConsistencyControl getMVCC() {
1134      return mvcc;
1135    }
1136 
1137    /*
1138     * Returns readpoint considering given IsolationLevel
1139     */
1140    public long getReadpoint(IsolationLevel isolationLevel) {
1141      if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1142        // This scan can read even uncommitted transactions
1143        return Long.MAX_VALUE;
1144      }
1145      return mvcc.memstoreReadPoint();
1146    }
1147 
1148    public boolean isLoadingCfsOnDemandDefault() {
1149      return this.isLoadingCfsOnDemandDefault;
1150    }
1151 
1152   /**
1153    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
1154    * service any more calls.
1155    *
1156    * <p>This method could take some time to execute, so don't call it from a
1157    * time-sensitive thread.
1158    *
1159    * @return Vector of all the storage files that the HRegion's component
1160    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
1161    * vector if already closed and null if judged that it should not close.
1162    *
1163    * @throws IOException e
1164    */
1165   public Map<byte[], List<StoreFile>> close() throws IOException {
1166     return close(false);
1167   }
1168 
1169   private final Object closeLock = new Object();
1170 
1171   /** Conf key for the periodic flush interval */
1172   public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1173       "hbase.regionserver.optionalcacheflushinterval";
1174   /** Default interval for the memstore flush */
1175   public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1176   public static final int META_CACHE_FLUSH_INTERVAL = 300000; // 5 minutes
1177 
1178   /** Conf key to force a flush if there are already enough changes for one region in memstore */
1179   public static final String MEMSTORE_FLUSH_PER_CHANGES =
1180       "hbase.regionserver.flush.per.changes";
1181   public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000; // 30 millions
1182   /**
1183    * The following MAX_FLUSH_PER_CHANGES is large enough because each KeyValue has 20+ bytes
1184    * overhead. Therefore, even 1G empty KVs occupy at least 20GB memstore size for a single region
1185    */
1186   public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G
1187 
1188   /**
1189    * Close down this HRegion.  Flush the cache unless abort parameter is true,
1190    * Shut down each HStore, don't service any more calls.
1191    *
1192    * This method could take some time to execute, so don't call it from a
1193    * time-sensitive thread.
1194    *
1195    * @param abort true if server is aborting (only during testing)
1196    * @return Vector of all the storage files that the HRegion's component
1197    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
1198    * we are not to close at this time or we are already closed.
1199    *
1200    * @throws IOException e
1201    */
1202   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1203     // Only allow one thread to close at a time. Serialize them so dual
1204     // threads attempting to close will run up against each other.
1205     MonitoredTask status = TaskMonitor.get().createStatus(
1206         "Closing region " + this +
1207         (abort ? " due to abort" : ""));
1208 
1209     status.setStatus("Waiting for close lock");
1210     try {
1211       synchronized (closeLock) {
1212         return doClose(abort, status);
1213       }
1214     } finally {
1215       status.cleanup();
1216     }
1217   }
1218 
1219   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1220       throws IOException {
1221     if (isClosed()) {
1222       LOG.warn("Region " + this + " already closed");
1223       return null;
1224     }
1225 
1226     if (coprocessorHost != null) {
1227       status.setStatus("Running coprocessor pre-close hooks");
1228       this.coprocessorHost.preClose(abort);
1229     }
1230 
1231     status.setStatus("Disabling compacts and flushes for region");
1232     synchronized (writestate) {
1233       // Disable compacting and flushing by background threads for this
1234       // region.
1235       writestate.writesEnabled = false;
1236       LOG.debug("Closing " + this + ": disabling compactions & flushes");
1237       waitForFlushesAndCompactions();
1238     }
1239     // If we were not just flushing, is it worth doing a preflush...one
1240     // that will clear out of the bulk of the memstore before we put up
1241     // the close flag?
1242     if (!abort && worthPreFlushing()) {
1243       status.setStatus("Pre-flushing region before close");
1244       LOG.info("Running close preflush of " + this.getRegionNameAsString());
1245       try {
1246         internalFlushcache(status);
1247       } catch (IOException ioe) {
1248         // Failed to flush the region. Keep going.
1249         status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1250       }
1251     }
1252 
1253     this.closing.set(true);
1254     status.setStatus("Disabling writes for close");
1255     // block waiting for the lock for closing
1256     lock.writeLock().lock();
1257     try {
1258       if (this.isClosed()) {
1259         status.abort("Already got closed by another process");
1260         // SplitTransaction handles the null
1261         return null;
1262       }
1263       LOG.debug("Updates disabled for region " + this);
1264       // Don't flush the cache if we are aborting
1265       if (!abort) {
1266         int flushCount = 0;
1267         while (this.getMemstoreSize().get() > 0) {
1268           try {
1269             if (flushCount++ > 0) {
1270               int actualFlushes = flushCount - 1;
1271               if (actualFlushes > 5) {
1272                 // If we tried 5 times and are unable to clear memory, abort
1273                 // so we do not lose data
1274                 throw new DroppedSnapshotException("Failed clearing memory after " +
1275                   actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
1276               }
1277               LOG.info("Running extra flush, " + actualFlushes +
1278                 " (carrying snapshot?) " + this);
1279             }
1280             internalFlushcache(status);
1281           } catch (IOException ioe) {
1282             status.setStatus("Failed flush " + this + ", putting online again");
1283             synchronized (writestate) {
1284               writestate.writesEnabled = true;
1285             }
1286             // Have to throw to upper layers.  I can't abort server from here.
1287             throw ioe;
1288           }
1289         }
1290       }
1291 
1292       Map<byte[], List<StoreFile>> result =
1293         new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1294       if (!stores.isEmpty()) {
1295         // initialize the thread pool for closing stores in parallel.
1296         ThreadPoolExecutor storeCloserThreadPool =
1297           getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
1298         CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1299           new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1300 
1301         // close each store in parallel
1302         for (final Store store : stores.values()) {
1303           assert abort || store.getFlushableSize() == 0;
1304           completionService
1305               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1306                 @Override
1307                 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1308                   return new Pair<byte[], Collection<StoreFile>>(
1309                     store.getFamily().getName(), store.close());
1310                 }
1311               });
1312         }
1313         try {
1314           for (int i = 0; i < stores.size(); i++) {
1315             Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1316             Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1317             List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1318             if (familyFiles == null) {
1319               familyFiles = new ArrayList<StoreFile>();
1320               result.put(storeFiles.getFirst(), familyFiles);
1321             }
1322             familyFiles.addAll(storeFiles.getSecond());
1323           }
1324         } catch (InterruptedException e) {
1325           throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1326         } catch (ExecutionException e) {
1327           throw new IOException(e.getCause());
1328         } finally {
1329           storeCloserThreadPool.shutdownNow();
1330         }
1331       }
1332 
1333       status.setStatus("Writing region close event to WAL");
1334       if (!abort && wal != null && getRegionServerServices() != null && !writestate.readOnly) {
1335         writeRegionCloseMarker(wal);
1336       }
1337 
1338       this.closed.set(true);
1339       if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
1340       if (coprocessorHost != null) {
1341         status.setStatus("Running coprocessor post-close hooks");
1342         this.coprocessorHost.postClose(abort);
1343       }
1344       if (this.metricsRegion != null) {
1345         this.metricsRegion.close();
1346       }
1347       if (this.metricsRegionWrapper != null) {
1348         Closeables.closeQuietly(this.metricsRegionWrapper);
1349       }
1350       status.markComplete("Closed");
1351       LOG.info("Closed " + this);
1352       return result;
1353     } finally {
1354       lock.writeLock().unlock();
1355     }
1356   }
1357 
1358   /**
1359    * Wait for all current flushes and compactions of the region to complete.
1360    * <p>
1361    * Exposed for TESTING.
1362    */
1363   public void waitForFlushesAndCompactions() {
1364     synchronized (writestate) {
1365       boolean interrupted = false;
1366       try {
1367         while (writestate.compacting > 0 || writestate.flushing) {
1368           LOG.debug("waiting for " + writestate.compacting + " compactions"
1369             + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1370           try {
1371             writestate.wait();
1372           } catch (InterruptedException iex) {
1373             // essentially ignore and propagate the interrupt back up
1374             LOG.warn("Interrupted while waiting");
1375             interrupted = true;
1376           }
1377         }
1378       } finally {
1379         if (interrupted) {
1380           Thread.currentThread().interrupt();
1381         }
1382       }
1383     }
1384   }
1385 
1386   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1387       final String threadNamePrefix) {
1388     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1389     int maxThreads = Math.min(numStores,
1390         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1391             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1392     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1393   }
1394 
1395   protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1396       final String threadNamePrefix) {
1397     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1398     int maxThreads = Math.max(1,
1399         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1400             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1401             / numStores);
1402     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1403   }
1404 
1405   static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1406       final String threadNamePrefix) {
1407     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1408       new ThreadFactory() {
1409         private int count = 1;
1410 
1411         @Override
1412         public Thread newThread(Runnable r) {
1413           return new Thread(r, threadNamePrefix + "-" + count++);
1414         }
1415       });
1416   }
1417 
1418    /**
1419     * @return True if its worth doing a flush before we put up the close flag.
1420     */
1421   private boolean worthPreFlushing() {
1422     return this.memstoreSize.get() >
1423       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1424   }
1425 
1426   //////////////////////////////////////////////////////////////////////////////
1427   // HRegion accessors
1428   //////////////////////////////////////////////////////////////////////////////
1429 
1430   /** @return start key for region */
1431   public byte [] getStartKey() {
1432     return this.getRegionInfo().getStartKey();
1433   }
1434 
1435   /** @return end key for region */
1436   public byte [] getEndKey() {
1437     return this.getRegionInfo().getEndKey();
1438   }
1439 
1440   /** @return region id */
1441   public long getRegionId() {
1442     return this.getRegionInfo().getRegionId();
1443   }
1444 
1445   /** @return region name */
1446   public byte [] getRegionName() {
1447     return this.getRegionInfo().getRegionName();
1448   }
1449 
1450   /** @return region name as string for logging */
1451   public String getRegionNameAsString() {
1452     return this.getRegionInfo().getRegionNameAsString();
1453   }
1454 
1455   /** @return HTableDescriptor for this region */
1456   public HTableDescriptor getTableDesc() {
1457     return this.htableDescriptor;
1458   }
1459 
1460   /** @return WAL in use for this region */
1461   public WAL getWAL() {
1462     return this.wal;
1463   }
1464 
1465   /**
1466    * @return split policy for this region.
1467    */
1468   public RegionSplitPolicy getSplitPolicy() {
1469     return this.splitPolicy;
1470   }
1471 
1472   /**
1473    * A split takes the config from the parent region & passes it to the daughter
1474    * region's constructor. If 'conf' was passed, you would end up using the HTD
1475    * of the parent region in addition to the new daughter HTD. Pass 'baseConf'
1476    * to the daughter regions to avoid this tricky dedupe problem.
1477    * @return Configuration object
1478    */
1479   Configuration getBaseConf() {
1480     return this.baseConf;
1481   }
1482 
1483   /** @return {@link FileSystem} being used by this region */
1484   public FileSystem getFilesystem() {
1485     return fs.getFileSystem();
1486   }
1487 
1488   /** @return the {@link HRegionFileSystem} used by this region */
1489   public HRegionFileSystem getRegionFileSystem() {
1490     return this.fs;
1491   }
1492 
1493   /**
1494    * @return Returns the earliest time a store in the region was flushed. All
1495    *         other stores in the region would have been flushed either at, or
1496    *         after this time.
1497    */
1498   @VisibleForTesting
1499   public long getEarliestFlushTimeForAllStores() {
1500     return Collections.min(lastStoreFlushTimeMap.values());
1501   }
1502 
1503   /**
1504    * This can be used to determine the last time all files of this region were major compacted.
1505    * @param majorCompactioOnly Only consider HFile that are the result of major compaction
1506    * @return the timestamp of the oldest HFile for all stores of this region
1507    */
1508   public long getOldestHfileTs(boolean majorCompactioOnly) throws IOException {
1509     long result = Long.MAX_VALUE;
1510     for (Store store : getStores().values()) {
1511       for (StoreFile file : store.getStorefiles()) {
1512         HFile.Reader reader = file.getReader().getHFileReader();
1513         if (majorCompactioOnly) {
1514           byte[] val = reader.loadFileInfo().get(StoreFile.MAJOR_COMPACTION_KEY);
1515           if (val == null || !Bytes.toBoolean(val)) {
1516             continue;
1517           }
1518         }
1519         result = Math.min(result, reader.getFileContext().getFileCreateTime());
1520       }
1521     }
1522     return result == Long.MAX_VALUE ? 0 : result;
1523   }
1524 
1525   //////////////////////////////////////////////////////////////////////////////
1526   // HRegion maintenance.
1527   //
1528   // These methods are meant to be called periodically by the HRegionServer for
1529   // upkeep.
1530   //////////////////////////////////////////////////////////////////////////////
1531 
1532   /** @return returns size of largest HStore. */
1533   public long getLargestHStoreSize() {
1534     long size = 0;
1535     for (Store h : stores.values()) {
1536       long storeSize = h.getSize();
1537       if (storeSize > size) {
1538         size = storeSize;
1539       }
1540     }
1541     return size;
1542   }
1543 
1544   /**
1545    * @return KeyValue Comparator
1546    */
1547   public KeyValue.KVComparator getComparator() {
1548     return this.comparator;
1549   }
1550 
1551   /*
1552    * Do preparation for pending compaction.
1553    * @throws IOException
1554    */
1555   protected void doRegionCompactionPrep() throws IOException {
1556   }
1557 
1558   void triggerMajorCompaction() {
1559     for (Store h : stores.values()) {
1560       h.triggerMajorCompaction();
1561     }
1562   }
1563 
1564   /**
1565    * This is a helper function that compact all the stores synchronously
1566    * It is used by utilities and testing
1567    *
1568    * @param majorCompaction True to force a major compaction regardless of thresholds
1569    * @throws IOException e
1570    */
1571   public void compactStores(final boolean majorCompaction)
1572   throws IOException {
1573     if (majorCompaction) {
1574       this.triggerMajorCompaction();
1575     }
1576     compactStores();
1577   }
1578 
1579   /**
1580    * This is a helper function that compact all the stores synchronously
1581    * It is used by utilities and testing
1582    *
1583    * @throws IOException e
1584    */
1585   public void compactStores() throws IOException {
1586     for (Store s : getStores().values()) {
1587       CompactionContext compaction = s.requestCompaction();
1588       if (compaction != null) {
1589         compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE);
1590       }
1591     }
1592   }
1593 
1594   /**
1595    * Called by compaction thread and after region is opened to compact the
1596    * HStores if necessary.
1597    *
1598    * <p>This operation could block for a long time, so don't call it from a
1599    * time-sensitive thread.
1600    *
1601    * Note that no locking is necessary at this level because compaction only
1602    * conflicts with a region split, and that cannot happen because the region
1603    * server does them sequentially and not in parallel.
1604    *
1605    * @param compaction Compaction details, obtained by requestCompaction()
1606    * @return whether the compaction completed
1607    */
1608   public boolean compact(CompactionContext compaction, Store store,
1609       CompactionThroughputController throughputController) throws IOException {
1610     assert compaction != null && compaction.hasSelection();
1611     assert !compaction.getRequest().getFiles().isEmpty();
1612     if (this.closing.get() || this.closed.get()) {
1613       LOG.debug("Skipping compaction on " + this + " because closing/closed");
1614       store.cancelRequestedCompaction(compaction);
1615       return false;
1616     }
1617     MonitoredTask status = null;
1618     boolean requestNeedsCancellation = true;
1619     // block waiting for the lock for compaction
1620     lock.readLock().lock();
1621     try {
1622       byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1623       if (stores.get(cf) != store) {
1624         LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1625             + " has been re-instantiated, cancel this compaction request. "
1626             + " It may be caused by the roll back of split transaction");
1627         return false;
1628       }
1629 
1630       status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1631       if (this.closed.get()) {
1632         String msg = "Skipping compaction on " + this + " because closed";
1633         LOG.debug(msg);
1634         status.abort(msg);
1635         return false;
1636       }
1637       boolean wasStateSet = false;
1638       try {
1639         synchronized (writestate) {
1640           if (writestate.writesEnabled) {
1641             wasStateSet = true;
1642             ++writestate.compacting;
1643           } else {
1644             String msg = "NOT compacting region " + this + ". Writes disabled.";
1645             LOG.info(msg);
1646             status.abort(msg);
1647             return false;
1648           }
1649         }
1650         LOG.info("Starting compaction on " + store + " in region " + this
1651             + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1652         doRegionCompactionPrep();
1653         try {
1654           status.setStatus("Compacting store " + store);
1655           // We no longer need to cancel the request on the way out of this
1656           // method because Store#compact will clean up unconditionally
1657           requestNeedsCancellation = false;
1658           store.compact(compaction, throughputController);
1659         } catch (InterruptedIOException iioe) {
1660           String msg = "compaction interrupted";
1661           LOG.info(msg, iioe);
1662           status.abort(msg);
1663           return false;
1664         }
1665       } finally {
1666         if (wasStateSet) {
1667           synchronized (writestate) {
1668             --writestate.compacting;
1669             if (writestate.compacting <= 0) {
1670               writestate.notifyAll();
1671             }
1672           }
1673         }
1674       }
1675       status.markComplete("Compaction complete");
1676       return true;
1677     } finally {
1678       try {
1679         if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
1680         if (status != null) status.cleanup();
1681       } finally {
1682         lock.readLock().unlock();
1683       }
1684     }
1685   }
1686 
1687   /**
1688    * Flush all stores.
1689    * <p>
1690    * See {@link #flushcache(boolean)}.
1691    *
1692    * @return whether the flush is success and whether the region needs compacting
1693    * @throws IOException
1694    */
1695   public FlushResult flushcache() throws IOException {
1696     return flushcache(true);
1697   }
1698 
1699   /**
1700    * Flush the cache.
1701    *
1702    * When this method is called the cache will be flushed unless:
1703    * <ol>
1704    *   <li>the cache is empty</li>
1705    *   <li>the region is closed.</li>
1706    *   <li>a flush is already in progress</li>
1707    *   <li>writes are disabled</li>
1708    * </ol>
1709    *
1710    * <p>This method may block for some time, so it should not be called from a
1711    * time-sensitive thread.
1712    * @param forceFlushAllStores whether we want to flush all stores
1713    * @return whether the flush is success and whether the region needs compacting
1714    *
1715    * @throws IOException general io exceptions
1716    * @throws DroppedSnapshotException Thrown when replay of wal is required
1717    * because a Snapshot was not properly persisted.
1718    */
1719   public FlushResult flushcache(boolean forceFlushAllStores) throws IOException {
1720     // fail-fast instead of waiting on the lock
1721     if (this.closing.get()) {
1722       String msg = "Skipping flush on " + this + " because closing";
1723       LOG.debug(msg);
1724       return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1725     }
1726     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1727     status.setStatus("Acquiring readlock on region");
1728     // block waiting for the lock for flushing cache
1729     lock.readLock().lock();
1730     try {
1731       if (this.closed.get()) {
1732         String msg = "Skipping flush on " + this + " because closed";
1733         LOG.debug(msg);
1734         status.abort(msg);
1735         return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1736       }
1737       if (coprocessorHost != null) {
1738         status.setStatus("Running coprocessor pre-flush hooks");
1739         coprocessorHost.preFlush();
1740       }
1741       if (numMutationsWithoutWAL.get() > 0) {
1742         numMutationsWithoutWAL.set(0);
1743         dataInMemoryWithoutWAL.set(0);
1744       }
1745       synchronized (writestate) {
1746         if (!writestate.flushing && writestate.writesEnabled) {
1747           this.writestate.flushing = true;
1748         } else {
1749           if (LOG.isDebugEnabled()) {
1750             LOG.debug("NOT flushing memstore for region " + this
1751                 + ", flushing=" + writestate.flushing + ", writesEnabled="
1752                 + writestate.writesEnabled);
1753           }
1754           String msg = "Not flushing since "
1755               + (writestate.flushing ? "already flushing"
1756               : "writes not enabled");
1757           status.abort(msg);
1758           return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1759         }
1760       }
1761 
1762       try {
1763         Collection<Store> specificStoresToFlush =
1764             forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
1765         FlushResult fs = internalFlushcache(specificStoresToFlush, status);
1766 
1767         if (coprocessorHost != null) {
1768           status.setStatus("Running post-flush coprocessor hooks");
1769           coprocessorHost.postFlush();
1770         }
1771 
1772         status.markComplete("Flush successful");
1773         return fs;
1774       } finally {
1775         synchronized (writestate) {
1776           writestate.flushing = false;
1777           this.writestate.flushRequested = false;
1778           writestate.notifyAll();
1779         }
1780       }
1781     } finally {
1782       lock.readLock().unlock();
1783       status.cleanup();
1784     }
1785   }
1786 
1787   /**
1788    * Should the store be flushed because it is old enough.
1789    * <p>
1790    * Every FlushPolicy should call this to determine whether a store is old enough to flush(except
1791    * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always
1792    * returns true which will make a lot of flush requests.
1793    */
1794   boolean shouldFlushStore(Store store) {
1795     long maxFlushedSeqId =
1796         this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store
1797             .getFamily().getName()) - 1;
1798     if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) {
1799       if (LOG.isDebugEnabled()) {
1800         LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
1801             + " will be flushed because its max flushed seqId(" + maxFlushedSeqId
1802             + ") is far away from current(" + sequenceId.get() + "), max allowed is "
1803             + flushPerChanges);
1804       }
1805       return true;
1806     }
1807     if (flushCheckInterval <= 0) {
1808       return false;
1809     }
1810     long now = EnvironmentEdgeManager.currentTime();
1811     if (store.timeOfOldestEdit() < now - flushCheckInterval) {
1812       if (LOG.isDebugEnabled()) {
1813         LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
1814             + " will be flushed because time of its oldest edit (" + store.timeOfOldestEdit()
1815             + ") is far away from now(" + now + "), max allowed is " + flushCheckInterval);
1816       }
1817       return true;
1818     }
1819     return false;
1820   }
1821 
1822   /**
1823    * Should the memstore be flushed now
1824    */
1825   boolean shouldFlush() {
1826     // This is a rough measure.
1827     if (this.maxFlushedSeqId > 0
1828           && (this.maxFlushedSeqId + this.flushPerChanges < this.sequenceId.get())) {
1829       return true;
1830     }
1831     long modifiedFlushCheckInterval = flushCheckInterval;
1832     if (getRegionInfo().isMetaRegion() &&
1833         getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1834       modifiedFlushCheckInterval = META_CACHE_FLUSH_INTERVAL;
1835     }
1836     if (modifiedFlushCheckInterval <= 0) { //disabled
1837       return false;
1838     }
1839     long now = EnvironmentEdgeManager.currentTime();
1840     //if we flushed in the recent past, we don't need to do again now
1841     if ((now - getEarliestFlushTimeForAllStores() < modifiedFlushCheckInterval)) {
1842       return false;
1843     }
1844     //since we didn't flush in the recent past, flush now if certain conditions
1845     //are met. Return true on first such memstore hit.
1846     for (Store s : this.getStores().values()) {
1847       if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) {
1848         // we have an old enough edit in the memstore, flush
1849         return true;
1850       }
1851     }
1852     return false;
1853   }
1854 
1855   /**
1856    * Flushing all stores.
1857    *
1858    * @see #internalFlushcache(Collection, MonitoredTask)
1859    */
1860   private FlushResult internalFlushcache(MonitoredTask status)
1861       throws IOException {
1862     return internalFlushcache(stores.values(), status);
1863   }
1864 
1865   /**
1866    * Flushing given stores.
1867    *
1868    * @see #internalFlushcache(WAL, long, Collection, MonitoredTask)
1869    */
1870   private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
1871       MonitoredTask status) throws IOException {
1872     return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
1873         status);
1874   }
1875 
1876   /**
1877    * Flush the memstore. Flushing the memstore is a little tricky. We have a lot
1878    * of updates in the memstore, all of which have also been written to the wal.
1879    * We need to write those updates in the memstore out to disk, while being
1880    * able to process reads/writes as much as possible during the flush
1881    * operation.
1882    * <p>
1883    * This method may block for some time. Every time you call it, we up the
1884    * regions sequence id even if we don't flush; i.e. the returned region id
1885    * will be at least one larger than the last edit applied to this region. The
1886    * returned id does not refer to an actual edit. The returned id can be used
1887    * for say installing a bulk loaded file just ahead of the last hfile that was
1888    * the result of this flush, etc.
1889    *
1890    * @param wal
1891    *          Null if we're NOT to go via wal.
1892    * @param myseqid
1893    *          The seqid to use if <code>wal</code> is null writing out flush
1894    *          file.
1895    * @param storesToFlush
1896    *          The list of stores to flush.
1897    * @return object describing the flush's state
1898    * @throws IOException
1899    *           general io exceptions
1900    * @throws DroppedSnapshotException
1901    *           Thrown when replay of wal is required because a Snapshot was not
1902    *           properly persisted.
1903    */
1904   protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
1905       final Collection<Store> storesToFlush, MonitoredTask status) throws IOException {
1906     if (this.rsServices != null && this.rsServices.isAborted()) {
1907       // Don't flush when server aborting, it's unsafe
1908       throw new IOException("Aborting flush because server is aborted...");
1909     }
1910     final long startTime = EnvironmentEdgeManager.currentTime();
1911     // If nothing to flush, return, but we need to safely update the region sequence id
1912     if (this.memstoreSize.get() <= 0) {
1913       // Take an update lock because am about to change the sequence id and we want the sequence id
1914       // to be at the border of the empty memstore.
1915       MultiVersionConsistencyControl.WriteEntry w = null;
1916       this.updatesLock.writeLock().lock();
1917       try {
1918         if (this.memstoreSize.get() <= 0) {
1919           // Presume that if there are still no edits in the memstore, then there are no edits for
1920           // this region out in the WAL subsystem so no need to do any trickery clearing out
1921           // edits in the WAL system. Up the sequence number so the resulting flush id is for
1922           // sure just beyond the last appended region edit (useful as a marker when bulk loading,
1923           // etc.)
1924           // wal can be null replaying edits.
1925           if (wal != null) {
1926             w = mvcc.beginMemstoreInsert();
1927             long flushSeqId = getNextSequenceId(wal);
1928             FlushResult flushResult = new FlushResult(
1929               FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush");
1930             w.setWriteNumber(flushSeqId);
1931             mvcc.waitForPreviousTransactionsComplete(w);
1932             w = null;
1933             return flushResult;
1934           } else {
1935             return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
1936               "Nothing to flush");
1937           }
1938         }
1939       } finally {
1940         this.updatesLock.writeLock().unlock();
1941         if (w != null) {
1942           mvcc.advanceMemstore(w);
1943         }
1944       }
1945     }
1946 
1947     if (LOG.isInfoEnabled()) {
1948       LOG.info("Started memstore flush for " + this + ", current region memstore size "
1949           + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/"
1950           + stores.size() + " column families' memstores are being flushed."
1951           + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid));
1952       // only log when we are not flushing all stores.
1953       if (this.stores.size() > storesToFlush.size()) {
1954         for (Store store: storesToFlush) {
1955           LOG.info("Flushing Column Family: " + store.getColumnFamilyName()
1956               + " which was occupying "
1957               + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore.");
1958         }
1959       }
1960     }
1961     // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
1962     // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
1963     // allow updates again so its value will represent the size of the updates received
1964     // during flush
1965     MultiVersionConsistencyControl.WriteEntry w = null;
1966     // We have to take an update lock during snapshot, or else a write could end up in both snapshot
1967     // and memstore (makes it difficult to do atomic rows then)
1968     status.setStatus("Obtaining lock to block concurrent updates");
1969     // block waiting for the lock for internal flush
1970     this.updatesLock.writeLock().lock();
1971     status.setStatus("Preparing to flush by snapshotting stores in " +
1972       getRegionInfo().getEncodedName());
1973     long totalFlushableSizeOfFlushableStores = 0;
1974 
1975     Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
1976     for (Store store: storesToFlush) {
1977       flushedFamilyNames.add(store.getFamily().getName());
1978     }
1979 
1980     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
1981     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
1982         Bytes.BYTES_COMPARATOR);
1983     // The sequence id of this flush operation which is used to log FlushMarker and pass to
1984     // createFlushContext to use as the store file's sequence id.
1985     long flushOpSeqId = HConstants.NO_SEQNUM;
1986     // The max flushed sequence id after this flush operation. Used as completeSequenceId which is
1987     // passed to HMaster.
1988     long flushedSeqId = HConstants.NO_SEQNUM;
1989     byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
1990 
1991     long trxId = 0;
1992     try {
1993       try {
1994         w = mvcc.beginMemstoreInsert();
1995         if (wal != null) {
1996           if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) {
1997             // This should never happen.
1998             String msg = "Flush will not be started for ["
1999                 + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
2000             status.setStatus(msg);
2001             return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
2002           }
2003           flushOpSeqId = getNextSequenceId(wal);
2004           long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
2005           // no oldestUnflushedSeqId means we flushed all stores.
2006           // or the unflushed stores are all empty.
2007           flushedSeqId = (oldestUnflushedSeqId == HConstants.NO_SEQNUM) ? flushOpSeqId
2008               : oldestUnflushedSeqId - 1;
2009         } else {
2010           // use the provided sequence Id as WAL is not being used for this flush.
2011           flushedSeqId = flushOpSeqId = myseqid;
2012         }
2013 
2014         for (Store s : storesToFlush) {
2015           totalFlushableSizeOfFlushableStores += s.getFlushableSize();
2016           storeFlushCtxs.add(s.createFlushContext(flushOpSeqId));
2017           committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
2018         }
2019 
2020         // write the snapshot start to WAL
2021         if (wal != null) {
2022           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
2023             getRegionInfo(), flushOpSeqId, committedFiles);
2024           // no sync. Sync is below where we do not hold the updates lock
2025           trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2026             desc, sequenceId, false);
2027         }
2028 
2029         // Prepare flush (take a snapshot)
2030         for (StoreFlushContext flush : storeFlushCtxs) {
2031           flush.prepare();
2032         }
2033       } catch (IOException ex) {
2034         if (wal != null) {
2035           if (trxId > 0) { // check whether we have already written START_FLUSH to WAL
2036             try {
2037               FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2038                 getRegionInfo(), flushOpSeqId, committedFiles);
2039               WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2040                 desc, sequenceId, false);
2041             } catch (Throwable t) {
2042               LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
2043                   StringUtils.stringifyException(t));
2044               // ignore this since we will be aborting the RS with DSE.
2045             }
2046           }
2047           // we have called wal.startCacheFlush(), now we have to abort it
2048           wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2049           throw ex; // let upper layers deal with it.
2050         }
2051       } finally {
2052         this.updatesLock.writeLock().unlock();
2053       }
2054       String s = "Finished memstore snapshotting " + this +
2055         ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores;
2056       status.setStatus(s);
2057       if (LOG.isTraceEnabled()) LOG.trace(s);
2058       // sync unflushed WAL changes
2059       // see HBASE-8208 for details
2060       if (wal != null) {
2061         try {
2062           wal.sync(); // ensure that flush marker is sync'ed
2063         } catch (IOException ioe) {
2064           LOG.warn("Unexpected exception while wal.sync(), ignoring. Exception: "
2065               + StringUtils.stringifyException(ioe));
2066         }
2067       }
2068 
2069       // wait for all in-progress transactions to commit to WAL before
2070       // we can start the flush. This prevents
2071       // uncommitted transactions from being written into HFiles.
2072       // We have to block before we start the flush, otherwise keys that
2073       // were removed via a rollbackMemstore could be written to Hfiles.
2074       w.setWriteNumber(flushOpSeqId);
2075       mvcc.waitForPreviousTransactionsComplete(w);
2076       // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
2077       w = null;
2078       s = "Flushing stores of " + this;
2079       status.setStatus(s);
2080       if (LOG.isTraceEnabled()) LOG.trace(s);
2081     } finally {
2082       if (w != null) {
2083         // in case of failure just mark current w as complete
2084         mvcc.advanceMemstore(w);
2085       }
2086     }
2087 
2088     // Any failure from here on out will be catastrophic requiring server
2089     // restart so wal content can be replayed and put back into the memstore.
2090     // Otherwise, the snapshot content while backed up in the wal, it will not
2091     // be part of the current running servers state.
2092     boolean compactionRequested = false;
2093     try {
2094       // A.  Flush memstore to all the HStores.
2095       // Keep running vector of all store files that includes both old and the
2096       // just-made new flush store file. The new flushed file is still in the
2097       // tmp directory.
2098 
2099       for (StoreFlushContext flush : storeFlushCtxs) {
2100         flush.flushCache(status);
2101       }
2102 
2103       // Switch snapshot (in memstore) -> new hfile (thus causing
2104       // all the store scanners to reset/reseek).
2105       Iterator<Store> it = storesToFlush.iterator();
2106       // stores.values() and storeFlushCtxs have same order
2107       for (StoreFlushContext flush : storeFlushCtxs) {
2108         boolean needsCompaction = flush.commit(status);
2109         if (needsCompaction) {
2110           compactionRequested = true;
2111         }
2112         committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles());
2113       }
2114       storeFlushCtxs.clear();
2115 
2116       // Set down the memstore size by amount of flush.
2117       this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
2118 
2119       if (wal != null) {
2120         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
2121         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
2122           getRegionInfo(), flushOpSeqId, committedFiles);
2123         WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2124           desc, sequenceId, true);
2125       }
2126     } catch (Throwable t) {
2127       // An exception here means that the snapshot was not persisted.
2128       // The wal needs to be replayed so its content is restored to memstore.
2129       // Currently, only a server restart will do this.
2130       // We used to only catch IOEs but its possible that we'd get other
2131       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
2132       // all and sundry.
2133       if (wal != null) {
2134         try {
2135           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2136             getRegionInfo(), flushOpSeqId, committedFiles);
2137           WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2138             desc, sequenceId, false);
2139         } catch (Throwable ex) {
2140           LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
2141               StringUtils.stringifyException(ex));
2142           // ignore this since we will be aborting the RS with DSE.
2143         }
2144         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2145       }
2146       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
2147           Bytes.toStringBinary(getRegionName()));
2148       dse.initCause(t);
2149       status.abort("Flush failed: " + StringUtils.stringifyException(t));
2150       throw dse;
2151     }
2152 
2153     // If we get to here, the HStores have been written.
2154     if (wal != null) {
2155       wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2156     }
2157 
2158     // Record latest flush time
2159     for (Store store: storesToFlush) {
2160       this.lastStoreFlushTimeMap.put(store, startTime);
2161     }
2162 
2163     // Update the oldest unflushed sequence id for region.
2164     this.maxFlushedSeqId = flushedSeqId;
2165 
2166     // C. Finally notify anyone waiting on memstore to clear:
2167     // e.g. checkResources().
2168     synchronized (this) {
2169       notifyAll(); // FindBugs NN_NAKED_NOTIFY
2170     }
2171 
2172     long time = EnvironmentEdgeManager.currentTime() - startTime;
2173     long memstoresize = this.memstoreSize.get();
2174     String msg = "Finished memstore flush of ~"
2175         + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
2176         + totalFlushableSizeOfFlushableStores + ", currentsize="
2177         + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
2178         + " for region " + this + " in " + time + "ms, sequenceid="
2179         + flushOpSeqId +  ", compaction requested=" + compactionRequested
2180         + ((wal == null) ? "; wal=null" : "");
2181     LOG.info(msg);
2182     status.setStatus(msg);
2183 
2184     return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
2185         FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId);
2186   }
2187 
2188   /**
2189    * Method to safely get the next sequence number.
2190    * @return Next sequence number unassociated with any actual edit.
2191    * @throws IOException
2192    */
2193   private long getNextSequenceId(final WAL wal) throws IOException {
2194     WALKey key = this.appendEmptyEdit(wal, null);
2195     return key.getSequenceId();
2196   }
2197 
2198   //////////////////////////////////////////////////////////////////////////////
2199   // get() methods for client use.
2200   //////////////////////////////////////////////////////////////////////////////
2201   /**
2202    * Return all the data for the row that matches <i>row</i> exactly,
2203    * or the one that immediately preceeds it, at or immediately before
2204    * <i>ts</i>.
2205    *
2206    * @param row row key
2207    * @return map of values
2208    * @throws IOException
2209    */
2210   Result getClosestRowBefore(final byte [] row)
2211   throws IOException{
2212     return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
2213   }
2214 
2215   /**
2216    * Return all the data for the row that matches <i>row</i> exactly,
2217    * or the one that immediately precedes it, at or immediately before
2218    * <i>ts</i>.
2219    *
2220    * @param row row key
2221    * @param family column family to find on
2222    * @return map of values
2223    * @throws IOException read exceptions
2224    */
2225   public Result getClosestRowBefore(final byte [] row, final byte [] family)
2226   throws IOException {
2227     if (coprocessorHost != null) {
2228       Result result = new Result();
2229       if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
2230         return result;
2231       }
2232     }
2233     // look across all the HStores for this region and determine what the
2234     // closest key is across all column families, since the data may be sparse
2235     checkRow(row, "getClosestRowBefore");
2236     startRegionOperation(Operation.GET);
2237     this.readRequestsCount.increment();
2238     try {
2239       Store store = getStore(family);
2240       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
2241       Cell key = store.getRowKeyAtOrBefore(row);
2242       Result result = null;
2243       if (key != null) {
2244         Get get = new Get(CellUtil.cloneRow(key));
2245         get.addFamily(family);
2246         result = get(get);
2247       }
2248       if (coprocessorHost != null) {
2249         coprocessorHost.postGetClosestRowBefore(row, family, result);
2250       }
2251       return result;
2252     } finally {
2253       closeRegionOperation(Operation.GET);
2254     }
2255   }
2256 
2257   /**
2258    * Return an iterator that scans over the HRegion, returning the indicated
2259    * columns and rows specified by the {@link Scan}.
2260    * <p>
2261    * This Iterator must be closed by the caller.
2262    *
2263    * @param scan configured {@link Scan}
2264    * @return RegionScanner
2265    * @throws IOException read exceptions
2266    */
2267   public RegionScanner getScanner(Scan scan) throws IOException {
2268    return getScanner(scan, null);
2269   }
2270 
2271   void prepareScanner(Scan scan) {
2272     if(!scan.hasFamilies()) {
2273       // Adding all families to scanner
2274       for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
2275         scan.addFamily(family);
2276       }
2277     }
2278   }
2279 
2280   protected RegionScanner getScanner(Scan scan,
2281       List<KeyValueScanner> additionalScanners) throws IOException {
2282     startRegionOperation(Operation.SCAN);
2283     try {
2284       // Verify families are all valid
2285       prepareScanner(scan);
2286       if(scan.hasFamilies()) {
2287         for(byte [] family : scan.getFamilyMap().keySet()) {
2288           checkFamily(family);
2289         }
2290       }
2291       return instantiateRegionScanner(scan, additionalScanners);
2292     } finally {
2293       closeRegionOperation(Operation.SCAN);
2294     }
2295   }
2296 
2297   protected RegionScanner instantiateRegionScanner(Scan scan,
2298       List<KeyValueScanner> additionalScanners) throws IOException {
2299     if (scan.isReversed()) {
2300       if (scan.getFilter() != null) {
2301         scan.getFilter().setReversed(true);
2302       }
2303       return new ReversedRegionScannerImpl(scan, additionalScanners, this);
2304     }
2305     return new RegionScannerImpl(scan, additionalScanners, this);
2306   }
2307 
2308   /*
2309    * @param delete The passed delete is modified by this method. WARNING!
2310    */
2311   void prepareDelete(Delete delete) throws IOException {
2312     // Check to see if this is a deleteRow insert
2313     if(delete.getFamilyCellMap().isEmpty()){
2314       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
2315         // Don't eat the timestamp
2316         delete.addFamily(family, delete.getTimeStamp());
2317       }
2318     } else {
2319       for(byte [] family : delete.getFamilyCellMap().keySet()) {
2320         if(family == null) {
2321           throw new NoSuchColumnFamilyException("Empty family is invalid");
2322         }
2323         checkFamily(family);
2324       }
2325     }
2326   }
2327 
2328   //////////////////////////////////////////////////////////////////////////////
2329   // set() methods for client use.
2330   //////////////////////////////////////////////////////////////////////////////
2331   /**
2332    * @param delete delete object
2333    * @throws IOException read exceptions
2334    */
2335   public void delete(Delete delete)
2336   throws IOException {
2337     checkReadOnly();
2338     checkResources();
2339     startRegionOperation(Operation.DELETE);
2340     try {
2341       delete.getRow();
2342       // All edits for the given row (across all column families) must happen atomically.
2343       doBatchMutate(delete);
2344     } finally {
2345       closeRegionOperation(Operation.DELETE);
2346     }
2347   }
2348 
2349   /**
2350    * Row needed by below method.
2351    */
2352   private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2353   /**
2354    * This is used only by unit tests. Not required to be a public API.
2355    * @param familyMap map of family to edits for the given family.
2356    * @throws IOException
2357    */
2358   void delete(NavigableMap<byte[], List<Cell>> familyMap,
2359       Durability durability) throws IOException {
2360     Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2361     delete.setFamilyCellMap(familyMap);
2362     delete.setDurability(durability);
2363     doBatchMutate(delete);
2364   }
2365 
2366   /**
2367    * Setup correct timestamps in the KVs in Delete object.
2368    * Caller should have the row and region locks.
2369    * @param mutation
2370    * @param familyMap
2371    * @param byteNow
2372    * @throws IOException
2373    */
2374   void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2375       byte[] byteNow) throws IOException {
2376     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2377 
2378       byte[] family = e.getKey();
2379       List<Cell> cells = e.getValue();
2380       assert cells instanceof RandomAccess;
2381 
2382       Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2383       int listSize = cells.size();
2384       for (int i=0; i < listSize; i++) {
2385         Cell cell = cells.get(i);
2386         //  Check if time is LATEST, change to time of most recent addition if so
2387         //  This is expensive.
2388         if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) {
2389           byte[] qual = CellUtil.cloneQualifier(cell);
2390           if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2391 
2392           Integer count = kvCount.get(qual);
2393           if (count == null) {
2394             kvCount.put(qual, 1);
2395           } else {
2396             kvCount.put(qual, count + 1);
2397           }
2398           count = kvCount.get(qual);
2399 
2400           Get get = new Get(CellUtil.cloneRow(cell));
2401           get.setMaxVersions(count);
2402           get.addColumn(family, qual);
2403           if (coprocessorHost != null) {
2404             if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2405                 byteNow, get)) {
2406               updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2407             }
2408           } else {
2409             updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2410           }
2411         } else {
2412           CellUtil.updateLatestStamp(cell, byteNow, 0);
2413         }
2414       }
2415     }
2416   }
2417 
2418   void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow)
2419       throws IOException {
2420     List<Cell> result = get(get, false);
2421 
2422     if (result.size() < count) {
2423       // Nothing to delete
2424       CellUtil.updateLatestStamp(cell, byteNow, 0);
2425       return;
2426     }
2427     if (result.size() > count) {
2428       throw new RuntimeException("Unexpected size: " + result.size());
2429     }
2430     Cell getCell = result.get(count - 1);
2431     CellUtil.setTimestamp(cell, getCell.getTimestamp());
2432   }
2433 
2434   /**
2435    * @throws IOException
2436    */
2437   public void put(Put put)
2438   throws IOException {
2439     checkReadOnly();
2440 
2441     // Do a rough check that we have resources to accept a write.  The check is
2442     // 'rough' in that between the resource check and the call to obtain a
2443     // read lock, resources may run out.  For now, the thought is that this
2444     // will be extremely rare; we'll deal with it when it happens.
2445     checkResources();
2446     startRegionOperation(Operation.PUT);
2447     try {
2448       // All edits for the given row (across all column families) must happen atomically.
2449       doBatchMutate(put);
2450     } finally {
2451       closeRegionOperation(Operation.PUT);
2452     }
2453   }
2454 
2455   /**
2456    * Struct-like class that tracks the progress of a batch operation,
2457    * accumulating status codes and tracking the index at which processing
2458    * is proceeding.
2459    */
2460   private abstract static class BatchOperationInProgress<T> {
2461     T[] operations;
2462     int nextIndexToProcess = 0;
2463     OperationStatus[] retCodeDetails;
2464     WALEdit[] walEditsFromCoprocessors;
2465 
2466     public BatchOperationInProgress(T[] operations) {
2467       this.operations = operations;
2468       this.retCodeDetails = new OperationStatus[operations.length];
2469       this.walEditsFromCoprocessors = new WALEdit[operations.length];
2470       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2471     }
2472 
2473     public abstract Mutation getMutation(int index);
2474     public abstract long getNonceGroup(int index);
2475     public abstract long getNonce(int index);
2476     /** This method is potentially expensive and should only be used for non-replay CP path. */
2477     public abstract Mutation[] getMutationsForCoprocs();
2478     public abstract boolean isInReplay();
2479     public abstract long getReplaySequenceId();
2480 
2481     public boolean isDone() {
2482       return nextIndexToProcess == operations.length;
2483     }
2484   }
2485 
2486   private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2487     private long nonceGroup;
2488     private long nonce;
2489     public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2490       super(operations);
2491       this.nonceGroup = nonceGroup;
2492       this.nonce = nonce;
2493     }
2494 
2495     @Override
2496     public Mutation getMutation(int index) {
2497       return this.operations[index];
2498     }
2499 
2500     @Override
2501     public long getNonceGroup(int index) {
2502       return nonceGroup;
2503     }
2504 
2505     @Override
2506     public long getNonce(int index) {
2507       return nonce;
2508     }
2509 
2510     @Override
2511     public Mutation[] getMutationsForCoprocs() {
2512       return this.operations;
2513     }
2514 
2515     @Override
2516     public boolean isInReplay() {
2517       return false;
2518     }
2519 
2520     @Override
2521     public long getReplaySequenceId() {
2522       return 0;
2523     }
2524   }
2525 
2526   private static class ReplayBatch extends BatchOperationInProgress<MutationReplay> {
2527     private long replaySeqId = 0;
2528     public ReplayBatch(MutationReplay[] operations, long seqId) {
2529       super(operations);
2530       this.replaySeqId = seqId;
2531     }
2532 
2533     @Override
2534     public Mutation getMutation(int index) {
2535       return this.operations[index].mutation;
2536     }
2537 
2538     @Override
2539     public long getNonceGroup(int index) {
2540       return this.operations[index].nonceGroup;
2541     }
2542 
2543     @Override
2544     public long getNonce(int index) {
2545       return this.operations[index].nonce;
2546     }
2547 
2548     @Override
2549     public Mutation[] getMutationsForCoprocs() {
2550       assert false;
2551       throw new RuntimeException("Should not be called for replay batch");
2552     }
2553 
2554     @Override
2555     public boolean isInReplay() {
2556       return true;
2557     }
2558 
2559     @Override
2560     public long getReplaySequenceId() {
2561       return this.replaySeqId;
2562     }
2563   }
2564 
2565   /**
2566    * Perform a batch of mutations.
2567    * It supports only Put and Delete mutations and will ignore other types passed.
2568    * @param mutations the list of mutations
2569    * @return an array of OperationStatus which internally contains the
2570    *         OperationStatusCode and the exceptionMessage if any.
2571    * @throws IOException
2572    */
2573   public OperationStatus[] batchMutate(
2574       Mutation[] mutations, long nonceGroup, long nonce) throws IOException {
2575     // As it stands, this is used for 3 things
2576     //  * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
2577     //  * coprocessor calls (see ex. BulkDeleteEndpoint).
2578     // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
2579     return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2580   }
2581 
2582   public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2583     return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2584   }
2585 
2586   /**
2587    * Replay a batch of mutations.
2588    * @param mutations mutations to replay.
2589    * @param replaySeqId SeqId for current mutations
2590    * @return an array of OperationStatus which internally contains the
2591    *         OperationStatusCode and the exceptionMessage if any.
2592    * @throws IOException
2593    */
2594   public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
2595       throws IOException {
2596     return batchMutate(new ReplayBatch(mutations, replaySeqId));
2597   }
2598 
2599   /**
2600    * Perform a batch of mutations.
2601    * It supports only Put and Delete mutations and will ignore other types passed.
2602    * @param batchOp contains the list of mutations
2603    * @return an array of OperationStatus which internally contains the
2604    *         OperationStatusCode and the exceptionMessage if any.
2605    * @throws IOException
2606    */
2607   OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2608     boolean initialized = false;
2609     Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
2610     startRegionOperation(op);
2611     try {
2612       while (!batchOp.isDone()) {
2613         if (!batchOp.isInReplay()) {
2614           checkReadOnly();
2615         }
2616         checkResources();
2617 
2618         if (!initialized) {
2619           this.writeRequestsCount.add(batchOp.operations.length);
2620           if (!batchOp.isInReplay()) {
2621             doPreMutationHook(batchOp);
2622           }
2623           initialized = true;
2624         }
2625         long addedSize = doMiniBatchMutation(batchOp);
2626         long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2627         if (isFlushSize(newSize)) {
2628           requestFlush();
2629         }
2630       }
2631     } finally {
2632       closeRegionOperation(op);
2633     }
2634     return batchOp.retCodeDetails;
2635   }
2636 
2637 
2638   private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2639       throws IOException {
2640     /* Run coprocessor pre hook outside of locks to avoid deadlock */
2641     WALEdit walEdit = new WALEdit();
2642     if (coprocessorHost != null) {
2643       for (int i = 0 ; i < batchOp.operations.length; i++) {
2644         Mutation m = batchOp.getMutation(i);
2645         if (m instanceof Put) {
2646           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2647             // pre hook says skip this Put
2648             // mark as success and skip in doMiniBatchMutation
2649             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2650           }
2651         } else if (m instanceof Delete) {
2652           Delete curDel = (Delete) m;
2653           if (curDel.getFamilyCellMap().isEmpty()) {
2654             // handle deleting a row case
2655             prepareDelete(curDel);
2656           }
2657           if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2658             // pre hook says skip this Delete
2659             // mark as success and skip in doMiniBatchMutation
2660             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2661           }
2662         } else {
2663           // In case of passing Append mutations along with the Puts and Deletes in batchMutate
2664           // mark the operation return code as failure so that it will not be considered in
2665           // the doMiniBatchMutation
2666           batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2667               "Put/Delete mutations only supported in batchMutate() now");
2668         }
2669         if (!walEdit.isEmpty()) {
2670           batchOp.walEditsFromCoprocessors[i] = walEdit;
2671           walEdit = new WALEdit();
2672         }
2673       }
2674     }
2675   }
2676 
2677   @SuppressWarnings("unchecked")
2678   private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2679     boolean isInReplay = batchOp.isInReplay();
2680     // variable to note if all Put items are for the same CF -- metrics related
2681     boolean putsCfSetConsistent = true;
2682     //The set of columnFamilies first seen for Put.
2683     Set<byte[]> putsCfSet = null;
2684     // variable to note if all Delete items are for the same CF -- metrics related
2685     boolean deletesCfSetConsistent = true;
2686     //The set of columnFamilies first seen for Delete.
2687     Set<byte[]> deletesCfSet = null;
2688 
2689     long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2690     WALEdit walEdit = new WALEdit(isInReplay);
2691     MultiVersionConsistencyControl.WriteEntry w = null;
2692     long txid = 0;
2693     boolean doRollBackMemstore = false;
2694     boolean locked = false;
2695 
2696     /** Keep track of the locks we hold so we can release them in finally clause */
2697     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2698     // reference family maps directly so coprocessors can mutate them if desired
2699     Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2700     List<Cell> memstoreCells = new ArrayList<Cell>();
2701     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
2702     int firstIndex = batchOp.nextIndexToProcess;
2703     int lastIndexExclusive = firstIndex;
2704     boolean success = false;
2705     int noOfPuts = 0, noOfDeletes = 0;
2706     WALKey walKey = null;
2707     long mvccNum = 0;
2708     try {
2709       // ------------------------------------
2710       // STEP 1. Try to acquire as many locks as we can, and ensure
2711       // we acquire at least one.
2712       // ----------------------------------
2713       int numReadyToWrite = 0;
2714       long now = EnvironmentEdgeManager.currentTime();
2715       while (lastIndexExclusive < batchOp.operations.length) {
2716         Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2717         boolean isPutMutation = mutation instanceof Put;
2718 
2719         Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2720         // store the family map reference to allow for mutations
2721         familyMaps[lastIndexExclusive] = familyMap;
2722 
2723         // skip anything that "ran" already
2724         if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2725             != OperationStatusCode.NOT_RUN) {
2726           lastIndexExclusive++;
2727           continue;
2728         }
2729 
2730         try {
2731           if (isPutMutation) {
2732             // Check the families in the put. If bad, skip this one.
2733             if (isInReplay) {
2734               removeNonExistentColumnFamilyForReplay(familyMap);
2735             } else {
2736               checkFamilies(familyMap.keySet());
2737             }
2738             checkTimestamps(mutation.getFamilyCellMap(), now);
2739           } else {
2740             prepareDelete((Delete) mutation);
2741           }
2742         } catch (NoSuchColumnFamilyException nscf) {
2743           LOG.warn("No such column family in batch mutation", nscf);
2744           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2745               OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2746           lastIndexExclusive++;
2747           continue;
2748         } catch (FailedSanityCheckException fsce) {
2749           LOG.warn("Batch Mutation did not pass sanity check", fsce);
2750           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2751               OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2752           lastIndexExclusive++;
2753           continue;
2754         }
2755 
2756         // If we haven't got any rows in our batch, we should block to
2757         // get the next one.
2758         boolean shouldBlock = numReadyToWrite == 0;
2759         RowLock rowLock = null;
2760         try {
2761           rowLock = getRowLockInternal(mutation.getRow(), shouldBlock);
2762         } catch (IOException ioe) {
2763           LOG.warn("Failed getting lock in batch put, row="
2764             + Bytes.toStringBinary(mutation.getRow()), ioe);
2765         }
2766         if (rowLock == null) {
2767           // We failed to grab another lock
2768           assert !shouldBlock : "Should never fail to get lock when blocking";
2769           break; // stop acquiring more rows for this batch
2770         } else {
2771           acquiredRowLocks.add(rowLock);
2772         }
2773 
2774         lastIndexExclusive++;
2775         numReadyToWrite++;
2776 
2777         if (isPutMutation) {
2778           // If Column Families stay consistent through out all of the
2779           // individual puts then metrics can be reported as a mutliput across
2780           // column families in the first put.
2781           if (putsCfSet == null) {
2782             putsCfSet = mutation.getFamilyCellMap().keySet();
2783           } else {
2784             putsCfSetConsistent = putsCfSetConsistent
2785                 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
2786           }
2787         } else {
2788           if (deletesCfSet == null) {
2789             deletesCfSet = mutation.getFamilyCellMap().keySet();
2790           } else {
2791             deletesCfSetConsistent = deletesCfSetConsistent
2792                 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
2793           }
2794         }
2795       }
2796 
2797       // we should record the timestamp only after we have acquired the rowLock,
2798       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
2799       now = EnvironmentEdgeManager.currentTime();
2800       byte[] byteNow = Bytes.toBytes(now);
2801 
2802       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
2803       if (numReadyToWrite <= 0) return 0L;
2804 
2805       // We've now grabbed as many mutations off the list as we can
2806 
2807       // ------------------------------------
2808       // STEP 2. Update any LATEST_TIMESTAMP timestamps
2809       // ----------------------------------
2810       for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
2811         // skip invalid
2812         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2813             != OperationStatusCode.NOT_RUN) continue;
2814 
2815         Mutation mutation = batchOp.getMutation(i);
2816         if (mutation instanceof Put) {
2817           updateCellTimestamps(familyMaps[i].values(), byteNow);
2818           noOfPuts++;
2819         } else {
2820           prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
2821           noOfDeletes++;
2822         }
2823         rewriteCellTags(familyMaps[i], mutation);
2824       }
2825 
2826       lock(this.updatesLock.readLock(), numReadyToWrite);
2827       locked = true;
2828       if(isInReplay) {
2829         mvccNum = batchOp.getReplaySequenceId();
2830       } else {
2831         mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
2832       }
2833       //
2834       // ------------------------------------
2835       // Acquire the latest mvcc number
2836       // ----------------------------------
2837       w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
2838 
2839       // calling the pre CP hook for batch mutation
2840       if (!isInReplay && coprocessorHost != null) {
2841         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2842           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2843           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2844         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2845       }
2846 
2847       // ------------------------------------
2848       // STEP 3. Write back to memstore
2849       // Write to memstore. It is ok to write to memstore
2850       // first without updating the WAL because we do not roll
2851       // forward the memstore MVCC. The MVCC will be moved up when
2852       // the complete operation is done. These changes are not yet
2853       // visible to scanners till we update the MVCC. The MVCC is
2854       // moved only when the sync is complete.
2855       // ----------------------------------
2856       long addedSize = 0;
2857       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2858         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2859             != OperationStatusCode.NOT_RUN) {
2860           continue;
2861         }
2862         doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
2863         addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay);
2864       }
2865 
2866       // ------------------------------------
2867       // STEP 4. Build WAL edit
2868       // ----------------------------------
2869       Durability durability = Durability.USE_DEFAULT;
2870       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2871         // Skip puts that were determined to be invalid during preprocessing
2872         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2873             != OperationStatusCode.NOT_RUN) {
2874           continue;
2875         }
2876         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2877 
2878         Mutation m = batchOp.getMutation(i);
2879         Durability tmpDur = getEffectiveDurability(m.getDurability());
2880         if (tmpDur.ordinal() > durability.ordinal()) {
2881           durability = tmpDur;
2882         }
2883         if (tmpDur == Durability.SKIP_WAL) {
2884           recordMutationWithoutWal(m.getFamilyCellMap());
2885           continue;
2886         }
2887 
2888         long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
2889         // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
2890         // Given how nonces are originally written, these should be contiguous.
2891         // They don't have to be, it will still work, just write more WALEdits than needed.
2892         if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
2893           if (walEdit.size() > 0) {
2894             assert isInReplay;
2895             if (!isInReplay) {
2896               throw new IOException("Multiple nonces per batch and not in replay");
2897             }
2898             // txid should always increase, so having the one from the last call is ok.
2899             // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
2900             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2901               this.htableDescriptor.getTableName(), now, m.getClusterIds(),
2902               currentNonceGroup, currentNonce);
2903             txid = this.wal.append(this.htableDescriptor,  this.getRegionInfo(),  walKey,
2904               walEdit, getSequenceId(), true, null);
2905             walEdit = new WALEdit(isInReplay);
2906             walKey = null;
2907           }
2908           currentNonceGroup = nonceGroup;
2909           currentNonce = nonce;
2910         }
2911 
2912         // Add WAL edits by CP
2913         WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2914         if (fromCP != null) {
2915           for (Cell cell : fromCP.getCells()) {
2916             walEdit.add(cell);
2917           }
2918         }
2919         addFamilyMapToWALEdit(familyMaps[i], walEdit);
2920       }
2921 
2922       // -------------------------
2923       // STEP 5. Append the final edit to WAL. Do not sync wal.
2924       // -------------------------
2925       Mutation mutation = batchOp.getMutation(firstIndex);
2926       if (walEdit.size() > 0) {
2927         // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
2928         walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2929             this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
2930             mutation.getClusterIds(), currentNonceGroup, currentNonce);
2931         if(isInReplay) {
2932           walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
2933         }
2934         txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
2935           getSequenceId(), true, memstoreCells);
2936       }
2937       if(walKey == null){
2938         // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
2939         walKey = this.appendEmptyEdit(this.wal, memstoreCells);
2940       }
2941 
2942       // -------------------------------
2943       // STEP 6. Release row locks, etc.
2944       // -------------------------------
2945       if (locked) {
2946         this.updatesLock.readLock().unlock();
2947         locked = false;
2948       }
2949       releaseRowLocks(acquiredRowLocks);
2950 
2951       // -------------------------
2952       // STEP 7. Sync wal.
2953       // -------------------------
2954       if (txid != 0) {
2955         syncOrDefer(txid, durability);
2956       }
2957 
2958       doRollBackMemstore = false;
2959       // calling the post CP hook for batch mutation
2960       if (!isInReplay && coprocessorHost != null) {
2961         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2962           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2963           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2964         coprocessorHost.postBatchMutate(miniBatchOp);
2965       }
2966 
2967 
2968       // ------------------------------------------------------------------
2969       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
2970       // ------------------------------------------------------------------
2971       if (w != null) {
2972         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2973         w = null;
2974       }
2975 
2976       // ------------------------------------
2977       // STEP 9. Run coprocessor post hooks. This should be done after the wal is
2978       // synced so that the coprocessor contract is adhered to.
2979       // ------------------------------------
2980       if (!isInReplay && coprocessorHost != null) {
2981         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2982           // only for successful puts
2983           if (batchOp.retCodeDetails[i].getOperationStatusCode()
2984               != OperationStatusCode.SUCCESS) {
2985             continue;
2986           }
2987           Mutation m = batchOp.getMutation(i);
2988           if (m instanceof Put) {
2989             coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
2990           } else {
2991             coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
2992           }
2993         }
2994       }
2995 
2996       success = true;
2997       return addedSize;
2998     } finally {
2999       // if the wal sync was unsuccessful, remove keys from memstore
3000       if (doRollBackMemstore) {
3001         rollbackMemstore(memstoreCells);
3002       }
3003       if (w != null) {
3004         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
3005       }
3006 
3007       if (locked) {
3008         this.updatesLock.readLock().unlock();
3009       }
3010       releaseRowLocks(acquiredRowLocks);
3011 
3012       // See if the column families were consistent through the whole thing.
3013       // if they were then keep them. If they were not then pass a null.
3014       // null will be treated as unknown.
3015       // Total time taken might be involving Puts and Deletes.
3016       // Split the time for puts and deletes based on the total number of Puts and Deletes.
3017 
3018       if (noOfPuts > 0) {
3019         // There were some Puts in the batch.
3020         if (this.metricsRegion != null) {
3021           this.metricsRegion.updatePut();
3022         }
3023       }
3024       if (noOfDeletes > 0) {
3025         // There were some Deletes in the batch.
3026         if (this.metricsRegion != null) {
3027           this.metricsRegion.updateDelete();
3028         }
3029       }
3030       if (!success) {
3031         for (int i = firstIndex; i < lastIndexExclusive; i++) {
3032           if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
3033             batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
3034           }
3035         }
3036       }
3037       if (coprocessorHost != null && !batchOp.isInReplay()) {
3038         // call the coprocessor hook to do any finalization steps
3039         // after the put is done
3040         MiniBatchOperationInProgress<Mutation> miniBatchOp =
3041             new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3042                 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
3043                 lastIndexExclusive);
3044         coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
3045       }
3046 
3047       batchOp.nextIndexToProcess = lastIndexExclusive;
3048     }
3049   }
3050 
3051   /**
3052    * Returns effective durability from the passed durability and
3053    * the table descriptor.
3054    */
3055   protected Durability getEffectiveDurability(Durability d) {
3056     return d == Durability.USE_DEFAULT ? this.durability : d;
3057   }
3058 
3059   //TODO, Think that gets/puts and deletes should be refactored a bit so that
3060   //the getting of the lock happens before, so that you would just pass it into
3061   //the methods. So in the case of checkAndMutate you could just do lockRow,
3062   //get, put, unlockRow or something
3063   /**
3064    *
3065    * @throws IOException
3066    * @return true if the new put was executed, false otherwise
3067    */
3068   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
3069       CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
3070       boolean writeToWAL)
3071   throws IOException{
3072     checkReadOnly();
3073     //TODO, add check for value length or maybe even better move this to the
3074     //client if this becomes a global setting
3075     checkResources();
3076     boolean isPut = w instanceof Put;
3077     if (!isPut && !(w instanceof Delete))
3078       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
3079           "be Put or Delete");
3080     if (!Bytes.equals(row, w.getRow())) {
3081       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
3082           "getRow must match the passed row");
3083     }
3084 
3085     startRegionOperation();
3086     try {
3087       Get get = new Get(row);
3088       checkFamily(family);
3089       get.addColumn(family, qualifier);
3090 
3091       // Lock row - note that doBatchMutate will relock this row if called
3092       RowLock rowLock = getRowLock(get.getRow());
3093       // wait for all previous transactions to complete (with lock held)
3094       mvcc.waitForPreviousTransactionsComplete();
3095       try {
3096         if (this.getCoprocessorHost() != null) {
3097           Boolean processed = null;
3098           if (w instanceof Put) {
3099             processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
3100                 qualifier, compareOp, comparator, (Put) w);
3101           } else if (w instanceof Delete) {
3102             processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
3103                 qualifier, compareOp, comparator, (Delete) w);
3104           }
3105           if (processed != null) {
3106             return processed;
3107           }
3108         }
3109         List<Cell> result = get(get, false);
3110 
3111         boolean valueIsNull = comparator.getValue() == null ||
3112           comparator.getValue().length == 0;
3113         boolean matches = false;
3114         if (result.size() == 0 && valueIsNull) {
3115           matches = true;
3116         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3117             valueIsNull) {
3118           matches = true;
3119         } else if (result.size() == 1 && !valueIsNull) {
3120           Cell kv = result.get(0);
3121           int compareResult = comparator.compareTo(kv.getValueArray(),
3122               kv.getValueOffset(), kv.getValueLength());
3123           switch (compareOp) {
3124           case LESS:
3125             matches = compareResult < 0;
3126             break;
3127           case LESS_OR_EQUAL:
3128             matches = compareResult <= 0;
3129             break;
3130           case EQUAL:
3131             matches = compareResult == 0;
3132             break;
3133           case NOT_EQUAL:
3134             matches = compareResult != 0;
3135             break;
3136           case GREATER_OR_EQUAL:
3137             matches = compareResult >= 0;
3138             break;
3139           case GREATER:
3140             matches = compareResult > 0;
3141             break;
3142           default:
3143             throw new RuntimeException("Unknown Compare op " + compareOp.name());
3144           }
3145         }
3146         //If matches put the new put or delete the new delete
3147         if (matches) {
3148           // All edits for the given row (across all column families) must
3149           // happen atomically.
3150           doBatchMutate(w);
3151           this.checkAndMutateChecksPassed.increment();
3152           return true;
3153         }
3154         this.checkAndMutateChecksFailed.increment();
3155         return false;
3156       } finally {
3157         rowLock.release();
3158       }
3159     } finally {
3160       closeRegionOperation();
3161     }
3162   }
3163 
3164   //TODO, Think that gets/puts and deletes should be refactored a bit so that
3165   //the getting of the lock happens before, so that you would just pass it into
3166   //the methods. So in the case of checkAndMutate you could just do lockRow,
3167   //get, put, unlockRow or something
3168   /**
3169    *
3170    * @throws IOException
3171    * @return true if the new put was executed, false otherwise
3172    */
3173   public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
3174       CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
3175       boolean writeToWAL)
3176       throws IOException{
3177     checkReadOnly();
3178     //TODO, add check for value length or maybe even better move this to the
3179     //client if this becomes a global setting
3180     checkResources();
3181 
3182     startRegionOperation();
3183     try {
3184       Get get = new Get(row);
3185       checkFamily(family);
3186       get.addColumn(family, qualifier);
3187 
3188       // Lock row - note that doBatchMutate will relock this row if called
3189       RowLock rowLock = getRowLock(get.getRow());
3190       // wait for all previous transactions to complete (with lock held)
3191       mvcc.waitForPreviousTransactionsComplete();
3192       try {
3193         List<Cell> result = get(get, false);
3194 
3195         boolean valueIsNull = comparator.getValue() == null ||
3196             comparator.getValue().length == 0;
3197         boolean matches = false;
3198         if (result.size() == 0 && valueIsNull) {
3199           matches = true;
3200         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3201             valueIsNull) {
3202           matches = true;
3203         } else if (result.size() == 1 && !valueIsNull) {
3204           Cell kv = result.get(0);
3205           int compareResult = comparator.compareTo(kv.getValueArray(),
3206               kv.getValueOffset(), kv.getValueLength());
3207           switch (compareOp) {
3208           case LESS:
3209             matches = compareResult < 0;
3210             break;
3211           case LESS_OR_EQUAL:
3212             matches = compareResult <= 0;
3213             break;
3214           case EQUAL:
3215             matches = compareResult == 0;
3216             break;
3217           case NOT_EQUAL:
3218             matches = compareResult != 0;
3219             break;
3220           case GREATER_OR_EQUAL:
3221             matches = compareResult >= 0;
3222             break;
3223           case GREATER:
3224             matches = compareResult > 0;
3225             break;
3226           default:
3227             throw new RuntimeException("Unknown Compare op " + compareOp.name());
3228           }
3229         }
3230         //If matches put the new put or delete the new delete
3231         if (matches) {
3232           // All edits for the given row (across all column families) must
3233           // happen atomically.
3234           mutateRow(rm);
3235           this.checkAndMutateChecksPassed.increment();
3236           return true;
3237         }
3238         this.checkAndMutateChecksFailed.increment();
3239         return false;
3240       } finally {
3241         rowLock.release();
3242       }
3243     } finally {
3244       closeRegionOperation();
3245     }
3246   }
3247   private void doBatchMutate(Mutation mutation) throws IOException {
3248     // Currently this is only called for puts and deletes, so no nonces.
3249     OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
3250         HConstants.NO_NONCE, HConstants.NO_NONCE);
3251     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
3252       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
3253     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
3254       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
3255     }
3256   }
3257 
3258   /**
3259    * Complete taking the snapshot on the region. Writes the region info and adds references to the
3260    * working snapshot directory.
3261    *
3262    * TODO for api consistency, consider adding another version with no {@link ForeignExceptionSnare}
3263    * arg.  (In the future other cancellable HRegion methods could eventually add a
3264    * {@link ForeignExceptionSnare}, or we could do something fancier).
3265    *
3266    * @param desc snapshot description object
3267    * @param exnSnare ForeignExceptionSnare that captures external exceptions in case we need to
3268    *   bail out.  This is allowed to be null and will just be ignored in that case.
3269    * @throws IOException if there is an external or internal error causing the snapshot to fail
3270    */
3271   public void addRegionToSnapshot(SnapshotDescription desc,
3272       ForeignExceptionSnare exnSnare) throws IOException {
3273     Path rootDir = FSUtils.getRootDir(conf);
3274     Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
3275 
3276     SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
3277                                                         snapshotDir, desc, exnSnare);
3278     manifest.addRegion(this);
3279   }
3280 
3281   /**
3282    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the
3283    * provided current timestamp.
3284    * @throws IOException
3285    */
3286   void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
3287       throws IOException {
3288     for (List<Cell> cells: cellItr) {
3289       if (cells == null) continue;
3290       assert cells instanceof RandomAccess;
3291       int listSize = cells.size();
3292       for (int i = 0; i < listSize; i++) {
3293         CellUtil.updateLatestStamp(cells.get(i), now, 0);
3294       }
3295     }
3296   }
3297 
3298   /**
3299    * Possibly rewrite incoming cell tags.
3300    */
3301   void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
3302     // Check if we have any work to do and early out otherwise
3303     // Update these checks as more logic is added here
3304 
3305     if (m.getTTL() == Long.MAX_VALUE) {
3306       return;
3307     }
3308 
3309     // From this point we know we have some work to do
3310 
3311     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
3312       List<Cell> cells = e.getValue();
3313       assert cells instanceof RandomAccess;
3314       int listSize = cells.size();
3315       for (int i = 0; i < listSize; i++) {
3316         Cell cell = cells.get(i);
3317         List<Tag> newTags = new ArrayList<Tag>();
3318         Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
3319           cell.getTagsOffset(), cell.getTagsLength());
3320 
3321         // Carry forward existing tags
3322 
3323         while (tagIterator.hasNext()) {
3324 
3325           // Add any filters or tag specific rewrites here
3326 
3327           newTags.add(tagIterator.next());
3328         }
3329 
3330         // Cell TTL handling
3331 
3332         // Check again if we need to add a cell TTL because early out logic
3333         // above may change when there are more tag based features in core.
3334         if (m.getTTL() != Long.MAX_VALUE) {
3335           // Add a cell TTL tag
3336           newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
3337         }
3338 
3339         // Rewrite the cell with the updated set of tags
3340 
3341         cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
3342           cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3343           cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
3344           cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
3345           cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
3346           newTags));
3347       }
3348     }
3349   }
3350 
3351   /*
3352    * Check if resources to support an update.
3353    *
3354    * We throw RegionTooBusyException if above memstore limit
3355    * and expect client to retry using some kind of backoff
3356   */
3357   private void checkResources() throws RegionTooBusyException {
3358     // If catalog region, do not impose resource constraints or block updates.
3359     if (this.getRegionInfo().isMetaRegion()) return;
3360 
3361     if (this.memstoreSize.get() > this.blockingMemStoreSize) {
3362       blockedRequestsCount.increment();
3363       requestFlush();
3364       throw new RegionTooBusyException("Above memstore limit, " +
3365           "regionName=" + (this.getRegionInfo() == null ? "unknown" :
3366           this.getRegionInfo().getRegionNameAsString()) +
3367           ", server=" + (this.getRegionServerServices() == null ? "unknown" :
3368           this.getRegionServerServices().getServerName()) +
3369           ", memstoreSize=" + memstoreSize.get() +
3370           ", blockingMemStoreSize=" + blockingMemStoreSize);
3371     }
3372   }
3373 
3374   /**
3375    * @throws IOException Throws exception if region is in read-only mode.
3376    */
3377   protected void checkReadOnly() throws IOException {
3378     if (this.writestate.isReadOnly()) {
3379       throw new IOException("region is read only");
3380     }
3381   }
3382 
3383   protected void checkReadsEnabled() throws IOException {
3384     if (!this.writestate.readsEnabled) {
3385       throw new IOException ("The region's reads are disabled. Cannot serve the request");
3386     }
3387   }
3388 
3389   public void setReadsEnabled(boolean readsEnabled) {
3390     this.writestate.setReadsEnabled(readsEnabled);
3391   }
3392 
3393   /**
3394    * Add updates first to the wal and then add values to memstore.
3395    * Warning: Assumption is caller has lock on passed in row.
3396    * @param edits Cell updates by column
3397    * @throws IOException
3398    */
3399   private void put(final byte [] row, byte [] family, List<Cell> edits)
3400   throws IOException {
3401     NavigableMap<byte[], List<Cell>> familyMap;
3402     familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
3403 
3404     familyMap.put(family, edits);
3405     Put p = new Put(row);
3406     p.setFamilyCellMap(familyMap);
3407     doBatchMutate(p);
3408   }
3409 
3410   /**
3411    * Atomically apply the given map of family->edits to the memstore.
3412    * This handles the consistency control on its own, but the caller
3413    * should already have locked updatesLock.readLock(). This also does
3414    * <b>not</b> check the families for validity.
3415    *
3416    * @param familyMap Map of kvs per family
3417    * @param mvccNum The MVCC for this transaction.
3418    * @param isInReplay true when adding replayed KVs into memstore
3419    * @return the additional memory usage of the memstore caused by the
3420    * new entries.
3421    */
3422   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
3423     long mvccNum, List<Cell> memstoreCells, boolean isInReplay) throws IOException {
3424     long size = 0;
3425 
3426     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3427       byte[] family = e.getKey();
3428       List<Cell> cells = e.getValue();
3429       assert cells instanceof RandomAccess;
3430       Store store = getStore(family);
3431       int listSize = cells.size();
3432       for (int i=0; i < listSize; i++) {
3433         Cell cell = cells.get(i);
3434         CellUtil.setSequenceId(cell, mvccNum);
3435         Pair<Long, Cell> ret = store.add(cell);
3436         size += ret.getFirst();
3437         memstoreCells.add(ret.getSecond());
3438         if(isInReplay) {
3439           // set memstore newly added cells with replay mvcc number
3440           CellUtil.setSequenceId(ret.getSecond(), mvccNum);
3441         }
3442       }
3443     }
3444 
3445      return size;
3446    }
3447 
3448   /**
3449    * Remove all the keys listed in the map from the memstore. This method is
3450    * called when a Put/Delete has updated memstore but subsequently fails to update
3451    * the wal. This method is then invoked to rollback the memstore.
3452    */
3453   private void rollbackMemstore(List<Cell> memstoreCells) {
3454     int kvsRolledback = 0;
3455 
3456     for (Cell cell : memstoreCells) {
3457       byte[] family = CellUtil.cloneFamily(cell);
3458       Store store = getStore(family);
3459       store.rollback(cell);
3460       kvsRolledback++;
3461     }
3462     LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
3463   }
3464 
3465   /**
3466    * Check the collection of families for validity.
3467    * @throws NoSuchColumnFamilyException if a family does not exist.
3468    */
3469   void checkFamilies(Collection<byte[]> families)
3470   throws NoSuchColumnFamilyException {
3471     for (byte[] family : families) {
3472       checkFamily(family);
3473     }
3474   }
3475 
3476   /**
3477    * During replay, there could exist column families which are removed between region server
3478    * failure and replay
3479    */
3480   private void removeNonExistentColumnFamilyForReplay(
3481       final Map<byte[], List<Cell>> familyMap) {
3482     List<byte[]> nonExistentList = null;
3483     for (byte[] family : familyMap.keySet()) {
3484       if (!this.htableDescriptor.hasFamily(family)) {
3485         if (nonExistentList == null) {
3486           nonExistentList = new ArrayList<byte[]>();
3487         }
3488         nonExistentList.add(family);
3489       }
3490     }
3491     if (nonExistentList != null) {
3492       for (byte[] family : nonExistentList) {
3493         // Perhaps schema was changed between crash and replay
3494         LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3495         familyMap.remove(family);
3496       }
3497     }
3498   }
3499 
3500   void checkTimestamps(final Map<byte[], List<Cell>> familyMap,
3501       long now) throws FailedSanityCheckException {
3502     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3503       return;
3504     }
3505     long maxTs = now + timestampSlop;
3506     for (List<Cell> kvs : familyMap.values()) {
3507       assert kvs instanceof RandomAccess;
3508       int listSize  = kvs.size();
3509       for (int i=0; i < listSize; i++) {
3510         Cell cell = kvs.get(i);
3511         // see if the user-side TS is out of range. latest = server-side
3512         long ts = cell.getTimestamp();
3513         if (ts != HConstants.LATEST_TIMESTAMP && ts > maxTs) {
3514           throw new FailedSanityCheckException("Timestamp for KV out of range "
3515               + cell + " (too.new=" + timestampSlop + ")");
3516         }
3517       }
3518     }
3519   }
3520 
3521   /**
3522    * Append the given map of family->edits to a WALEdit data structure.
3523    * This does not write to the WAL itself.
3524    * @param familyMap map of family->edits
3525    * @param walEdit the destination entry to append into
3526    */
3527   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3528       WALEdit walEdit) {
3529     for (List<Cell> edits : familyMap.values()) {
3530       assert edits instanceof RandomAccess;
3531       int listSize = edits.size();
3532       for (int i=0; i < listSize; i++) {
3533         Cell cell = edits.get(i);
3534         walEdit.add(cell);
3535       }
3536     }
3537   }
3538 
3539   private void requestFlush() {
3540     if (this.rsServices == null) {
3541       return;
3542     }
3543     synchronized (writestate) {
3544       if (this.writestate.isFlushRequested()) {
3545         return;
3546       }
3547       writestate.flushRequested = true;
3548     }
3549     // Make request outside of synchronize block; HBASE-818.
3550     this.rsServices.getFlushRequester().requestFlush(this, false);
3551     if (LOG.isDebugEnabled()) {
3552       LOG.debug("Flush requested on " + this);
3553     }
3554   }
3555 
3556   /*
3557    * @param size
3558    * @return True if size is over the flush threshold
3559    */
3560   private boolean isFlushSize(final long size) {
3561     return size > this.memstoreFlushSize;
3562   }
3563 
3564   /**
3565    * Read the edits put under this region by wal splitting process.  Put
3566    * the recovered edits back up into this region.
3567    *
3568    * <p>We can ignore any wal message that has a sequence ID that's equal to or
3569    * lower than minSeqId.  (Because we know such messages are already
3570    * reflected in the HFiles.)
3571    *
3572    * <p>While this is running we are putting pressure on memory yet we are
3573    * outside of our usual accounting because we are not yet an onlined region
3574    * (this stuff is being run as part of Region initialization).  This means
3575    * that if we're up against global memory limits, we'll not be flagged to flush
3576    * because we are not online. We can't be flushed by usual mechanisms anyways;
3577    * we're not yet online so our relative sequenceids are not yet aligned with
3578    * WAL sequenceids -- not till we come up online, post processing of split
3579    * edits.
3580    *
3581    * <p>But to help relieve memory pressure, at least manage our own heap size
3582    * flushing if are in excess of per-region limits.  Flushing, though, we have
3583    * to be careful and avoid using the regionserver/wal sequenceid.  Its running
3584    * on a different line to whats going on in here in this region context so if we
3585    * crashed replaying these edits, but in the midst had a flush that used the
3586    * regionserver wal with a sequenceid in excess of whats going on in here
3587    * in this region and with its split editlogs, then we could miss edits the
3588    * next time we go to recover. So, we have to flush inline, using seqids that
3589    * make sense in a this single region context only -- until we online.
3590    *
3591    * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
3592    * the maxSeqId for the store to be applied, else its skipped.
3593    * @return the sequence id of the last edit added to this region out of the
3594    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3595    * @throws UnsupportedEncodingException
3596    * @throws IOException
3597    */
3598   protected long replayRecoveredEditsIfAny(final Path regiondir,
3599       Map<byte[], Long> maxSeqIdInStores,
3600       final CancelableProgressable reporter, final MonitoredTask status)
3601       throws IOException {
3602     long minSeqIdForTheRegion = -1;
3603     for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3604       if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3605         minSeqIdForTheRegion = maxSeqIdInStore;
3606       }
3607     }
3608     long seqid = minSeqIdForTheRegion;
3609 
3610     FileSystem fs = this.fs.getFileSystem();
3611     NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
3612     if (LOG.isDebugEnabled()) {
3613       LOG.debug("Found " + (files == null ? 0 : files.size())
3614         + " recovered edits file(s) under " + regiondir);
3615     }
3616 
3617     if (files == null || files.isEmpty()) return seqid;
3618 
3619     for (Path edits: files) {
3620       if (edits == null || !fs.exists(edits)) {
3621         LOG.warn("Null or non-existent edits file: " + edits);
3622         continue;
3623       }
3624       if (isZeroLengthThenDelete(fs, edits)) continue;
3625 
3626       long maxSeqId;
3627       String fileName = edits.getName();
3628       maxSeqId = Math.abs(Long.parseLong(fileName));
3629       if (maxSeqId <= minSeqIdForTheRegion) {
3630         if (LOG.isDebugEnabled()) {
3631           String msg = "Maximum sequenceid for this wal is " + maxSeqId
3632             + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3633             + ", skipped the whole file, path=" + edits;
3634           LOG.debug(msg);
3635         }
3636         continue;
3637       }
3638 
3639       try {
3640         // replay the edits. Replay can return -1 if everything is skipped, only update
3641         // if seqId is greater
3642         seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3643       } catch (IOException e) {
3644         boolean skipErrors = conf.getBoolean(
3645             HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3646             conf.getBoolean(
3647                 "hbase.skip.errors",
3648                 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3649         if (conf.get("hbase.skip.errors") != null) {
3650           LOG.warn(
3651               "The property 'hbase.skip.errors' has been deprecated. Please use " +
3652               HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3653         }
3654         if (skipErrors) {
3655           Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3656           LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3657               + "=true so continuing. Renamed " + edits +
3658               " as " + p, e);
3659         } else {
3660           throw e;
3661         }
3662       }
3663     }
3664     // The edits size added into rsAccounting during this replaying will not
3665     // be required any more. So just clear it.
3666     if (this.rsAccounting != null) {
3667       this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
3668     }
3669     if (seqid > minSeqIdForTheRegion) {
3670       // Then we added some edits to memory. Flush and cleanup split edit files.
3671       internalFlushcache(null, seqid, stores.values(), status);
3672     }
3673     // Now delete the content of recovered edits.  We're done w/ them.
3674     if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
3675       // For debugging data loss issues!
3676       // If this flag is set, make use of the hfile archiving by making recovered.edits a fake
3677       // column family. Have to fake out file type too by casting our recovered.edits as storefiles
3678       String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
3679       Set<StoreFile> fakeStoreFiles = new HashSet<StoreFile>(files.size());
3680       for (Path file: files) {
3681         fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf,
3682           null, null));
3683       }
3684       getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
3685     } else {
3686       for (Path file: files) {
3687         if (!fs.delete(file, false)) {
3688           LOG.error("Failed delete of " + file);
3689         } else {
3690           LOG.debug("Deleted recovered.edits file=" + file);
3691         }
3692       }
3693     }
3694     return seqid;
3695   }
3696 
3697   /*
3698    * @param edits File of recovered edits.
3699    * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in wal
3700    * must be larger than this to be replayed for each store.
3701    * @param reporter
3702    * @return the sequence id of the last edit added to this region out of the
3703    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3704    * @throws IOException
3705    */
3706   private long replayRecoveredEdits(final Path edits,
3707       Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3708     throws IOException {
3709     String msg = "Replaying edits from " + edits;
3710     LOG.info(msg);
3711     MonitoredTask status = TaskMonitor.get().createStatus(msg);
3712     FileSystem fs = this.fs.getFileSystem();
3713 
3714     status.setStatus("Opening recovered edits");
3715     WAL.Reader reader = null;
3716     try {
3717       reader = WALFactory.createReader(fs, edits, conf);
3718       long currentEditSeqId = -1;
3719       long currentReplaySeqId = -1;
3720       long firstSeqIdInLog = -1;
3721       long skippedEdits = 0;
3722       long editsCount = 0;
3723       long intervalEdits = 0;
3724       WAL.Entry entry;
3725       Store store = null;
3726       boolean reported_once = false;
3727       ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3728 
3729       try {
3730         // How many edits seen before we check elapsed time
3731         int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
3732         // How often to send a progress report (default 1/2 master timeout)
3733         int period = this.conf.getInt("hbase.hstore.report.period", 300000);
3734         long lastReport = EnvironmentEdgeManager.currentTime();
3735 
3736         while ((entry = reader.next()) != null) {
3737           WALKey key = entry.getKey();
3738           WALEdit val = entry.getEdit();
3739 
3740           if (ng != null) { // some test, or nonces disabled
3741             ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
3742           }
3743 
3744           if (reporter != null) {
3745             intervalEdits += val.size();
3746             if (intervalEdits >= interval) {
3747               // Number of edits interval reached
3748               intervalEdits = 0;
3749               long cur = EnvironmentEdgeManager.currentTime();
3750               if (lastReport + period <= cur) {
3751                 status.setStatus("Replaying edits..." +
3752                     " skipped=" + skippedEdits +
3753                     " edits=" + editsCount);
3754                 // Timeout reached
3755                 if(!reporter.progress()) {
3756                   msg = "Progressable reporter failed, stopping replay";
3757                   LOG.warn(msg);
3758                   status.abort(msg);
3759                   throw new IOException(msg);
3760                 }
3761                 reported_once = true;
3762                 lastReport = cur;
3763               }
3764             }
3765           }
3766 
3767           if (firstSeqIdInLog == -1) {
3768             firstSeqIdInLog = key.getLogSeqNum();
3769           }
3770           if (currentEditSeqId > key.getLogSeqNum()) {
3771             // when this condition is true, it means we have a serious defect because we need to
3772             // maintain increasing SeqId for WAL edits per region
3773             LOG.error("Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key
3774                 + "; edit=" + val);
3775           } else {
3776             currentEditSeqId = key.getLogSeqNum();
3777           }
3778           currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
3779             key.getOrigLogSeqNum() : currentEditSeqId;
3780 
3781           // Start coprocessor replay here. The coprocessor is for each WALEdit
3782           // instead of a KeyValue.
3783           if (coprocessorHost != null) {
3784             status.setStatus("Running pre-WAL-restore hook in coprocessors");
3785             if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
3786               // if bypass this wal entry, ignore it ...
3787               continue;
3788             }
3789           }
3790           // Check this edit is for this region.
3791           if (!Bytes.equals(key.getEncodedRegionName(),
3792               this.getRegionInfo().getEncodedNameAsBytes())) {
3793             skippedEdits++;
3794             continue;
3795           }
3796 
3797           boolean flush = false;
3798           for (Cell cell: val.getCells()) {
3799             // Check this edit is for me. Also, guard against writing the special
3800             // METACOLUMN info such as HBASE::CACHEFLUSH entries
3801             if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
3802               //this is a special edit, we should handle it
3803               CompactionDescriptor compaction = WALEdit.getCompaction(cell);
3804               if (compaction != null) {
3805                 //replay the compaction
3806                 completeCompactionMarker(compaction);
3807               }
3808               skippedEdits++;
3809               continue;
3810             }
3811             // Figure which store the edit is meant for.
3812             if (store == null || !CellUtil.matchingFamily(cell, store.getFamily().getName())) {
3813               store = getStore(cell);
3814             }
3815             if (store == null) {
3816               // This should never happen.  Perhaps schema was changed between
3817               // crash and redeploy?
3818               LOG.warn("No family for " + cell);
3819               skippedEdits++;
3820               continue;
3821             }
3822             // Now, figure if we should skip this edit.
3823             if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
3824                 .getName())) {
3825               skippedEdits++;
3826               continue;
3827             }
3828             CellUtil.setSequenceId(cell, currentReplaySeqId);
3829 
3830             // Once we are over the limit, restoreEdit will keep returning true to
3831             // flush -- but don't flush until we've played all the kvs that make up
3832             // the WALEdit.
3833             flush |= restoreEdit(store, cell);
3834             editsCount++;
3835           }
3836           if (flush) {
3837             internalFlushcache(null, currentEditSeqId, stores.values(), status);
3838           }
3839 
3840           if (coprocessorHost != null) {
3841             coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3842           }
3843         }
3844       } catch (EOFException eof) {
3845         Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3846         msg = "Encountered EOF. Most likely due to Master failure during " +
3847             "wal splitting, so we have this data in another edit.  " +
3848             "Continuing, but renaming " + edits + " as " + p;
3849         LOG.warn(msg, eof);
3850         status.abort(msg);
3851       } catch (IOException ioe) {
3852         // If the IOE resulted from bad file format,
3853         // then this problem is idempotent and retrying won't help
3854         if (ioe.getCause() instanceof ParseException) {
3855           Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3856           msg = "File corruption encountered!  " +
3857               "Continuing, but renaming " + edits + " as " + p;
3858           LOG.warn(msg, ioe);
3859           status.setStatus(msg);
3860         } else {
3861           status.abort(StringUtils.stringifyException(ioe));
3862           // other IO errors may be transient (bad network connection,
3863           // checksum exception on one datanode, etc).  throw & retry
3864           throw ioe;
3865         }
3866       }
3867       if (reporter != null && !reported_once) {
3868         reporter.progress();
3869       }
3870       msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3871         ", firstSequenceIdInLog=" + firstSeqIdInLog +
3872         ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
3873       status.markComplete(msg);
3874       LOG.debug(msg);
3875       return currentEditSeqId;
3876     } finally {
3877       status.cleanup();
3878       if (reader != null) {
3879          reader.close();
3880       }
3881     }
3882   }
3883 
3884   /**
3885    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
3886    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
3887    * See HBASE-2331.
3888    */
3889   void completeCompactionMarker(CompactionDescriptor compaction)
3890       throws IOException {
3891     Store store = this.getStore(compaction.getFamilyName().toByteArray());
3892     if (store == null) {
3893       LOG.warn("Found Compaction WAL edit for deleted family:" +
3894           Bytes.toString(compaction.getFamilyName().toByteArray()));
3895       return;
3896     }
3897     store.completeCompactionMarker(compaction);
3898   }
3899 
3900   /**
3901    * Used by tests
3902    * @param s Store to add edit too.
3903    * @param cell Cell to add.
3904    * @return True if we should flush.
3905    */
3906   protected boolean restoreEdit(final Store s, final Cell cell) {
3907     long kvSize = s.add(cell).getFirst();
3908     if (this.rsAccounting != null) {
3909       rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
3910     }
3911     return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3912   }
3913 
3914   /*
3915    * @param fs
3916    * @param p File to check.
3917    * @return True if file was zero-length (and if so, we'll delete it in here).
3918    * @throws IOException
3919    */
3920   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3921       throws IOException {
3922     FileStatus stat = fs.getFileStatus(p);
3923     if (stat.getLen() > 0) return false;
3924     LOG.warn("File " + p + " is zero-length, deleting.");
3925     fs.delete(p, false);
3926     return true;
3927   }
3928 
3929   protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
3930     return new HStore(this, family, this.conf);
3931   }
3932 
3933   /**
3934    * Return HStore instance.
3935    * Use with caution.  Exposed for use of fixup utilities.
3936    * @param column Name of column family hosted by this region.
3937    * @return Store that goes with the family on passed <code>column</code>.
3938    * TODO: Make this lookup faster.
3939    */
3940   public Store getStore(final byte[] column) {
3941     return this.stores.get(column);
3942   }
3943 
3944   /**
3945    * Return HStore instance. Does not do any copy: as the number of store is limited, we
3946    *  iterate on the list.
3947    */
3948   private Store getStore(Cell cell) {
3949     for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
3950       if (Bytes.equals(
3951           cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3952           famStore.getKey(), 0, famStore.getKey().length)) {
3953         return famStore.getValue();
3954       }
3955     }
3956 
3957     return null;
3958   }
3959 
3960   public Map<byte[], Store> getStores() {
3961     return this.stores;
3962   }
3963 
3964   /**
3965    * Return list of storeFiles for the set of CFs.
3966    * Uses closeLock to prevent the race condition where a region closes
3967    * in between the for loop - closing the stores one by one, some stores
3968    * will return 0 files.
3969    * @return List of storeFiles.
3970    */
3971   public List<String> getStoreFileList(final byte [][] columns)
3972     throws IllegalArgumentException {
3973     List<String> storeFileNames = new ArrayList<String>();
3974     synchronized(closeLock) {
3975       for(byte[] column : columns) {
3976         Store store = this.stores.get(column);
3977         if (store == null) {
3978           throw new IllegalArgumentException("No column family : " +
3979               new String(column) + " available");
3980         }
3981         for (StoreFile storeFile: store.getStorefiles()) {
3982           storeFileNames.add(storeFile.getPath().toString());
3983         }
3984       }
3985     }
3986     return storeFileNames;
3987   }
3988 
3989   //////////////////////////////////////////////////////////////////////////////
3990   // Support code
3991   //////////////////////////////////////////////////////////////////////////////
3992 
3993   /** Make sure this is a valid row for the HRegion */
3994   void checkRow(final byte [] row, String op) throws IOException {
3995     if (!rowIsInRange(getRegionInfo(), row)) {
3996       throw new WrongRegionException("Requested row out of range for " +
3997           op + " on HRegion " + this + ", startKey='" +
3998           Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" +
3999           Bytes.toStringBinary(getEndKey()) + "', row='" +
4000           Bytes.toStringBinary(row) + "'");
4001     }
4002   }
4003 
4004   /**
4005    * Tries to acquire a lock on the given row.
4006    * @param waitForLock if true, will block until the lock is available.
4007    *        Otherwise, just tries to obtain the lock and returns
4008    *        false if unavailable.
4009    * @return the row lock if acquired,
4010    *   null if waitForLock was false and the lock was not acquired
4011    * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
4012    */
4013   public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
4014     startRegionOperation();
4015     try {
4016       return getRowLockInternal(row, waitForLock);
4017     } finally {
4018       closeRegionOperation();
4019     }
4020   }
4021 
4022   /**
4023    * A version of getRowLock(byte[], boolean) to use when a region operation has already been
4024    * started (the calling thread has already acquired the region-close-guard lock).
4025    */
4026   protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
4027     checkRow(row, "row lock");
4028     HashedBytes rowKey = new HashedBytes(row);
4029     RowLockContext rowLockContext = new RowLockContext(rowKey);
4030 
4031     // loop until we acquire the row lock (unless !waitForLock)
4032     while (true) {
4033       RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
4034       if (existingContext == null) {
4035         // Row is not already locked by any thread, use newly created context.
4036         break;
4037       } else if (existingContext.ownedByCurrentThread()) {
4038         // Row is already locked by current thread, reuse existing context instead.
4039         rowLockContext = existingContext;
4040         break;
4041       } else {
4042         if (!waitForLock) {
4043           return null;
4044         }
4045         try {
4046           // Row is already locked by some other thread, give up or wait for it
4047           if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
4048             throw new IOException("Timed out waiting for lock for row: " + rowKey);
4049           }
4050         } catch (InterruptedException ie) {
4051           LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
4052           InterruptedIOException iie = new InterruptedIOException();
4053           iie.initCause(ie);
4054           throw iie;
4055         }
4056       }
4057     }
4058 
4059     // allocate new lock for this thread
4060     return rowLockContext.newLock();
4061   }
4062 
4063   /**
4064    * Acquires a lock on the given row.
4065    * The same thread may acquire multiple locks on the same row.
4066    * @return the acquired row lock
4067    * @throws IOException if the lock could not be acquired after waiting
4068    */
4069   public RowLock getRowLock(byte[] row) throws IOException {
4070     return getRowLock(row, true);
4071   }
4072 
4073   /**
4074    * If the given list of row locks is not null, releases all locks.
4075    */
4076   public void releaseRowLocks(List<RowLock> rowLocks) {
4077     if (rowLocks != null) {
4078       for (RowLock rowLock : rowLocks) {
4079         rowLock.release();
4080       }
4081       rowLocks.clear();
4082     }
4083   }
4084 
4085   /**
4086    * Determines whether multiple column families are present
4087    * Precondition: familyPaths is not null
4088    *
4089    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
4090    */
4091   private static boolean hasMultipleColumnFamilies(
4092       List<Pair<byte[], String>> familyPaths) {
4093     boolean multipleFamilies = false;
4094     byte[] family = null;
4095     for (Pair<byte[], String> pair : familyPaths) {
4096       byte[] fam = pair.getFirst();
4097       if (family == null) {
4098         family = fam;
4099       } else if (!Bytes.equals(family, fam)) {
4100         multipleFamilies = true;
4101         break;
4102       }
4103     }
4104     return multipleFamilies;
4105   }
4106 
4107   /**
4108    * Bulk load a/many HFiles into this region
4109    *
4110    * @param familyPaths A list which maps column families to the location of the HFile to load
4111    *                    into that column family region.
4112    * @param assignSeqId Force a flush, get it's sequenceId to preserve the guarantee that all the
4113    *                    edits lower than the highest sequential ID from all the HFiles are flushed
4114    *                    on disk.
4115    * @return true if successful, false if failed recoverably
4116    * @throws IOException if failed unrecoverably.
4117    */
4118   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
4119                                 boolean assignSeqId) throws IOException {
4120     return bulkLoadHFiles(familyPaths, assignSeqId, null);
4121   }
4122 
4123   /**
4124    * Attempts to atomically load a group of hfiles.  This is critical for loading
4125    * rows with multiple column families atomically.
4126    *
4127    * @param familyPaths      List of Pair<byte[] column family, String hfilePath>
4128    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
4129    *                         file about to be bulk loaded
4130    * @param assignSeqId      Force a flush, get it's sequenceId to preserve the guarantee that 
4131    *                         all the edits lower than the highest sequential ID from all the 
4132    *                         HFiles are flushed on disk.
4133    * @return true if successful, false if failed recoverably
4134    * @throws IOException if failed unrecoverably.
4135    */
4136   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
4137       BulkLoadListener bulkLoadListener) throws IOException {
4138     long seqId = -1;
4139     Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
4140     Preconditions.checkNotNull(familyPaths);
4141     // we need writeLock for multi-family bulk load
4142     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
4143     try {
4144       this.writeRequestsCount.increment();
4145 
4146       // There possibly was a split that happened between when the split keys
4147       // were gathered and before the HRegion's write lock was taken.  We need
4148       // to validate the HFile region before attempting to bulk load all of them
4149       List<IOException> ioes = new ArrayList<IOException>();
4150       List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
4151       for (Pair<byte[], String> p : familyPaths) {
4152         byte[] familyName = p.getFirst();
4153         String path = p.getSecond();
4154 
4155         Store store = getStore(familyName);
4156         if (store == null) {
4157           IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
4158               "No such column family " + Bytes.toStringBinary(familyName));
4159           ioes.add(ioe);
4160         } else {
4161           try {
4162             store.assertBulkLoadHFileOk(new Path(path));
4163           } catch (WrongRegionException wre) {
4164             // recoverable (file doesn't fit in region)
4165             failures.add(p);
4166           } catch (IOException ioe) {
4167             // unrecoverable (hdfs problem)
4168             ioes.add(ioe);
4169           }
4170         }
4171       }
4172 
4173       // validation failed because of some sort of IO problem.
4174       if (ioes.size() != 0) {
4175         IOException e = MultipleIOException.createIOException(ioes);
4176         LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
4177         throw e;
4178       }
4179 
4180       // validation failed, bail out before doing anything permanent.
4181       if (failures.size() != 0) {
4182         StringBuilder list = new StringBuilder();
4183         for (Pair<byte[], String> p : failures) {
4184           list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
4185               .append(p.getSecond());
4186         }
4187         // problem when validating
4188         LOG.warn("There was a recoverable bulk load failure likely due to a" +
4189             " split.  These (family, HFile) pairs were not loaded: " + list);
4190         return false;
4191       }
4192 
4193       // We need to assign a sequential ID that's in between two memstores in order to preserve
4194       // the guarantee that all the edits lower than the highest sequential ID from all the
4195       // HFiles are flushed on disk. See HBASE-10958.  The sequence id returned when we flush is
4196       // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
4197       // a sequence id that we can be sure is beyond the last hfile written).
4198       if (assignSeqId) {
4199         FlushResult fs = this.flushcache(true);
4200         if (fs.isFlushSucceeded()) {
4201           seqId = fs.flushSequenceId;
4202         } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
4203           seqId = fs.flushSequenceId;
4204         } else {
4205           throw new IOException("Could not bulk load with an assigned sequential ID because the " +
4206               "flush didn't run. Reason for not flushing: " + fs.failureReason);
4207         }
4208       }
4209 
4210       for (Pair<byte[], String> p : familyPaths) {
4211         byte[] familyName = p.getFirst();
4212         String path = p.getSecond();
4213         Store store = getStore(familyName);
4214         try {
4215           String finalPath = path;
4216           if (bulkLoadListener != null) {
4217             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
4218           }
4219           store.bulkLoadHFile(finalPath, seqId);
4220           
4221           if(storeFiles.containsKey(familyName)) {
4222             storeFiles.get(familyName).add(new Path(finalPath));
4223           } else {
4224             List<Path> storeFileNames = new ArrayList<Path>();
4225             storeFileNames.add(new Path(finalPath));
4226             storeFiles.put(familyName, storeFileNames);
4227           }
4228           if (bulkLoadListener != null) {
4229             bulkLoadListener.doneBulkLoad(familyName, path);
4230           }
4231         } catch (IOException ioe) {
4232           // A failure here can cause an atomicity violation that we currently
4233           // cannot recover from since it is likely a failed HDFS operation.
4234 
4235           // TODO Need a better story for reverting partial failures due to HDFS.
4236           LOG.error("There was a partial failure due to IO when attempting to" +
4237               " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);
4238           if (bulkLoadListener != null) {
4239             try {
4240               bulkLoadListener.failedBulkLoad(familyName, path);
4241             } catch (Exception ex) {
4242               LOG.error("Error while calling failedBulkLoad for family " +
4243                   Bytes.toString(familyName) + " with path " + path, ex);
4244             }
4245           }
4246           throw ioe;
4247         }
4248       }
4249 
4250       return true;
4251     } finally {
4252       if (wal != null && !storeFiles.isEmpty()) {
4253         // write a bulk load event when not all hfiles are loaded
4254         try {
4255           WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(
4256               this.getRegionInfo().getTable(),
4257               ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
4258           WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),
4259               loadDescriptor, sequenceId);
4260         } catch (IOException ioe) {
4261           if (this.rsServices != null) {
4262             // Have to abort region server because some hfiles has been loaded but we can't write
4263             // the event into WAL
4264             this.rsServices.abort("Failed to write bulk load event into WAL.", ioe);
4265           }
4266         }
4267       }
4268     
4269       closeBulkRegionOperation();
4270     }
4271   }
4272 
4273   @Override
4274   public boolean equals(Object o) {
4275     return o instanceof HRegion && Bytes.equals(this.getRegionName(),
4276                                                 ((HRegion) o).getRegionName());
4277   }
4278 
4279   @Override
4280   public int hashCode() {
4281     return Bytes.hashCode(this.getRegionName());
4282   }
4283 
4284   @Override
4285   public String toString() {
4286     return this.getRegionNameAsString();
4287   }
4288 
4289   /**
4290    * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
4291    */
4292   class RegionScannerImpl implements RegionScanner {
4293     // Package local for testability
4294     KeyValueHeap storeHeap = null;
4295     /** Heap of key-values that are not essential for the provided filters and are thus read
4296      * on demand, if on-demand column family loading is enabled.*/
4297     KeyValueHeap joinedHeap = null;
4298     /**
4299      * If the joined heap data gathering is interrupted due to scan limits, this will
4300      * contain the row for which we are populating the values.*/
4301     protected Cell joinedContinuationRow = null;
4302     // KeyValue indicating that limit is reached when scanning
4303     private final KeyValue KV_LIMIT = new KeyValue();
4304     protected final byte[] stopRow;
4305     private final FilterWrapper filter;
4306     private int batch;
4307     protected int isScan;
4308     private boolean filterClosed = false;
4309     private long readPt;
4310     private long maxResultSize;
4311     protected HRegion region;
4312 
4313     @Override
4314     public HRegionInfo getRegionInfo() {
4315       return region.getRegionInfo();
4316     }
4317 
4318     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
4319         throws IOException {
4320 
4321       this.region = region;
4322       this.maxResultSize = scan.getMaxResultSize();
4323       if (scan.hasFilter()) {
4324         this.filter = new FilterWrapper(scan.getFilter());
4325       } else {
4326         this.filter = null;
4327       }
4328 
4329       this.batch = scan.getBatch();
4330       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
4331         this.stopRow = null;
4332       } else {
4333         this.stopRow = scan.getStopRow();
4334       }
4335       // If we are doing a get, we want to be [startRow,endRow] normally
4336       // it is [startRow,endRow) and if startRow=endRow we get nothing.
4337       this.isScan = scan.isGetScan() ? -1 : 0;
4338 
4339       // synchronize on scannerReadPoints so that nobody calculates
4340       // getSmallestReadPoint, before scannerReadPoints is updated.
4341       IsolationLevel isolationLevel = scan.getIsolationLevel();
4342       synchronized(scannerReadPoints) {
4343         this.readPt = getReadpoint(isolationLevel);
4344         scannerReadPoints.put(this, this.readPt);
4345       }
4346 
4347       // Here we separate all scanners into two lists - scanner that provide data required
4348       // by the filter to operate (scanners list) and all others (joinedScanners list).
4349       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
4350       List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
4351       if (additionalScanners != null) {
4352         scanners.addAll(additionalScanners);
4353       }
4354 
4355       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
4356           scan.getFamilyMap().entrySet()) {
4357         Store store = stores.get(entry.getKey());
4358         KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
4359         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
4360           || this.filter.isFamilyEssential(entry.getKey())) {
4361           scanners.add(scanner);
4362         } else {
4363           joinedScanners.add(scanner);
4364         }
4365       }
4366       initializeKVHeap(scanners, joinedScanners, region);
4367     }
4368 
4369     protected void initializeKVHeap(List<KeyValueScanner> scanners,
4370         List<KeyValueScanner> joinedScanners, HRegion region)
4371         throws IOException {
4372       this.storeHeap = new KeyValueHeap(scanners, region.comparator);
4373       if (!joinedScanners.isEmpty()) {
4374         this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
4375       }
4376     }
4377 
4378     @Override
4379     public long getMaxResultSize() {
4380       return maxResultSize;
4381     }
4382 
4383     @Override
4384     public long getMvccReadPoint() {
4385       return this.readPt;
4386     }
4387 
4388     /**
4389      * Reset both the filter and the old filter.
4390      *
4391      * @throws IOException in case a filter raises an I/O exception.
4392      */
4393     protected void resetFilters() throws IOException {
4394       if (filter != null) {
4395         filter.reset();
4396       }
4397     }
4398 
4399     @Override
4400     public boolean next(List<Cell> outResults)
4401         throws IOException {
4402       // apply the batching limit by default
4403       return next(outResults, batch);
4404     }
4405 
4406     @Override
4407     public synchronized boolean next(List<Cell> outResults, int limit) throws IOException {
4408       if (this.filterClosed) {
4409         throw new UnknownScannerException("Scanner was closed (timed out?) " +
4410             "after we renewed it. Could be caused by a very slow scanner " +
4411             "or a lengthy garbage collection");
4412       }
4413       startRegionOperation(Operation.SCAN);
4414       readRequestsCount.increment();
4415       try {
4416         return nextRaw(outResults, limit);
4417       } finally {
4418         closeRegionOperation(Operation.SCAN);
4419       }
4420     }
4421 
4422     @Override
4423     public boolean nextRaw(List<Cell> outResults)
4424         throws IOException {
4425       return nextRaw(outResults, batch);
4426     }
4427 
4428     @Override
4429     public boolean nextRaw(List<Cell> outResults, int limit) throws IOException {
4430       if (storeHeap == null) {
4431         // scanner is closed
4432         throw new UnknownScannerException("Scanner was closed");
4433       }
4434       boolean returnResult;
4435       if (outResults.isEmpty()) {
4436         // Usually outResults is empty. This is true when next is called
4437         // to handle scan or get operation.
4438         returnResult = nextInternal(outResults, limit);
4439       } else {
4440         List<Cell> tmpList = new ArrayList<Cell>();
4441         returnResult = nextInternal(tmpList, limit);
4442         outResults.addAll(tmpList);
4443       }
4444       resetFilters();
4445       if (isFilterDoneInternal()) {
4446         returnResult = false;
4447       }
4448       return returnResult;
4449     }
4450 
4451     private void populateFromJoinedHeap(List<Cell> results, int limit)
4452         throws IOException {
4453       assert joinedContinuationRow != null;
4454       Cell kv = populateResult(results, this.joinedHeap, limit,
4455           joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
4456           joinedContinuationRow.getRowLength());
4457       if (kv != KV_LIMIT) {
4458         // We are done with this row, reset the continuation.
4459         joinedContinuationRow = null;
4460       }
4461       // As the data is obtained from two independent heaps, we need to
4462       // ensure that result list is sorted, because Result relies on that.
4463       Collections.sort(results, comparator);
4464     }
4465 
4466     /**
4467      * Fetches records with currentRow into results list, until next row or limit (if not -1).
4468      * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
4469      * @param limit Max amount of KVs to place in result list, -1 means no limit.
4470      * @param currentRow Byte array with key we are fetching.
4471      * @param offset offset for currentRow
4472      * @param length length for currentRow
4473      * @return KV_LIMIT if limit reached, next KeyValue otherwise.
4474      */
4475     private Cell populateResult(List<Cell> results, KeyValueHeap heap, int limit,
4476         byte[] currentRow, int offset, short length) throws IOException {
4477       Cell nextKv;
4478       do {
4479         heap.next(results, limit - results.size());
4480         if (limit > 0 && results.size() == limit) {
4481           return KV_LIMIT;
4482         }
4483         nextKv = heap.peek();
4484       } while (nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length));
4485 
4486       return nextKv;
4487     }
4488 
4489     /*
4490      * @return True if a filter rules the scanner is over, done.
4491      */
4492     @Override
4493     public synchronized boolean isFilterDone() throws IOException {
4494       return isFilterDoneInternal();
4495     }
4496 
4497     private boolean isFilterDoneInternal() throws IOException {
4498       return this.filter != null && this.filter.filterAllRemaining();
4499     }
4500 
4501     private boolean nextInternal(List<Cell> results, int limit)
4502     throws IOException {
4503       if (!results.isEmpty()) {
4504         throw new IllegalArgumentException("First parameter should be an empty list");
4505       }
4506       RpcCallContext rpcCall = RpcServer.getCurrentCall();
4507       // The loop here is used only when at some point during the next we determine
4508       // that due to effects of filters or otherwise, we have an empty row in the result.
4509       // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
4510       // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
4511       // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
4512       while (true) {
4513         if (rpcCall != null) {
4514           // If a user specifies a too-restrictive or too-slow scanner, the
4515           // client might time out and disconnect while the server side
4516           // is still processing the request. We should abort aggressively
4517           // in that case.
4518           long afterTime = rpcCall.disconnectSince();
4519           if (afterTime >= 0) {
4520             throw new CallerDisconnectedException(
4521                 "Aborting on region " + getRegionNameAsString() + ", call " +
4522                     this + " after " + afterTime + " ms, since " +
4523                     "caller disconnected");
4524           }
4525         }
4526 
4527         // Let's see what we have in the storeHeap.
4528         Cell current = this.storeHeap.peek();
4529 
4530         byte[] currentRow = null;
4531         int offset = 0;
4532         short length = 0;
4533         if (current != null) {
4534           currentRow = current.getRowArray();
4535           offset = current.getRowOffset();
4536           length = current.getRowLength();
4537         }
4538         boolean stopRow = isStopRow(currentRow, offset, length);
4539         // Check if we were getting data from the joinedHeap and hit the limit.
4540         // If not, then it's main path - getting results from storeHeap.
4541         if (joinedContinuationRow == null) {
4542           // First, check if we are at a stop row. If so, there are no more results.
4543           if (stopRow) {
4544             if (filter != null && filter.hasFilterRow()) {
4545               filter.filterRowCells(results);
4546             }
4547             return false;
4548           }
4549 
4550           // Check if rowkey filter wants to exclude this row. If so, loop to next.
4551           // Technically, if we hit limits before on this row, we don't need this call.
4552           if (filterRowKey(currentRow, offset, length)) {
4553             boolean moreRows = nextRow(currentRow, offset, length);
4554             if (!moreRows) return false;
4555             results.clear();
4556             continue;
4557           }
4558 
4559           Cell nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
4560               length);
4561           // Ok, we are good, let's try to get some results from the main heap.
4562           if (nextKv == KV_LIMIT) {
4563             if (this.filter != null && filter.hasFilterRow()) {
4564               throw new IncompatibleFilterException(
4565                 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
4566             }
4567             return true; // We hit the limit.
4568           }
4569 
4570           stopRow = nextKv == null ||
4571               isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
4572           // save that the row was empty before filters applied to it.
4573           final boolean isEmptyRow = results.isEmpty();
4574 
4575           // We have the part of the row necessary for filtering (all of it, usually).
4576           // First filter with the filterRow(List).
4577           FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
4578           if (filter != null && filter.hasFilterRow()) {
4579             ret = filter.filterRowCellsWithRet(results);
4580           }
4581 
4582           if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
4583             results.clear();
4584             boolean moreRows = nextRow(currentRow, offset, length);
4585             if (!moreRows) return false;
4586 
4587             // This row was totally filtered out, if this is NOT the last row,
4588             // we should continue on. Otherwise, nothing else to do.
4589             if (!stopRow) continue;
4590             return false;
4591           }
4592 
4593           // Ok, we are done with storeHeap for this row.
4594           // Now we may need to fetch additional, non-essential data into row.
4595           // These values are not needed for filter to work, so we postpone their
4596           // fetch to (possibly) reduce amount of data loads from disk.
4597           if (this.joinedHeap != null) {
4598             Cell nextJoinedKv = joinedHeap.peek();
4599             // If joinedHeap is pointing to some other row, try to seek to a correct one.
4600             boolean mayHaveData = (nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv,
4601                 currentRow, offset, length))
4602                 || (this.joinedHeap.requestSeek(
4603                     KeyValueUtil.createFirstOnRow(currentRow, offset, length), true, true)
4604                     && joinedHeap.peek() != null && CellUtil.matchingRow(joinedHeap.peek(),
4605                     currentRow, offset, length));
4606             if (mayHaveData) {
4607               joinedContinuationRow = current;
4608               populateFromJoinedHeap(results, limit);
4609             }
4610           }
4611         } else {
4612           // Populating from the joined heap was stopped by limits, populate some more.
4613           populateFromJoinedHeap(results, limit);
4614         }
4615 
4616         // We may have just called populateFromJoinedMap and hit the limits. If that is
4617         // the case, we need to call it again on the next next() invocation.
4618         if (joinedContinuationRow != null) {
4619           return true;
4620         }
4621 
4622         // Finally, we are done with both joinedHeap and storeHeap.
4623         // Double check to prevent empty rows from appearing in result. It could be
4624         // the case when SingleColumnValueExcludeFilter is used.
4625         if (results.isEmpty()) {
4626           boolean moreRows = nextRow(currentRow, offset, length);
4627           if (!moreRows) return false;
4628           if (!stopRow) continue;
4629         }
4630 
4631         // We are done. Return the result.
4632         return !stopRow;
4633       }
4634     }
4635 
4636     /**
4637      * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines
4638      * both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may
4639      * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns
4640      * true when filterRow(List<KeyValue> kvs) is overridden not the filterRow(). Therefore, the
4641      * filterRow() will be skipped.
4642      */
4643     private boolean filterRow() throws IOException {
4644       // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
4645       // filterRowCells(List<Cell> kvs) so we skip that scenario here.
4646       return filter != null && (!filter.hasFilterRow())
4647           && filter.filterRow();
4648     }
4649 
4650     private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
4651       return filter != null
4652           && filter.filterRowKey(row, offset, length);
4653     }
4654 
4655     protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
4656       assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
4657       Cell next;
4658       while ((next = this.storeHeap.peek()) != null &&
4659              CellUtil.matchingRow(next, currentRow, offset, length)) {
4660         this.storeHeap.next(MOCKED_LIST);
4661       }
4662       resetFilters();
4663       // Calling the hook in CP which allows it to do a fast forward
4664       return this.region.getCoprocessorHost() == null
4665           || this.region.getCoprocessorHost()
4666               .postScannerFilterRow(this, currentRow, offset, length);
4667     }
4668 
4669     protected boolean isStopRow(byte[] currentRow, int offset, short length) {
4670       return currentRow == null ||
4671           (stopRow != null &&
4672           comparator.compareRows(stopRow, 0, stopRow.length,
4673             currentRow, offset, length) <= isScan);
4674     }
4675 
4676     @Override
4677     public synchronized void close() {
4678       if (storeHeap != null) {
4679         storeHeap.close();
4680         storeHeap = null;
4681       }
4682       if (joinedHeap != null) {
4683         joinedHeap.close();
4684         joinedHeap = null;
4685       }
4686       // no need to synchronize here.
4687       scannerReadPoints.remove(this);
4688       this.filterClosed = true;
4689     }
4690 
4691     KeyValueHeap getStoreHeapForTesting() {
4692       return storeHeap;
4693     }
4694 
4695     @Override
4696     public synchronized boolean reseek(byte[] row) throws IOException {
4697       if (row == null) {
4698         throw new IllegalArgumentException("Row cannot be null.");
4699       }
4700       boolean result = false;
4701       startRegionOperation();
4702       try {
4703         KeyValue kv = KeyValueUtil.createFirstOnRow(row);
4704         // use request seek to make use of the lazy seek option. See HBASE-5520
4705         result = this.storeHeap.requestSeek(kv, true, true);
4706         if (this.joinedHeap != null) {
4707           result = this.joinedHeap.requestSeek(kv, true, true) || result;
4708         }
4709       } finally {
4710         closeRegionOperation();
4711       }
4712       return result;
4713     }
4714   }
4715 
4716   // Utility methods
4717   /**
4718    * A utility method to create new instances of HRegion based on the
4719    * {@link HConstants#REGION_IMPL} configuration property.
4720    * @param tableDir qualified path of directory where region should be located,
4721    * usually the table directory.
4722    * @param wal The WAL is the outbound log for any updates to the HRegion
4723    * The wal file is a logfile from the previous execution that's
4724    * custom-computed for this HRegion. The HRegionServer computes and sorts the
4725    * appropriate wal info for this HRegion. If there is a previous file
4726    * (implying that the HRegion has been written-to before), then read it from
4727    * the supplied path.
4728    * @param fs is the filesystem.
4729    * @param conf is global configuration settings.
4730    * @param regionInfo - HRegionInfo that describes the region
4731    * is new), then read them from the supplied path.
4732    * @param htd the table descriptor
4733    * @return the new instance
4734    */
4735   static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
4736       Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
4737       RegionServerServices rsServices) {
4738     try {
4739       @SuppressWarnings("unchecked")
4740       Class<? extends HRegion> regionClass =
4741           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
4742 
4743       Constructor<? extends HRegion> c =
4744           regionClass.getConstructor(Path.class, WAL.class, FileSystem.class,
4745               Configuration.class, HRegionInfo.class, HTableDescriptor.class,
4746               RegionServerServices.class);
4747 
4748       return c.newInstance(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
4749     } catch (Throwable e) {
4750       // todo: what should I throw here?
4751       throw new IllegalStateException("Could not instantiate a region instance.", e);
4752     }
4753   }
4754 
4755   /**
4756    * Convenience method creating new HRegions. Used by createTable.
4757    *
4758    * @param info Info for region to create.
4759    * @param rootDir Root directory for HBase instance
4760    * @param wal shared WAL
4761    * @param initialize - true to initialize the region
4762    * @return new HRegion
4763    * @throws IOException
4764    */
4765   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4766                                       final Configuration conf,
4767                                       final HTableDescriptor hTableDescriptor,
4768                                       final WAL wal,
4769                                       final boolean initialize)
4770       throws IOException {
4771     LOG.info("creating HRegion " + info.getTable().getNameAsString()
4772         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
4773         " Table name == " + info.getTable().getNameAsString());
4774     FileSystem fs = FileSystem.get(conf);
4775     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4776     HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
4777     HRegion region = HRegion.newHRegion(tableDir,
4778         wal, fs, conf, info, hTableDescriptor, null);
4779     if (initialize) {
4780       // If initializing, set the sequenceId. It is also required by WALPerformanceEvaluation when
4781       // verifying the WALEdits.
4782       region.setSequenceId(region.initialize(null));
4783     }
4784     return region;
4785   }
4786 
4787   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4788                                       final Configuration conf,
4789                                       final HTableDescriptor hTableDescriptor,
4790                                       final WAL wal)
4791     throws IOException {
4792     return createHRegion(info, rootDir, conf, hTableDescriptor, wal, true);
4793   }
4794 
4795 
4796   /**
4797    * Open a Region.
4798    * @param info Info for region to be opened.
4799    * @param wal WAL for region to use. This method will call
4800    * WAL#setSequenceNumber(long) passing the result of the call to
4801    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4802    * up.  HRegionStore does this every time it opens a new region.
4803    * @return new HRegion
4804    *
4805    * @throws IOException
4806    */
4807   public static HRegion openHRegion(final HRegionInfo info,
4808       final HTableDescriptor htd, final WAL wal,
4809       final Configuration conf)
4810   throws IOException {
4811     return openHRegion(info, htd, wal, conf, null, null);
4812   }
4813 
4814   /**
4815    * Open a Region.
4816    * @param info Info for region to be opened
4817    * @param htd the table descriptor
4818    * @param wal WAL for region to use. This method will call
4819    * WAL#setSequenceNumber(long) passing the result of the call to
4820    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4821    * up.  HRegionStore does this every time it opens a new region.
4822    * @param conf The Configuration object to use.
4823    * @param rsServices An interface we can request flushes against.
4824    * @param reporter An interface we can report progress against.
4825    * @return new HRegion
4826    *
4827    * @throws IOException
4828    */
4829   public static HRegion openHRegion(final HRegionInfo info,
4830     final HTableDescriptor htd, final WAL wal, final Configuration conf,
4831     final RegionServerServices rsServices,
4832     final CancelableProgressable reporter)
4833   throws IOException {
4834     return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
4835   }
4836 
4837   /**
4838    * Open a Region.
4839    * @param rootDir Root directory for HBase instance
4840    * @param info Info for region to be opened.
4841    * @param htd the table descriptor
4842    * @param wal WAL for region to use. This method will call
4843    * WAL#setSequenceNumber(long) passing the result of the call to
4844    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4845    * up.  HRegionStore does this every time it opens a new region.
4846    * @param conf The Configuration object to use.
4847    * @return new HRegion
4848    * @throws IOException
4849    */
4850   public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
4851       final HTableDescriptor htd, final WAL wal, final Configuration conf)
4852   throws IOException {
4853     return openHRegion(rootDir, info, htd, wal, conf, null, null);
4854   }
4855 
4856   /**
4857    * Open a Region.
4858    * @param rootDir Root directory for HBase instance
4859    * @param info Info for region to be opened.
4860    * @param htd the table descriptor
4861    * @param wal WAL for region to use. This method will call
4862    * WAL#setSequenceNumber(long) passing the result of the call to
4863    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4864    * up.  HRegionStore does this every time it opens a new region.
4865    * @param conf The Configuration object to use.
4866    * @param rsServices An interface we can request flushes against.
4867    * @param reporter An interface we can report progress against.
4868    * @return new HRegion
4869    * @throws IOException
4870    */
4871   public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
4872       final HTableDescriptor htd, final WAL wal, final Configuration conf,
4873       final RegionServerServices rsServices,
4874       final CancelableProgressable reporter)
4875   throws IOException {
4876     FileSystem fs = null;
4877     if (rsServices != null) {
4878       fs = rsServices.getFileSystem();
4879     }
4880     if (fs == null) {
4881       fs = FileSystem.get(conf);
4882     }
4883     return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
4884   }
4885 
4886   /**
4887    * Open a Region.
4888    * @param conf The Configuration object to use.
4889    * @param fs Filesystem to use
4890    * @param rootDir Root directory for HBase instance
4891    * @param info Info for region to be opened.
4892    * @param htd the table descriptor
4893    * @param wal WAL for region to use. This method will call
4894    * WAL#setSequenceNumber(long) passing the result of the call to
4895    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4896    * up.  HRegionStore does this every time it opens a new region.
4897    * @return new HRegion
4898    * @throws IOException
4899    */
4900   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4901       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal)
4902       throws IOException {
4903     return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
4904   }
4905 
4906   /**
4907    * Open a Region.
4908    * @param conf The Configuration object to use.
4909    * @param fs Filesystem to use
4910    * @param rootDir Root directory for HBase instance
4911    * @param info Info for region to be opened.
4912    * @param htd the table descriptor
4913    * @param wal WAL for region to use. This method will call
4914    * WAL#setSequenceNumber(long) passing the result of the call to
4915    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4916    * up.  HRegionStore does this every time it opens a new region.
4917    * @param rsServices An interface we can request flushes against.
4918    * @param reporter An interface we can report progress against.
4919    * @return new HRegion
4920    * @throws IOException
4921    */
4922   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4923       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal,
4924       final RegionServerServices rsServices, final CancelableProgressable reporter)
4925       throws IOException {
4926     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4927     return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
4928   }
4929 
4930   /**
4931    * Open a Region.
4932    * @param conf The Configuration object to use.
4933    * @param fs Filesystem to use
4934    * @param rootDir Root directory for HBase instance
4935    * @param info Info for region to be opened.
4936    * @param htd the table descriptor
4937    * @param wal WAL for region to use. This method will call
4938    * WAL#setSequenceNumber(long) passing the result of the call to
4939    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4940    * up.  HRegionStore does this every time it opens a new region.
4941    * @param rsServices An interface we can request flushes against.
4942    * @param reporter An interface we can report progress against.
4943    * @return new HRegion
4944    * @throws IOException
4945    */
4946   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4947       final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd,
4948       final WAL wal, final RegionServerServices rsServices,
4949       final CancelableProgressable reporter)
4950       throws IOException {
4951     if (info == null) throw new NullPointerException("Passed region info is null");
4952     if (LOG.isDebugEnabled()) {
4953       LOG.debug("Opening region: " + info);
4954     }
4955     HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
4956     return r.openHRegion(reporter);
4957   }
4958 
4959 
4960   /**
4961    * Useful when reopening a closed region (normally for unit tests)
4962    * @param other original object
4963    * @param reporter An interface we can report progress against.
4964    * @return new HRegion
4965    * @throws IOException
4966    */
4967   public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
4968       throws IOException {
4969     HRegionFileSystem regionFs = other.getRegionFileSystem();
4970     HRegion r = newHRegion(regionFs.getTableDir(), other.getWAL(), regionFs.getFileSystem(),
4971         other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
4972     return r.openHRegion(reporter);
4973   }
4974 
4975   /**
4976    * Open HRegion.
4977    * Calls initialize and sets sequenceId.
4978    * @return Returns <code>this</code>
4979    * @throws IOException
4980    */
4981   protected HRegion openHRegion(final CancelableProgressable reporter)
4982   throws IOException {
4983     // Refuse to open the region if we are missing local compression support
4984     checkCompressionCodecs();
4985     // Refuse to open the region if encryption configuration is incorrect or
4986     // codec support is missing
4987     checkEncryption();
4988     // Refuse to open the region if a required class cannot be loaded
4989     checkClassLoading();
4990     this.openSeqNum = initialize(reporter);
4991     this.setSequenceId(openSeqNum);
4992     if (wal != null && getRegionServerServices() != null) {
4993       writeRegionOpenMarker(wal, openSeqNum);
4994     }
4995     return this;
4996   }
4997 
4998   private void checkCompressionCodecs() throws IOException {
4999     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
5000       CompressionTest.testCompression(fam.getCompression());
5001       CompressionTest.testCompression(fam.getCompactionCompression());
5002     }
5003   }
5004 
5005   private void checkEncryption() throws IOException {
5006     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
5007       EncryptionTest.testEncryption(conf, fam.getEncryptionType(), fam.getEncryptionKey());
5008     }
5009   }
5010 
5011   private void checkClassLoading() throws IOException {
5012     RegionSplitPolicy.getSplitPolicyClass(this.htableDescriptor, conf);
5013     RegionCoprocessorHost.testTableCoprocessorAttrs(conf, this.htableDescriptor);
5014   }
5015 
5016   /**
5017    * Create a daughter region from given a temp directory with the region data.
5018    * @param hri Spec. for daughter region to open.
5019    * @throws IOException
5020    */
5021   HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
5022     // Move the files from the temporary .splits to the final /table/region directory
5023     fs.commitDaughterRegion(hri);
5024 
5025     // Create the daughter HRegion instance
5026     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
5027         this.getBaseConf(), hri, this.getTableDesc(), rsServices);
5028     r.readRequestsCount.set(this.getReadRequestsCount() / 2);
5029     r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
5030     return r;
5031   }
5032 
5033   /**
5034    * Create a merged region given a temp directory with the region data.
5035    * @param region_b another merging region
5036    * @return merged HRegion
5037    * @throws IOException
5038    */
5039   HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
5040       final HRegion region_b) throws IOException {
5041     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
5042         fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
5043         this.getTableDesc(), this.rsServices);
5044     r.readRequestsCount.set(this.getReadRequestsCount()
5045         + region_b.getReadRequestsCount());
5046     r.writeRequestsCount.set(this.getWriteRequestsCount()
5047 
5048         + region_b.getWriteRequestsCount());
5049     this.fs.commitMergedRegion(mergedRegionInfo);
5050     return r;
5051   }
5052 
5053   /**
5054    * Inserts a new region's meta information into the passed
5055    * <code>meta</code> region. Used by the HMaster bootstrap code adding
5056    * new table to hbase:meta table.
5057    *
5058    * @param meta hbase:meta HRegion to be updated
5059    * @param r HRegion to add to <code>meta</code>
5060    *
5061    * @throws IOException
5062    */
5063   // TODO remove since only test and merge use this
5064   public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
5065     meta.checkResources();
5066     // The row key is the region name
5067     byte[] row = r.getRegionName();
5068     final long now = EnvironmentEdgeManager.currentTime();
5069     final List<Cell> cells = new ArrayList<Cell>(2);
5070     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
5071       HConstants.REGIONINFO_QUALIFIER, now,
5072       r.getRegionInfo().toByteArray()));
5073     // Set into the root table the version of the meta table.
5074     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
5075       HConstants.META_VERSION_QUALIFIER, now,
5076       Bytes.toBytes(HConstants.META_VERSION)));
5077     meta.put(row, HConstants.CATALOG_FAMILY, cells);
5078   }
5079 
5080   /**
5081    * Computes the Path of the HRegion
5082    *
5083    * @param tabledir qualified path for table
5084    * @param name ENCODED region name
5085    * @return Path of HRegion directory
5086    */
5087   @Deprecated
5088   public static Path getRegionDir(final Path tabledir, final String name) {
5089     return new Path(tabledir, name);
5090   }
5091 
5092   /**
5093    * Computes the Path of the HRegion
5094    *
5095    * @param rootdir qualified path of HBase root directory
5096    * @param info HRegionInfo for the region
5097    * @return qualified path of region directory
5098    */
5099   @Deprecated
5100   @VisibleForTesting
5101   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
5102     return new Path(
5103       FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
5104   }
5105 
5106   /**
5107    * Determines if the specified row is within the row range specified by the
5108    * specified HRegionInfo
5109    *
5110    * @param info HRegionInfo that specifies the row range
5111    * @param row row to be checked
5112    * @return true if the row is within the range specified by the HRegionInfo
5113    */
5114   public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
5115     return ((info.getStartKey().length == 0) ||
5116         (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
5117         ((info.getEndKey().length == 0) ||
5118             (Bytes.compareTo(info.getEndKey(), row) > 0));
5119   }
5120 
5121   /**
5122    * Merge two HRegions.  The regions must be adjacent and must not overlap.
5123    *
5124    * @return new merged HRegion
5125    * @throws IOException
5126    */
5127   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
5128   throws IOException {
5129     HRegion a = srcA;
5130     HRegion b = srcB;
5131 
5132     // Make sure that srcA comes first; important for key-ordering during
5133     // write of the merged file.
5134     if (srcA.getStartKey() == null) {
5135       if (srcB.getStartKey() == null) {
5136         throw new IOException("Cannot merge two regions with null start key");
5137       }
5138       // A's start key is null but B's isn't. Assume A comes before B
5139     } else if ((srcB.getStartKey() == null) ||
5140       (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
5141       a = srcB;
5142       b = srcA;
5143     }
5144 
5145     if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
5146       throw new IOException("Cannot merge non-adjacent regions");
5147     }
5148     return merge(a, b);
5149   }
5150 
5151   /**
5152    * Merge two regions whether they are adjacent or not.
5153    *
5154    * @param a region a
5155    * @param b region b
5156    * @return new merged region
5157    * @throws IOException
5158    */
5159   public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
5160     if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
5161       throw new IOException("Regions do not belong to the same table");
5162     }
5163 
5164     FileSystem fs = a.getRegionFileSystem().getFileSystem();
5165     // Make sure each region's cache is empty
5166     a.flushcache(true);
5167     b.flushcache(true);
5168 
5169     // Compact each region so we only have one store file per family
5170     a.compactStores(true);
5171     if (LOG.isDebugEnabled()) {
5172       LOG.debug("Files for region: " + a);
5173       a.getRegionFileSystem().logFileSystemState(LOG);
5174     }
5175     b.compactStores(true);
5176     if (LOG.isDebugEnabled()) {
5177       LOG.debug("Files for region: " + b);
5178       b.getRegionFileSystem().logFileSystemState(LOG);
5179     }
5180 
5181     RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
5182     if (!rmt.prepare(null)) {
5183       throw new IOException("Unable to merge regions " + a + " and " + b);
5184     }
5185     HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
5186     LOG.info("starting merge of regions: " + a + " and " + b
5187         + " into new region " + mergedRegionInfo.getRegionNameAsString()
5188         + " with start key <"
5189         + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
5190         + "> and end key <"
5191         + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
5192     HRegion dstRegion;
5193     try {
5194       dstRegion = rmt.execute(null, null);
5195     } catch (IOException ioe) {
5196       rmt.rollback(null, null);
5197       throw new IOException("Failed merging region " + a + " and " + b
5198           + ", and successfully rolled back");
5199     }
5200     dstRegion.compactStores(true);
5201 
5202     if (LOG.isDebugEnabled()) {
5203       LOG.debug("Files for new region");
5204       dstRegion.getRegionFileSystem().logFileSystemState(LOG);
5205     }
5206 
5207     if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
5208       throw new IOException("Merged region " + dstRegion
5209           + " still has references after the compaction, is compaction canceled?");
5210     }
5211 
5212     // Archiving the 'A' region
5213     HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
5214     // Archiving the 'B' region
5215     HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
5216 
5217     LOG.info("merge completed. New region is " + dstRegion);
5218     return dstRegion;
5219   }
5220 
5221   //
5222   // HBASE-880
5223   //
5224   /**
5225    * @param get get object
5226    * @return result
5227    * @throws IOException read exceptions
5228    */
5229   public Result get(final Get get) throws IOException {
5230     checkRow(get.getRow(), "Get");
5231     // Verify families are all valid
5232     if (get.hasFamilies()) {
5233       for (byte [] family: get.familySet()) {
5234         checkFamily(family);
5235       }
5236     } else { // Adding all families to scanner
5237       for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
5238         get.addFamily(family);
5239       }
5240     }
5241     List<Cell> results = get(get, true);
5242     boolean stale = this.getRegionInfo().getReplicaId() != 0;
5243     return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
5244   }
5245 
5246   /*
5247    * Do a get based on the get parameter.
5248    * @param withCoprocessor invoke coprocessor or not. We don't want to
5249    * always invoke cp for this private method.
5250    */
5251   public List<Cell> get(Get get, boolean withCoprocessor)
5252   throws IOException {
5253 
5254     List<Cell> results = new ArrayList<Cell>();
5255 
5256     // pre-get CP hook
5257     if (withCoprocessor && (coprocessorHost != null)) {
5258        if (coprocessorHost.preGet(get, results)) {
5259          return results;
5260        }
5261     }
5262 
5263     Scan scan = new Scan(get);
5264 
5265     RegionScanner scanner = null;
5266     try {
5267       scanner = getScanner(scan);
5268       scanner.next(results);
5269     } finally {
5270       if (scanner != null)
5271         scanner.close();
5272     }
5273 
5274     // post-get CP hook
5275     if (withCoprocessor && (coprocessorHost != null)) {
5276       coprocessorHost.postGet(get, results);
5277     }
5278 
5279     // do after lock
5280     if (this.metricsRegion != null) {
5281       long totalSize = 0L;
5282       for (Cell cell : results) {
5283         totalSize += CellUtil.estimatedSerializedSizeOf(cell);
5284       }
5285       this.metricsRegion.updateGet(totalSize);
5286     }
5287 
5288     return results;
5289   }
5290 
5291   public void mutateRow(RowMutations rm) throws IOException {
5292     // Don't need nonces here - RowMutations only supports puts and deletes
5293     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
5294   }
5295 
5296   /**
5297    * Perform atomic mutations within the region w/o nonces.
5298    * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)}
5299    */
5300   public void mutateRowsWithLocks(Collection<Mutation> mutations,
5301       Collection<byte[]> rowsToLock) throws IOException {
5302     mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
5303   }
5304 
5305   /**
5306    * Perform atomic mutations within the region.
5307    * @param mutations The list of mutations to perform.
5308    * <code>mutations</code> can contain operations for multiple rows.
5309    * Caller has to ensure that all rows are contained in this region.
5310    * @param rowsToLock Rows to lock
5311    * @param nonceGroup Optional nonce group of the operation (client Id)
5312    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5313    * If multiple rows are locked care should be taken that
5314    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
5315    * @throws IOException
5316    */
5317   public void mutateRowsWithLocks(Collection<Mutation> mutations,
5318       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
5319     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
5320     processRowsWithLocks(proc, -1, nonceGroup, nonce);
5321   }
5322 
5323   /**
5324    * @return the current load statistics for the the region
5325    */
5326   public ClientProtos.RegionLoadStats getRegionStats() {
5327     if (!regionStatsEnabled) {
5328       return null;
5329     }
5330     ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
5331     stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this
5332         .memstoreFlushSize)));
5333     stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100);
5334     return stats.build();
5335   }
5336 
5337   /**
5338    * Performs atomic multiple reads and writes on a given row.
5339    *
5340    * @param processor The object defines the reads and writes to a row.
5341    * @param nonceGroup Optional nonce group of the operation (client Id)
5342    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5343    */
5344   public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
5345       throws IOException {
5346     processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
5347   }
5348 
5349   /**
5350    * Performs atomic multiple reads and writes on a given row.
5351    *
5352    * @param processor The object defines the reads and writes to a row.
5353    * @param timeout The timeout of the processor.process() execution
5354    *                Use a negative number to switch off the time bound
5355    * @param nonceGroup Optional nonce group of the operation (client Id)
5356    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5357    */
5358   public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
5359       long nonceGroup, long nonce) throws IOException {
5360 
5361     for (byte[] row : processor.getRowsToLock()) {
5362       checkRow(row, "processRowsWithLocks");
5363     }
5364     if (!processor.readOnly()) {
5365       checkReadOnly();
5366     }
5367     checkResources();
5368 
5369     startRegionOperation();
5370     WALEdit walEdit = new WALEdit();
5371 
5372     // 1. Run pre-process hook
5373     try {
5374       processor.preProcess(this, walEdit);
5375     } catch (IOException e) {
5376       closeRegionOperation();
5377       throw e;
5378     }
5379     // Short circuit the read only case
5380     if (processor.readOnly()) {
5381       try {
5382         long now = EnvironmentEdgeManager.currentTime();
5383         doProcessRowWithTimeout(
5384             processor, now, this, null, null, timeout);
5385         processor.postProcess(this, walEdit, true);
5386       } finally {
5387         closeRegionOperation();
5388       }
5389       return;
5390     }
5391 
5392     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
5393     boolean locked;
5394     boolean walSyncSuccessful = false;
5395     List<RowLock> acquiredRowLocks;
5396     long addedSize = 0;
5397     List<Mutation> mutations = new ArrayList<Mutation>();
5398     List<Cell> memstoreCells = new ArrayList<Cell>();
5399     Collection<byte[]> rowsToLock = processor.getRowsToLock();
5400     long mvccNum = 0;
5401     WALKey walKey = null;
5402     try {
5403       // 2. Acquire the row lock(s)
5404       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
5405       for (byte[] row : rowsToLock) {
5406         // Attempt to lock all involved rows, throw if any lock times out
5407         acquiredRowLocks.add(getRowLock(row));
5408       }
5409       // 3. Region lock
5410       lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
5411       locked = true;
5412       // Get a mvcc write number
5413       mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5414 
5415       long now = EnvironmentEdgeManager.currentTime();
5416       try {
5417         // 4. Let the processor scan the rows, generate mutations and add
5418         //    waledits
5419         doProcessRowWithTimeout(
5420             processor, now, this, mutations, walEdit, timeout);
5421 
5422         if (!mutations.isEmpty()) {
5423           // 5. Start mvcc transaction
5424           writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5425           // 6. Call the preBatchMutate hook
5426           processor.preBatchMutate(this, walEdit);
5427           // 7. Apply to memstore
5428           for (Mutation m : mutations) {
5429             // Handle any tag based cell features
5430             rewriteCellTags(m.getFamilyCellMap(), m);
5431 
5432             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5433               Cell cell = cellScanner.current();
5434               CellUtil.setSequenceId(cell, mvccNum);
5435               Store store = getStore(cell);
5436               if (store == null) {
5437                 checkFamily(CellUtil.cloneFamily(cell));
5438                 // unreachable
5439               }
5440               Pair<Long, Cell> ret = store.add(cell);
5441               addedSize += ret.getFirst();
5442               memstoreCells.add(ret.getSecond());
5443             }
5444           }
5445 
5446           long txid = 0;
5447           // 8. Append no sync
5448           if (!walEdit.isEmpty()) {
5449             // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
5450             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
5451               this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
5452               processor.getClusterIds(), nonceGroup, nonce);
5453             txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
5454               walKey, walEdit, getSequenceId(), true, memstoreCells);
5455           }
5456           if(walKey == null){
5457             // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
5458             // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
5459             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
5460           }
5461           // 9. Release region lock
5462           if (locked) {
5463             this.updatesLock.readLock().unlock();
5464             locked = false;
5465           }
5466 
5467           // 10. Release row lock(s)
5468           releaseRowLocks(acquiredRowLocks);
5469 
5470           // 11. Sync edit log
5471           if (txid != 0) {
5472             syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
5473           }
5474           walSyncSuccessful = true;
5475           // 12. call postBatchMutate hook
5476           processor.postBatchMutate(this);
5477         }
5478       } finally {
5479         if (!mutations.isEmpty() && !walSyncSuccessful) {
5480           LOG.warn("Wal sync failed. Roll back " + mutations.size() +
5481               " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
5482               processor.getRowsToLock().iterator().next()) + "...");
5483           for (Mutation m : mutations) {
5484             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5485               Cell cell = cellScanner.current();
5486               getStore(cell).rollback(cell);
5487             }
5488           }
5489         }
5490         // 13. Roll mvcc forward
5491         if (writeEntry != null) {
5492           mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
5493         }
5494         if (locked) {
5495           this.updatesLock.readLock().unlock();
5496         }
5497         // release locks if some were acquired but another timed out
5498         releaseRowLocks(acquiredRowLocks);
5499       }
5500 
5501       // 14. Run post-process hook
5502       processor.postProcess(this, walEdit, walSyncSuccessful);
5503 
5504     } finally {
5505       closeRegionOperation();
5506       if (!mutations.isEmpty() &&
5507           isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
5508         requestFlush();
5509       }
5510     }
5511   }
5512 
5513   private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
5514                                        final long now,
5515                                        final HRegion region,
5516                                        final List<Mutation> mutations,
5517                                        final WALEdit walEdit,
5518                                        final long timeout) throws IOException {
5519     // Short circuit the no time bound case.
5520     if (timeout < 0) {
5521       try {
5522         processor.process(now, region, mutations, walEdit);
5523       } catch (IOException e) {
5524         LOG.warn("RowProcessor:" + processor.getClass().getName() +
5525             " throws Exception on row(s):" +
5526             Bytes.toStringBinary(
5527               processor.getRowsToLock().iterator().next()) + "...", e);
5528         throw e;
5529       }
5530       return;
5531     }
5532 
5533     // Case with time bound
5534     FutureTask<Void> task =
5535       new FutureTask<Void>(new Callable<Void>() {
5536         @Override
5537         public Void call() throws IOException {
5538           try {
5539             processor.process(now, region, mutations, walEdit);
5540             return null;
5541           } catch (IOException e) {
5542             LOG.warn("RowProcessor:" + processor.getClass().getName() +
5543                 " throws Exception on row(s):" +
5544                 Bytes.toStringBinary(
5545                     processor.getRowsToLock().iterator().next()) + "...", e);
5546             throw e;
5547           }
5548         }
5549       });
5550     rowProcessorExecutor.execute(task);
5551     try {
5552       task.get(timeout, TimeUnit.MILLISECONDS);
5553     } catch (TimeoutException te) {
5554       LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
5555           Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
5556           "...");
5557       throw new IOException(te);
5558     } catch (Exception e) {
5559       throw new IOException(e);
5560     }
5561   }
5562 
5563   public Result append(Append append) throws IOException {
5564     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
5565   }
5566 
5567   // TODO: There's a lot of boiler plate code identical to increment.
5568   // We should refactor append and increment as local get-mutate-put
5569   // transactions, so all stores only go through one code path for puts.
5570   /**
5571    * Perform one or more append operations on a row.
5572    *
5573    * @return new keyvalues after increment
5574    * @throws IOException
5575    */
5576   public Result append(Append append, long nonceGroup, long nonce)
5577       throws IOException {
5578     byte[] row = append.getRow();
5579     checkRow(row, "append");
5580     boolean flush = false;
5581     Durability durability = getEffectiveDurability(append.getDurability());
5582     boolean writeToWAL = durability != Durability.SKIP_WAL;
5583     WALEdit walEdits = null;
5584     List<Cell> allKVs = new ArrayList<Cell>(append.size());
5585     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5586     long size = 0;
5587     long txid = 0;
5588 
5589     checkReadOnly();
5590     checkResources();
5591     // Lock row
5592     startRegionOperation(Operation.APPEND);
5593     this.writeRequestsCount.increment();
5594     long mvccNum = 0;
5595     WriteEntry w = null;
5596     WALKey walKey = null;
5597     RowLock rowLock = null;
5598     List<Cell> memstoreCells = new ArrayList<Cell>();
5599     boolean doRollBackMemstore = false;
5600     try {
5601       rowLock = getRowLock(row);
5602       try {
5603         lock(this.updatesLock.readLock());
5604         try {
5605           // wait for all prior MVCC transactions to finish - while we hold the row lock
5606           // (so that we are guaranteed to see the latest state)
5607           mvcc.waitForPreviousTransactionsComplete();
5608           if (this.coprocessorHost != null) {
5609             Result r = this.coprocessorHost.preAppendAfterRowLock(append);
5610             if(r!= null) {
5611               return r;
5612             }
5613           }
5614           // now start my own transaction
5615           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5616           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5617           long now = EnvironmentEdgeManager.currentTime();
5618           // Process each family
5619           for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
5620 
5621             Store store = stores.get(family.getKey());
5622             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5623 
5624             // Sort the cells so that they match the order that they
5625             // appear in the Get results. Otherwise, we won't be able to
5626             // find the existing values if the cells are not specified
5627             // in order by the client since cells are in an array list.
5628             Collections.sort(family.getValue(), store.getComparator());
5629             // Get previous values for all columns in this family
5630             Get get = new Get(row);
5631             for (Cell cell : family.getValue()) {
5632               get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
5633             }
5634             List<Cell> results = get(get, false);
5635             // Iterate the input columns and update existing values if they were
5636             // found, otherwise add new column initialized to the append value
5637 
5638             // Avoid as much copying as possible. We may need to rewrite and
5639             // consolidate tags. Bytes are only copied once.
5640             // Would be nice if KeyValue had scatter/gather logic
5641             int idx = 0;
5642             for (Cell cell : family.getValue()) {
5643               Cell newCell;
5644               Cell oldCell = null;
5645               if (idx < results.size()
5646                   && CellUtil.matchingQualifier(results.get(idx), cell)) {
5647                 oldCell = results.get(idx);
5648                 long ts = Math.max(now, oldCell.getTimestamp());
5649 
5650                 // Process cell tags
5651                 List<Tag> newTags = new ArrayList<Tag>();
5652 
5653                 // Make a union of the set of tags in the old and new KVs
5654 
5655                 if (oldCell.getTagsLength() > 0) {
5656                   Iterator<Tag> i = CellUtil.tagsIterator(oldCell.getTagsArray(),
5657                     oldCell.getTagsOffset(), oldCell.getTagsLength());
5658                   while (i.hasNext()) {
5659                     newTags.add(i.next());
5660                   }
5661                 }
5662                 if (cell.getTagsLength() > 0) {
5663                   Iterator<Tag> i  = CellUtil.tagsIterator(cell.getTagsArray(), 
5664                     cell.getTagsOffset(), cell.getTagsLength());
5665                   while (i.hasNext()) {
5666                     newTags.add(i.next());
5667                   }
5668                 }
5669 
5670                 // Cell TTL handling
5671 
5672                 if (append.getTTL() != Long.MAX_VALUE) {
5673                   // Add the new TTL tag
5674                   newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
5675                 }
5676 
5677                 // Rebuild tags
5678                 byte[] tagBytes = Tag.fromList(newTags);
5679 
5680                 // allocate an empty cell once
5681                 newCell = new KeyValue(row.length, cell.getFamilyLength(),
5682                     cell.getQualifierLength(), ts, KeyValue.Type.Put,
5683                     oldCell.getValueLength() + cell.getValueLength(),
5684                     tagBytes.length);
5685                 // copy in row, family, and qualifier
5686                 System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
5687                   newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
5688                 System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
5689                   newCell.getFamilyArray(), newCell.getFamilyOffset(),
5690                   cell.getFamilyLength());
5691                 System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
5692                   newCell.getQualifierArray(), newCell.getQualifierOffset(),
5693                   cell.getQualifierLength());
5694                 // copy in the value
5695                 System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
5696                   newCell.getValueArray(), newCell.getValueOffset(),
5697                   oldCell.getValueLength());
5698                 System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
5699                   newCell.getValueArray(),
5700                   newCell.getValueOffset() + oldCell.getValueLength(),
5701                   cell.getValueLength());
5702                 // Copy in tag data
5703                 System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
5704                   tagBytes.length);
5705                 idx++;
5706               } else {
5707                 // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP
5708                 CellUtil.updateLatestStamp(cell, now);
5709 
5710                 // Cell TTL handling
5711 
5712                 if (append.getTTL() != Long.MAX_VALUE) {
5713                   List<Tag> newTags = new ArrayList<Tag>(1);
5714                   newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
5715                   // Add the new TTL tag
5716                   newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
5717                       cell.getRowLength(),
5718                     cell.getFamilyArray(), cell.getFamilyOffset(),
5719                       cell.getFamilyLength(),
5720                     cell.getQualifierArray(), cell.getQualifierOffset(),
5721                       cell.getQualifierLength(),
5722                     cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
5723                     cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
5724                     newTags);
5725                 } else {
5726                   newCell = cell;
5727                 }
5728               }
5729 
5730               CellUtil.setSequenceId(newCell, mvccNum);
5731               // Give coprocessors a chance to update the new cell
5732               if (coprocessorHost != null) {
5733                 newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,
5734                     append, oldCell, newCell);
5735               }
5736               kvs.add(newCell);
5737 
5738               // Append update to WAL
5739               if (writeToWAL) {
5740                 if (walEdits == null) {
5741                   walEdits = new WALEdit();
5742                 }
5743                 walEdits.add(newCell);
5744               }
5745             }
5746 
5747             //store the kvs to the temporary memstore before writing WAL
5748             tempMemstore.put(store, kvs);
5749           }
5750 
5751           //Actually write to Memstore now
5752           for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5753             Store store = entry.getKey();
5754             if (store.getFamily().getMaxVersions() == 1) {
5755               // upsert if VERSIONS for this CF == 1
5756               size += store.upsert(entry.getValue(), getSmallestReadPoint());
5757               memstoreCells.addAll(entry.getValue());
5758             } else {
5759               // otherwise keep older versions around
5760               for (Cell cell: entry.getValue()) {
5761                 Pair<Long, Cell> ret = store.add(cell);
5762                 size += ret.getFirst();
5763                 memstoreCells.add(ret.getSecond());
5764                 doRollBackMemstore = true;
5765               }
5766             }
5767             allKVs.addAll(entry.getValue());
5768           }
5769 
5770           // Actually write to WAL now
5771           if (writeToWAL) {
5772             // Using default cluster id, as this can only happen in the originating
5773             // cluster. A slave cluster receives the final value (not the delta)
5774             // as a Put.
5775             // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
5776             walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
5777               this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
5778             txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
5779               this.sequenceId, true, memstoreCells);
5780           } else {
5781             recordMutationWithoutWal(append.getFamilyCellMap());
5782           }
5783           if (walKey == null) {
5784             // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
5785             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
5786           }
5787           size = this.addAndGetGlobalMemstoreSize(size);
5788           flush = isFlushSize(size);
5789         } finally {
5790           this.updatesLock.readLock().unlock();
5791         }
5792       } finally {
5793         rowLock.release();
5794         rowLock = null;
5795       }
5796       // sync the transaction log outside the rowlock
5797       if(txid != 0){
5798         syncOrDefer(txid, durability);
5799       }
5800       doRollBackMemstore = false;
5801     } finally {
5802       if (rowLock != null) {
5803         rowLock.release();
5804       }
5805       // if the wal sync was unsuccessful, remove keys from memstore
5806       if (doRollBackMemstore) {
5807         rollbackMemstore(memstoreCells);
5808       }
5809       if (w != null) {
5810         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5811       }
5812       closeRegionOperation(Operation.APPEND);
5813     }
5814 
5815     if (this.metricsRegion != null) {
5816       this.metricsRegion.updateAppend();
5817     }
5818 
5819     if (flush) {
5820       // Request a cache flush. Do it outside update lock.
5821       requestFlush();
5822     }
5823 
5824 
5825     return append.isReturnResults() ? Result.create(allKVs) : null;
5826   }
5827 
5828   public Result increment(Increment increment) throws IOException {
5829     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
5830   }
5831 
5832   // TODO: There's a lot of boiler plate code identical to append.
5833   // We should refactor append and increment as local get-mutate-put
5834   // transactions, so all stores only go through one code path for puts.
5835   /**
5836    * Perform one or more increment operations on a row.
5837    * @return new keyvalues after increment
5838    * @throws IOException
5839    */
5840   public Result increment(Increment increment, long nonceGroup, long nonce)
5841   throws IOException {
5842     byte [] row = increment.getRow();
5843     checkRow(row, "increment");
5844     TimeRange tr = increment.getTimeRange();
5845     boolean flush = false;
5846     Durability durability = getEffectiveDurability(increment.getDurability());
5847     boolean writeToWAL = durability != Durability.SKIP_WAL;
5848     WALEdit walEdits = null;
5849     List<Cell> allKVs = new ArrayList<Cell>(increment.size());
5850     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5851 
5852     long size = 0;
5853     long txid = 0;
5854 
5855     checkReadOnly();
5856     checkResources();
5857     // Lock row
5858     startRegionOperation(Operation.INCREMENT);
5859     this.writeRequestsCount.increment();
5860     RowLock rowLock = null;
5861     WriteEntry w = null;
5862     WALKey walKey = null;
5863     long mvccNum = 0;
5864     List<Cell> memstoreCells = new ArrayList<Cell>();
5865     boolean doRollBackMemstore = false;
5866     try {
5867       rowLock = getRowLock(row);
5868       try {
5869         lock(this.updatesLock.readLock());
5870         try {
5871           // wait for all prior MVCC transactions to finish - while we hold the row lock
5872           // (so that we are guaranteed to see the latest state)
5873           mvcc.waitForPreviousTransactionsComplete();
5874           if (this.coprocessorHost != null) {
5875             Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
5876             if (r != null) {
5877               return r;
5878             }
5879           }
5880           // now start my own transaction
5881           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5882           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5883           long now = EnvironmentEdgeManager.currentTime();
5884           // Process each family
5885           for (Map.Entry<byte [], List<Cell>> family:
5886               increment.getFamilyCellMap().entrySet()) {
5887 
5888             Store store = stores.get(family.getKey());
5889             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5890 
5891             // Sort the cells so that they match the order that they
5892             // appear in the Get results. Otherwise, we won't be able to
5893             // find the existing values if the cells are not specified
5894             // in order by the client since cells are in an array list.
5895             Collections.sort(family.getValue(), store.getComparator());
5896             // Get previous values for all columns in this family
5897             Get get = new Get(row);
5898             for (Cell cell: family.getValue()) {
5899               get.addColumn(family.getKey(),  CellUtil.cloneQualifier(cell));
5900             }
5901             get.setTimeRange(tr.getMin(), tr.getMax());
5902             List<Cell> results = get(get, false);
5903 
5904             // Iterate the input columns and update existing values if they were
5905             // found, otherwise add new column initialized to the increment amount
5906             int idx = 0;
5907             for (Cell cell: family.getValue()) {
5908               long amount = Bytes.toLong(CellUtil.cloneValue(cell));
5909               boolean noWriteBack = (amount == 0);
5910               List<Tag> newTags = new ArrayList<Tag>();
5911 
5912               // Carry forward any tags that might have been added by a coprocessor
5913               if (cell.getTagsLength() > 0) {
5914                 Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(),
5915                   cell.getTagsOffset(), cell.getTagsLength());
5916                 while (i.hasNext()) {
5917                   newTags.add(i.next());
5918                 }
5919               }
5920 
5921               Cell c = null;
5922               long ts = now;
5923               if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
5924                 c = results.get(idx);
5925                 ts = Math.max(now, c.getTimestamp());
5926                 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
5927                   amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
5928                 } else {
5929                   // throw DoNotRetryIOException instead of IllegalArgumentException
5930                   throw new org.apache.hadoop.hbase.DoNotRetryIOException(
5931                       "Attempted to increment field that isn't 64 bits wide");
5932                 }
5933                 // Carry tags forward from previous version
5934                 if (c.getTagsLength() > 0) {
5935                   Iterator<Tag> i = CellUtil.tagsIterator(c.getTagsArray(),
5936                     c.getTagsOffset(), c.getTagsLength());
5937                   while (i.hasNext()) {
5938                     newTags.add(i.next());
5939                   }
5940                 }
5941                 idx++;
5942               }
5943 
5944               // Append new incremented KeyValue to list
5945               byte[] q = CellUtil.cloneQualifier(cell);
5946               byte[] val = Bytes.toBytes(amount);
5947 
5948               // Add the TTL tag if the mutation carried one
5949               if (increment.getTTL() != Long.MAX_VALUE) {
5950                 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
5951               }
5952 
5953               Cell newKV = new KeyValue(row, 0, row.length,
5954                 family.getKey(), 0, family.getKey().length,
5955                 q, 0, q.length,
5956                 ts,
5957                 KeyValue.Type.Put,
5958                 val, 0, val.length,
5959                 newTags);
5960 
5961               CellUtil.setSequenceId(newKV, mvccNum);
5962 
5963               // Give coprocessors a chance to update the new cell
5964               if (coprocessorHost != null) {
5965                 newKV = coprocessorHost.postMutationBeforeWAL(
5966                     RegionObserver.MutationType.INCREMENT, increment, c, newKV);
5967               }
5968               allKVs.add(newKV);
5969 
5970               if (!noWriteBack) {
5971                 kvs.add(newKV);
5972 
5973                 // Prepare WAL updates
5974                 if (writeToWAL) {
5975                   if (walEdits == null) {
5976                     walEdits = new WALEdit();
5977                   }
5978                   walEdits.add(newKV);
5979                 }
5980               }
5981             }
5982 
5983             //store the kvs to the temporary memstore before writing WAL
5984             if (!kvs.isEmpty()) {
5985               tempMemstore.put(store, kvs);
5986             }
5987           }
5988 
5989           //Actually write to Memstore now
5990           if (!tempMemstore.isEmpty()) {
5991             for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5992               Store store = entry.getKey();
5993               if (store.getFamily().getMaxVersions() == 1) {
5994                 // upsert if VERSIONS for this CF == 1
5995                 size += store.upsert(entry.getValue(), getSmallestReadPoint());
5996                 memstoreCells.addAll(entry.getValue());
5997               } else {
5998                 // otherwise keep older versions around
5999                 for (Cell cell : entry.getValue()) {
6000                   Pair<Long, Cell> ret = store.add(cell);
6001                   size += ret.getFirst();
6002                   memstoreCells.add(ret.getSecond());
6003                   doRollBackMemstore = true;
6004                 }
6005               }
6006             }
6007             size = this.addAndGetGlobalMemstoreSize(size);
6008             flush = isFlushSize(size);
6009           }
6010 
6011           // Actually write to WAL now
6012           if (walEdits != null && !walEdits.isEmpty()) {
6013             if (writeToWAL) {
6014               // Using default cluster id, as this can only happen in the originating
6015               // cluster. A slave cluster receives the final value (not the delta)
6016               // as a Put.
6017               // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
6018               walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
6019                 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
6020               txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
6021                 walKey, walEdits, getSequenceId(), true, memstoreCells);
6022             } else {
6023               recordMutationWithoutWal(increment.getFamilyCellMap());
6024             }
6025           }
6026           if(walKey == null){
6027             // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
6028             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
6029           }
6030         } finally {
6031           this.updatesLock.readLock().unlock();
6032         }
6033       } finally {
6034         rowLock.release();
6035         rowLock = null;
6036       }
6037       // sync the transaction log outside the rowlock
6038       if(txid != 0){
6039         syncOrDefer(txid, durability);
6040       }
6041       doRollBackMemstore = false;
6042     } finally {
6043       if (rowLock != null) {
6044         rowLock.release();
6045       }
6046       // if the wal sync was unsuccessful, remove keys from memstore
6047       if (doRollBackMemstore) {
6048         rollbackMemstore(memstoreCells);
6049       }
6050       if (w != null) {
6051         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
6052       }
6053       closeRegionOperation(Operation.INCREMENT);
6054       if (this.metricsRegion != null) {
6055         this.metricsRegion.updateIncrement();
6056       }
6057     }
6058 
6059     if (flush) {
6060       // Request a cache flush.  Do it outside update lock.
6061       requestFlush();
6062     }
6063 
6064     return Result.create(allKVs);
6065   }
6066 
6067   //
6068   // New HBASE-880 Helpers
6069   //
6070 
6071   private void checkFamily(final byte [] family)
6072   throws NoSuchColumnFamilyException {
6073     if (!this.htableDescriptor.hasFamily(family)) {
6074       throw new NoSuchColumnFamilyException("Column family " +
6075           Bytes.toString(family) + " does not exist in region " + this
6076           + " in table " + this.htableDescriptor);
6077     }
6078   }
6079 
6080   public static final long FIXED_OVERHEAD = ClassSize.align(
6081       ClassSize.OBJECT +
6082       ClassSize.ARRAY +
6083       44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
6084       (11 * Bytes.SIZEOF_LONG) +
6085       4 * Bytes.SIZEOF_BOOLEAN);
6086 
6087   // woefully out of date - currently missing:
6088   // 1 x HashMap - coprocessorServiceHandlers
6089   // 6 x Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
6090   //   checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,
6091   //   writeRequestsCount
6092   // 1 x HRegion$WriteState - writestate
6093   // 1 x RegionCoprocessorHost - coprocessorHost
6094   // 1 x RegionSplitPolicy - splitPolicy
6095   // 1 x MetricsRegion - metricsRegion
6096   // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper
6097   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
6098       ClassSize.OBJECT + // closeLock
6099       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
6100       (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
6101       (2 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, scannerReadPoints
6102       WriteState.HEAP_SIZE + // writestate
6103       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
6104       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
6105       MultiVersionConsistencyControl.FIXED_SIZE // mvcc
6106       + ClassSize.TREEMAP // maxSeqIdInStores
6107       + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
6108       ;
6109 
6110   @Override
6111   public long heapSize() {
6112     long heapSize = DEEP_OVERHEAD;
6113     for (Store store : this.stores.values()) {
6114       heapSize += store.heapSize();
6115     }
6116     // this does not take into account row locks, recent flushes, mvcc entries, and more
6117     return heapSize;
6118   }
6119 
6120   /*
6121    * This method calls System.exit.
6122    * @param message Message to print out.  May be null.
6123    */
6124   private static void printUsageAndExit(final String message) {
6125     if (message != null && message.length() > 0) System.out.println(message);
6126     System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
6127     System.out.println("Options:");
6128     System.out.println(" major_compact  Pass this option to major compact " +
6129       "passed region.");
6130     System.out.println("Default outputs scan of passed region.");
6131     System.exit(1);
6132   }
6133 
6134   /**
6135    * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
6136    * be available for handling
6137    * {@link HRegion#execService(com.google.protobuf.RpcController,
6138    *    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
6139    *
6140    * <p>
6141    * Only a single instance may be registered per region for a given {@link Service} subclass (the
6142    * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
6143    * After the first registration, subsequent calls with the same service name will fail with
6144    * a return value of {@code false}.
6145    * </p>
6146    * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
6147    * @return {@code true} if the registration was successful, {@code false}
6148    * otherwise
6149    */
6150   public boolean registerService(Service instance) {
6151     /*
6152      * No stacking of instances is allowed for a single service name
6153      */
6154     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
6155     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
6156       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
6157           " already registered, rejecting request from "+instance
6158       );
6159       return false;
6160     }
6161 
6162     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
6163     if (LOG.isDebugEnabled()) {
6164       LOG.debug("Registered coprocessor service: region="+
6165           Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
6166     }
6167     return true;
6168   }
6169 
6170   /**
6171    * Executes a single protocol buffer coprocessor endpoint {@link Service} method using
6172    * the registered protocol handlers.  {@link Service} implementations must be registered via the
6173    * {@link HRegion#registerService(com.google.protobuf.Service)}
6174    * method before they are available.
6175    *
6176    * @param controller an {@code RpcController} implementation to pass to the invoked service
6177    * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
6178    *     and parameters for the method invocation
6179    * @return a protocol buffer {@code Message} instance containing the method's result
6180    * @throws IOException if no registered service handler is found or an error
6181    *     occurs during the invocation
6182    * @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)
6183    */
6184   public Message execService(RpcController controller, CoprocessorServiceCall call)
6185       throws IOException {
6186     String serviceName = call.getServiceName();
6187     String methodName = call.getMethodName();
6188     if (!coprocessorServiceHandlers.containsKey(serviceName)) {
6189       throw new UnknownProtocolException(null,
6190           "No registered coprocessor service found for name "+serviceName+
6191           " in region "+Bytes.toStringBinary(getRegionName()));
6192     }
6193 
6194     Service service = coprocessorServiceHandlers.get(serviceName);
6195     Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
6196     Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
6197     if (methodDesc == null) {
6198       throw new UnknownProtocolException(service.getClass(),
6199           "Unknown method "+methodName+" called on service "+serviceName+
6200               " in region "+Bytes.toStringBinary(getRegionName()));
6201     }
6202 
6203     Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
6204         .mergeFrom(call.getRequest()).build();
6205 
6206     if (coprocessorHost != null) {
6207       request = coprocessorHost.preEndpointInvocation(service, methodName, request);
6208     }
6209 
6210     final Message.Builder responseBuilder =
6211         service.getResponsePrototype(methodDesc).newBuilderForType();
6212     service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
6213       @Override
6214       public void run(Message message) {
6215         if (message != null) {
6216           responseBuilder.mergeFrom(message);
6217         }
6218       }
6219     });
6220 
6221     if (coprocessorHost != null) {
6222       coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
6223     }
6224 
6225     return responseBuilder.build();
6226   }
6227 
6228   /*
6229    * Process table.
6230    * Do major compaction or list content.
6231    * @throws IOException
6232    */
6233   private static void processTable(final FileSystem fs, final Path p,
6234       final WALFactory walFactory, final Configuration c,
6235       final boolean majorCompact)
6236   throws IOException {
6237     HRegion region;
6238     FSTableDescriptors fst = new FSTableDescriptors(c);
6239     // Currently expects tables have one region only.
6240     if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
6241       final WAL wal = walFactory.getMetaWAL(
6242           HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
6243       region = HRegion.newHRegion(p, wal, fs, c,
6244         HRegionInfo.FIRST_META_REGIONINFO,
6245           fst.get(TableName.META_TABLE_NAME), null);
6246     } else {
6247       throw new IOException("Not a known catalog table: " + p.toString());
6248     }
6249     try {
6250       region.initialize(null);
6251       if (majorCompact) {
6252         region.compactStores(true);
6253       } else {
6254         // Default behavior
6255         Scan scan = new Scan();
6256         // scan.addFamily(HConstants.CATALOG_FAMILY);
6257         RegionScanner scanner = region.getScanner(scan);
6258         try {
6259           List<Cell> kvs = new ArrayList<Cell>();
6260           boolean done;
6261           do {
6262             kvs.clear();
6263             done = scanner.next(kvs);
6264             if (kvs.size() > 0) LOG.info(kvs);
6265           } while (done);
6266         } finally {
6267           scanner.close();
6268         }
6269       }
6270     } finally {
6271       region.close();
6272     }
6273   }
6274 
6275   boolean shouldForceSplit() {
6276     return this.splitRequest;
6277   }
6278 
6279   byte[] getExplicitSplitPoint() {
6280     return this.explicitSplitPoint;
6281   }
6282 
6283   void forceSplit(byte[] sp) {
6284     // This HRegion will go away after the forced split is successful
6285     // But if a forced split fails, we need to clear forced split.
6286     this.splitRequest = true;
6287     if (sp != null) {
6288       this.explicitSplitPoint = sp;
6289     }
6290   }
6291 
6292   void clearSplit() {
6293     this.splitRequest = false;
6294     this.explicitSplitPoint = null;
6295   }
6296 
6297   /**
6298    * Give the region a chance to prepare before it is split.
6299    */
6300   protected void prepareToSplit() {
6301     // nothing
6302   }
6303 
6304   /**
6305    * Return the splitpoint. null indicates the region isn't splittable
6306    * If the splitpoint isn't explicitly specified, it will go over the stores
6307    * to find the best splitpoint. Currently the criteria of best splitpoint
6308    * is based on the size of the store.
6309    */
6310   public byte[] checkSplit() {
6311     // Can't split META
6312     if (this.getRegionInfo().isMetaTable() ||
6313         TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
6314       if (shouldForceSplit()) {
6315         LOG.warn("Cannot split meta region in HBase 0.20 and above");
6316       }
6317       return null;
6318     }
6319 
6320     // Can't split region which is in recovering state
6321     if (this.isRecovering()) {
6322       LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
6323       return null;
6324     }
6325 
6326     if (!splitPolicy.shouldSplit()) {
6327       return null;
6328     }
6329 
6330     byte[] ret = splitPolicy.getSplitPoint();
6331 
6332     if (ret != null) {
6333       try {
6334         checkRow(ret, "calculated split");
6335       } catch (IOException e) {
6336         LOG.error("Ignoring invalid split", e);
6337         return null;
6338       }
6339     }
6340     return ret;
6341   }
6342 
6343   /**
6344    * @return The priority that this region should have in the compaction queue
6345    */
6346   public int getCompactPriority() {
6347     int count = Integer.MAX_VALUE;
6348     for (Store store : stores.values()) {
6349       count = Math.min(count, store.getCompactPriority());
6350     }
6351     return count;
6352   }
6353 
6354 
6355   /** @return the coprocessor host */
6356   public RegionCoprocessorHost getCoprocessorHost() {
6357     return coprocessorHost;
6358   }
6359 
6360   /** @param coprocessorHost the new coprocessor host */
6361   public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
6362     this.coprocessorHost = coprocessorHost;
6363   }
6364 
6365   /**
6366    * This method needs to be called before any public call that reads or
6367    * modifies data. It has to be called just before a try.
6368    * #closeRegionOperation needs to be called in the try's finally block
6369    * Acquires a read lock and checks if the region is closing or closed.
6370    * @throws IOException
6371    */
6372   public void startRegionOperation() throws IOException {
6373     startRegionOperation(Operation.ANY);
6374   }
6375 
6376   /**
6377    * @param op The operation is about to be taken on the region
6378    * @throws IOException
6379    */
6380   protected void startRegionOperation(Operation op) throws IOException {
6381     switch (op) {
6382     case GET:  // read operations
6383     case SCAN:
6384       checkReadsEnabled();
6385     case INCREMENT: // write operations
6386     case APPEND:
6387     case SPLIT_REGION:
6388     case MERGE_REGION:
6389     case PUT:
6390     case DELETE:
6391     case BATCH_MUTATE:
6392     case COMPACT_REGION:
6393       // when a region is in recovering state, no read, split or merge is allowed
6394       if (isRecovering() && (this.disallowWritesInRecovering ||
6395               (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
6396         throw new RegionInRecoveryException(this.getRegionNameAsString() +
6397           " is recovering; cannot take reads");
6398       }
6399       break;
6400     default:
6401       break;
6402     }
6403     if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
6404         || op == Operation.COMPACT_REGION) {
6405       // split, merge or compact region doesn't need to check the closing/closed state or lock the
6406       // region
6407       return;
6408     }
6409     if (this.closing.get()) {
6410       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
6411     }
6412     lock(lock.readLock());
6413     if (this.closed.get()) {
6414       lock.readLock().unlock();
6415       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
6416     }
6417     try {
6418       if (coprocessorHost != null) {
6419         coprocessorHost.postStartRegionOperation(op);
6420       }
6421     } catch (Exception e) {
6422       lock.readLock().unlock();
6423       throw new IOException(e);
6424     }
6425   }
6426 
6427   /**
6428    * Closes the lock. This needs to be called in the finally block corresponding
6429    * to the try block of #startRegionOperation
6430    * @throws IOException
6431    */
6432   public void closeRegionOperation() throws IOException {
6433     closeRegionOperation(Operation.ANY);
6434   }
6435 
6436   /**
6437    * Closes the lock. This needs to be called in the finally block corresponding
6438    * to the try block of {@link #startRegionOperation(Operation)}
6439    * @throws IOException
6440    */
6441   public void closeRegionOperation(Operation operation) throws IOException {
6442     lock.readLock().unlock();
6443     if (coprocessorHost != null) {
6444       coprocessorHost.postCloseRegionOperation(operation);
6445     }
6446   }
6447 
6448   /**
6449    * This method needs to be called before any public call that reads or
6450    * modifies stores in bulk. It has to be called just before a try.
6451    * #closeBulkRegionOperation needs to be called in the try's finally block
6452    * Acquires a writelock and checks if the region is closing or closed.
6453    * @throws NotServingRegionException when the region is closing or closed
6454    * @throws RegionTooBusyException if failed to get the lock in time
6455    * @throws InterruptedIOException if interrupted while waiting for a lock
6456    */
6457   private void startBulkRegionOperation(boolean writeLockNeeded)
6458       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
6459     if (this.closing.get()) {
6460       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
6461     }
6462     if (writeLockNeeded) lock(lock.writeLock());
6463     else lock(lock.readLock());
6464     if (this.closed.get()) {
6465       if (writeLockNeeded) lock.writeLock().unlock();
6466       else lock.readLock().unlock();
6467       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
6468     }
6469   }
6470 
6471   /**
6472    * Closes the lock. This needs to be called in the finally block corresponding
6473    * to the try block of #startRegionOperation
6474    */
6475   private void closeBulkRegionOperation(){
6476     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
6477     else lock.readLock().unlock();
6478   }
6479 
6480   /**
6481    * Update counters for numer of puts without wal and the size of possible data loss.
6482    * These information are exposed by the region server metrics.
6483    */
6484   private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
6485     numMutationsWithoutWAL.increment();
6486     if (numMutationsWithoutWAL.get() <= 1) {
6487       LOG.info("writing data to region " + this +
6488                " with WAL disabled. Data may be lost in the event of a crash.");
6489     }
6490 
6491     long mutationSize = 0;
6492     for (List<Cell> cells: familyMap.values()) {
6493       assert cells instanceof RandomAccess;
6494       int listSize = cells.size();
6495       for (int i=0; i < listSize; i++) {
6496         Cell cell = cells.get(i);
6497         // TODO we need include tags length also here.
6498         mutationSize += KeyValueUtil.keyLength(cell) + cell.getValueLength();
6499       }
6500     }
6501 
6502     dataInMemoryWithoutWAL.add(mutationSize);
6503   }
6504 
6505   private void lock(final Lock lock)
6506       throws RegionTooBusyException, InterruptedIOException {
6507     lock(lock, 1);
6508   }
6509 
6510   /**
6511    * Try to acquire a lock.  Throw RegionTooBusyException
6512    * if failed to get the lock in time. Throw InterruptedIOException
6513    * if interrupted while waiting for the lock.
6514    */
6515   private void lock(final Lock lock, final int multiplier)
6516       throws RegionTooBusyException, InterruptedIOException {
6517     try {
6518       final long waitTime = Math.min(maxBusyWaitDuration,
6519           busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
6520       if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
6521         throw new RegionTooBusyException(
6522             "failed to get a lock in " + waitTime + " ms. " +
6523                 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
6524                 this.getRegionInfo().getRegionNameAsString()) +
6525                 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
6526                 this.getRegionServerServices().getServerName()));
6527       }
6528     } catch (InterruptedException ie) {
6529       LOG.info("Interrupted while waiting for a lock");
6530       InterruptedIOException iie = new InterruptedIOException();
6531       iie.initCause(ie);
6532       throw iie;
6533     }
6534   }
6535 
6536   /**
6537    * Calls sync with the given transaction ID if the region's table is not
6538    * deferring it.
6539    * @param txid should sync up to which transaction
6540    * @throws IOException If anything goes wrong with DFS
6541    */
6542   private void syncOrDefer(long txid, Durability durability) throws IOException {
6543     if (this.getRegionInfo().isMetaRegion()) {
6544       this.wal.sync(txid);
6545     } else {
6546       switch(durability) {
6547       case USE_DEFAULT:
6548         // do what table defaults to
6549         if (shouldSyncWAL()) {
6550           this.wal.sync(txid);
6551         }
6552         break;
6553       case SKIP_WAL:
6554         // nothing do to
6555         break;
6556       case ASYNC_WAL:
6557         // nothing do to
6558         break;
6559       case SYNC_WAL:
6560       case FSYNC_WAL:
6561         // sync the WAL edit (SYNC and FSYNC treated the same for now)
6562         this.wal.sync(txid);
6563         break;
6564       }
6565     }
6566   }
6567 
6568   /**
6569    * Check whether we should sync the wal from the table's durability settings
6570    */
6571   private boolean shouldSyncWAL() {
6572     return durability.ordinal() >  Durability.ASYNC_WAL.ordinal();
6573   }
6574 
6575   /**
6576    * A mocked list implementation - discards all updates.
6577    */
6578   private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
6579 
6580     @Override
6581     public void add(int index, Cell element) {
6582       // do nothing
6583     }
6584 
6585     @Override
6586     public boolean addAll(int index, Collection<? extends Cell> c) {
6587       return false; // this list is never changed as a result of an update
6588     }
6589 
6590     @Override
6591     public KeyValue get(int index) {
6592       throw new UnsupportedOperationException();
6593     }
6594 
6595     @Override
6596     public int size() {
6597       return 0;
6598     }
6599   };
6600 
6601   /**
6602    * Facility for dumping and compacting catalog tables.
6603    * Only does catalog tables since these are only tables we for sure know
6604    * schema on.  For usage run:
6605    * <pre>
6606    *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
6607    * </pre>
6608    * @throws IOException
6609    */
6610   public static void main(String[] args) throws IOException {
6611     if (args.length < 1) {
6612       printUsageAndExit(null);
6613     }
6614     boolean majorCompact = false;
6615     if (args.length > 1) {
6616       if (!args[1].toLowerCase().startsWith("major")) {
6617         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
6618       }
6619       majorCompact = true;
6620     }
6621     final Path tableDir = new Path(args[0]);
6622     final Configuration c = HBaseConfiguration.create();
6623     final FileSystem fs = FileSystem.get(c);
6624     final Path logdir = new Path(c.get("hbase.tmp.dir"));
6625     final String logname = "wal" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
6626 
6627     final Configuration walConf = new Configuration(c);
6628     FSUtils.setRootDir(walConf, logdir);
6629     final WALFactory wals = new WALFactory(walConf, null, logname);
6630     try {
6631       processTable(fs, tableDir, wals, c, majorCompact);
6632     } finally {
6633        wals.close();
6634        // TODO: is this still right?
6635        BlockCache bc = new CacheConfig(c).getBlockCache();
6636        if (bc != null) bc.shutdown();
6637     }
6638   }
6639 
6640   /**
6641    * Gets the latest sequence number that was read from storage when this region was opened.
6642    */
6643   public long getOpenSeqNum() {
6644     return this.openSeqNum;
6645   }
6646 
6647   /**
6648    * Gets max sequence ids of stores that was read from storage when this region was opened. WAL
6649    * Edits with smaller or equal sequence number will be skipped from replay.
6650    */
6651   public Map<byte[], Long> getMaxStoreSeqIdForLogReplay() {
6652     return this.maxSeqIdInStores;
6653   }
6654 
6655   @VisibleForTesting
6656   public long getOldestSeqIdOfStore(byte[] familyName) {
6657     return wal.getEarliestMemstoreSeqNum(getRegionInfo()
6658         .getEncodedNameAsBytes(), familyName);
6659   }
6660 
6661   /**
6662    * @return if a given region is in compaction now.
6663    */
6664   public CompactionState getCompactionState() {
6665     boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
6666     return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
6667         : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
6668   }
6669 
6670   public void reportCompactionRequestStart(boolean isMajor){
6671     (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
6672   }
6673 
6674   public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
6675     int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
6676 
6677     // metrics
6678     compactionsFinished.incrementAndGet();
6679     compactionNumFilesCompacted.addAndGet(numFiles);
6680     compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
6681 
6682     assert newValue >= 0;
6683   }
6684 
6685   /**
6686    * Do not change this sequence id. See {@link #sequenceId} comment.
6687    * @return sequenceId
6688    */
6689   @VisibleForTesting
6690   public AtomicLong getSequenceId() {
6691     return this.sequenceId;
6692   }
6693 
6694   /**
6695    * sets this region's sequenceId.
6696    * @param value new value
6697    */
6698   private void setSequenceId(long value) {
6699     this.sequenceId.set(value);
6700   }
6701 
6702   /**
6703    * Listener class to enable callers of
6704    * bulkLoadHFile() to perform any necessary
6705    * pre/post processing of a given bulkload call
6706    */
6707   public interface BulkLoadListener {
6708 
6709     /**
6710      * Called before an HFile is actually loaded
6711      * @param family family being loaded to
6712      * @param srcPath path of HFile
6713      * @return final path to be used for actual loading
6714      * @throws IOException
6715      */
6716     String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
6717 
6718     /**
6719      * Called after a successful HFile load
6720      * @param family family being loaded to
6721      * @param srcPath path of HFile
6722      * @throws IOException
6723      */
6724     void doneBulkLoad(byte[] family, String srcPath) throws IOException;
6725 
6726     /**
6727      * Called after a failed HFile load
6728      * @param family family being loaded to
6729      * @param srcPath path of HFile
6730      * @throws IOException
6731      */
6732     void failedBulkLoad(byte[] family, String srcPath) throws IOException;
6733   }
6734 
6735   @VisibleForTesting class RowLockContext {
6736     private final HashedBytes row;
6737     private final CountDownLatch latch = new CountDownLatch(1);
6738     private final Thread thread;
6739     private int lockCount = 0;
6740 
6741     RowLockContext(HashedBytes row) {
6742       this.row = row;
6743       this.thread = Thread.currentThread();
6744     }
6745 
6746     boolean ownedByCurrentThread() {
6747       return thread == Thread.currentThread();
6748     }
6749 
6750     RowLock newLock() {
6751       lockCount++;
6752       return new RowLock(this);
6753     }
6754 
6755     @Override
6756     public String toString() {
6757       Thread t = this.thread;
6758       return "Thread=" + (t == null? "null": t.getName()) + ", row=" + this.row +
6759         ", lockCount=" + this.lockCount;
6760     }
6761 
6762     void releaseLock() {
6763       if (!ownedByCurrentThread()) {
6764         throw new IllegalArgumentException("Lock held by thread: " + thread
6765           + " cannot be released by different thread: " + Thread.currentThread());
6766       }
6767       lockCount--;
6768       if (lockCount == 0) {
6769         // no remaining locks by the thread, unlock and allow other threads to access
6770         RowLockContext existingContext = lockedRows.remove(row);
6771         if (existingContext != this) {
6772           throw new RuntimeException(
6773               "Internal row lock state inconsistent, should not happen, row: " + row);
6774         }
6775         latch.countDown();
6776       }
6777     }
6778   }
6779 
6780   /**
6781    * Row lock held by a given thread.
6782    * One thread may acquire multiple locks on the same row simultaneously.
6783    * The locks must be released by calling release() from the same thread.
6784    */
6785   public static class RowLock {
6786     @VisibleForTesting final RowLockContext context;
6787     private boolean released = false;
6788 
6789     @VisibleForTesting RowLock(RowLockContext context) {
6790       this.context = context;
6791     }
6792 
6793     /**
6794      * Release the given lock.  If there are no remaining locks held by the current thread
6795      * then unlock the row and allow other threads to acquire the lock.
6796      * @throws IllegalArgumentException if called by a different thread than the lock owning thread
6797      */
6798     public void release() {
6799       if (!released) {
6800         context.releaseLock();
6801         released = true;
6802       }
6803     }
6804   }
6805 
6806   /**
6807    * Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore
6808    * the WALEdit append later.
6809    * @param wal
6810    * @param cells list of Cells inserted into memstore. Those Cells are passed in order to
6811    *        be updated with right mvcc values(their wal sequence number)
6812    * @return Return the key used appending with no sync and no append.
6813    * @throws IOException
6814    */
6815   private WALKey appendEmptyEdit(final WAL wal, List<Cell> cells) throws IOException {
6816     // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
6817     WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
6818       WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
6819     // Call append but with an empty WALEdit.  The returned seqeunce id will not be associated
6820     // with any edit and we can be sure it went in after all outstanding appends.
6821     wal.append(getTableDesc(), getRegionInfo(), key,
6822       WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
6823     return key;
6824   }
6825 
6826   /**
6827    * Explicitly sync wal
6828    * @throws IOException
6829    */
6830   public void syncWal() throws IOException {
6831     if(this.wal != null) {
6832       this.wal.sync();
6833     }
6834   }
6835 
6836   /**
6837    * {@inheritDoc}
6838    */
6839   @Override
6840   public void onConfigurationChange(Configuration conf) {
6841     // Do nothing for now.
6842   }
6843 
6844   /**
6845    * {@inheritDoc}
6846    */
6847   @Override
6848   public void registerChildren(ConfigurationManager manager) {
6849     configurationManager = Optional.of(manager);
6850     for (Store s : this.stores.values()) {
6851       configurationManager.get().registerObserver(s);
6852     }
6853   }
6854 
6855   /**
6856    * {@inheritDoc}
6857    */
6858   @Override
6859   public void deregisterChildren(ConfigurationManager manager) {
6860     for (Store s : this.stores.values()) {
6861       configurationManager.get().deregisterObserver(s);
6862     }
6863   }
6864 }