View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.io.UnsupportedEncodingException;
25  import java.lang.reflect.Constructor;
26  import java.text.ParseException;
27  import java.util.AbstractList;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.NavigableMap;
37  import java.util.NavigableSet;
38  import java.util.RandomAccess;
39  import java.util.Set;
40  import java.util.TreeMap;
41  import java.util.concurrent.Callable;
42  import java.util.concurrent.CompletionService;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.concurrent.ConcurrentSkipListMap;
45  import java.util.concurrent.CountDownLatch;
46  import java.util.concurrent.ExecutionException;
47  import java.util.concurrent.ExecutorCompletionService;
48  import java.util.concurrent.ExecutorService;
49  import java.util.concurrent.Executors;
50  import java.util.concurrent.Future;
51  import java.util.concurrent.FutureTask;
52  import java.util.concurrent.ThreadFactory;
53  import java.util.concurrent.ThreadPoolExecutor;
54  import java.util.concurrent.TimeUnit;
55  import java.util.concurrent.TimeoutException;
56  import java.util.concurrent.atomic.AtomicBoolean;
57  import java.util.concurrent.atomic.AtomicInteger;
58  import java.util.concurrent.atomic.AtomicLong;
59  import java.util.concurrent.locks.Lock;
60  import java.util.concurrent.locks.ReentrantReadWriteLock;
61  
62  import org.apache.commons.lang.RandomStringUtils;
63  import org.apache.commons.logging.Log;
64  import org.apache.commons.logging.LogFactory;
65  import org.apache.hadoop.hbase.classification.InterfaceAudience;
66  import org.apache.hadoop.conf.Configuration;
67  import org.apache.hadoop.fs.FileStatus;
68  import org.apache.hadoop.fs.FileSystem;
69  import org.apache.hadoop.fs.Path;
70  import org.apache.hadoop.hbase.Cell;
71  import org.apache.hadoop.hbase.CellScanner;
72  import org.apache.hadoop.hbase.CellUtil;
73  import org.apache.hadoop.hbase.CompoundConfiguration;
74  import org.apache.hadoop.hbase.DoNotRetryIOException;
75  import org.apache.hadoop.hbase.DroppedSnapshotException;
76  import org.apache.hadoop.hbase.HBaseConfiguration;
77  import org.apache.hadoop.hbase.HColumnDescriptor;
78  import org.apache.hadoop.hbase.HConstants;
79  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
80  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
81  import org.apache.hadoop.hbase.HRegionInfo;
82  import org.apache.hadoop.hbase.HTableDescriptor;
83  import org.apache.hadoop.hbase.KeyValue;
84  import org.apache.hadoop.hbase.KeyValueUtil;
85  import org.apache.hadoop.hbase.NotServingRegionException;
86  import org.apache.hadoop.hbase.RegionTooBusyException;
87  import org.apache.hadoop.hbase.TableName;
88  import org.apache.hadoop.hbase.UnknownScannerException;
89  import org.apache.hadoop.hbase.backup.HFileArchiver;
90  import org.apache.hadoop.hbase.client.Append;
91  import org.apache.hadoop.hbase.client.Delete;
92  import org.apache.hadoop.hbase.client.Durability;
93  import org.apache.hadoop.hbase.client.Get;
94  import org.apache.hadoop.hbase.client.Increment;
95  import org.apache.hadoop.hbase.client.IsolationLevel;
96  import org.apache.hadoop.hbase.client.Mutation;
97  import org.apache.hadoop.hbase.client.Put;
98  import org.apache.hadoop.hbase.client.Result;
99  import org.apache.hadoop.hbase.client.RowMutations;
100 import org.apache.hadoop.hbase.client.Scan;
101 import org.apache.hadoop.hbase.conf.ConfigurationManager;
102 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
103 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
104 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
105 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
106 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
107 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
108 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
109 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
110 import org.apache.hadoop.hbase.filter.FilterWrapper;
111 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
112 import org.apache.hadoop.hbase.io.HeapSize;
113 import org.apache.hadoop.hbase.io.TimeRange;
114 import org.apache.hadoop.hbase.io.hfile.BlockCache;
115 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
116 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
117 import org.apache.hadoop.hbase.ipc.RpcCallContext;
118 import org.apache.hadoop.hbase.ipc.RpcServer;
119 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
120 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
121 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
123 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
124 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
125 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
126 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
127 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
128 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
129 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
130 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
131 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
132 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
133 import org.apache.hadoop.hbase.wal.WAL;
134 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
135 import org.apache.hadoop.hbase.wal.WALFactory;
136 import org.apache.hadoop.hbase.wal.WALKey;
137 import org.apache.hadoop.hbase.wal.WALSplitter;
138 import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
139 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
140 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
141 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
142 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
143 import org.apache.hadoop.hbase.util.Bytes;
144 import org.apache.hadoop.hbase.util.CancelableProgressable;
145 import org.apache.hadoop.hbase.util.ClassSize;
146 import org.apache.hadoop.hbase.util.CompressionTest;
147 import org.apache.hadoop.hbase.util.Counter;
148 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
149 import org.apache.hadoop.hbase.util.FSTableDescriptors;
150 import org.apache.hadoop.hbase.util.FSUtils;
151 import org.apache.hadoop.hbase.util.HashedBytes;
152 import org.apache.hadoop.hbase.util.Pair;
153 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
154 import org.apache.hadoop.hbase.util.Threads;
155 import org.apache.hadoop.io.MultipleIOException;
156 import org.apache.hadoop.util.StringUtils;
157 
158 import com.google.common.annotations.VisibleForTesting;
159 import com.google.common.base.Optional;
160 import com.google.common.base.Preconditions;
161 import com.google.common.collect.Lists;
162 import com.google.common.collect.Maps;
163 import com.google.common.io.Closeables;
164 import com.google.protobuf.Descriptors;
165 import com.google.protobuf.Message;
166 import com.google.protobuf.RpcCallback;
167 import com.google.protobuf.RpcController;
168 import com.google.protobuf.Service;
169 
170 /**
171  * HRegion stores data for a certain region of a table.  It stores all columns
172  * for each row. A given table consists of one or more HRegions.
173  *
174  * <p>We maintain multiple HStores for a single HRegion.
175  *
176  * <p>An Store is a set of rows with some column data; together,
177  * they make up all the data for the rows.
178  *
179  * <p>Each HRegion has a 'startKey' and 'endKey'.
180  * <p>The first is inclusive, the second is exclusive (except for
181  * the final region)  The endKey of region 0 is the same as
182  * startKey for region 1 (if it exists).  The startKey for the
183  * first region is null. The endKey for the final region is null.
184  *
185  * <p>Locking at the HRegion level serves only one purpose: preventing the
186  * region from being closed (and consequently split) while other operations
187  * are ongoing. Each row level operation obtains both a row lock and a region
188  * read lock for the duration of the operation. While a scanner is being
189  * constructed, getScanner holds a read lock. If the scanner is successfully
190  * constructed, it holds a read lock until it is closed. A close takes out a
191  * write lock and consequently will block for ongoing operations and will block
192  * new operations from starting while the close is in progress.
193  *
194  * <p>An HRegion is defined by its table and its key extent.
195  *
196  * <p>It consists of at least one Store.  The number of Stores should be
197  * configurable, so that data which is accessed together is stored in the same
198  * Store.  Right now, we approximate that by building a single Store for
199  * each column family.  (This config info will be communicated via the
200  * tabledesc.)
201  *
202  * <p>The HTableDescriptor contains metainfo about the HRegion's table.
203  * regionName is a unique identifier for this HRegion. (startKey, endKey]
204  * defines the keyspace for this HRegion.
205  */
206 @InterfaceAudience.Private
207 public class HRegion implements HeapSize, PropagatingConfigurationObserver { // , Writable{
208   public static final Log LOG = LogFactory.getLog(HRegion.class);
209 
210   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
211       "hbase.hregion.scan.loadColumnFamiliesOnDemand";
212 
213   /**
214    * This is the global default value for durability. All tables/mutations not
215    * defining a durability or using USE_DEFAULT will default to this value.
216    */
217   private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL;
218 
219   final AtomicBoolean closed = new AtomicBoolean(false);
220   /* Closing can take some time; use the closing flag if there is stuff we don't
221    * want to do while in closing state; e.g. like offer this region up to the
222    * master as a region to close if the carrying regionserver is overloaded.
223    * Once set, it is never cleared.
224    */
225   final AtomicBoolean closing = new AtomicBoolean(false);
226 
227   /**
228    * The sequence id of the last flush on this region.  Used doing some rough calculations on
229    * whether time to flush or not.
230    */
231   protected volatile long lastFlushSeqId = -1L;
232 
233   /**
234    * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL
235    * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
236    * Its default value is -1L. This default is used as a marker to indicate
237    * that the region hasn't opened yet. Once it is opened, it is set to the derived
238    * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
239    *
240    * <p>Control of this sequence is handed off to the WAL implementation.  It is responsible
241    * for tagging edits with the correct sequence id since it is responsible for getting the
242    * edits into the WAL files. It controls updating the sequence id value.  DO NOT UPDATE IT
243    * OUTSIDE OF THE WAL.  The value you get will not be what you think it is.
244    */
245   private final AtomicLong sequenceId = new AtomicLong(-1L);
246 
247   /**
248    * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
249    * startRegionOperation to possibly invoke different checks before any region operations. Not all
250    * operations have to be defined here. It's only needed when a special check is need in
251    * startRegionOperation
252    */
253   public enum Operation {
254     ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
255     REPLAY_BATCH_MUTATE, COMPACT_REGION
256   }
257 
258   //////////////////////////////////////////////////////////////////////////////
259   // Members
260   //////////////////////////////////////////////////////////////////////////////
261 
262   // map from a locked row to the context for that lock including:
263   // - CountDownLatch for threads waiting on that row
264   // - the thread that owns the lock (allow reentrancy)
265   // - reference count of (reentrant) locks held by the thread
266   // - the row itself
267   private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
268       new ConcurrentHashMap<HashedBytes, RowLockContext>();
269 
270   protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
271       Bytes.BYTES_RAWCOMPARATOR);
272 
273   // TODO: account for each registered handler in HeapSize computation
274   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
275 
276   public final AtomicLong memstoreSize = new AtomicLong(0);
277 
278   // Debug possible data loss due to WAL off
279   final Counter numMutationsWithoutWAL = new Counter();
280   final Counter dataInMemoryWithoutWAL = new Counter();
281 
282   // Debug why CAS operations are taking a while.
283   final Counter checkAndMutateChecksPassed = new Counter();
284   final Counter checkAndMutateChecksFailed = new Counter();
285 
286   //Number of requests
287   final Counter readRequestsCount = new Counter();
288   final Counter writeRequestsCount = new Counter();
289 
290   // Compaction counters
291   final AtomicLong compactionsFinished = new AtomicLong(0L);
292   final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
293   final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
294 
295 
296   private final WAL wal;
297   private final HRegionFileSystem fs;
298   protected final Configuration conf;
299   private final Configuration baseConf;
300   private final KeyValue.KVComparator comparator;
301   private final int rowLockWaitDuration;
302   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
303 
304   // The internal wait duration to acquire a lock before read/update
305   // from the region. It is not per row. The purpose of this wait time
306   // is to avoid waiting a long time while the region is busy, so that
307   // we can release the IPC handler soon enough to improve the
308   // availability of the region server. It can be adjusted by
309   // tuning configuration "hbase.busy.wait.duration".
310   final long busyWaitDuration;
311   static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
312 
313   // If updating multiple rows in one call, wait longer,
314   // i.e. waiting for busyWaitDuration * # of rows. However,
315   // we can limit the max multiplier.
316   final int maxBusyWaitMultiplier;
317 
318   // Max busy wait duration. There is no point to wait longer than the RPC
319   // purge timeout, when a RPC call will be terminated by the RPC engine.
320   final long maxBusyWaitDuration;
321 
322   // negative number indicates infinite timeout
323   static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
324   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
325 
326   private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
327 
328   /**
329    * The sequence ID that was encountered when this region was opened.
330    */
331   private long openSeqNum = HConstants.NO_SEQNUM;
332 
333   /**
334    * The default setting for whether to enable on-demand CF loading for
335    * scan requests to this region. Requests can override it.
336    */
337   private boolean isLoadingCfsOnDemandDefault = false;
338 
339   private final AtomicInteger majorInProgress = new AtomicInteger(0);
340   private final AtomicInteger minorInProgress = new AtomicInteger(0);
341 
342   //
343   // Context: During replay we want to ensure that we do not lose any data. So, we
344   // have to be conservative in how we replay wals. For each store, we calculate
345   // the maxSeqId up to which the store was flushed. And, skip the edits which
346   // are equal to or lower than maxSeqId for each store.
347   // The following map is populated when opening the region
348   Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
349 
350   /**
351    * Config setting for whether to allow writes when a region is in recovering or not.
352    */
353   private boolean disallowWritesInRecovering = false;
354 
355   // when a region is in recovering state, it can only accept writes not reads
356   private volatile boolean isRecovering = false;
357 
358   private volatile Optional<ConfigurationManager> configurationManager;
359 
360   /**
361    * @return The smallest mvcc readPoint across all the scanners in this
362    * region. Writes older than this readPoint, are included  in every
363    * read operation.
364    */
365   public long getSmallestReadPoint() {
366     long minimumReadPoint;
367     // We need to ensure that while we are calculating the smallestReadPoint
368     // no new RegionScanners can grab a readPoint that we are unaware of.
369     // We achieve this by synchronizing on the scannerReadPoints object.
370     synchronized(scannerReadPoints) {
371       minimumReadPoint = mvcc.memstoreReadPoint();
372 
373       for (Long readPoint: this.scannerReadPoints.values()) {
374         if (readPoint < minimumReadPoint) {
375           minimumReadPoint = readPoint;
376         }
377       }
378     }
379     return minimumReadPoint;
380   }
381   /*
382    * Data structure of write state flags used coordinating flushes,
383    * compactions and closes.
384    */
385   static class WriteState {
386     // Set while a memstore flush is happening.
387     volatile boolean flushing = false;
388     // Set when a flush has been requested.
389     volatile boolean flushRequested = false;
390     // Number of compactions running.
391     volatile int compacting = 0;
392     // Gets set in close. If set, cannot compact or flush again.
393     volatile boolean writesEnabled = true;
394     // Set if region is read-only
395     volatile boolean readOnly = false;
396     // whether the reads are enabled. This is different than readOnly, because readOnly is
397     // static in the lifetime of the region, while readsEnabled is dynamic
398     volatile boolean readsEnabled = true;
399 
400     /**
401      * Set flags that make this region read-only.
402      *
403      * @param onOff flip value for region r/o setting
404      */
405     synchronized void setReadOnly(final boolean onOff) {
406       this.writesEnabled = !onOff;
407       this.readOnly = onOff;
408     }
409 
410     boolean isReadOnly() {
411       return this.readOnly;
412     }
413 
414     boolean isFlushRequested() {
415       return this.flushRequested;
416     }
417 
418     void setReadsEnabled(boolean readsEnabled) {
419       this.readsEnabled = readsEnabled;
420     }
421 
422     static final long HEAP_SIZE = ClassSize.align(
423         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
424   }
425 
426   /**
427    * Objects from this class are created when flushing to describe all the different states that
428    * that method ends up in. The Result enum describes those states. The sequence id should only
429    * be specified if the flush was successful, and the failure message should only be specified
430    * if it didn't flush.
431    */
432   public static class FlushResult {
433     enum Result {
434       FLUSHED_NO_COMPACTION_NEEDED,
435       FLUSHED_COMPACTION_NEEDED,
436       // Special case where a flush didn't run because there's nothing in the memstores. Used when
437       // bulk loading to know when we can still load even if a flush didn't happen.
438       CANNOT_FLUSH_MEMSTORE_EMPTY,
439       CANNOT_FLUSH
440       // Be careful adding more to this enum, look at the below methods to make sure
441     }
442 
443     final Result result;
444     final String failureReason;
445     final long flushSequenceId;
446 
447     /**
448      * Convenience constructor to use when the flush is successful, the failure message is set to
449      * null.
450      * @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED.
451      * @param flushSequenceId Generated sequence id that comes right after the edits in the
452      *                        memstores.
453      */
454     FlushResult(Result result, long flushSequenceId) {
455       this(result, flushSequenceId, null);
456       assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
457           .FLUSHED_COMPACTION_NEEDED;
458     }
459 
460     /**
461      * Convenience constructor to use when we cannot flush.
462      * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
463      * @param failureReason Reason why we couldn't flush.
464      */
465     FlushResult(Result result, String failureReason) {
466       this(result, -1, failureReason);
467       assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
468     }
469 
470     /**
471      * Constructor with all the parameters.
472      * @param result Any of the Result.
473      * @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
474      * @param failureReason Reason why we couldn't flush, or null.
475      */
476     FlushResult(Result result, long flushSequenceId, String failureReason) {
477       this.result = result;
478       this.flushSequenceId = flushSequenceId;
479       this.failureReason = failureReason;
480     }
481 
482     /**
483      * Convenience method, the equivalent of checking if result is
484      * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
485      * @return true if the memstores were flushed, else false.
486      */
487     public boolean isFlushSucceeded() {
488       return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
489           .FLUSHED_COMPACTION_NEEDED;
490     }
491 
492     /**
493      * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
494      * @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
495      */
496     public boolean isCompactionNeeded() {
497       return result == Result.FLUSHED_COMPACTION_NEEDED;
498     }
499   }
500 
501   final WriteState writestate = new WriteState();
502 
503   long memstoreFlushSize;
504   final long timestampSlop;
505   final long rowProcessorTimeout;
506   private volatile long lastFlushTime;
507   final RegionServerServices rsServices;
508   private RegionServerAccounting rsAccounting;
509   private long flushCheckInterval;
510   // flushPerChanges is to prevent too many changes in memstore
511   private long flushPerChanges;
512   private long blockingMemStoreSize;
513   final long threadWakeFrequency;
514   // Used to guard closes
515   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
516 
517   // Stop updates lock
518   private final ReentrantReadWriteLock updatesLock = new ReentrantReadWriteLock();
519   private boolean splitRequest;
520   private byte[] explicitSplitPoint = null;
521 
522   private final MultiVersionConsistencyControl mvcc =
523       new MultiVersionConsistencyControl();
524 
525   // Coprocessor host
526   private RegionCoprocessorHost coprocessorHost;
527 
528   private HTableDescriptor htableDescriptor = null;
529   private RegionSplitPolicy splitPolicy;
530 
531   private final MetricsRegion metricsRegion;
532   private final MetricsRegionWrapperImpl metricsRegionWrapper;
533   private final Durability durability;
534 
535   /**
536    * HRegion constructor. This constructor should only be used for testing and
537    * extensions.  Instances of HRegion should be instantiated with the
538    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
539    *
540    * @param tableDir qualified path of directory where region should be located,
541    * usually the table directory.
542    * @param wal The WAL is the outbound log for any updates to the HRegion
543    * The wal file is a logfile from the previous execution that's
544    * custom-computed for this HRegion. The HRegionServer computes and sorts the
545    * appropriate wal info for this HRegion. If there is a previous wal file
546    * (implying that the HRegion has been written-to before), then read it from
547    * the supplied path.
548    * @param fs is the filesystem.
549    * @param confParam is global configuration settings.
550    * @param regionInfo - HRegionInfo that describes the region
551    * is new), then read them from the supplied path.
552    * @param htd the table descriptor
553    * @param rsServices reference to {@link RegionServerServices} or null
554    */
555   @Deprecated
556   public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
557       final Configuration confParam, final HRegionInfo regionInfo,
558       final HTableDescriptor htd, final RegionServerServices rsServices) {
559     this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
560       wal, confParam, htd, rsServices);
561   }
562 
563   /**
564    * HRegion constructor. This constructor should only be used for testing and
565    * extensions.  Instances of HRegion should be instantiated with the
566    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
567    *
568    * @param fs is the filesystem.
569    * @param wal The WAL is the outbound log for any updates to the HRegion
570    * The wal file is a logfile from the previous execution that's
571    * custom-computed for this HRegion. The HRegionServer computes and sorts the
572    * appropriate wal info for this HRegion. If there is a previous wal file
573    * (implying that the HRegion has been written-to before), then read it from
574    * the supplied path.
575    * @param confParam is global configuration settings.
576    * @param htd the table descriptor
577    * @param rsServices reference to {@link RegionServerServices} or null
578    */
579   public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration confParam,
580       final HTableDescriptor htd, final RegionServerServices rsServices) {
581     if (htd == null) {
582       throw new IllegalArgumentException("Need table descriptor");
583     }
584 
585     if (confParam instanceof CompoundConfiguration) {
586       throw new IllegalArgumentException("Need original base configuration");
587     }
588 
589     this.comparator = fs.getRegionInfo().getComparator();
590     this.wal = wal;
591     this.fs = fs;
592 
593     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
594     this.baseConf = confParam;
595     this.conf = new CompoundConfiguration()
596       .add(confParam)
597       .addStringMap(htd.getConfiguration())
598       .addBytesMap(htd.getValues());
599     this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
600         DEFAULT_CACHE_FLUSH_INTERVAL);
601     this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
602     if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
603       throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
604           + MAX_FLUSH_PER_CHANGES);
605     }
606 
607     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
608                     DEFAULT_ROWLOCK_WAIT_DURATION);
609 
610     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
611     this.htableDescriptor = htd;
612     this.rsServices = rsServices;
613     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
614     setHTableSpecificConf();
615     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
616 
617     this.busyWaitDuration = conf.getLong(
618       "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
619     this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
620     if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
621       throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
622         + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
623         + maxBusyWaitMultiplier + "). Their product should be positive");
624     }
625     this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
626       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
627 
628     /*
629      * timestamp.slop provides a server-side constraint on the timestamp. This
630      * assumes that you base your TS around currentTimeMillis(). In this case,
631      * throw an error to the user if the user-specified TS is newer than now +
632      * slop. LATEST_TIMESTAMP == don't use this functionality
633      */
634     this.timestampSlop = conf.getLong(
635         "hbase.hregion.keyvalue.timestamp.slop.millisecs",
636         HConstants.LATEST_TIMESTAMP);
637 
638     /**
639      * Timeout for the process time in processRowsWithLocks().
640      * Use -1 to switch off time bound.
641      */
642     this.rowProcessorTimeout = conf.getLong(
643         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
644     this.durability = htd.getDurability() == Durability.USE_DEFAULT
645         ? DEFAULT_DURABLITY
646         : htd.getDurability();
647     if (rsServices != null) {
648       this.rsAccounting = this.rsServices.getRegionServerAccounting();
649       // don't initialize coprocessors if not running within a regionserver
650       // TODO: revisit if coprocessors should load in other cases
651       this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
652       this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
653       this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
654 
655       Map<String, HRegion> recoveringRegions = rsServices.getRecoveringRegions();
656       String encodedName = getRegionInfo().getEncodedName();
657       if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
658         this.isRecovering = true;
659         recoveringRegions.put(encodedName, this);
660       }
661     } else {
662       this.metricsRegionWrapper = null;
663       this.metricsRegion = null;
664     }
665     if (LOG.isDebugEnabled()) {
666       // Write out region name as string and its encoded name.
667       LOG.debug("Instantiated " + this);
668     }
669 
670     // by default, we allow writes against a region when it's in recovering
671     this.disallowWritesInRecovering =
672         conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
673           HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
674     configurationManager = Optional.absent();
675   }
676 
677   void setHTableSpecificConf() {
678     if (this.htableDescriptor == null) return;
679     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
680 
681     if (flushSize <= 0) {
682       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
683         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
684     }
685     this.memstoreFlushSize = flushSize;
686     this.blockingMemStoreSize = this.memstoreFlushSize *
687         conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
688   }
689 
690   /**
691    * Initialize this region.
692    * Used only by tests and SplitTransaction to reopen the region.
693    * You should use createHRegion() or openHRegion()
694    * @return What the next sequence (edit) id should be.
695    * @throws IOException e
696    * @deprecated use HRegion.createHRegion() or HRegion.openHRegion()
697    */
698   @Deprecated
699   public long initialize() throws IOException {
700     return initialize(null);
701   }
702 
703   /**
704    * Initialize this region.
705    *
706    * @param reporter Tickle every so often if initialize is taking a while.
707    * @return What the next sequence (edit) id should be.
708    * @throws IOException e
709    */
710   private long initialize(final CancelableProgressable reporter) throws IOException {
711     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
712     long nextSeqId = -1;
713     try {
714       nextSeqId = initializeRegionInternals(reporter, status);
715       return nextSeqId;
716     } finally {
717       // nextSeqid will be -1 if the initialization fails.
718       // At least it will be 0 otherwise.
719       if (nextSeqId == -1) {
720         status
721             .abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
722       }
723     }
724   }
725 
726   private long initializeRegionInternals(final CancelableProgressable reporter,
727       final MonitoredTask status) throws IOException, UnsupportedEncodingException {
728     if (coprocessorHost != null) {
729       status.setStatus("Running coprocessor pre-open hook");
730       coprocessorHost.preOpen();
731     }
732 
733     // Write HRI to a file in case we need to recover hbase:meta
734     status.setStatus("Writing region info on filesystem");
735     fs.checkRegionInfoOnFilesystem();
736 
737 
738 
739     // Initialize all the HStores
740     status.setStatus("Initializing all the Stores");
741     long maxSeqId = initializeRegionStores(reporter, status);
742 
743     this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
744     this.writestate.flushRequested = false;
745     this.writestate.compacting = 0;
746 
747     if (this.writestate.writesEnabled) {
748       // Remove temporary data left over from old regions
749       status.setStatus("Cleaning up temporary data from old regions");
750       fs.cleanupTempDir();
751     }
752 
753     if (this.writestate.writesEnabled) {
754       status.setStatus("Cleaning up detritus from prior splits");
755       // Get rid of any splits or merges that were lost in-progress.  Clean out
756       // these directories here on open.  We may be opening a region that was
757       // being split but we crashed in the middle of it all.
758       fs.cleanupAnySplitDetritus();
759       fs.cleanupMergesDir();
760     }
761 
762     // Initialize split policy
763     this.splitPolicy = RegionSplitPolicy.create(this, conf);
764 
765     this.lastFlushTime = EnvironmentEdgeManager.currentTime();
766     // Use maximum of wal sequenceid or that which was found in stores
767     // (particularly if no recovered edits, seqid will be -1).
768     long nextSeqid = maxSeqId + 1;
769     if (this.isRecovering) {
770       // In distributedLogReplay mode, we don't know the last change sequence number because region
771       // is opened before recovery completes. So we add a safety bumper to avoid new sequence number
772       // overlaps used sequence numbers
773       nextSeqid = WALSplitter.writeRegionOpenSequenceIdFile(this.fs.getFileSystem(),
774             this.fs.getRegionDir(), nextSeqid, (this.flushPerChanges + 10000000));
775     }
776 
777     LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
778       "; next sequenceid=" + nextSeqid);
779 
780     // A region can be reopened if failed a split; reset flags
781     this.closing.set(false);
782     this.closed.set(false);
783 
784     if (coprocessorHost != null) {
785       status.setStatus("Running coprocessor post-open hooks");
786       coprocessorHost.postOpen();
787     }
788 
789     status.markComplete("Region opened successfully");
790     return nextSeqid;
791   }
792 
793   private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
794       throws IOException, UnsupportedEncodingException {
795     // Load in all the HStores.
796 
797     long maxSeqId = -1;
798     // initialized to -1 so that we pick up MemstoreTS from column families
799     long maxMemstoreTS = -1;
800 
801     if (!htableDescriptor.getFamilies().isEmpty()) {
802       // initialize the thread pool for opening stores in parallel.
803       ThreadPoolExecutor storeOpenerThreadPool =
804         getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
805       CompletionService<HStore> completionService =
806         new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
807 
808       // initialize each store in parallel
809       for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
810         status.setStatus("Instantiating store for column family " + family);
811         completionService.submit(new Callable<HStore>() {
812           @Override
813           public HStore call() throws IOException {
814             return instantiateHStore(family);
815           }
816         });
817       }
818       boolean allStoresOpened = false;
819       try {
820         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
821           Future<HStore> future = completionService.take();
822           HStore store = future.get();
823           this.stores.put(store.getColumnFamilyName().getBytes(), store);
824 
825           long storeMaxSequenceId = store.getMaxSequenceId();
826           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
827               storeMaxSequenceId);
828           if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
829             maxSeqId = storeMaxSequenceId;
830           }
831           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
832           if (maxStoreMemstoreTS > maxMemstoreTS) {
833             maxMemstoreTS = maxStoreMemstoreTS;
834           }
835         }
836         allStoresOpened = true;
837       } catch (InterruptedException e) {
838         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
839       } catch (ExecutionException e) {
840         throw new IOException(e.getCause());
841       } finally {
842         storeOpenerThreadPool.shutdownNow();
843         if (!allStoresOpened) {
844           // something went wrong, close all opened stores
845           LOG.error("Could not initialize all stores for the region=" + this);
846           for (Store store : this.stores.values()) {
847             try {
848               store.close();
849             } catch (IOException e) {
850               LOG.warn(e.getMessage());
851             }
852           }
853         }
854       }
855     }
856     if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
857       // Recover any edits if available.
858       maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
859           this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
860     }
861     maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
862     mvcc.initialize(maxSeqId);
863     return maxSeqId;
864   }
865 
866   private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
867     Map<byte[], List<Path>> storeFiles
868     = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
869     for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
870       Store store = entry.getValue();
871       ArrayList<Path> storeFileNames = new ArrayList<Path>();
872       for (StoreFile storeFile: store.getStorefiles()) {
873         storeFileNames.add(storeFile.getPath());
874       }
875       storeFiles.put(entry.getKey(), storeFileNames);
876     }
877 
878     RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
879       RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
880       getRegionServerServices().getServerName(), storeFiles);
881     WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc,
882       getSequenceId());
883   }
884 
885   private void writeRegionCloseMarker(WAL wal) throws IOException {
886     Map<byte[], List<Path>> storeFiles
887     = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
888     for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
889       Store store = entry.getValue();
890       ArrayList<Path> storeFileNames = new ArrayList<Path>();
891       for (StoreFile storeFile: store.getStorefiles()) {
892         storeFileNames.add(storeFile.getPath());
893       }
894       storeFiles.put(entry.getKey(), storeFileNames);
895     }
896 
897     RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
898       RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
899       getRegionServerServices().getServerName(), storeFiles);
900     WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc,
901       getSequenceId());
902   }
903 
904   /**
905    * @return True if this region has references.
906    */
907   public boolean hasReferences() {
908     for (Store store : this.stores.values()) {
909       if (store.hasReferences()) return true;
910     }
911     return false;
912   }
913 
914   /**
915    * This function will return the HDFS blocks distribution based on the data
916    * captured when HFile is created
917    * @return The HDFS blocks distribution for the region.
918    */
919   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
920     HDFSBlocksDistribution hdfsBlocksDistribution =
921       new HDFSBlocksDistribution();
922     synchronized (this.stores) {
923       for (Store store : this.stores.values()) {
924         for (StoreFile sf : store.getStorefiles()) {
925           HDFSBlocksDistribution storeFileBlocksDistribution =
926             sf.getHDFSBlockDistribution();
927           hdfsBlocksDistribution.add(storeFileBlocksDistribution);
928         }
929       }
930     }
931     return hdfsBlocksDistribution;
932   }
933 
934   /**
935    * This is a helper function to compute HDFS block distribution on demand
936    * @param conf configuration
937    * @param tableDescriptor HTableDescriptor of the table
938    * @param regionInfo encoded name of the region
939    * @return The HDFS blocks distribution for the given region.
940    * @throws IOException
941    */
942   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
943       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
944     Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
945     return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
946   }
947 
948   /**
949    * This is a helper function to compute HDFS block distribution on demand
950    * @param conf configuration
951    * @param tableDescriptor HTableDescriptor of the table
952    * @param regionInfo encoded name of the region
953    * @param tablePath the table directory
954    * @return The HDFS blocks distribution for the given region.
955    * @throws IOException
956    */
957   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
958       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo,  Path tablePath)
959       throws IOException {
960     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
961     FileSystem fs = tablePath.getFileSystem(conf);
962 
963     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
964     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
965       Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
966       if (storeFiles == null) continue;
967 
968       for (StoreFileInfo storeFileInfo : storeFiles) {
969         hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
970       }
971     }
972     return hdfsBlocksDistribution;
973   }
974 
975   public AtomicLong getMemstoreSize() {
976     return memstoreSize;
977   }
978 
979   /**
980    * Increase the size of mem store in this region and the size of global mem
981    * store
982    * @return the size of memstore in this region
983    */
984   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
985     if (this.rsAccounting != null) {
986       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
987     }
988     return this.memstoreSize.addAndGet(memStoreSize);
989   }
990 
991   /** @return a HRegionInfo object for this region */
992   public HRegionInfo getRegionInfo() {
993     return this.fs.getRegionInfo();
994   }
995 
996   /**
997    * @return Instance of {@link RegionServerServices} used by this HRegion.
998    * Can be null.
999    */
1000   RegionServerServices getRegionServerServices() {
1001     return this.rsServices;
1002   }
1003 
1004   /** @return readRequestsCount for this region */
1005   long getReadRequestsCount() {
1006     return this.readRequestsCount.get();
1007   }
1008 
1009   /** @return writeRequestsCount for this region */
1010   long getWriteRequestsCount() {
1011     return this.writeRequestsCount.get();
1012   }
1013 
1014   public MetricsRegion getMetrics() {
1015     return metricsRegion;
1016   }
1017 
1018   /** @return true if region is closed */
1019   public boolean isClosed() {
1020     return this.closed.get();
1021   }
1022 
1023   /**
1024    * @return True if closing process has started.
1025    */
1026   public boolean isClosing() {
1027     return this.closing.get();
1028   }
1029 
1030   /**
1031    * Reset recovering state of current region
1032    */
1033   public void setRecovering(boolean newState) {
1034     boolean wasRecovering = this.isRecovering;
1035     this.isRecovering = newState;
1036     if (wasRecovering && !isRecovering) {
1037       // Call only when wal replay is over.
1038       coprocessorHost.postLogReplay();
1039     }
1040   }
1041 
1042   /**
1043    * @return True if current region is in recovering
1044    */
1045   public boolean isRecovering() {
1046     return this.isRecovering;
1047   }
1048 
1049   /** @return true if region is available (not closed and not closing) */
1050   public boolean isAvailable() {
1051     return !isClosed() && !isClosing();
1052   }
1053 
1054   /** @return true if region is splittable */
1055   public boolean isSplittable() {
1056     return isAvailable() && !hasReferences();
1057   }
1058 
1059   /**
1060    * @return true if region is mergeable
1061    */
1062   public boolean isMergeable() {
1063     if (!isAvailable()) {
1064       LOG.debug("Region " + this.getRegionNameAsString()
1065           + " is not mergeable because it is closing or closed");
1066       return false;
1067     }
1068     if (hasReferences()) {
1069       LOG.debug("Region " + this.getRegionNameAsString()
1070           + " is not mergeable because it has references");
1071       return false;
1072     }
1073 
1074     return true;
1075   }
1076 
1077   public boolean areWritesEnabled() {
1078     synchronized(this.writestate) {
1079       return this.writestate.writesEnabled;
1080     }
1081   }
1082 
1083    public MultiVersionConsistencyControl getMVCC() {
1084      return mvcc;
1085    }
1086 
1087    /*
1088     * Returns readpoint considering given IsolationLevel
1089     */
1090    public long getReadpoint(IsolationLevel isolationLevel) {
1091      if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1092        // This scan can read even uncommitted transactions
1093        return Long.MAX_VALUE;
1094      }
1095      return mvcc.memstoreReadPoint();
1096    }
1097 
1098    public boolean isLoadingCfsOnDemandDefault() {
1099      return this.isLoadingCfsOnDemandDefault;
1100    }
1101 
1102   /**
1103    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
1104    * service any more calls.
1105    *
1106    * <p>This method could take some time to execute, so don't call it from a
1107    * time-sensitive thread.
1108    *
1109    * @return Vector of all the storage files that the HRegion's component
1110    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
1111    * vector if already closed and null if judged that it should not close.
1112    *
1113    * @throws IOException e
1114    */
1115   public Map<byte[], List<StoreFile>> close() throws IOException {
1116     return close(false);
1117   }
1118 
1119   private final Object closeLock = new Object();
1120 
1121   /** Conf key for the periodic flush interval */
1122   public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1123       "hbase.regionserver.optionalcacheflushinterval";
1124   /** Default interval for the memstore flush */
1125   public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1126 
1127   /** Conf key to force a flush if there are already enough changes for one region in memstore */
1128   public static final String MEMSTORE_FLUSH_PER_CHANGES =
1129       "hbase.regionserver.flush.per.changes";
1130   public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000; // 30 millions
1131   /**
1132    * The following MAX_FLUSH_PER_CHANGES is large enough because each KeyValue has 20+ bytes
1133    * overhead. Therefore, even 1G empty KVs occupy at least 20GB memstore size for a single region
1134    */
1135   public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G
1136 
1137   /**
1138    * Close down this HRegion.  Flush the cache unless abort parameter is true,
1139    * Shut down each HStore, don't service any more calls.
1140    *
1141    * This method could take some time to execute, so don't call it from a
1142    * time-sensitive thread.
1143    *
1144    * @param abort true if server is aborting (only during testing)
1145    * @return Vector of all the storage files that the HRegion's component
1146    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
1147    * we are not to close at this time or we are already closed.
1148    *
1149    * @throws IOException e
1150    */
1151   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1152     // Only allow one thread to close at a time. Serialize them so dual
1153     // threads attempting to close will run up against each other.
1154     MonitoredTask status = TaskMonitor.get().createStatus(
1155         "Closing region " + this +
1156         (abort ? " due to abort" : ""));
1157 
1158     status.setStatus("Waiting for close lock");
1159     try {
1160       synchronized (closeLock) {
1161         return doClose(abort, status);
1162       }
1163     } finally {
1164       status.cleanup();
1165     }
1166   }
1167 
1168   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1169       throws IOException {
1170     if (isClosed()) {
1171       LOG.warn("Region " + this + " already closed");
1172       return null;
1173     }
1174 
1175     if (coprocessorHost != null) {
1176       status.setStatus("Running coprocessor pre-close hooks");
1177       this.coprocessorHost.preClose(abort);
1178     }
1179 
1180     status.setStatus("Disabling compacts and flushes for region");
1181     synchronized (writestate) {
1182       // Disable compacting and flushing by background threads for this
1183       // region.
1184       writestate.writesEnabled = false;
1185       LOG.debug("Closing " + this + ": disabling compactions & flushes");
1186       waitForFlushesAndCompactions();
1187     }
1188     // If we were not just flushing, is it worth doing a preflush...one
1189     // that will clear out of the bulk of the memstore before we put up
1190     // the close flag?
1191     if (!abort && worthPreFlushing()) {
1192       status.setStatus("Pre-flushing region before close");
1193       LOG.info("Running close preflush of " + this.getRegionNameAsString());
1194       try {
1195         internalFlushcache(status);
1196       } catch (IOException ioe) {
1197         // Failed to flush the region. Keep going.
1198         status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1199       }
1200     }
1201 
1202     this.closing.set(true);
1203     status.setStatus("Disabling writes for close");
1204     // block waiting for the lock for closing
1205     lock.writeLock().lock();
1206     try {
1207       if (this.isClosed()) {
1208         status.abort("Already got closed by another process");
1209         // SplitTransaction handles the null
1210         return null;
1211       }
1212       LOG.debug("Updates disabled for region " + this);
1213       // Don't flush the cache if we are aborting
1214       if (!abort) {
1215         int flushCount = 0;
1216         while (this.getMemstoreSize().get() > 0) {
1217           try {
1218             if (flushCount++ > 0) {
1219               int actualFlushes = flushCount - 1;
1220               if (actualFlushes > 5) {
1221                 // If we tried 5 times and are unable to clear memory, abort
1222                 // so we do not lose data
1223                 throw new DroppedSnapshotException("Failed clearing memory after " +
1224                   actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
1225               }
1226               LOG.info("Running extra flush, " + actualFlushes +
1227                 " (carrying snapshot?) " + this);
1228             }
1229             internalFlushcache(status);
1230           } catch (IOException ioe) {
1231             status.setStatus("Failed flush " + this + ", putting online again");
1232             synchronized (writestate) {
1233               writestate.writesEnabled = true;
1234             }
1235             // Have to throw to upper layers.  I can't abort server from here.
1236             throw ioe;
1237           }
1238         }
1239       }
1240 
1241       Map<byte[], List<StoreFile>> result =
1242         new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1243       if (!stores.isEmpty()) {
1244         // initialize the thread pool for closing stores in parallel.
1245         ThreadPoolExecutor storeCloserThreadPool =
1246           getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
1247         CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1248           new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1249 
1250         // close each store in parallel
1251         for (final Store store : stores.values()) {
1252           assert abort || store.getFlushableSize() == 0;
1253           completionService
1254               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1255                 @Override
1256                 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1257                   return new Pair<byte[], Collection<StoreFile>>(
1258                     store.getFamily().getName(), store.close());
1259                 }
1260               });
1261         }
1262         try {
1263           for (int i = 0; i < stores.size(); i++) {
1264             Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1265             Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1266             List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1267             if (familyFiles == null) {
1268               familyFiles = new ArrayList<StoreFile>();
1269               result.put(storeFiles.getFirst(), familyFiles);
1270             }
1271             familyFiles.addAll(storeFiles.getSecond());
1272           }
1273         } catch (InterruptedException e) {
1274           throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1275         } catch (ExecutionException e) {
1276           throw new IOException(e.getCause());
1277         } finally {
1278           storeCloserThreadPool.shutdownNow();
1279         }
1280       }
1281 
1282       status.setStatus("Writing region close event to WAL");
1283       if (!abort  && wal != null && getRegionServerServices() != null) {
1284         writeRegionCloseMarker(wal);
1285       }
1286 
1287       this.closed.set(true);
1288       if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
1289       if (coprocessorHost != null) {
1290         status.setStatus("Running coprocessor post-close hooks");
1291         this.coprocessorHost.postClose(abort);
1292       }
1293       if ( this.metricsRegion != null) {
1294         this.metricsRegion.close();
1295       }
1296       if ( this.metricsRegionWrapper != null) {
1297         Closeables.closeQuietly(this.metricsRegionWrapper);
1298       }
1299       status.markComplete("Closed");
1300       LOG.info("Closed " + this);
1301       return result;
1302     } finally {
1303       lock.writeLock().unlock();
1304     }
1305   }
1306 
1307   /**
1308    * Wait for all current flushes and compactions of the region to complete.
1309    * <p>
1310    * Exposed for TESTING.
1311    */
1312   public void waitForFlushesAndCompactions() {
1313     synchronized (writestate) {
1314       boolean interrupted = false;
1315       try {
1316         while (writestate.compacting > 0 || writestate.flushing) {
1317           LOG.debug("waiting for " + writestate.compacting + " compactions"
1318             + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1319           try {
1320             writestate.wait();
1321           } catch (InterruptedException iex) {
1322             // essentially ignore and propagate the interrupt back up
1323             LOG.warn("Interrupted while waiting");
1324             interrupted = true;
1325           }
1326         }
1327       } finally {
1328         if (interrupted) {
1329           Thread.currentThread().interrupt();
1330         }
1331       }
1332     }
1333   }
1334 
1335   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1336       final String threadNamePrefix) {
1337     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1338     int maxThreads = Math.min(numStores,
1339         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1340             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1341     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1342   }
1343 
1344   protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1345       final String threadNamePrefix) {
1346     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1347     int maxThreads = Math.max(1,
1348         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1349             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1350             / numStores);
1351     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1352   }
1353 
1354   static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1355       final String threadNamePrefix) {
1356     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1357       new ThreadFactory() {
1358         private int count = 1;
1359 
1360         @Override
1361         public Thread newThread(Runnable r) {
1362           return new Thread(r, threadNamePrefix + "-" + count++);
1363         }
1364       });
1365   }
1366 
1367    /**
1368     * @return True if its worth doing a flush before we put up the close flag.
1369     */
1370   private boolean worthPreFlushing() {
1371     return this.memstoreSize.get() >
1372       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1373   }
1374 
1375   //////////////////////////////////////////////////////////////////////////////
1376   // HRegion accessors
1377   //////////////////////////////////////////////////////////////////////////////
1378 
1379   /** @return start key for region */
1380   public byte [] getStartKey() {
1381     return this.getRegionInfo().getStartKey();
1382   }
1383 
1384   /** @return end key for region */
1385   public byte [] getEndKey() {
1386     return this.getRegionInfo().getEndKey();
1387   }
1388 
1389   /** @return region id */
1390   public long getRegionId() {
1391     return this.getRegionInfo().getRegionId();
1392   }
1393 
1394   /** @return region name */
1395   public byte [] getRegionName() {
1396     return this.getRegionInfo().getRegionName();
1397   }
1398 
1399   /** @return region name as string for logging */
1400   public String getRegionNameAsString() {
1401     return this.getRegionInfo().getRegionNameAsString();
1402   }
1403 
1404   /** @return HTableDescriptor for this region */
1405   public HTableDescriptor getTableDesc() {
1406     return this.htableDescriptor;
1407   }
1408 
1409   /** @return WAL in use for this region */
1410   public WAL getWAL() {
1411     return this.wal;
1412   }
1413 
1414   /**
1415    * A split takes the config from the parent region & passes it to the daughter
1416    * region's constructor. If 'conf' was passed, you would end up using the HTD
1417    * of the parent region in addition to the new daughter HTD. Pass 'baseConf'
1418    * to the daughter regions to avoid this tricky dedupe problem.
1419    * @return Configuration object
1420    */
1421   Configuration getBaseConf() {
1422     return this.baseConf;
1423   }
1424 
1425   /** @return {@link FileSystem} being used by this region */
1426   public FileSystem getFilesystem() {
1427     return fs.getFileSystem();
1428   }
1429 
1430   /** @return the {@link HRegionFileSystem} used by this region */
1431   public HRegionFileSystem getRegionFileSystem() {
1432     return this.fs;
1433   }
1434 
1435   /** @return the last time the region was flushed */
1436   public long getLastFlushTime() {
1437     return this.lastFlushTime;
1438   }
1439 
1440   //////////////////////////////////////////////////////////////////////////////
1441   // HRegion maintenance.
1442   //
1443   // These methods are meant to be called periodically by the HRegionServer for
1444   // upkeep.
1445   //////////////////////////////////////////////////////////////////////////////
1446 
1447   /** @return returns size of largest HStore. */
1448   public long getLargestHStoreSize() {
1449     long size = 0;
1450     for (Store h : stores.values()) {
1451       long storeSize = h.getSize();
1452       if (storeSize > size) {
1453         size = storeSize;
1454       }
1455     }
1456     return size;
1457   }
1458 
1459   /**
1460    * @return KeyValue Comparator
1461    */
1462   public KeyValue.KVComparator getComparator() {
1463     return this.comparator;
1464   }
1465 
1466   /*
1467    * Do preparation for pending compaction.
1468    * @throws IOException
1469    */
1470   protected void doRegionCompactionPrep() throws IOException {
1471   }
1472 
1473   void triggerMajorCompaction() {
1474     for (Store h : stores.values()) {
1475       h.triggerMajorCompaction();
1476     }
1477   }
1478 
1479   /**
1480    * This is a helper function that compact all the stores synchronously
1481    * It is used by utilities and testing
1482    *
1483    * @param majorCompaction True to force a major compaction regardless of thresholds
1484    * @throws IOException e
1485    */
1486   public void compactStores(final boolean majorCompaction)
1487   throws IOException {
1488     if (majorCompaction) {
1489       this.triggerMajorCompaction();
1490     }
1491     compactStores();
1492   }
1493 
1494   /**
1495    * This is a helper function that compact all the stores synchronously
1496    * It is used by utilities and testing
1497    *
1498    * @throws IOException e
1499    */
1500   public void compactStores() throws IOException {
1501     for (Store s : getStores().values()) {
1502       CompactionContext compaction = s.requestCompaction();
1503       if (compaction != null) {
1504         compact(compaction, s);
1505       }
1506     }
1507   }
1508 
1509   /*
1510    * Called by compaction thread and after region is opened to compact the
1511    * HStores if necessary.
1512    *
1513    * <p>This operation could block for a long time, so don't call it from a
1514    * time-sensitive thread.
1515    *
1516    * Note that no locking is necessary at this level because compaction only
1517    * conflicts with a region split, and that cannot happen because the region
1518    * server does them sequentially and not in parallel.
1519    *
1520    * @param cr Compaction details, obtained by requestCompaction()
1521    * @return whether the compaction completed
1522    * @throws IOException e
1523    */
1524   public boolean compact(CompactionContext compaction, Store store) throws IOException {
1525     assert compaction != null && compaction.hasSelection();
1526     assert !compaction.getRequest().getFiles().isEmpty();
1527     if (this.closing.get() || this.closed.get()) {
1528       LOG.debug("Skipping compaction on " + this + " because closing/closed");
1529       store.cancelRequestedCompaction(compaction);
1530       return false;
1531     }
1532     MonitoredTask status = null;
1533     boolean didPerformCompaction = false;
1534     // block waiting for the lock for compaction
1535     lock.readLock().lock();
1536     try {
1537       byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1538       if (stores.get(cf) != store) {
1539         LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1540             + " has been re-instantiated, cancel this compaction request. "
1541             + " It may be caused by the roll back of split transaction");
1542         return false;
1543       }
1544 
1545       status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1546       if (this.closed.get()) {
1547         String msg = "Skipping compaction on " + this + " because closed";
1548         LOG.debug(msg);
1549         status.abort(msg);
1550         return false;
1551       }
1552       boolean wasStateSet = false;
1553       try {
1554         synchronized (writestate) {
1555           if (writestate.writesEnabled) {
1556             wasStateSet = true;
1557             ++writestate.compacting;
1558           } else {
1559             String msg = "NOT compacting region " + this + ". Writes disabled.";
1560             LOG.info(msg);
1561             status.abort(msg);
1562             return false;
1563           }
1564         }
1565         LOG.info("Starting compaction on " + store + " in region " + this
1566             + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1567         doRegionCompactionPrep();
1568         try {
1569           status.setStatus("Compacting store " + store);
1570           didPerformCompaction = true;
1571           store.compact(compaction);
1572         } catch (InterruptedIOException iioe) {
1573           String msg = "compaction interrupted";
1574           LOG.info(msg, iioe);
1575           status.abort(msg);
1576           return false;
1577         }
1578       } finally {
1579         if (wasStateSet) {
1580           synchronized (writestate) {
1581             --writestate.compacting;
1582             if (writestate.compacting <= 0) {
1583               writestate.notifyAll();
1584             }
1585           }
1586         }
1587       }
1588       status.markComplete("Compaction complete");
1589       return true;
1590     } finally {
1591       try {
1592         if (!didPerformCompaction) store.cancelRequestedCompaction(compaction);
1593         if (status != null) status.cleanup();
1594       } finally {
1595         lock.readLock().unlock();
1596       }
1597     }
1598   }
1599 
1600   /**
1601    * Flush the cache.
1602    *
1603    * When this method is called the cache will be flushed unless:
1604    * <ol>
1605    *   <li>the cache is empty</li>
1606    *   <li>the region is closed.</li>
1607    *   <li>a flush is already in progress</li>
1608    *   <li>writes are disabled</li>
1609    * </ol>
1610    *
1611    * <p>This method may block for some time, so it should not be called from a
1612    * time-sensitive thread.
1613    *
1614    * @return true if the region needs compacting
1615    *
1616    * @throws IOException general io exceptions
1617    * @throws DroppedSnapshotException Thrown when replay of wal is required
1618    * because a Snapshot was not properly persisted.
1619    */
1620   public FlushResult flushcache() throws IOException {
1621     // fail-fast instead of waiting on the lock
1622     if (this.closing.get()) {
1623       String msg = "Skipping flush on " + this + " because closing";
1624       LOG.debug(msg);
1625       return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1626     }
1627     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1628     status.setStatus("Acquiring readlock on region");
1629     // block waiting for the lock for flushing cache
1630     lock.readLock().lock();
1631     try {
1632       if (this.closed.get()) {
1633         String msg = "Skipping flush on " + this + " because closed";
1634         LOG.debug(msg);
1635         status.abort(msg);
1636         return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1637       }
1638       if (coprocessorHost != null) {
1639         status.setStatus("Running coprocessor pre-flush hooks");
1640         coprocessorHost.preFlush();
1641       }
1642       if (numMutationsWithoutWAL.get() > 0) {
1643         numMutationsWithoutWAL.set(0);
1644         dataInMemoryWithoutWAL.set(0);
1645       }
1646       synchronized (writestate) {
1647         if (!writestate.flushing && writestate.writesEnabled) {
1648           this.writestate.flushing = true;
1649         } else {
1650           if (LOG.isDebugEnabled()) {
1651             LOG.debug("NOT flushing memstore for region " + this
1652                 + ", flushing=" + writestate.flushing + ", writesEnabled="
1653                 + writestate.writesEnabled);
1654           }
1655           String msg = "Not flushing since "
1656               + (writestate.flushing ? "already flushing"
1657               : "writes not enabled");
1658           status.abort(msg);
1659           return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1660         }
1661       }
1662       try {
1663         FlushResult fs = internalFlushcache(status);
1664 
1665         if (coprocessorHost != null) {
1666           status.setStatus("Running post-flush coprocessor hooks");
1667           coprocessorHost.postFlush();
1668         }
1669 
1670         status.markComplete("Flush successful");
1671         return fs;
1672       } finally {
1673         synchronized (writestate) {
1674           writestate.flushing = false;
1675           this.writestate.flushRequested = false;
1676           writestate.notifyAll();
1677         }
1678       }
1679     } finally {
1680       lock.readLock().unlock();
1681       status.cleanup();
1682     }
1683   }
1684 
1685   /**
1686    * Should the memstore be flushed now
1687    */
1688   boolean shouldFlush() {
1689     // This is a rough measure.
1690     if (this.lastFlushSeqId > 0
1691           && (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get())) {
1692       return true;
1693     }
1694     if (flushCheckInterval <= 0) { //disabled
1695       return false;
1696     }
1697     long now = EnvironmentEdgeManager.currentTime();
1698     //if we flushed in the recent past, we don't need to do again now
1699     if ((now - getLastFlushTime() < flushCheckInterval)) {
1700       return false;
1701     }
1702     //since we didn't flush in the recent past, flush now if certain conditions
1703     //are met. Return true on first such memstore hit.
1704     for (Store s : this.getStores().values()) {
1705       if (s.timeOfOldestEdit() < now - flushCheckInterval) {
1706         // we have an old enough edit in the memstore, flush
1707         return true;
1708       }
1709     }
1710     return false;
1711   }
1712 
1713   /**
1714    * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
1715    * memstore, all of which have also been written to the wal. We need to write those updates in the
1716    * memstore out to disk, while being able to process reads/writes as much as possible during the
1717    * flush operation.
1718    * <p>This method may block for some time.  Every time you call it, we up the regions
1719    * sequence id even if we don't flush; i.e. the returned region id will be at least one larger
1720    * than the last edit applied to this region. The returned id does not refer to an actual edit.
1721    * The returned id can be used for say installing a bulk loaded file just ahead of the last hfile
1722    * that was the result of this flush, etc.
1723    * @return object describing the flush's state
1724    *
1725    * @throws IOException general io exceptions
1726    * @throws DroppedSnapshotException Thrown when replay of wal is required
1727    * because a Snapshot was not properly persisted.
1728    */
1729   protected FlushResult internalFlushcache(MonitoredTask status)
1730       throws IOException {
1731     return internalFlushcache(this.wal, -1, status);
1732   }
1733 
1734   /**
1735    * @param wal Null if we're NOT to go via wal.
1736    * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
1737    * @return object describing the flush's state
1738    * @throws IOException
1739    * @see #internalFlushcache(MonitoredTask)
1740    */
1741   protected FlushResult internalFlushcache(
1742       final WAL wal, final long myseqid, MonitoredTask status) throws IOException {
1743     if (this.rsServices != null && this.rsServices.isAborted()) {
1744       // Don't flush when server aborting, it's unsafe
1745       throw new IOException("Aborting flush because server is aborted...");
1746     }
1747     final long startTime = EnvironmentEdgeManager.currentTime();
1748     // If nothing to flush, return, but we need to safely update the region sequence id
1749     if (this.memstoreSize.get() <= 0) {
1750       // Take an update lock because am about to change the sequence id and we want the sequence id
1751       // to be at the border of the empty memstore.
1752       MultiVersionConsistencyControl.WriteEntry w = null;
1753       this.updatesLock.writeLock().lock();
1754       try {
1755         if (this.memstoreSize.get() <= 0) {
1756           // Presume that if there are still no edits in the memstore, then there are no edits for
1757           // this region out in the WAL subsystem so no need to do any trickery clearing out
1758           // edits in the WAL system. Up the sequence number so the resulting flush id is for
1759           // sure just beyond the last appended region edit (useful as a marker when bulk loading,
1760           // etc.)
1761           // wal can be null replaying edits.
1762           if (wal != null) {
1763             w = mvcc.beginMemstoreInsert();
1764             long flushSeqId = getNextSequenceId(wal);
1765             FlushResult flushResult = new FlushResult(
1766               FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush");
1767             w.setWriteNumber(flushSeqId);
1768             mvcc.waitForPreviousTransactionsComplete(w);
1769             w = null;
1770             return flushResult;
1771           } else {
1772             return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
1773               "Nothing to flush");
1774           }
1775         }
1776       } finally {
1777         this.updatesLock.writeLock().unlock();
1778         if (w != null) {
1779           mvcc.advanceMemstore(w);
1780         }
1781       }
1782     }
1783 
1784     LOG.info("Started memstore flush for " + this +
1785       ", current region memstore size " +
1786       StringUtils.byteDesc(this.memstoreSize.get()) +
1787       ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
1788 
1789     // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
1790     // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
1791     // allow updates again so its value will represent the size of the updates received
1792     // during flush
1793     MultiVersionConsistencyControl.WriteEntry w = null;
1794 
1795     // We have to take an update lock during snapshot, or else a write could end up in both snapshot
1796     // and memstore (makes it difficult to do atomic rows then)
1797     status.setStatus("Obtaining lock to block concurrent updates");
1798     // block waiting for the lock for internal flush
1799     this.updatesLock.writeLock().lock();
1800     long totalFlushableSize = 0;
1801     status.setStatus("Preparing to flush by snapshotting stores in " +
1802       getRegionInfo().getEncodedName());
1803     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
1804     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
1805         Bytes.BYTES_COMPARATOR);
1806     long flushSeqId = -1L;
1807 
1808     long trxId = 0;
1809     try {
1810       try {
1811         w = mvcc.beginMemstoreInsert();
1812         if (wal != null) {
1813           if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
1814             // This should never happen.
1815             String msg = "Flush will not be started for ["
1816                 + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
1817             status.setStatus(msg);
1818             return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1819           }
1820           // Get a sequence id that we can use to denote the flush. It will be one beyond the last
1821           // edit that made it into the hfile (the below does not add an edit, it just asks the
1822           // WAL system to return next sequence edit).
1823           flushSeqId = getNextSequenceId(wal);
1824         } else {
1825           // use the provided sequence Id as WAL is not being used for this flush.
1826           flushSeqId = myseqid;
1827         }
1828 
1829         for (Store s : stores.values()) {
1830           totalFlushableSize += s.getFlushableSize();
1831           storeFlushCtxs.add(s.createFlushContext(flushSeqId));
1832           committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
1833         }
1834 
1835         // write the snapshot start to WAL
1836         if (wal != null) {
1837           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
1838             getRegionInfo(), flushSeqId, committedFiles);
1839           trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1840             desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock
1841         }
1842 
1843         // Prepare flush (take a snapshot)
1844         for (StoreFlushContext flush : storeFlushCtxs) {
1845           flush.prepare();
1846         }
1847       } catch (IOException ex) {
1848         if (wal != null) {
1849           if (trxId > 0) { // check whether we have already written START_FLUSH to WAL
1850             try {
1851               FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
1852                 getRegionInfo(), flushSeqId, committedFiles);
1853               WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1854                 desc, sequenceId, false);
1855             } catch (Throwable t) {
1856               LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
1857                   StringUtils.stringifyException(t));
1858               // ignore this since we will be aborting the RS with DSE.
1859             }
1860           }
1861           // we have called wal.startCacheFlush(), now we have to abort it
1862           wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1863           throw ex; // let upper layers deal with it.
1864         }
1865       } finally {
1866         this.updatesLock.writeLock().unlock();
1867       }
1868       String s = "Finished memstore snapshotting " + this +
1869         ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
1870       status.setStatus(s);
1871       if (LOG.isTraceEnabled()) LOG.trace(s);
1872       // sync unflushed WAL changes
1873       // see HBASE-8208 for details
1874       if (wal != null) {
1875         try {
1876           wal.sync(); // ensure that flush marker is sync'ed
1877         } catch (IOException ioe) {
1878           LOG.warn("Unexpected exception while wal.sync(), ignoring. Exception: "
1879               + StringUtils.stringifyException(ioe));
1880         }
1881       }
1882 
1883       // wait for all in-progress transactions to commit to WAL before
1884       // we can start the flush. This prevents
1885       // uncommitted transactions from being written into HFiles.
1886       // We have to block before we start the flush, otherwise keys that
1887       // were removed via a rollbackMemstore could be written to Hfiles.
1888       w.setWriteNumber(flushSeqId);
1889       mvcc.waitForPreviousTransactionsComplete(w);
1890       // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
1891       w = null;
1892       s = "Flushing stores of " + this;
1893       status.setStatus(s);
1894       if (LOG.isTraceEnabled()) LOG.trace(s);
1895     } finally {
1896       if (w != null) {
1897         // in case of failure just mark current w as complete
1898         mvcc.advanceMemstore(w);
1899       }
1900     }
1901 
1902     // Any failure from here on out will be catastrophic requiring server
1903     // restart so wal content can be replayed and put back into the memstore.
1904     // Otherwise, the snapshot content while backed up in the wal, it will not
1905     // be part of the current running servers state.
1906     boolean compactionRequested = false;
1907     try {
1908       // A.  Flush memstore to all the HStores.
1909       // Keep running vector of all store files that includes both old and the
1910       // just-made new flush store file. The new flushed file is still in the
1911       // tmp directory.
1912 
1913       for (StoreFlushContext flush : storeFlushCtxs) {
1914         flush.flushCache(status);
1915       }
1916 
1917       // Switch snapshot (in memstore) -> new hfile (thus causing
1918       // all the store scanners to reset/reseek).
1919       Iterator<Store> it = stores.values().iterator(); // stores.values() and storeFlushCtxs have
1920       // same order
1921       for (StoreFlushContext flush : storeFlushCtxs) {
1922         boolean needsCompaction = flush.commit(status);
1923         if (needsCompaction) {
1924           compactionRequested = true;
1925         }
1926         committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles());
1927       }
1928       storeFlushCtxs.clear();
1929 
1930       // Set down the memstore size by amount of flush.
1931       this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
1932 
1933       if (wal != null) {
1934         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
1935         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
1936           getRegionInfo(), flushSeqId, committedFiles);
1937         WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1938           desc, sequenceId, true);
1939       }
1940     } catch (Throwable t) {
1941       // An exception here means that the snapshot was not persisted.
1942       // The wal needs to be replayed so its content is restored to memstore.
1943       // Currently, only a server restart will do this.
1944       // We used to only catch IOEs but its possible that we'd get other
1945       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
1946       // all and sundry.
1947       if (wal != null) {
1948         try {
1949           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
1950             getRegionInfo(), flushSeqId, committedFiles);
1951           WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1952             desc, sequenceId, false);
1953         } catch (Throwable ex) {
1954           LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
1955               StringUtils.stringifyException(ex));
1956           // ignore this since we will be aborting the RS with DSE.
1957         }
1958         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1959       }
1960       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
1961           Bytes.toStringBinary(getRegionName()));
1962       dse.initCause(t);
1963       status.abort("Flush failed: " + StringUtils.stringifyException(t));
1964       throw dse;
1965     }
1966 
1967     // If we get to here, the HStores have been written.
1968     if (wal != null) {
1969       wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1970     }
1971 
1972     // Record latest flush time
1973     this.lastFlushTime = EnvironmentEdgeManager.currentTime();
1974 
1975     // Update the last flushed sequence id for region. TODO: This is dup'd inside the WAL/FSHlog.
1976     this.lastFlushSeqId = flushSeqId;
1977 
1978     // C. Finally notify anyone waiting on memstore to clear:
1979     // e.g. checkResources().
1980     synchronized (this) {
1981       notifyAll(); // FindBugs NN_NAKED_NOTIFY
1982     }
1983 
1984     long time = EnvironmentEdgeManager.currentTime() - startTime;
1985     long memstoresize = this.memstoreSize.get();
1986     String msg = "Finished memstore flush of ~" +
1987       StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize +
1988       ", currentsize=" +
1989       StringUtils.byteDesc(memstoresize) + "/" + memstoresize +
1990       " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
1991       ", compaction requested=" + compactionRequested +
1992       ((wal == null)? "; wal=null": "");
1993     LOG.info(msg);
1994     status.setStatus(msg);
1995 
1996     return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
1997         FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
1998   }
1999 
2000   /**
2001    * Method to safely get the next sequence number.
2002    * @return Next sequence number unassociated with any actual edit.
2003    * @throws IOException
2004    */
2005   private long getNextSequenceId(final WAL wal) throws IOException {
2006     WALKey key = this.appendEmptyEdit(wal, null);
2007     return key.getSequenceId();
2008   }
2009 
2010   //////////////////////////////////////////////////////////////////////////////
2011   // get() methods for client use.
2012   //////////////////////////////////////////////////////////////////////////////
2013   /**
2014    * Return all the data for the row that matches <i>row</i> exactly,
2015    * or the one that immediately preceeds it, at or immediately before
2016    * <i>ts</i>.
2017    *
2018    * @param row row key
2019    * @return map of values
2020    * @throws IOException
2021    */
2022   Result getClosestRowBefore(final byte [] row)
2023   throws IOException{
2024     return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
2025   }
2026 
2027   /**
2028    * Return all the data for the row that matches <i>row</i> exactly,
2029    * or the one that immediately precedes it, at or immediately before
2030    * <i>ts</i>.
2031    *
2032    * @param row row key
2033    * @param family column family to find on
2034    * @return map of values
2035    * @throws IOException read exceptions
2036    */
2037   public Result getClosestRowBefore(final byte [] row, final byte [] family)
2038   throws IOException {
2039     if (coprocessorHost != null) {
2040       Result result = new Result();
2041       if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
2042         return result;
2043       }
2044     }
2045     // look across all the HStores for this region and determine what the
2046     // closest key is across all column families, since the data may be sparse
2047     checkRow(row, "getClosestRowBefore");
2048     startRegionOperation(Operation.GET);
2049     this.readRequestsCount.increment();
2050     try {
2051       Store store = getStore(family);
2052       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
2053       Cell key = store.getRowKeyAtOrBefore(row);
2054       Result result = null;
2055       if (key != null) {
2056         Get get = new Get(CellUtil.cloneRow(key));
2057         get.addFamily(family);
2058         result = get(get);
2059       }
2060       if (coprocessorHost != null) {
2061         coprocessorHost.postGetClosestRowBefore(row, family, result);
2062       }
2063       return result;
2064     } finally {
2065       closeRegionOperation(Operation.GET);
2066     }
2067   }
2068 
2069   /**
2070    * Return an iterator that scans over the HRegion, returning the indicated
2071    * columns and rows specified by the {@link Scan}.
2072    * <p>
2073    * This Iterator must be closed by the caller.
2074    *
2075    * @param scan configured {@link Scan}
2076    * @return RegionScanner
2077    * @throws IOException read exceptions
2078    */
2079   public RegionScanner getScanner(Scan scan) throws IOException {
2080    return getScanner(scan, null);
2081   }
2082 
2083   void prepareScanner(Scan scan) throws IOException {
2084     if(!scan.hasFamilies()) {
2085       // Adding all families to scanner
2086       for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
2087         scan.addFamily(family);
2088       }
2089     }
2090   }
2091 
2092   protected RegionScanner getScanner(Scan scan,
2093       List<KeyValueScanner> additionalScanners) throws IOException {
2094     startRegionOperation(Operation.SCAN);
2095     try {
2096       // Verify families are all valid
2097       prepareScanner(scan);
2098       if(scan.hasFamilies()) {
2099         for(byte [] family : scan.getFamilyMap().keySet()) {
2100           checkFamily(family);
2101         }
2102       }
2103       return instantiateRegionScanner(scan, additionalScanners);
2104     } finally {
2105       closeRegionOperation(Operation.SCAN);
2106     }
2107   }
2108 
2109   protected RegionScanner instantiateRegionScanner(Scan scan,
2110       List<KeyValueScanner> additionalScanners) throws IOException {
2111     if (scan.isReversed()) {
2112       if (scan.getFilter() != null) {
2113         scan.getFilter().setReversed(true);
2114       }
2115       return new ReversedRegionScannerImpl(scan, additionalScanners, this);
2116     }
2117     return new RegionScannerImpl(scan, additionalScanners, this);
2118   }
2119 
2120   /*
2121    * @param delete The passed delete is modified by this method. WARNING!
2122    */
2123   void prepareDelete(Delete delete) throws IOException {
2124     // Check to see if this is a deleteRow insert
2125     if(delete.getFamilyCellMap().isEmpty()){
2126       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
2127         // Don't eat the timestamp
2128         delete.deleteFamily(family, delete.getTimeStamp());
2129       }
2130     } else {
2131       for(byte [] family : delete.getFamilyCellMap().keySet()) {
2132         if(family == null) {
2133           throw new NoSuchColumnFamilyException("Empty family is invalid");
2134         }
2135         checkFamily(family);
2136       }
2137     }
2138   }
2139 
2140   //////////////////////////////////////////////////////////////////////////////
2141   // set() methods for client use.
2142   //////////////////////////////////////////////////////////////////////////////
2143   /**
2144    * @param delete delete object
2145    * @throws IOException read exceptions
2146    */
2147   public void delete(Delete delete)
2148   throws IOException {
2149     checkReadOnly();
2150     checkResources();
2151     startRegionOperation(Operation.DELETE);
2152     try {
2153       delete.getRow();
2154       // All edits for the given row (across all column families) must happen atomically.
2155       doBatchMutate(delete);
2156     } finally {
2157       closeRegionOperation(Operation.DELETE);
2158     }
2159   }
2160 
2161   /**
2162    * Row needed by below method.
2163    */
2164   private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2165   /**
2166    * This is used only by unit tests. Not required to be a public API.
2167    * @param familyMap map of family to edits for the given family.
2168    * @throws IOException
2169    */
2170   void delete(NavigableMap<byte[], List<Cell>> familyMap,
2171       Durability durability) throws IOException {
2172     Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2173     delete.setFamilyCellMap(familyMap);
2174     delete.setDurability(durability);
2175     doBatchMutate(delete);
2176   }
2177 
2178   /**
2179    * Setup correct timestamps in the KVs in Delete object.
2180    * Caller should have the row and region locks.
2181    * @param mutation
2182    * @param familyMap
2183    * @param byteNow
2184    * @throws IOException
2185    */
2186   void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2187       byte[] byteNow) throws IOException {
2188     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2189 
2190       byte[] family = e.getKey();
2191       List<Cell> cells = e.getValue();
2192       assert cells instanceof RandomAccess;
2193 
2194       Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2195       int listSize = cells.size();
2196       for (int i=0; i < listSize; i++) {
2197         Cell cell = cells.get(i);
2198         //  Check if time is LATEST, change to time of most recent addition if so
2199         //  This is expensive.
2200         if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) {
2201           byte[] qual = CellUtil.cloneQualifier(cell);
2202           if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2203 
2204           Integer count = kvCount.get(qual);
2205           if (count == null) {
2206             kvCount.put(qual, 1);
2207           } else {
2208             kvCount.put(qual, count + 1);
2209           }
2210           count = kvCount.get(qual);
2211 
2212           Get get = new Get(CellUtil.cloneRow(cell));
2213           get.setMaxVersions(count);
2214           get.addColumn(family, qual);
2215           if (coprocessorHost != null) {
2216             if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2217                 byteNow, get)) {
2218               updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2219             }
2220           } else {
2221             updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2222           }
2223         } else {
2224           CellUtil.updateLatestStamp(cell, byteNow, 0);
2225         }
2226       }
2227     }
2228   }
2229 
2230   void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow)
2231       throws IOException {
2232     List<Cell> result = get(get, false);
2233 
2234     if (result.size() < count) {
2235       // Nothing to delete
2236       CellUtil.updateLatestStamp(cell, byteNow, 0);
2237       return;
2238     }
2239     if (result.size() > count) {
2240       throw new RuntimeException("Unexpected size: " + result.size());
2241     }
2242     Cell getCell = result.get(count - 1);
2243     CellUtil.setTimestamp(cell, getCell.getTimestamp());
2244   }
2245 
2246   /**
2247    * @throws IOException
2248    */
2249   public void put(Put put)
2250   throws IOException {
2251     checkReadOnly();
2252 
2253     // Do a rough check that we have resources to accept a write.  The check is
2254     // 'rough' in that between the resource check and the call to obtain a
2255     // read lock, resources may run out.  For now, the thought is that this
2256     // will be extremely rare; we'll deal with it when it happens.
2257     checkResources();
2258     startRegionOperation(Operation.PUT);
2259     try {
2260       // All edits for the given row (across all column families) must happen atomically.
2261       doBatchMutate(put);
2262     } finally {
2263       closeRegionOperation(Operation.PUT);
2264     }
2265   }
2266 
2267   /**
2268    * Struct-like class that tracks the progress of a batch operation,
2269    * accumulating status codes and tracking the index at which processing
2270    * is proceeding.
2271    */
2272   private abstract static class BatchOperationInProgress<T> {
2273     T[] operations;
2274     int nextIndexToProcess = 0;
2275     OperationStatus[] retCodeDetails;
2276     WALEdit[] walEditsFromCoprocessors;
2277 
2278     public BatchOperationInProgress(T[] operations) {
2279       this.operations = operations;
2280       this.retCodeDetails = new OperationStatus[operations.length];
2281       this.walEditsFromCoprocessors = new WALEdit[operations.length];
2282       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2283     }
2284 
2285     public abstract Mutation getMutation(int index);
2286     public abstract long getNonceGroup(int index);
2287     public abstract long getNonce(int index);
2288     /** This method is potentially expensive and should only be used for non-replay CP path. */
2289     public abstract Mutation[] getMutationsForCoprocs();
2290     public abstract boolean isInReplay();
2291     public abstract long getReplaySequenceId();
2292 
2293     public boolean isDone() {
2294       return nextIndexToProcess == operations.length;
2295     }
2296   }
2297 
2298   private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2299     private long nonceGroup;
2300     private long nonce;
2301     public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2302       super(operations);
2303       this.nonceGroup = nonceGroup;
2304       this.nonce = nonce;
2305     }
2306 
2307     @Override
2308     public Mutation getMutation(int index) {
2309       return this.operations[index];
2310     }
2311 
2312     @Override
2313     public long getNonceGroup(int index) {
2314       return nonceGroup;
2315     }
2316 
2317     @Override
2318     public long getNonce(int index) {
2319       return nonce;
2320     }
2321 
2322     @Override
2323     public Mutation[] getMutationsForCoprocs() {
2324       return this.operations;
2325     }
2326 
2327     @Override
2328     public boolean isInReplay() {
2329       return false;
2330     }
2331 
2332     @Override
2333     public long getReplaySequenceId() {
2334       return 0;
2335     }
2336   }
2337 
2338   private static class ReplayBatch extends BatchOperationInProgress<MutationReplay> {
2339     private long replaySeqId = 0;
2340     public ReplayBatch(MutationReplay[] operations, long seqId) {
2341       super(operations);
2342       this.replaySeqId = seqId;
2343     }
2344 
2345     @Override
2346     public Mutation getMutation(int index) {
2347       return this.operations[index].mutation;
2348     }
2349 
2350     @Override
2351     public long getNonceGroup(int index) {
2352       return this.operations[index].nonceGroup;
2353     }
2354 
2355     @Override
2356     public long getNonce(int index) {
2357       return this.operations[index].nonce;
2358     }
2359 
2360     @Override
2361     public Mutation[] getMutationsForCoprocs() {
2362       assert false;
2363       throw new RuntimeException("Should not be called for replay batch");
2364     }
2365 
2366     @Override
2367     public boolean isInReplay() {
2368       return true;
2369     }
2370 
2371     @Override
2372     public long getReplaySequenceId() {
2373       return this.replaySeqId;
2374     }
2375   }
2376 
2377   /**
2378    * Perform a batch of mutations.
2379    * It supports only Put and Delete mutations and will ignore other types passed.
2380    * @param mutations the list of mutations
2381    * @return an array of OperationStatus which internally contains the
2382    *         OperationStatusCode and the exceptionMessage if any.
2383    * @throws IOException
2384    */
2385   public OperationStatus[] batchMutate(
2386       Mutation[] mutations, long nonceGroup, long nonce) throws IOException {
2387     // As it stands, this is used for 3 things
2388     //  * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
2389     //  * coprocessor calls (see ex. BulkDeleteEndpoint).
2390     // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
2391     return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2392   }
2393 
2394   public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2395     return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2396   }
2397 
2398   /**
2399    * Replay a batch of mutations.
2400    * @param mutations mutations to replay.
2401    * @param replaySeqId SeqId for current mutations
2402    * @return an array of OperationStatus which internally contains the
2403    *         OperationStatusCode and the exceptionMessage if any.
2404    * @throws IOException
2405    */
2406   public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
2407       throws IOException {
2408     return batchMutate(new ReplayBatch(mutations, replaySeqId));
2409   }
2410 
2411   /**
2412    * Perform a batch of mutations.
2413    * It supports only Put and Delete mutations and will ignore other types passed.
2414    * @param mutations the list of mutations
2415    * @return an array of OperationStatus which internally contains the
2416    *         OperationStatusCode and the exceptionMessage if any.
2417    * @throws IOException
2418    */
2419   OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2420     boolean initialized = false;
2421     while (!batchOp.isDone()) {
2422       if (!batchOp.isInReplay()) {
2423         checkReadOnly();
2424       }
2425       checkResources();
2426 
2427       long newSize;
2428       Operation op = Operation.BATCH_MUTATE;
2429       if (batchOp.isInReplay()) op = Operation.REPLAY_BATCH_MUTATE;
2430       startRegionOperation(op);
2431 
2432       try {
2433         if (!initialized) {
2434           this.writeRequestsCount.add(batchOp.operations.length);
2435           if (!batchOp.isInReplay()) {
2436             doPreMutationHook(batchOp);
2437           }
2438           initialized = true;
2439         }
2440         long addedSize = doMiniBatchMutation(batchOp);
2441         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2442       } finally {
2443         closeRegionOperation(op);
2444       }
2445       if (isFlushSize(newSize)) {
2446         requestFlush();
2447       }
2448     }
2449     return batchOp.retCodeDetails;
2450   }
2451 
2452 
2453   private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2454       throws IOException {
2455     /* Run coprocessor pre hook outside of locks to avoid deadlock */
2456     WALEdit walEdit = new WALEdit();
2457     if (coprocessorHost != null) {
2458       for (int i = 0 ; i < batchOp.operations.length; i++) {
2459         Mutation m = batchOp.getMutation(i);
2460         if (m instanceof Put) {
2461           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2462             // pre hook says skip this Put
2463             // mark as success and skip in doMiniBatchMutation
2464             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2465           }
2466         } else if (m instanceof Delete) {
2467           Delete curDel = (Delete) m;
2468           if (curDel.getFamilyCellMap().isEmpty()) {
2469             // handle deleting a row case
2470             prepareDelete(curDel);
2471           }
2472           if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2473             // pre hook says skip this Delete
2474             // mark as success and skip in doMiniBatchMutation
2475             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2476           }
2477         } else {
2478           // In case of passing Append mutations along with the Puts and Deletes in batchMutate
2479           // mark the operation return code as failure so that it will not be considered in
2480           // the doMiniBatchMutation
2481           batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2482               "Put/Delete mutations only supported in batchMutate() now");
2483         }
2484         if (!walEdit.isEmpty()) {
2485           batchOp.walEditsFromCoprocessors[i] = walEdit;
2486           walEdit = new WALEdit();
2487         }
2488       }
2489     }
2490   }
2491 
2492   @SuppressWarnings("unchecked")
2493   private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2494     boolean isInReplay = batchOp.isInReplay();
2495     // variable to note if all Put items are for the same CF -- metrics related
2496     boolean putsCfSetConsistent = true;
2497     //The set of columnFamilies first seen for Put.
2498     Set<byte[]> putsCfSet = null;
2499     // variable to note if all Delete items are for the same CF -- metrics related
2500     boolean deletesCfSetConsistent = true;
2501     //The set of columnFamilies first seen for Delete.
2502     Set<byte[]> deletesCfSet = null;
2503 
2504     long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2505     WALEdit walEdit = new WALEdit(isInReplay);
2506     MultiVersionConsistencyControl.WriteEntry w = null;
2507     long txid = 0;
2508     boolean doRollBackMemstore = false;
2509     boolean locked = false;
2510 
2511     /** Keep track of the locks we hold so we can release them in finally clause */
2512     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2513     // reference family maps directly so coprocessors can mutate them if desired
2514     Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2515     List<Cell> memstoreCells = new ArrayList<Cell>();
2516     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
2517     int firstIndex = batchOp.nextIndexToProcess;
2518     int lastIndexExclusive = firstIndex;
2519     boolean success = false;
2520     int noOfPuts = 0, noOfDeletes = 0;
2521     WALKey walKey = null;
2522     long mvccNum = 0;
2523     try {
2524       // ------------------------------------
2525       // STEP 1. Try to acquire as many locks as we can, and ensure
2526       // we acquire at least one.
2527       // ----------------------------------
2528       int numReadyToWrite = 0;
2529       long now = EnvironmentEdgeManager.currentTime();
2530       while (lastIndexExclusive < batchOp.operations.length) {
2531         Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2532         boolean isPutMutation = mutation instanceof Put;
2533 
2534         Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2535         // store the family map reference to allow for mutations
2536         familyMaps[lastIndexExclusive] = familyMap;
2537 
2538         // skip anything that "ran" already
2539         if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2540             != OperationStatusCode.NOT_RUN) {
2541           lastIndexExclusive++;
2542           continue;
2543         }
2544 
2545         try {
2546           if (isPutMutation) {
2547             // Check the families in the put. If bad, skip this one.
2548             if (isInReplay) {
2549               removeNonExistentColumnFamilyForReplay(familyMap);
2550             } else {
2551               checkFamilies(familyMap.keySet());
2552             }
2553             checkTimestamps(mutation.getFamilyCellMap(), now);
2554           } else {
2555             prepareDelete((Delete) mutation);
2556           }
2557         } catch (NoSuchColumnFamilyException nscf) {
2558           LOG.warn("No such column family in batch mutation", nscf);
2559           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2560               OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2561           lastIndexExclusive++;
2562           continue;
2563         } catch (FailedSanityCheckException fsce) {
2564           LOG.warn("Batch Mutation did not pass sanity check", fsce);
2565           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2566               OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2567           lastIndexExclusive++;
2568           continue;
2569         }
2570 
2571         // If we haven't got any rows in our batch, we should block to
2572         // get the next one.
2573         boolean shouldBlock = numReadyToWrite == 0;
2574         RowLock rowLock = null;
2575         try {
2576           rowLock = getRowLock(mutation.getRow(), shouldBlock);
2577         } catch (IOException ioe) {
2578           LOG.warn("Failed getting lock in batch put, row="
2579             + Bytes.toStringBinary(mutation.getRow()), ioe);
2580         }
2581         if (rowLock == null) {
2582           // We failed to grab another lock
2583           assert !shouldBlock : "Should never fail to get lock when blocking";
2584           break; // stop acquiring more rows for this batch
2585         } else {
2586           acquiredRowLocks.add(rowLock);
2587         }
2588 
2589         lastIndexExclusive++;
2590         numReadyToWrite++;
2591 
2592         if (isPutMutation) {
2593           // If Column Families stay consistent through out all of the
2594           // individual puts then metrics can be reported as a mutliput across
2595           // column families in the first put.
2596           if (putsCfSet == null) {
2597             putsCfSet = mutation.getFamilyCellMap().keySet();
2598           } else {
2599             putsCfSetConsistent = putsCfSetConsistent
2600                 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
2601           }
2602         } else {
2603           if (deletesCfSet == null) {
2604             deletesCfSet = mutation.getFamilyCellMap().keySet();
2605           } else {
2606             deletesCfSetConsistent = deletesCfSetConsistent
2607                 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
2608           }
2609         }
2610       }
2611 
2612       // we should record the timestamp only after we have acquired the rowLock,
2613       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
2614       now = EnvironmentEdgeManager.currentTime();
2615       byte[] byteNow = Bytes.toBytes(now);
2616 
2617       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
2618       if (numReadyToWrite <= 0) return 0L;
2619 
2620       // We've now grabbed as many mutations off the list as we can
2621 
2622       // ------------------------------------
2623       // STEP 2. Update any LATEST_TIMESTAMP timestamps
2624       // ----------------------------------
2625       for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
2626         // skip invalid
2627         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2628             != OperationStatusCode.NOT_RUN) continue;
2629 
2630         Mutation mutation = batchOp.getMutation(i);
2631         if (mutation instanceof Put) {
2632           updateCellTimestamps(familyMaps[i].values(), byteNow);
2633           noOfPuts++;
2634         } else {
2635           prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
2636           noOfDeletes++;
2637         }
2638       }
2639 
2640       lock(this.updatesLock.readLock(), numReadyToWrite);
2641       locked = true;
2642       if(isInReplay) {
2643         mvccNum = batchOp.getReplaySequenceId();
2644       } else {
2645         mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
2646       }
2647       //
2648       // ------------------------------------
2649       // Acquire the latest mvcc number
2650       // ----------------------------------
2651       w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
2652 
2653       // calling the pre CP hook for batch mutation
2654       if (!isInReplay && coprocessorHost != null) {
2655         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2656           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2657           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2658         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2659       }
2660 
2661       // ------------------------------------
2662       // STEP 3. Write back to memstore
2663       // Write to memstore. It is ok to write to memstore
2664       // first without updating the WAL because we do not roll
2665       // forward the memstore MVCC. The MVCC will be moved up when
2666       // the complete operation is done. These changes are not yet
2667       // visible to scanners till we update the MVCC. The MVCC is
2668       // moved only when the sync is complete.
2669       // ----------------------------------
2670       long addedSize = 0;
2671       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2672         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2673             != OperationStatusCode.NOT_RUN) {
2674           continue;
2675         }
2676         doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
2677         addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells);
2678       }
2679 
2680       // ------------------------------------
2681       // STEP 4. Build WAL edit
2682       // ----------------------------------
2683       Durability durability = Durability.USE_DEFAULT;
2684       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2685         // Skip puts that were determined to be invalid during preprocessing
2686         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2687             != OperationStatusCode.NOT_RUN) {
2688           continue;
2689         }
2690         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2691 
2692         Mutation m = batchOp.getMutation(i);
2693         Durability tmpDur = getEffectiveDurability(m.getDurability());
2694         if (tmpDur.ordinal() > durability.ordinal()) {
2695           durability = tmpDur;
2696         }
2697         if (tmpDur == Durability.SKIP_WAL) {
2698           recordMutationWithoutWal(m.getFamilyCellMap());
2699           continue;
2700         }
2701 
2702         long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
2703         // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
2704         // Given how nonces are originally written, these should be contiguous.
2705         // They don't have to be, it will still work, just write more WALEdits than needed.
2706         if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
2707           if (walEdit.size() > 0) {
2708             assert isInReplay;
2709             if (!isInReplay) {
2710               throw new IOException("Multiple nonces per batch and not in replay");
2711             }
2712             // txid should always increase, so having the one from the last call is ok.
2713             // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
2714             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2715               this.htableDescriptor.getTableName(), now, m.getClusterIds(),
2716               currentNonceGroup, currentNonce);
2717             txid = this.wal.append(this.htableDescriptor,  this.getRegionInfo(),  walKey,
2718               walEdit, getSequenceId(), true, null);
2719             walEdit = new WALEdit(isInReplay);
2720             walKey = null;
2721           }
2722           currentNonceGroup = nonceGroup;
2723           currentNonce = nonce;
2724         }
2725 
2726         // Add WAL edits by CP
2727         WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2728         if (fromCP != null) {
2729           for (Cell cell : fromCP.getCells()) {
2730             walEdit.add(cell);
2731           }
2732         }
2733         addFamilyMapToWALEdit(familyMaps[i], walEdit);
2734       }
2735 
2736       // -------------------------
2737       // STEP 5. Append the final edit to WAL. Do not sync wal.
2738       // -------------------------
2739       Mutation mutation = batchOp.getMutation(firstIndex);
2740       if (walEdit.size() > 0) {
2741         // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
2742         walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2743             this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
2744             mutation.getClusterIds(), currentNonceGroup, currentNonce);
2745         if(isInReplay) {
2746           walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
2747         }
2748         txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
2749           getSequenceId(), true, memstoreCells);
2750       }
2751       if(walKey == null){
2752         // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
2753         walKey = this.appendEmptyEdit(this.wal, memstoreCells);
2754       }
2755 
2756       // -------------------------------
2757       // STEP 6. Release row locks, etc.
2758       // -------------------------------
2759       if (locked) {
2760         this.updatesLock.readLock().unlock();
2761         locked = false;
2762       }
2763       releaseRowLocks(acquiredRowLocks);
2764 
2765       // -------------------------
2766       // STEP 7. Sync wal.
2767       // -------------------------
2768       if (txid != 0) {
2769         syncOrDefer(txid, durability);
2770       }
2771 
2772       doRollBackMemstore = false;
2773       // calling the post CP hook for batch mutation
2774       if (!isInReplay && coprocessorHost != null) {
2775         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2776           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2777           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2778         coprocessorHost.postBatchMutate(miniBatchOp);
2779       }
2780 
2781       // ------------------------------------------------------------------
2782       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
2783       // ------------------------------------------------------------------
2784       if (w != null) {
2785         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2786         w = null;
2787       }
2788 
2789       // ------------------------------------
2790       // STEP 9. Run coprocessor post hooks. This should be done after the wal is
2791       // synced so that the coprocessor contract is adhered to.
2792       // ------------------------------------
2793       if (!isInReplay && coprocessorHost != null) {
2794         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2795           // only for successful puts
2796           if (batchOp.retCodeDetails[i].getOperationStatusCode()
2797               != OperationStatusCode.SUCCESS) {
2798             continue;
2799           }
2800           Mutation m = batchOp.getMutation(i);
2801           if (m instanceof Put) {
2802             coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
2803           } else {
2804             coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
2805           }
2806         }
2807       }
2808 
2809       success = true;
2810       return addedSize;
2811     } finally {
2812 
2813       // if the wal sync was unsuccessful, remove keys from memstore
2814       if (doRollBackMemstore) {
2815         rollbackMemstore(memstoreCells);
2816       }
2817       if (w != null) {
2818         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2819       }
2820 
2821       if (locked) {
2822         this.updatesLock.readLock().unlock();
2823       }
2824       releaseRowLocks(acquiredRowLocks);
2825 
2826       // See if the column families were consistent through the whole thing.
2827       // if they were then keep them. If they were not then pass a null.
2828       // null will be treated as unknown.
2829       // Total time taken might be involving Puts and Deletes.
2830       // Split the time for puts and deletes based on the total number of Puts and Deletes.
2831 
2832       if (noOfPuts > 0) {
2833         // There were some Puts in the batch.
2834         if (this.metricsRegion != null) {
2835           this.metricsRegion.updatePut();
2836         }
2837       }
2838       if (noOfDeletes > 0) {
2839         // There were some Deletes in the batch.
2840         if (this.metricsRegion != null) {
2841           this.metricsRegion.updateDelete();
2842         }
2843       }
2844       if (!success) {
2845         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2846           if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
2847             batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
2848           }
2849         }
2850       }
2851       if (coprocessorHost != null && !batchOp.isInReplay()) {
2852         // call the coprocessor hook to do any finalization steps
2853         // after the put is done
2854         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2855             new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2856                 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
2857                 lastIndexExclusive);
2858         coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
2859       }
2860 
2861       batchOp.nextIndexToProcess = lastIndexExclusive;
2862     }
2863   }
2864 
2865   /**
2866    * Returns effective durability from the passed durability and
2867    * the table descriptor.
2868    */
2869   protected Durability getEffectiveDurability(Durability d) {
2870     return d == Durability.USE_DEFAULT ? this.durability : d;
2871   }
2872 
2873   //TODO, Think that gets/puts and deletes should be refactored a bit so that
2874   //the getting of the lock happens before, so that you would just pass it into
2875   //the methods. So in the case of checkAndMutate you could just do lockRow,
2876   //get, put, unlockRow or something
2877   /**
2878    *
2879    * @throws IOException
2880    * @return true if the new put was executed, false otherwise
2881    */
2882   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
2883       CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
2884       boolean writeToWAL)
2885   throws IOException{
2886     checkReadOnly();
2887     //TODO, add check for value length or maybe even better move this to the
2888     //client if this becomes a global setting
2889     checkResources();
2890     boolean isPut = w instanceof Put;
2891     if (!isPut && !(w instanceof Delete))
2892       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
2893           "be Put or Delete");
2894     if (!Bytes.equals(row, w.getRow())) {
2895       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
2896           "getRow must match the passed row");
2897     }
2898 
2899     startRegionOperation();
2900     try {
2901       Get get = new Get(row);
2902       checkFamily(family);
2903       get.addColumn(family, qualifier);
2904 
2905       // Lock row - note that doBatchMutate will relock this row if called
2906       RowLock rowLock = getRowLock(get.getRow());
2907       // wait for all previous transactions to complete (with lock held)
2908       mvcc.waitForPreviousTransactionsComplete();
2909       try {
2910         if (this.getCoprocessorHost() != null) {
2911           Boolean processed = null;
2912           if (w instanceof Put) {
2913             processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
2914                 qualifier, compareOp, comparator, (Put) w);
2915           } else if (w instanceof Delete) {
2916             processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
2917                 qualifier, compareOp, comparator, (Delete) w);
2918           }
2919           if (processed != null) {
2920             return processed;
2921           }
2922         }
2923         List<Cell> result = get(get, false);
2924 
2925         boolean valueIsNull = comparator.getValue() == null ||
2926           comparator.getValue().length == 0;
2927         boolean matches = false;
2928         if (result.size() == 0 && valueIsNull) {
2929           matches = true;
2930         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
2931             valueIsNull) {
2932           matches = true;
2933         } else if (result.size() == 1 && !valueIsNull) {
2934           Cell kv = result.get(0);
2935           int compareResult = comparator.compareTo(kv.getValueArray(),
2936               kv.getValueOffset(), kv.getValueLength());
2937           switch (compareOp) {
2938           case LESS:
2939             matches = compareResult < 0;
2940             break;
2941           case LESS_OR_EQUAL:
2942             matches = compareResult <= 0;
2943             break;
2944           case EQUAL:
2945             matches = compareResult == 0;
2946             break;
2947           case NOT_EQUAL:
2948             matches = compareResult != 0;
2949             break;
2950           case GREATER_OR_EQUAL:
2951             matches = compareResult >= 0;
2952             break;
2953           case GREATER:
2954             matches = compareResult > 0;
2955             break;
2956           default:
2957             throw new RuntimeException("Unknown Compare op " + compareOp.name());
2958           }
2959         }
2960         //If matches put the new put or delete the new delete
2961         if (matches) {
2962           // All edits for the given row (across all column families) must
2963           // happen atomically.
2964           doBatchMutate(w);
2965           this.checkAndMutateChecksPassed.increment();
2966           return true;
2967         }
2968         this.checkAndMutateChecksFailed.increment();
2969         return false;
2970       } finally {
2971         rowLock.release();
2972       }
2973     } finally {
2974       closeRegionOperation();
2975     }
2976   }
2977 
2978   //TODO, Think that gets/puts and deletes should be refactored a bit so that
2979   //the getting of the lock happens before, so that you would just pass it into
2980   //the methods. So in the case of checkAndMutate you could just do lockRow,
2981   //get, put, unlockRow or something
2982   /**
2983    *
2984    * @throws IOException
2985    * @return true if the new put was executed, false otherwise
2986    */
2987   public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
2988       CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
2989       boolean writeToWAL)
2990       throws IOException{
2991     checkReadOnly();
2992     //TODO, add check for value length or maybe even better move this to the
2993     //client if this becomes a global setting
2994     checkResources();
2995 
2996     startRegionOperation();
2997     try {
2998       Get get = new Get(row);
2999       checkFamily(family);
3000       get.addColumn(family, qualifier);
3001 
3002       // Lock row - note that doBatchMutate will relock this row if called
3003       RowLock rowLock = getRowLock(get.getRow());
3004       // wait for all previous transactions to complete (with lock held)
3005       mvcc.waitForPreviousTransactionsComplete();
3006       try {
3007         List<Cell> result = get(get, false);
3008 
3009         boolean valueIsNull = comparator.getValue() == null ||
3010             comparator.getValue().length == 0;
3011         boolean matches = false;
3012         if (result.size() == 0 && valueIsNull) {
3013           matches = true;
3014         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3015             valueIsNull) {
3016           matches = true;
3017         } else if (result.size() == 1 && !valueIsNull) {
3018           Cell kv = result.get(0);
3019           int compareResult = comparator.compareTo(kv.getValueArray(),
3020               kv.getValueOffset(), kv.getValueLength());
3021           switch (compareOp) {
3022           case LESS:
3023             matches = compareResult < 0;
3024             break;
3025           case LESS_OR_EQUAL:
3026             matches = compareResult <= 0;
3027             break;
3028           case EQUAL:
3029             matches = compareResult == 0;
3030             break;
3031           case NOT_EQUAL:
3032             matches = compareResult != 0;
3033             break;
3034           case GREATER_OR_EQUAL:
3035             matches = compareResult >= 0;
3036             break;
3037           case GREATER:
3038             matches = compareResult > 0;
3039             break;
3040           default:
3041             throw new RuntimeException("Unknown Compare op " + compareOp.name());
3042           }
3043         }
3044         //If matches put the new put or delete the new delete
3045         if (matches) {
3046           // All edits for the given row (across all column families) must
3047           // happen atomically.
3048           mutateRow(rm);
3049           this.checkAndMutateChecksPassed.increment();
3050           return true;
3051         }
3052         this.checkAndMutateChecksFailed.increment();
3053         return false;
3054       } finally {
3055         rowLock.release();
3056       }
3057     } finally {
3058       closeRegionOperation();
3059     }
3060   }
3061   private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
3062     // Currently this is only called for puts and deletes, so no nonces.
3063     OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
3064         HConstants.NO_NONCE, HConstants.NO_NONCE);
3065     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
3066       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
3067     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
3068       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
3069     }
3070   }
3071 
3072   /**
3073    * Complete taking the snapshot on the region. Writes the region info and adds references to the
3074    * working snapshot directory.
3075    *
3076    * TODO for api consistency, consider adding another version with no {@link ForeignExceptionSnare}
3077    * arg.  (In the future other cancellable HRegion methods could eventually add a
3078    * {@link ForeignExceptionSnare}, or we could do something fancier).
3079    *
3080    * @param desc snapshot description object
3081    * @param exnSnare ForeignExceptionSnare that captures external exceptions in case we need to
3082    *   bail out.  This is allowed to be null and will just be ignored in that case.
3083    * @throws IOException if there is an external or internal error causing the snapshot to fail
3084    */
3085   public void addRegionToSnapshot(SnapshotDescription desc,
3086       ForeignExceptionSnare exnSnare) throws IOException {
3087     Path rootDir = FSUtils.getRootDir(conf);
3088     Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
3089 
3090     SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
3091                                                         snapshotDir, desc, exnSnare);
3092     manifest.addRegion(this);
3093   }
3094 
3095   /**
3096    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the
3097    * provided current timestamp.
3098    * @throws IOException
3099    */
3100   void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
3101       throws IOException {
3102     for (List<Cell> cells: cellItr) {
3103       if (cells == null) continue;
3104       assert cells instanceof RandomAccess;
3105       int listSize = cells.size();
3106       for (int i = 0; i < listSize; i++) {
3107         CellUtil.updateLatestStamp(cells.get(i), now, 0);
3108       }
3109     }
3110   }
3111 
3112   /*
3113    * Check if resources to support an update.
3114    *
3115    * We throw RegionTooBusyException if above memstore limit
3116    * and expect client to retry using some kind of backoff
3117   */
3118   private void checkResources()
3119     throws RegionTooBusyException {
3120     // If catalog region, do not impose resource constraints or block updates.
3121     if (this.getRegionInfo().isMetaRegion()) return;
3122 
3123     if (this.memstoreSize.get() > this.blockingMemStoreSize) {
3124       requestFlush();
3125       throw new RegionTooBusyException("Above memstore limit, " +
3126           "regionName=" + (this.getRegionInfo() == null ? "unknown" :
3127           this.getRegionInfo().getRegionNameAsString()) +
3128           ", server=" + (this.getRegionServerServices() == null ? "unknown" :
3129           this.getRegionServerServices().getServerName()) +
3130           ", memstoreSize=" + memstoreSize.get() +
3131           ", blockingMemStoreSize=" + blockingMemStoreSize);
3132     }
3133   }
3134 
3135   /**
3136    * @throws IOException Throws exception if region is in read-only mode.
3137    */
3138   protected void checkReadOnly() throws IOException {
3139     if (this.writestate.isReadOnly()) {
3140       throw new IOException("region is read only");
3141     }
3142   }
3143 
3144   protected void checkReadsEnabled() throws IOException {
3145     if (!this.writestate.readsEnabled) {
3146       throw new IOException ("The region's reads are disabled. Cannot serve the request");
3147     }
3148   }
3149 
3150   public void setReadsEnabled(boolean readsEnabled) {
3151     this.writestate.setReadsEnabled(readsEnabled);
3152   }
3153 
3154   /**
3155    * Add updates first to the wal and then add values to memstore.
3156    * Warning: Assumption is caller has lock on passed in row.
3157    * @param edits Cell updates by column
3158    * @throws IOException
3159    */
3160   private void put(final byte [] row, byte [] family, List<Cell> edits)
3161   throws IOException {
3162     NavigableMap<byte[], List<Cell>> familyMap;
3163     familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
3164 
3165     familyMap.put(family, edits);
3166     Put p = new Put(row);
3167     p.setFamilyCellMap(familyMap);
3168     doBatchMutate(p);
3169   }
3170 
3171   /**
3172    * Atomically apply the given map of family->edits to the memstore.
3173    * This handles the consistency control on its own, but the caller
3174    * should already have locked updatesLock.readLock(). This also does
3175    * <b>not</b> check the families for validity.
3176    *
3177    * @param familyMap Map of kvs per family
3178    * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
3179    *        If null, then this method internally creates a mvcc transaction.
3180    * @param output newly added KVs into memstore
3181    * @return the additional memory usage of the memstore caused by the
3182    * new entries.
3183    * @throws IOException
3184    */
3185   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
3186     long mvccNum, List<Cell> memstoreCells) throws IOException {
3187     long size = 0;
3188 
3189     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3190       byte[] family = e.getKey();
3191       List<Cell> cells = e.getValue();
3192       assert cells instanceof RandomAccess;
3193       Store store = getStore(family);
3194       int listSize = cells.size();
3195       for (int i=0; i < listSize; i++) {
3196         Cell cell = cells.get(i);
3197         CellUtil.setSequenceId(cell, mvccNum);
3198         Pair<Long, Cell> ret = store.add(cell);
3199         size += ret.getFirst();
3200         memstoreCells.add(ret.getSecond());
3201       }
3202     }
3203 
3204      return size;
3205    }
3206 
3207   /**
3208    * Remove all the keys listed in the map from the memstore. This method is
3209    * called when a Put/Delete has updated memstore but subsequently fails to update
3210    * the wal. This method is then invoked to rollback the memstore.
3211    */
3212   private void rollbackMemstore(List<Cell> memstoreCells) {
3213     int kvsRolledback = 0;
3214 
3215     for (Cell cell : memstoreCells) {
3216       byte[] family = CellUtil.cloneFamily(cell);
3217       Store store = getStore(family);
3218       store.rollback(cell);
3219       kvsRolledback++;
3220     }
3221     LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
3222   }
3223 
3224   /**
3225    * Check the collection of families for validity.
3226    * @throws NoSuchColumnFamilyException if a family does not exist.
3227    */
3228   void checkFamilies(Collection<byte[]> families)
3229   throws NoSuchColumnFamilyException {
3230     for (byte[] family : families) {
3231       checkFamily(family);
3232     }
3233   }
3234 
3235   /**
3236    * During replay, there could exist column families which are removed between region server
3237    * failure and replay
3238    */
3239   private void removeNonExistentColumnFamilyForReplay(
3240       final Map<byte[], List<Cell>> familyMap) {
3241     List<byte[]> nonExistentList = null;
3242     for (byte[] family : familyMap.keySet()) {
3243       if (!this.htableDescriptor.hasFamily(family)) {
3244         if (nonExistentList == null) {
3245           nonExistentList = new ArrayList<byte[]>();
3246         }
3247         nonExistentList.add(family);
3248       }
3249     }
3250     if (nonExistentList != null) {
3251       for (byte[] family : nonExistentList) {
3252         // Perhaps schema was changed between crash and replay
3253         LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3254         familyMap.remove(family);
3255       }
3256     }
3257   }
3258 
3259   void checkTimestamps(final Map<byte[], List<Cell>> familyMap,
3260       long now) throws FailedSanityCheckException {
3261     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3262       return;
3263     }
3264     long maxTs = now + timestampSlop;
3265     for (List<Cell> kvs : familyMap.values()) {
3266       assert kvs instanceof RandomAccess;
3267       int listSize  = kvs.size();
3268       for (int i=0; i < listSize; i++) {
3269         Cell cell = kvs.get(i);
3270         // see if the user-side TS is out of range. latest = server-side
3271         long ts = cell.getTimestamp();
3272         if (ts != HConstants.LATEST_TIMESTAMP && ts > maxTs) {
3273           throw new FailedSanityCheckException("Timestamp for KV out of range "
3274               + cell + " (too.new=" + timestampSlop + ")");
3275         }
3276       }
3277     }
3278   }
3279 
3280   /**
3281    * Append the given map of family->edits to a WALEdit data structure.
3282    * This does not write to the WAL itself.
3283    * @param familyMap map of family->edits
3284    * @param walEdit the destination entry to append into
3285    */
3286   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3287       WALEdit walEdit) {
3288     for (List<Cell> edits : familyMap.values()) {
3289       assert edits instanceof RandomAccess;
3290       int listSize = edits.size();
3291       for (int i=0; i < listSize; i++) {
3292         Cell cell = edits.get(i);
3293         walEdit.add(cell);
3294       }
3295     }
3296   }
3297 
3298   private void requestFlush() {
3299     if (this.rsServices == null) {
3300       return;
3301     }
3302     synchronized (writestate) {
3303       if (this.writestate.isFlushRequested()) {
3304         return;
3305       }
3306       writestate.flushRequested = true;
3307     }
3308     // Make request outside of synchronize block; HBASE-818.
3309     this.rsServices.getFlushRequester().requestFlush(this);
3310     if (LOG.isDebugEnabled()) {
3311       LOG.debug("Flush requested on " + this);
3312     }
3313   }
3314 
3315   /*
3316    * @param size
3317    * @return True if size is over the flush threshold
3318    */
3319   private boolean isFlushSize(final long size) {
3320     return size > this.memstoreFlushSize;
3321   }
3322 
3323   /**
3324    * Read the edits put under this region by wal splitting process.  Put
3325    * the recovered edits back up into this region.
3326    *
3327    * <p>We can ignore any wal message that has a sequence ID that's equal to or
3328    * lower than minSeqId.  (Because we know such messages are already
3329    * reflected in the HFiles.)
3330    *
3331    * <p>While this is running we are putting pressure on memory yet we are
3332    * outside of our usual accounting because we are not yet an onlined region
3333    * (this stuff is being run as part of Region initialization).  This means
3334    * that if we're up against global memory limits, we'll not be flagged to flush
3335    * because we are not online. We can't be flushed by usual mechanisms anyways;
3336    * we're not yet online so our relative sequenceids are not yet aligned with
3337    * WAL sequenceids -- not till we come up online, post processing of split
3338    * edits.
3339    *
3340    * <p>But to help relieve memory pressure, at least manage our own heap size
3341    * flushing if are in excess of per-region limits.  Flushing, though, we have
3342    * to be careful and avoid using the regionserver/wal sequenceid.  Its running
3343    * on a different line to whats going on in here in this region context so if we
3344    * crashed replaying these edits, but in the midst had a flush that used the
3345    * regionserver wal with a sequenceid in excess of whats going on in here
3346    * in this region and with its split editlogs, then we could miss edits the
3347    * next time we go to recover. So, we have to flush inline, using seqids that
3348    * make sense in a this single region context only -- until we online.
3349    *
3350    * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
3351    * the maxSeqId for the store to be applied, else its skipped.
3352    * @return the sequence id of the last edit added to this region out of the
3353    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3354    * @throws UnsupportedEncodingException
3355    * @throws IOException
3356    */
3357   protected long replayRecoveredEditsIfAny(final Path regiondir,
3358       Map<byte[], Long> maxSeqIdInStores,
3359       final CancelableProgressable reporter, final MonitoredTask status)
3360       throws UnsupportedEncodingException, IOException {
3361     long minSeqIdForTheRegion = -1;
3362     for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3363       if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3364         minSeqIdForTheRegion = maxSeqIdInStore;
3365       }
3366     }
3367     long seqid = minSeqIdForTheRegion;
3368 
3369     FileSystem fs = this.fs.getFileSystem();
3370     NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
3371     if (LOG.isDebugEnabled()) {
3372       LOG.debug("Found " + (files == null ? 0 : files.size())
3373         + " recovered edits file(s) under " + regiondir);
3374     }
3375 
3376     if (files == null || files.isEmpty()) return seqid;
3377 
3378     for (Path edits: files) {
3379       if (edits == null || !fs.exists(edits)) {
3380         LOG.warn("Null or non-existent edits file: " + edits);
3381         continue;
3382       }
3383       if (isZeroLengthThenDelete(fs, edits)) continue;
3384 
3385       long maxSeqId;
3386       String fileName = edits.getName();
3387       maxSeqId = Math.abs(Long.parseLong(fileName));
3388       if (maxSeqId <= minSeqIdForTheRegion) {
3389         if (LOG.isDebugEnabled()) {
3390           String msg = "Maximum sequenceid for this wal is " + maxSeqId
3391             + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3392             + ", skipped the whole file, path=" + edits;
3393           LOG.debug(msg);
3394         }
3395         continue;
3396       }
3397 
3398       try {
3399         // replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater
3400         seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3401       } catch (IOException e) {
3402         boolean skipErrors = conf.getBoolean(
3403             HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3404             conf.getBoolean(
3405                 "hbase.skip.errors",
3406                 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3407         if (conf.get("hbase.skip.errors") != null) {
3408           LOG.warn(
3409               "The property 'hbase.skip.errors' has been deprecated. Please use " +
3410               HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3411         }
3412         if (skipErrors) {
3413           Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3414           LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3415               + "=true so continuing. Renamed " + edits +
3416               " as " + p, e);
3417         } else {
3418           throw e;
3419         }
3420       }
3421     }
3422     // The edits size added into rsAccounting during this replaying will not
3423     // be required any more. So just clear it.
3424     if (this.rsAccounting != null) {
3425       this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
3426     }
3427     if (seqid > minSeqIdForTheRegion) {
3428       // Then we added some edits to memory. Flush and cleanup split edit files.
3429       internalFlushcache(null, seqid, status);
3430     }
3431     // Now delete the content of recovered edits.  We're done w/ them.
3432     for (Path file: files) {
3433       if (!fs.delete(file, false)) {
3434         LOG.error("Failed delete of " + file);
3435       } else {
3436         LOG.debug("Deleted recovered.edits file=" + file);
3437       }
3438     }
3439     return seqid;
3440   }
3441 
3442   /*
3443    * @param edits File of recovered edits.
3444    * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in wal
3445    * must be larger than this to be replayed for each store.
3446    * @param reporter
3447    * @return the sequence id of the last edit added to this region out of the
3448    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3449    * @throws IOException
3450    */
3451   private long replayRecoveredEdits(final Path edits,
3452       Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3453     throws IOException {
3454     String msg = "Replaying edits from " + edits;
3455     LOG.info(msg);
3456     MonitoredTask status = TaskMonitor.get().createStatus(msg);
3457     FileSystem fs = this.fs.getFileSystem();
3458 
3459     status.setStatus("Opening recovered edits");
3460     WAL.Reader reader = null;
3461     try {
3462       reader = WALFactory.createReader(fs, edits, conf);
3463       long currentEditSeqId = -1;
3464       long currentReplaySeqId = -1;
3465       long firstSeqIdInLog = -1;
3466       long skippedEdits = 0;
3467       long editsCount = 0;
3468       long intervalEdits = 0;
3469       WAL.Entry entry;
3470       Store store = null;
3471       boolean reported_once = false;
3472       ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3473 
3474       try {
3475         // How many edits seen before we check elapsed time
3476         int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
3477             2000);
3478         // How often to send a progress report (default 1/2 master timeout)
3479         int period = this.conf.getInt("hbase.hstore.report.period", 300000);
3480         long lastReport = EnvironmentEdgeManager.currentTime();
3481 
3482         while ((entry = reader.next()) != null) {
3483           WALKey key = entry.getKey();
3484           WALEdit val = entry.getEdit();
3485 
3486           if (ng != null) { // some test, or nonces disabled
3487             ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
3488           }
3489 
3490           if (reporter != null) {
3491             intervalEdits += val.size();
3492             if (intervalEdits >= interval) {
3493               // Number of edits interval reached
3494               intervalEdits = 0;
3495               long cur = EnvironmentEdgeManager.currentTime();
3496               if (lastReport + period <= cur) {
3497                 status.setStatus("Replaying edits..." +
3498                     " skipped=" + skippedEdits +
3499                     " edits=" + editsCount);
3500                 // Timeout reached
3501                 if(!reporter.progress()) {
3502                   msg = "Progressable reporter failed, stopping replay";
3503                   LOG.warn(msg);
3504                   status.abort(msg);
3505                   throw new IOException(msg);
3506                 }
3507                 reported_once = true;
3508                 lastReport = cur;
3509               }
3510             }
3511           }
3512 
3513           // Start coprocessor replay here. The coprocessor is for each WALEdit
3514           // instead of a KeyValue.
3515           if (coprocessorHost != null) {
3516             status.setStatus("Running pre-WAL-restore hook in coprocessors");
3517             if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
3518               // if bypass this wal entry, ignore it ...
3519               continue;
3520             }
3521           }
3522 
3523           if (firstSeqIdInLog == -1) {
3524             firstSeqIdInLog = key.getLogSeqNum();
3525           }
3526           currentEditSeqId = key.getLogSeqNum();
3527           currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
3528             key.getOrigLogSeqNum() : currentEditSeqId;
3529           boolean flush = false;
3530           for (Cell cell: val.getCells()) {
3531             // Check this edit is for me. Also, guard against writing the special
3532             // METACOLUMN info such as HBASE::CACHEFLUSH entries
3533             if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY) ||
3534                 !Bytes.equals(key.getEncodedRegionName(),
3535                   this.getRegionInfo().getEncodedNameAsBytes())) {
3536               //this is a special edit, we should handle it
3537               CompactionDescriptor compaction = WALEdit.getCompaction(cell);
3538               if (compaction != null) {
3539                 //replay the compaction
3540                 completeCompactionMarker(compaction);
3541               }
3542 
3543               skippedEdits++;
3544               continue;
3545             }
3546             // Figure which store the edit is meant for.
3547             if (store == null || !CellUtil.matchingFamily(cell, store.getFamily().getName())) {
3548               store = getStore(cell);
3549             }
3550             if (store == null) {
3551               // This should never happen.  Perhaps schema was changed between
3552               // crash and redeploy?
3553               LOG.warn("No family for " + cell);
3554               skippedEdits++;
3555               continue;
3556             }
3557             // Now, figure if we should skip this edit.
3558             if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
3559                 .getName())) {
3560               skippedEdits++;
3561               continue;
3562             }
3563             CellUtil.setSequenceId(cell, currentReplaySeqId);
3564             // Once we are over the limit, restoreEdit will keep returning true to
3565             // flush -- but don't flush until we've played all the kvs that make up
3566             // the WALEdit.
3567             flush = restoreEdit(store, cell);
3568             editsCount++;
3569           }
3570           if (flush) internalFlushcache(null, currentEditSeqId, status);
3571 
3572           if (coprocessorHost != null) {
3573             coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3574           }
3575         }
3576       } catch (EOFException eof) {
3577         Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3578         msg = "Encountered EOF. Most likely due to Master failure during " +
3579             "wal splitting, so we have this data in another edit.  " +
3580             "Continuing, but renaming " + edits + " as " + p;
3581         LOG.warn(msg, eof);
3582         status.abort(msg);
3583       } catch (IOException ioe) {
3584         // If the IOE resulted from bad file format,
3585         // then this problem is idempotent and retrying won't help
3586         if (ioe.getCause() instanceof ParseException) {
3587           Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3588           msg = "File corruption encountered!  " +
3589               "Continuing, but renaming " + edits + " as " + p;
3590           LOG.warn(msg, ioe);
3591           status.setStatus(msg);
3592         } else {
3593           status.abort(StringUtils.stringifyException(ioe));
3594           // other IO errors may be transient (bad network connection,
3595           // checksum exception on one datanode, etc).  throw & retry
3596           throw ioe;
3597         }
3598       }
3599       if (reporter != null && !reported_once) {
3600         reporter.progress();
3601       }
3602       msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3603         ", firstSequenceIdInLog=" + firstSeqIdInLog +
3604         ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
3605       status.markComplete(msg);
3606       LOG.debug(msg);
3607       return currentEditSeqId;
3608     } finally {
3609       status.cleanup();
3610       if (reader != null) {
3611          reader.close();
3612       }
3613     }
3614   }
3615 
3616   /**
3617    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
3618    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
3619    * See HBASE-2331.
3620    */
3621   void completeCompactionMarker(CompactionDescriptor compaction)
3622       throws IOException {
3623     Store store = this.getStore(compaction.getFamilyName().toByteArray());
3624     if (store == null) {
3625       LOG.warn("Found Compaction WAL edit for deleted family:" +
3626           Bytes.toString(compaction.getFamilyName().toByteArray()));
3627       return;
3628     }
3629     store.completeCompactionMarker(compaction);
3630   }
3631 
3632   /**
3633    * Used by tests
3634    * @param s Store to add edit too.
3635    * @param cell Cell to add.
3636    * @return True if we should flush.
3637    */
3638   protected boolean restoreEdit(final Store s, final Cell cell) {
3639     long kvSize = s.add(cell).getFirst();
3640     if (this.rsAccounting != null) {
3641       rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
3642     }
3643     return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3644   }
3645 
3646   /*
3647    * @param fs
3648    * @param p File to check.
3649    * @return True if file was zero-length (and if so, we'll delete it in here).
3650    * @throws IOException
3651    */
3652   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3653       throws IOException {
3654     FileStatus stat = fs.getFileStatus(p);
3655     if (stat.getLen() > 0) return false;
3656     LOG.warn("File " + p + " is zero-length, deleting.");
3657     fs.delete(p, false);
3658     return true;
3659   }
3660 
3661   protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
3662     return new HStore(this, family, this.conf);
3663   }
3664 
3665   /**
3666    * Return HStore instance.
3667    * Use with caution.  Exposed for use of fixup utilities.
3668    * @param column Name of column family hosted by this region.
3669    * @return Store that goes with the family on passed <code>column</code>.
3670    * TODO: Make this lookup faster.
3671    */
3672   public Store getStore(final byte[] column) {
3673     return this.stores.get(column);
3674   }
3675 
3676   /**
3677    * Return HStore instance. Does not do any copy: as the number of store is limited, we
3678    *  iterate on the list.
3679    */
3680   private Store getStore(Cell cell) {
3681     for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
3682       if (Bytes.equals(
3683           cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3684           famStore.getKey(), 0, famStore.getKey().length)) {
3685         return famStore.getValue();
3686       }
3687     }
3688 
3689     return null;
3690   }
3691 
3692   public Map<byte[], Store> getStores() {
3693     return this.stores;
3694   }
3695 
3696   /**
3697    * Return list of storeFiles for the set of CFs.
3698    * Uses closeLock to prevent the race condition where a region closes
3699    * in between the for loop - closing the stores one by one, some stores
3700    * will return 0 files.
3701    * @return List of storeFiles.
3702    */
3703   public List<String> getStoreFileList(final byte [][] columns)
3704     throws IllegalArgumentException {
3705     List<String> storeFileNames = new ArrayList<String>();
3706     synchronized(closeLock) {
3707       for(byte[] column : columns) {
3708         Store store = this.stores.get(column);
3709         if (store == null) {
3710           throw new IllegalArgumentException("No column family : " +
3711               new String(column) + " available");
3712         }
3713         for (StoreFile storeFile: store.getStorefiles()) {
3714           storeFileNames.add(storeFile.getPath().toString());
3715         }
3716       }
3717     }
3718     return storeFileNames;
3719   }
3720 
3721   //////////////////////////////////////////////////////////////////////////////
3722   // Support code
3723   //////////////////////////////////////////////////////////////////////////////
3724 
3725   /** Make sure this is a valid row for the HRegion */
3726   void checkRow(final byte [] row, String op) throws IOException {
3727     if (!rowIsInRange(getRegionInfo(), row)) {
3728       throw new WrongRegionException("Requested row out of range for " +
3729           op + " on HRegion " + this + ", startKey='" +
3730           Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" +
3731           Bytes.toStringBinary(getEndKey()) + "', row='" +
3732           Bytes.toStringBinary(row) + "'");
3733     }
3734   }
3735 
3736   /**
3737    * Tries to acquire a lock on the given row.
3738    * @param waitForLock if true, will block until the lock is available.
3739    *        Otherwise, just tries to obtain the lock and returns
3740    *        false if unavailable.
3741    * @return the row lock if acquired,
3742    *   null if waitForLock was false and the lock was not acquired
3743    * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
3744    */
3745   public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
3746     checkRow(row, "row lock");
3747     startRegionOperation();
3748     try {
3749       HashedBytes rowKey = new HashedBytes(row);
3750       RowLockContext rowLockContext = new RowLockContext(rowKey);
3751 
3752       // loop until we acquire the row lock (unless !waitForLock)
3753       while (true) {
3754         RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
3755         if (existingContext == null) {
3756           // Row is not already locked by any thread, use newly created context.
3757           break;
3758         } else if (existingContext.ownedByCurrentThread()) {
3759           // Row is already locked by current thread, reuse existing context instead.
3760           rowLockContext = existingContext;
3761           break;
3762         } else {
3763           if (!waitForLock) {
3764             return null;
3765           }
3766           try {
3767             // Row is already locked by some other thread, give up or wait for it
3768             if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
3769               throw new IOException("Timed out waiting for lock for row: " + rowKey);
3770             }
3771           } catch (InterruptedException ie) {
3772             LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
3773             InterruptedIOException iie = new InterruptedIOException();
3774             iie.initCause(ie);
3775             throw iie;
3776           }
3777         }
3778       }
3779 
3780       // allocate new lock for this thread
3781       return rowLockContext.newLock();
3782     } finally {
3783       closeRegionOperation();
3784     }
3785   }
3786 
3787   /**
3788    * Acquires a lock on the given row.
3789    * The same thread may acquire multiple locks on the same row.
3790    * @return the acquired row lock
3791    * @throws IOException if the lock could not be acquired after waiting
3792    */
3793   public RowLock getRowLock(byte[] row) throws IOException {
3794     return getRowLock(row, true);
3795   }
3796 
3797   /**
3798    * If the given list of row locks is not null, releases all locks.
3799    */
3800   public void releaseRowLocks(List<RowLock> rowLocks) {
3801     if (rowLocks != null) {
3802       for (RowLock rowLock : rowLocks) {
3803         rowLock.release();
3804       }
3805       rowLocks.clear();
3806     }
3807   }
3808 
3809   /**
3810    * Determines whether multiple column families are present
3811    * Precondition: familyPaths is not null
3812    *
3813    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3814    */
3815   private static boolean hasMultipleColumnFamilies(
3816       List<Pair<byte[], String>> familyPaths) {
3817     boolean multipleFamilies = false;
3818     byte[] family = null;
3819     for (Pair<byte[], String> pair : familyPaths) {
3820       byte[] fam = pair.getFirst();
3821       if (family == null) {
3822         family = fam;
3823       } else if (!Bytes.equals(family, fam)) {
3824         multipleFamilies = true;
3825         break;
3826       }
3827     }
3828     return multipleFamilies;
3829   }
3830 
3831 
3832   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3833                                 boolean assignSeqId) throws IOException {
3834     return bulkLoadHFiles(familyPaths, assignSeqId, null);
3835   }
3836 
3837   /**
3838    * Attempts to atomically load a group of hfiles.  This is critical for loading
3839    * rows with multiple column families atomically.
3840    *
3841    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3842    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
3843    * file about to be bulk loaded
3844    * @return true if successful, false if failed recoverably
3845    * @throws IOException if failed unrecoverably.
3846    */
3847   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
3848       BulkLoadListener bulkLoadListener) throws IOException {
3849     Preconditions.checkNotNull(familyPaths);
3850     // we need writeLock for multi-family bulk load
3851     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
3852     try {
3853       this.writeRequestsCount.increment();
3854 
3855       // There possibly was a split that happened between when the split keys
3856       // were gathered and before the HRegion's write lock was taken.  We need
3857       // to validate the HFile region before attempting to bulk load all of them
3858       List<IOException> ioes = new ArrayList<IOException>();
3859       List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
3860       for (Pair<byte[], String> p : familyPaths) {
3861         byte[] familyName = p.getFirst();
3862         String path = p.getSecond();
3863 
3864         Store store = getStore(familyName);
3865         if (store == null) {
3866           IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
3867               "No such column family " + Bytes.toStringBinary(familyName));
3868           ioes.add(ioe);
3869         } else {
3870           try {
3871             store.assertBulkLoadHFileOk(new Path(path));
3872           } catch (WrongRegionException wre) {
3873             // recoverable (file doesn't fit in region)
3874             failures.add(p);
3875           } catch (IOException ioe) {
3876             // unrecoverable (hdfs problem)
3877             ioes.add(ioe);
3878           }
3879         }
3880       }
3881 
3882       // validation failed because of some sort of IO problem.
3883       if (ioes.size() != 0) {
3884         IOException e = MultipleIOException.createIOException(ioes);
3885         LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
3886         throw e;
3887       }
3888 
3889       // validation failed, bail out before doing anything permanent.
3890       if (failures.size() != 0) {
3891         StringBuilder list = new StringBuilder();
3892         for (Pair<byte[], String> p : failures) {
3893           list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
3894             .append(p.getSecond());
3895         }
3896         // problem when validating
3897         LOG.warn("There was a recoverable bulk load failure likely due to a" +
3898             " split.  These (family, HFile) pairs were not loaded: " + list);
3899         return false;
3900       }
3901 
3902       long seqId = -1;
3903       // We need to assign a sequential ID that's in between two memstores in order to preserve
3904       // the guarantee that all the edits lower than the highest sequential ID from all the
3905       // HFiles are flushed on disk. See HBASE-10958.  The sequence id returned when we flush is
3906       // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
3907       // a sequence id that we can be sure is beyond the last hfile written).
3908       if (assignSeqId) {
3909         FlushResult fs = this.flushcache();
3910         if (fs.isFlushSucceeded()) {
3911           seqId = fs.flushSequenceId;
3912         } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
3913           seqId = fs.flushSequenceId;
3914         } else {
3915           throw new IOException("Could not bulk load with an assigned sequential ID because the " +
3916               "flush didn't run. Reason for not flushing: " + fs.failureReason);
3917         }
3918       }
3919 
3920       for (Pair<byte[], String> p : familyPaths) {
3921         byte[] familyName = p.getFirst();
3922         String path = p.getSecond();
3923         Store store = getStore(familyName);
3924         try {
3925           String finalPath = path;
3926           if(bulkLoadListener != null) {
3927             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
3928           }
3929           store.bulkLoadHFile(finalPath, seqId);
3930           if(bulkLoadListener != null) {
3931             bulkLoadListener.doneBulkLoad(familyName, path);
3932           }
3933         } catch (IOException ioe) {
3934           // A failure here can cause an atomicity violation that we currently
3935           // cannot recover from since it is likely a failed HDFS operation.
3936 
3937           // TODO Need a better story for reverting partial failures due to HDFS.
3938           LOG.error("There was a partial failure due to IO when attempting to" +
3939               " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe);
3940           if(bulkLoadListener != null) {
3941             try {
3942               bulkLoadListener.failedBulkLoad(familyName, path);
3943             } catch (Exception ex) {
3944               LOG.error("Error while calling failedBulkLoad for family "+
3945                   Bytes.toString(familyName)+" with path "+path, ex);
3946             }
3947           }
3948           throw ioe;
3949         }
3950       }
3951       return true;
3952     } finally {
3953       closeBulkRegionOperation();
3954     }
3955   }
3956 
3957   @Override
3958   public boolean equals(Object o) {
3959     return o instanceof HRegion && Bytes.equals(this.getRegionName(),
3960                                                 ((HRegion) o).getRegionName());
3961   }
3962 
3963   @Override
3964   public int hashCode() {
3965     return Bytes.hashCode(this.getRegionName());
3966   }
3967 
3968   @Override
3969   public String toString() {
3970     return this.getRegionNameAsString();
3971   }
3972 
3973   /**
3974    * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
3975    */
3976   class RegionScannerImpl implements RegionScanner {
3977     // Package local for testability
3978     KeyValueHeap storeHeap = null;
3979     /** Heap of key-values that are not essential for the provided filters and are thus read
3980      * on demand, if on-demand column family loading is enabled.*/
3981     KeyValueHeap joinedHeap = null;
3982     /**
3983      * If the joined heap data gathering is interrupted due to scan limits, this will
3984      * contain the row for which we are populating the values.*/
3985     protected Cell joinedContinuationRow = null;
3986     // KeyValue indicating that limit is reached when scanning
3987     private final KeyValue KV_LIMIT = new KeyValue();
3988     protected final byte[] stopRow;
3989     private final FilterWrapper filter;
3990     private int batch;
3991     protected int isScan;
3992     private boolean filterClosed = false;
3993     private long readPt;
3994     private long maxResultSize;
3995     protected HRegion region;
3996 
3997     @Override
3998     public HRegionInfo getRegionInfo() {
3999       return region.getRegionInfo();
4000     }
4001 
4002     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
4003         throws IOException {
4004 
4005       this.region = region;
4006       this.maxResultSize = scan.getMaxResultSize();
4007       if (scan.hasFilter()) {
4008         this.filter = new FilterWrapper(scan.getFilter());
4009       } else {
4010         this.filter = null;
4011       }
4012 
4013       this.batch = scan.getBatch();
4014       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
4015         this.stopRow = null;
4016       } else {
4017         this.stopRow = scan.getStopRow();
4018       }
4019       // If we are doing a get, we want to be [startRow,endRow] normally
4020       // it is [startRow,endRow) and if startRow=endRow we get nothing.
4021       this.isScan = scan.isGetScan() ? -1 : 0;
4022 
4023       // synchronize on scannerReadPoints so that nobody calculates
4024       // getSmallestReadPoint, before scannerReadPoints is updated.
4025       IsolationLevel isolationLevel = scan.getIsolationLevel();
4026       synchronized(scannerReadPoints) {
4027         this.readPt = getReadpoint(isolationLevel);
4028         scannerReadPoints.put(this, this.readPt);
4029       }
4030 
4031       // Here we separate all scanners into two lists - scanner that provide data required
4032       // by the filter to operate (scanners list) and all others (joinedScanners list).
4033       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
4034       List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
4035       if (additionalScanners != null) {
4036         scanners.addAll(additionalScanners);
4037       }
4038 
4039       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
4040           scan.getFamilyMap().entrySet()) {
4041         Store store = stores.get(entry.getKey());
4042         KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
4043         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
4044           || this.filter.isFamilyEssential(entry.getKey())) {
4045           scanners.add(scanner);
4046         } else {
4047           joinedScanners.add(scanner);
4048         }
4049       }
4050       initializeKVHeap(scanners, joinedScanners, region);
4051     }
4052 
4053     protected void initializeKVHeap(List<KeyValueScanner> scanners,
4054         List<KeyValueScanner> joinedScanners, HRegion region)
4055         throws IOException {
4056       this.storeHeap = new KeyValueHeap(scanners, region.comparator);
4057       if (!joinedScanners.isEmpty()) {
4058         this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
4059       }
4060     }
4061 
4062     @Override
4063     public long getMaxResultSize() {
4064       return maxResultSize;
4065     }
4066 
4067     @Override
4068     public long getMvccReadPoint() {
4069       return this.readPt;
4070     }
4071 
4072     /**
4073      * Reset both the filter and the old filter.
4074      *
4075      * @throws IOException in case a filter raises an I/O exception.
4076      */
4077     protected void resetFilters() throws IOException {
4078       if (filter != null) {
4079         filter.reset();
4080       }
4081     }
4082 
4083     @Override
4084     public boolean next(List<Cell> outResults)
4085         throws IOException {
4086       // apply the batching limit by default
4087       return next(outResults, batch);
4088     }
4089 
4090     @Override
4091     public synchronized boolean next(List<Cell> outResults, int limit) throws IOException {
4092       if (this.filterClosed) {
4093         throw new UnknownScannerException("Scanner was closed (timed out?) " +
4094             "after we renewed it. Could be caused by a very slow scanner " +
4095             "or a lengthy garbage collection");
4096       }
4097       startRegionOperation(Operation.SCAN);
4098       readRequestsCount.increment();
4099       try {
4100         return nextRaw(outResults, limit);
4101       } finally {
4102         closeRegionOperation(Operation.SCAN);
4103       }
4104     }
4105 
4106     @Override
4107     public boolean nextRaw(List<Cell> outResults)
4108         throws IOException {
4109       return nextRaw(outResults, batch);
4110     }
4111 
4112     @Override
4113     public boolean nextRaw(List<Cell> outResults, int limit) throws IOException {
4114       if (storeHeap == null) {
4115         // scanner is closed
4116         throw new UnknownScannerException("Scanner was closed");
4117       }
4118       boolean returnResult;
4119       if (outResults.isEmpty()) {
4120         // Usually outResults is empty. This is true when next is called
4121         // to handle scan or get operation.
4122         returnResult = nextInternal(outResults, limit);
4123       } else {
4124         List<Cell> tmpList = new ArrayList<Cell>();
4125         returnResult = nextInternal(tmpList, limit);
4126         outResults.addAll(tmpList);
4127       }
4128       resetFilters();
4129       if (isFilterDoneInternal()) {
4130         returnResult = false;
4131       }
4132       return returnResult;
4133     }
4134 
4135     private void populateFromJoinedHeap(List<Cell> results, int limit)
4136         throws IOException {
4137       assert joinedContinuationRow != null;
4138       Cell kv = populateResult(results, this.joinedHeap, limit,
4139           joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
4140           joinedContinuationRow.getRowLength());
4141       if (kv != KV_LIMIT) {
4142         // We are done with this row, reset the continuation.
4143         joinedContinuationRow = null;
4144       }
4145       // As the data is obtained from two independent heaps, we need to
4146       // ensure that result list is sorted, because Result relies on that.
4147       Collections.sort(results, comparator);
4148     }
4149 
4150     /**
4151      * Fetches records with currentRow into results list, until next row or limit (if not -1).
4152      * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
4153      * @param limit Max amount of KVs to place in result list, -1 means no limit.
4154      * @param currentRow Byte array with key we are fetching.
4155      * @param offset offset for currentRow
4156      * @param length length for currentRow
4157      * @return KV_LIMIT if limit reached, next KeyValue otherwise.
4158      */
4159     private Cell populateResult(List<Cell> results, KeyValueHeap heap, int limit,
4160         byte[] currentRow, int offset, short length) throws IOException {
4161       Cell nextKv;
4162       do {
4163         heap.next(results, limit - results.size());
4164         if (limit > 0 && results.size() == limit) {
4165           return KV_LIMIT;
4166         }
4167         nextKv = heap.peek();
4168       } while (nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length));
4169 
4170       return nextKv;
4171     }
4172 
4173     /*
4174      * @return True if a filter rules the scanner is over, done.
4175      */
4176     @Override
4177     public synchronized boolean isFilterDone() throws IOException {
4178       return isFilterDoneInternal();
4179     }
4180 
4181     private boolean isFilterDoneInternal() throws IOException {
4182       return this.filter != null && this.filter.filterAllRemaining();
4183     }
4184 
4185     private boolean nextInternal(List<Cell> results, int limit)
4186     throws IOException {
4187       if (!results.isEmpty()) {
4188         throw new IllegalArgumentException("First parameter should be an empty list");
4189       }
4190       RpcCallContext rpcCall = RpcServer.getCurrentCall();
4191       // The loop here is used only when at some point during the next we determine
4192       // that due to effects of filters or otherwise, we have an empty row in the result.
4193       // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
4194       // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
4195       // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
4196       while (true) {
4197         if (rpcCall != null) {
4198           // If a user specifies a too-restrictive or too-slow scanner, the
4199           // client might time out and disconnect while the server side
4200           // is still processing the request. We should abort aggressively
4201           // in that case.
4202           long afterTime = rpcCall.disconnectSince();
4203           if (afterTime >= 0) {
4204             throw new CallerDisconnectedException(
4205                 "Aborting on region " + getRegionNameAsString() + ", call " +
4206                     this + " after " + afterTime + " ms, since " +
4207                     "caller disconnected");
4208           }
4209         }
4210 
4211         // Let's see what we have in the storeHeap.
4212         Cell current = this.storeHeap.peek();
4213 
4214         byte[] currentRow = null;
4215         int offset = 0;
4216         short length = 0;
4217         if (current != null) {
4218           currentRow = current.getRowArray();
4219           offset = current.getRowOffset();
4220           length = current.getRowLength();
4221         }
4222         boolean stopRow = isStopRow(currentRow, offset, length);
4223         // Check if we were getting data from the joinedHeap and hit the limit.
4224         // If not, then it's main path - getting results from storeHeap.
4225         if (joinedContinuationRow == null) {
4226           // First, check if we are at a stop row. If so, there are no more results.
4227           if (stopRow) {
4228             if (filter != null && filter.hasFilterRow()) {
4229               filter.filterRowCells(results);
4230             }
4231             return false;
4232           }
4233 
4234           // Check if rowkey filter wants to exclude this row. If so, loop to next.
4235           // Technically, if we hit limits before on this row, we don't need this call.
4236           if (filterRowKey(currentRow, offset, length)) {
4237             boolean moreRows = nextRow(currentRow, offset, length);
4238             if (!moreRows) return false;
4239             results.clear();
4240             continue;
4241           }
4242 
4243           Cell nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
4244               length);
4245           // Ok, we are good, let's try to get some results from the main heap.
4246           if (nextKv == KV_LIMIT) {
4247             if (this.filter != null && filter.hasFilterRow()) {
4248               throw new IncompatibleFilterException(
4249                 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
4250             }
4251             return true; // We hit the limit.
4252           }
4253 
4254           stopRow = nextKv == null ||
4255               isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
4256           // save that the row was empty before filters applied to it.
4257           final boolean isEmptyRow = results.isEmpty();
4258 
4259           // We have the part of the row necessary for filtering (all of it, usually).
4260           // First filter with the filterRow(List).
4261           FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
4262           if (filter != null && filter.hasFilterRow()) {
4263             ret = filter.filterRowCellsWithRet(results);
4264           }
4265 
4266           if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
4267             results.clear();
4268             boolean moreRows = nextRow(currentRow, offset, length);
4269             if (!moreRows) return false;
4270 
4271             // This row was totally filtered out, if this is NOT the last row,
4272             // we should continue on. Otherwise, nothing else to do.
4273             if (!stopRow) continue;
4274             return false;
4275           }
4276 
4277           // Ok, we are done with storeHeap for this row.
4278           // Now we may need to fetch additional, non-essential data into row.
4279           // These values are not needed for filter to work, so we postpone their
4280           // fetch to (possibly) reduce amount of data loads from disk.
4281           if (this.joinedHeap != null) {
4282             Cell nextJoinedKv = joinedHeap.peek();
4283             // If joinedHeap is pointing to some other row, try to seek to a correct one.
4284             boolean mayHaveData = (nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv,
4285                 currentRow, offset, length))
4286                 || (this.joinedHeap.requestSeek(
4287                     KeyValueUtil.createFirstOnRow(currentRow, offset, length), true, true)
4288                     && joinedHeap.peek() != null && CellUtil.matchingRow(joinedHeap.peek(),
4289                     currentRow, offset, length));
4290             if (mayHaveData) {
4291               joinedContinuationRow = current;
4292               populateFromJoinedHeap(results, limit);
4293             }
4294           }
4295         } else {
4296           // Populating from the joined heap was stopped by limits, populate some more.
4297           populateFromJoinedHeap(results, limit);
4298         }
4299 
4300         // We may have just called populateFromJoinedMap and hit the limits. If that is
4301         // the case, we need to call it again on the next next() invocation.
4302         if (joinedContinuationRow != null) {
4303           return true;
4304         }
4305 
4306         // Finally, we are done with both joinedHeap and storeHeap.
4307         // Double check to prevent empty rows from appearing in result. It could be
4308         // the case when SingleColumnValueExcludeFilter is used.
4309         if (results.isEmpty()) {
4310           boolean moreRows = nextRow(currentRow, offset, length);
4311           if (!moreRows) return false;
4312           if (!stopRow) continue;
4313         }
4314 
4315         // We are done. Return the result.
4316         return !stopRow;
4317       }
4318     }
4319 
4320     /**
4321      * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines
4322      * both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may
4323      * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns
4324      * true when filterRow(List<KeyValue> kvs) is overridden not the filterRow(). Therefore, the
4325      * filterRow() will be skipped.
4326      */
4327     private boolean filterRow() throws IOException {
4328       // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
4329       // filterRowCells(List<Cell> kvs) so we skip that scenario here.
4330       return filter != null && (!filter.hasFilterRow())
4331           && filter.filterRow();
4332     }
4333 
4334     private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
4335       return filter != null
4336           && filter.filterRowKey(row, offset, length);
4337     }
4338 
4339     protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
4340       assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
4341       Cell next;
4342       while ((next = this.storeHeap.peek()) != null &&
4343              CellUtil.matchingRow(next, currentRow, offset, length)) {
4344         this.storeHeap.next(MOCKED_LIST);
4345       }
4346       resetFilters();
4347       // Calling the hook in CP which allows it to do a fast forward
4348       return this.region.getCoprocessorHost() == null
4349           || this.region.getCoprocessorHost()
4350               .postScannerFilterRow(this, currentRow, offset, length);
4351     }
4352 
4353     protected boolean isStopRow(byte[] currentRow, int offset, short length) {
4354       return currentRow == null ||
4355           (stopRow != null &&
4356           comparator.compareRows(stopRow, 0, stopRow.length,
4357             currentRow, offset, length) <= isScan);
4358     }
4359 
4360     @Override
4361     public synchronized void close() {
4362       if (storeHeap != null) {
4363         storeHeap.close();
4364         storeHeap = null;
4365       }
4366       if (joinedHeap != null) {
4367         joinedHeap.close();
4368         joinedHeap = null;
4369       }
4370       // no need to synchronize here.
4371       scannerReadPoints.remove(this);
4372       this.filterClosed = true;
4373     }
4374 
4375     KeyValueHeap getStoreHeapForTesting() {
4376       return storeHeap;
4377     }
4378 
4379     @Override
4380     public synchronized boolean reseek(byte[] row) throws IOException {
4381       if (row == null) {
4382         throw new IllegalArgumentException("Row cannot be null.");
4383       }
4384       boolean result = false;
4385       startRegionOperation();
4386       try {
4387         KeyValue kv = KeyValueUtil.createFirstOnRow(row);
4388         // use request seek to make use of the lazy seek option. See HBASE-5520
4389         result = this.storeHeap.requestSeek(kv, true, true);
4390         if (this.joinedHeap != null) {
4391           result = this.joinedHeap.requestSeek(kv, true, true) || result;
4392         }
4393       } finally {
4394         closeRegionOperation();
4395       }
4396       return result;
4397     }
4398   }
4399 
4400   // Utility methods
4401   /**
4402    * A utility method to create new instances of HRegion based on the
4403    * {@link HConstants#REGION_IMPL} configuration property.
4404    * @param tableDir qualified path of directory where region should be located,
4405    * usually the table directory.
4406    * @param wal The WAL is the outbound log for any updates to the HRegion
4407    * The wal file is a logfile from the previous execution that's
4408    * custom-computed for this HRegion. The HRegionServer computes and sorts the
4409    * appropriate wal info for this HRegion. If there is a previous file
4410    * (implying that the HRegion has been written-to before), then read it from
4411    * the supplied path.
4412    * @param fs is the filesystem.
4413    * @param conf is global configuration settings.
4414    * @param regionInfo - HRegionInfo that describes the region
4415    * is new), then read them from the supplied path.
4416    * @param htd the table descriptor
4417    * @return the new instance
4418    */
4419   static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
4420       Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
4421       RegionServerServices rsServices) {
4422     try {
4423       @SuppressWarnings("unchecked")
4424       Class<? extends HRegion> regionClass =
4425           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
4426 
4427       Constructor<? extends HRegion> c =
4428           regionClass.getConstructor(Path.class, WAL.class, FileSystem.class,
4429               Configuration.class, HRegionInfo.class, HTableDescriptor.class,
4430               RegionServerServices.class);
4431 
4432       return c.newInstance(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
4433     } catch (Throwable e) {
4434       // todo: what should I throw here?
4435       throw new IllegalStateException("Could not instantiate a region instance.", e);
4436     }
4437   }
4438 
4439   /**
4440    * Convenience method creating new HRegions. Used by createTable and by the
4441    * bootstrap code in the HMaster constructor.
4442    * Note, this method creates an {@link WAL} for the created region. It
4443    * needs to be closed explicitly.  Use {@link HRegion#getWAL()} to get
4444    * access.  <b>When done with a region created using this method, you will
4445    * need to explicitly close the {@link WAL} it created too; it will not be
4446    * done for you.  Not closing the wal will leave at least a daemon thread
4447    * running.</b>  Call {@link #closeHRegion(HRegion)} and it will do
4448    * necessary cleanup for you.
4449    * @param info Info for region to create.
4450    * @param rootDir Root directory for HBase instance
4451    * @return new HRegion
4452    *
4453    * @throws IOException
4454    */
4455   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4456       final Configuration conf, final HTableDescriptor hTableDescriptor)
4457   throws IOException {
4458     return createHRegion(info, rootDir, conf, hTableDescriptor, null);
4459   }
4460 
4461   /**
4462    * This will do the necessary cleanup a call to
4463    * {@link #createHRegion(HRegionInfo, Path, Configuration, HTableDescriptor)}
4464    * requires.  This method will close the region and then close its
4465    * associated {@link WAL} file.  You can still use it if you call the other createHRegion,
4466    * the one that takes an {@link WAL} instance but don't be surprised by the
4467    * call to the {@link WAL#close()} on the {@link WAL} the
4468    * HRegion was carrying.
4469    * @throws IOException
4470    */
4471   public static void closeHRegion(final HRegion r) throws IOException {
4472     if (r == null) return;
4473     r.close();
4474     if (r.getWAL() == null) return;
4475     r.getWAL().close();
4476   }
4477 
4478   /**
4479    * Convenience method creating new HRegions. Used by createTable.
4480    * The {@link WAL} for the created region needs to be closed explicitly.
4481    * Use {@link HRegion#getWAL()} to get access.
4482    *
4483    * @param info Info for region to create.
4484    * @param rootDir Root directory for HBase instance
4485    * @param wal shared WAL
4486    * @param initialize - true to initialize the region
4487    * @return new HRegion
4488    *
4489    * @throws IOException
4490    */
4491   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4492                                       final Configuration conf,
4493                                       final HTableDescriptor hTableDescriptor,
4494                                       final WAL wal,
4495                                       final boolean initialize)
4496       throws IOException {
4497     return createHRegion(info, rootDir, conf, hTableDescriptor,
4498         wal, initialize, false);
4499   }
4500 
4501   /**
4502    * Convenience method creating new HRegions. Used by createTable.
4503    * The {@link WAL} for the created region needs to be closed
4504    * explicitly, if it is not null.
4505    * Use {@link HRegion#getWAL()} to get access.
4506    *
4507    * @param info Info for region to create.
4508    * @param rootDir Root directory for HBase instance
4509    * @param wal shared WAL
4510    * @param initialize - true to initialize the region
4511    * @param ignoreWAL - true to skip generate new wal if it is null, mostly for createTable
4512    * @return new HRegion
4513    * @throws IOException
4514    */
4515   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4516                                       final Configuration conf,
4517                                       final HTableDescriptor hTableDescriptor,
4518                                       final WAL wal,
4519                                       final boolean initialize, final boolean ignoreWAL)
4520       throws IOException {
4521       Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4522       return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, wal, initialize,
4523           ignoreWAL);
4524   }
4525 
4526   /**
4527    * Convenience method creating new HRegions. Used by createTable.
4528    * The {@link WAL} for the created region needs to be closed
4529    * explicitly, if it is not null.
4530    * Use {@link HRegion#getWAL()} to get access.
4531    *
4532    * @param info Info for region to create.
4533    * @param rootDir Root directory for HBase instance
4534    * @param tableDir table directory
4535    * @param wal shared WAL
4536    * @param initialize - true to initialize the region
4537    * @param ignoreWAL - true to skip generate new wal if it is null, mostly for createTable
4538    * @return new HRegion
4539    * @throws IOException
4540    */
4541   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
4542                                       final Configuration conf,
4543                                       final HTableDescriptor hTableDescriptor,
4544                                       final WAL wal,
4545                                       final boolean initialize, final boolean ignoreWAL)
4546       throws IOException {
4547     LOG.info("creating HRegion " + info.getTable().getNameAsString()
4548         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
4549         " Table name == " + info.getTable().getNameAsString());
4550     FileSystem fs = FileSystem.get(conf);
4551     HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
4552     WAL effectiveWAL = wal;
4553     if (wal == null && !ignoreWAL) {
4554       // TODO HBASE-11983 There'll be no roller for this wal?
4555       effectiveWAL = (new WALFactory(conf,
4556           Collections.<WALActionsListener>singletonList(new MetricsWAL()),
4557           "hregion-" + RandomStringUtils.random(8))).getWAL(info.getEncodedNameAsBytes());
4558     }
4559     HRegion region = HRegion.newHRegion(tableDir,
4560         effectiveWAL, fs, conf, info, hTableDescriptor, null);
4561     if (initialize) {
4562       // If initializing, set the sequenceId. It is also required by WALPerformanceEvaluation when
4563       // verifying the WALEdits.
4564       region.setSequenceId(region.initialize(null));
4565     }
4566     return region;
4567   }
4568 
4569   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4570                                       final Configuration conf,
4571                                       final HTableDescriptor hTableDescriptor,
4572                                       final WAL wal)
4573     throws IOException {
4574     return createHRegion(info, rootDir, conf, hTableDescriptor, wal, true);
4575   }
4576 
4577 
4578   /**
4579    * Open a Region.
4580    * @param info Info for region to be opened.
4581    * @param wal WAL for region to use. This method will call
4582    * WAL#setSequenceNumber(long) passing the result of the call to
4583    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4584    * up.  HRegionStore does this every time it opens a new region.
4585    * @return new HRegion
4586    *
4587    * @throws IOException
4588    */
4589   public static HRegion openHRegion(final HRegionInfo info,
4590       final HTableDescriptor htd, final WAL wal,
4591       final Configuration conf)
4592   throws IOException {
4593     return openHRegion(info, htd, wal, conf, null, null);
4594   }
4595 
4596   /**
4597    * Open a Region.
4598    * @param info Info for region to be opened
4599    * @param htd the table descriptor
4600    * @param wal WAL for region to use. This method will call
4601    * WAL#setSequenceNumber(long) passing the result of the call to
4602    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4603    * up.  HRegionStore does this every time it opens a new region.
4604    * @param conf The Configuration object to use.
4605    * @param rsServices An interface we can request flushes against.
4606    * @param reporter An interface we can report progress against.
4607    * @return new HRegion
4608    *
4609    * @throws IOException
4610    */
4611   public static HRegion openHRegion(final HRegionInfo info,
4612     final HTableDescriptor htd, final WAL wal, final Configuration conf,
4613     final RegionServerServices rsServices,
4614     final CancelableProgressable reporter)
4615   throws IOException {
4616     return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
4617   }
4618 
4619   /**
4620    * Open a Region.
4621    * @param rootDir Root directory for HBase instance
4622    * @param info Info for region to be opened.
4623    * @param htd the table descriptor
4624    * @param wal WAL for region to use. This method will call
4625    * WAL#setSequenceNumber(long) passing the result of the call to
4626    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4627    * up.  HRegionStore does this every time it opens a new region.
4628    * @param conf The Configuration object to use.
4629    * @return new HRegion
4630    * @throws IOException
4631    */
4632   public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
4633       final HTableDescriptor htd, final WAL wal, final Configuration conf)
4634   throws IOException {
4635     return openHRegion(rootDir, info, htd, wal, conf, null, null);
4636   }
4637 
4638   /**
4639    * Open a Region.
4640    * @param rootDir Root directory for HBase instance
4641    * @param info Info for region to be opened.
4642    * @param htd the table descriptor
4643    * @param wal WAL for region to use. This method will call
4644    * WAL#setSequenceNumber(long) passing the result of the call to
4645    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4646    * up.  HRegionStore does this every time it opens a new region.
4647    * @param conf The Configuration object to use.
4648    * @param rsServices An interface we can request flushes against.
4649    * @param reporter An interface we can report progress against.
4650    * @return new HRegion
4651    * @throws IOException
4652    */
4653   public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
4654       final HTableDescriptor htd, final WAL wal, final Configuration conf,
4655       final RegionServerServices rsServices,
4656       final CancelableProgressable reporter)
4657   throws IOException {
4658     FileSystem fs = null;
4659     if (rsServices != null) {
4660       fs = rsServices.getFileSystem();
4661     }
4662     if (fs == null) {
4663       fs = FileSystem.get(conf);
4664     }
4665     return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
4666   }
4667 
4668   /**
4669    * Open a Region.
4670    * @param conf The Configuration object to use.
4671    * @param fs Filesystem to use
4672    * @param rootDir Root directory for HBase instance
4673    * @param info Info for region to be opened.
4674    * @param htd the table descriptor
4675    * @param wal WAL for region to use. This method will call
4676    * WAL#setSequenceNumber(long) passing the result of the call to
4677    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4678    * up.  HRegionStore does this every time it opens a new region.
4679    * @return new HRegion
4680    * @throws IOException
4681    */
4682   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4683       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal)
4684       throws IOException {
4685     return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
4686   }
4687 
4688   /**
4689    * Open a Region.
4690    * @param conf The Configuration object to use.
4691    * @param fs Filesystem to use
4692    * @param rootDir Root directory for HBase instance
4693    * @param info Info for region to be opened.
4694    * @param htd the table descriptor
4695    * @param wal WAL for region to use. This method will call
4696    * WAL#setSequenceNumber(long) passing the result of the call to
4697    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4698    * up.  HRegionStore does this every time it opens a new region.
4699    * @param rsServices An interface we can request flushes against.
4700    * @param reporter An interface we can report progress against.
4701    * @return new HRegion
4702    * @throws IOException
4703    */
4704   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4705       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal,
4706       final RegionServerServices rsServices, final CancelableProgressable reporter)
4707       throws IOException {
4708     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4709     return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
4710   }
4711 
4712   /**
4713    * Open a Region.
4714    * @param conf The Configuration object to use.
4715    * @param fs Filesystem to use
4716    * @param rootDir Root directory for HBase instance
4717    * @param info Info for region to be opened.
4718    * @param htd the table descriptor
4719    * @param wal WAL for region to use. This method will call
4720    * WAL#setSequenceNumber(long) passing the result of the call to
4721    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4722    * up.  HRegionStore does this every time it opens a new region.
4723    * @param rsServices An interface we can request flushes against.
4724    * @param reporter An interface we can report progress against.
4725    * @return new HRegion
4726    * @throws IOException
4727    */
4728   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4729       final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd,
4730       final WAL wal, final RegionServerServices rsServices,
4731       final CancelableProgressable reporter)
4732       throws IOException {
4733     if (info == null) throw new NullPointerException("Passed region info is null");
4734     if (LOG.isDebugEnabled()) {
4735       LOG.debug("Opening region: " + info);
4736     }
4737     HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
4738     return r.openHRegion(reporter);
4739   }
4740 
4741 
4742   /**
4743    * Useful when reopening a closed region (normally for unit tests)
4744    * @param other original object
4745    * @param reporter An interface we can report progress against.
4746    * @return new HRegion
4747    * @throws IOException
4748    */
4749   public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
4750       throws IOException {
4751     HRegionFileSystem regionFs = other.getRegionFileSystem();
4752     HRegion r = newHRegion(regionFs.getTableDir(), other.getWAL(), regionFs.getFileSystem(),
4753         other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
4754     return r.openHRegion(reporter);
4755   }
4756 
4757   /**
4758    * Open HRegion.
4759    * Calls initialize and sets sequenceId.
4760    * @return Returns <code>this</code>
4761    * @throws IOException
4762    */
4763   protected HRegion openHRegion(final CancelableProgressable reporter)
4764   throws IOException {
4765     checkCompressionCodecs();
4766 
4767     this.openSeqNum = initialize(reporter);
4768     this.setSequenceId(openSeqNum);
4769     if (wal != null && getRegionServerServices() != null) {
4770       writeRegionOpenMarker(wal, openSeqNum);
4771     }
4772     return this;
4773   }
4774 
4775   private void checkCompressionCodecs() throws IOException {
4776     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4777       CompressionTest.testCompression(fam.getCompression());
4778       CompressionTest.testCompression(fam.getCompactionCompression());
4779     }
4780   }
4781 
4782   /**
4783    * Create a daughter region from given a temp directory with the region data.
4784    * @param hri Spec. for daughter region to open.
4785    * @throws IOException
4786    */
4787   HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
4788     // Move the files from the temporary .splits to the final /table/region directory
4789     fs.commitDaughterRegion(hri);
4790 
4791     // Create the daughter HRegion instance
4792     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
4793         this.getBaseConf(), hri, this.getTableDesc(), rsServices);
4794     r.readRequestsCount.set(this.getReadRequestsCount() / 2);
4795     r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
4796     return r;
4797   }
4798 
4799   /**
4800    * Create a merged region given a temp directory with the region data.
4801    * @param region_b another merging region
4802    * @return merged HRegion
4803    * @throws IOException
4804    */
4805   HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
4806       final HRegion region_b) throws IOException {
4807     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
4808         fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
4809         this.getTableDesc(), this.rsServices);
4810     r.readRequestsCount.set(this.getReadRequestsCount()
4811         + region_b.getReadRequestsCount());
4812     r.writeRequestsCount.set(this.getWriteRequestsCount()
4813         + region_b.getWriteRequestsCount());
4814     this.fs.commitMergedRegion(mergedRegionInfo);
4815     return r;
4816   }
4817 
4818   /**
4819    * Inserts a new region's meta information into the passed
4820    * <code>meta</code> region. Used by the HMaster bootstrap code adding
4821    * new table to hbase:meta table.
4822    *
4823    * @param meta hbase:meta HRegion to be updated
4824    * @param r HRegion to add to <code>meta</code>
4825    *
4826    * @throws IOException
4827    */
4828   // TODO remove since only test and merge use this
4829   public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
4830     meta.checkResources();
4831     // The row key is the region name
4832     byte[] row = r.getRegionName();
4833     final long now = EnvironmentEdgeManager.currentTime();
4834     final List<Cell> cells = new ArrayList<Cell>(2);
4835     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4836       HConstants.REGIONINFO_QUALIFIER, now,
4837       r.getRegionInfo().toByteArray()));
4838     // Set into the root table the version of the meta table.
4839     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4840       HConstants.META_VERSION_QUALIFIER, now,
4841       Bytes.toBytes(HConstants.META_VERSION)));
4842     meta.put(row, HConstants.CATALOG_FAMILY, cells);
4843   }
4844 
4845   /**
4846    * Computes the Path of the HRegion
4847    *
4848    * @param tabledir qualified path for table
4849    * @param name ENCODED region name
4850    * @return Path of HRegion directory
4851    */
4852   @Deprecated
4853   public static Path getRegionDir(final Path tabledir, final String name) {
4854     return new Path(tabledir, name);
4855   }
4856 
4857   /**
4858    * Computes the Path of the HRegion
4859    *
4860    * @param rootdir qualified path of HBase root directory
4861    * @param info HRegionInfo for the region
4862    * @return qualified path of region directory
4863    */
4864   @Deprecated
4865   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
4866     return new Path(
4867       FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
4868   }
4869 
4870   /**
4871    * Determines if the specified row is within the row range specified by the
4872    * specified HRegionInfo
4873    *
4874    * @param info HRegionInfo that specifies the row range
4875    * @param row row to be checked
4876    * @return true if the row is within the range specified by the HRegionInfo
4877    */
4878   public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
4879     return ((info.getStartKey().length == 0) ||
4880         (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
4881         ((info.getEndKey().length == 0) ||
4882             (Bytes.compareTo(info.getEndKey(), row) > 0));
4883   }
4884 
4885   /**
4886    * Merge two HRegions.  The regions must be adjacent and must not overlap.
4887    *
4888    * @return new merged HRegion
4889    * @throws IOException
4890    */
4891   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
4892   throws IOException {
4893     HRegion a = srcA;
4894     HRegion b = srcB;
4895 
4896     // Make sure that srcA comes first; important for key-ordering during
4897     // write of the merged file.
4898     if (srcA.getStartKey() == null) {
4899       if (srcB.getStartKey() == null) {
4900         throw new IOException("Cannot merge two regions with null start key");
4901       }
4902       // A's start key is null but B's isn't. Assume A comes before B
4903     } else if ((srcB.getStartKey() == null) ||
4904       (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
4905       a = srcB;
4906       b = srcA;
4907     }
4908 
4909     if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
4910       throw new IOException("Cannot merge non-adjacent regions");
4911     }
4912     return merge(a, b);
4913   }
4914 
4915   /**
4916    * Merge two regions whether they are adjacent or not.
4917    *
4918    * @param a region a
4919    * @param b region b
4920    * @return new merged region
4921    * @throws IOException
4922    */
4923   public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
4924     if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
4925       throw new IOException("Regions do not belong to the same table");
4926     }
4927 
4928     FileSystem fs = a.getRegionFileSystem().getFileSystem();
4929     // Make sure each region's cache is empty
4930     a.flushcache();
4931     b.flushcache();
4932 
4933     // Compact each region so we only have one store file per family
4934     a.compactStores(true);
4935     if (LOG.isDebugEnabled()) {
4936       LOG.debug("Files for region: " + a);
4937       a.getRegionFileSystem().logFileSystemState(LOG);
4938     }
4939     b.compactStores(true);
4940     if (LOG.isDebugEnabled()) {
4941       LOG.debug("Files for region: " + b);
4942       b.getRegionFileSystem().logFileSystemState(LOG);
4943     }
4944 
4945     RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
4946     if (!rmt.prepare(null)) {
4947       throw new IOException("Unable to merge regions " + a + " and " + b);
4948     }
4949     HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
4950     LOG.info("starting merge of regions: " + a + " and " + b
4951         + " into new region " + mergedRegionInfo.getRegionNameAsString()
4952         + " with start key <"
4953         + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
4954         + "> and end key <"
4955         + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
4956     HRegion dstRegion;
4957     try {
4958       dstRegion = rmt.execute(null, null);
4959     } catch (IOException ioe) {
4960       rmt.rollback(null, null);
4961       throw new IOException("Failed merging region " + a + " and " + b
4962           + ", and successfully rolled back");
4963     }
4964     dstRegion.compactStores(true);
4965 
4966     if (LOG.isDebugEnabled()) {
4967       LOG.debug("Files for new region");
4968       dstRegion.getRegionFileSystem().logFileSystemState(LOG);
4969     }
4970 
4971     if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
4972       throw new IOException("Merged region " + dstRegion
4973           + " still has references after the compaction, is compaction canceled?");
4974     }
4975 
4976     // Archiving the 'A' region
4977     HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
4978     // Archiving the 'B' region
4979     HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
4980 
4981     LOG.info("merge completed. New region is " + dstRegion);
4982     return dstRegion;
4983   }
4984 
4985   //
4986   // HBASE-880
4987   //
4988   /**
4989    * @param get get object
4990    * @return result
4991    * @throws IOException read exceptions
4992    */
4993   public Result get(final Get get) throws IOException {
4994     checkRow(get.getRow(), "Get");
4995     // Verify families are all valid
4996     if (get.hasFamilies()) {
4997       for (byte [] family: get.familySet()) {
4998         checkFamily(family);
4999       }
5000     } else { // Adding all families to scanner
5001       for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
5002         get.addFamily(family);
5003       }
5004     }
5005     List<Cell> results = get(get, true);
5006     boolean stale = this.getRegionInfo().getReplicaId() != 0;
5007     return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
5008   }
5009 
5010   /*
5011    * Do a get based on the get parameter.
5012    * @param withCoprocessor invoke coprocessor or not. We don't want to
5013    * always invoke cp for this private method.
5014    */
5015   public List<Cell> get(Get get, boolean withCoprocessor)
5016   throws IOException {
5017 
5018     List<Cell> results = new ArrayList<Cell>();
5019 
5020     // pre-get CP hook
5021     if (withCoprocessor && (coprocessorHost != null)) {
5022        if (coprocessorHost.preGet(get, results)) {
5023          return results;
5024        }
5025     }
5026 
5027     Scan scan = new Scan(get);
5028 
5029     RegionScanner scanner = null;
5030     try {
5031       scanner = getScanner(scan);
5032       scanner.next(results);
5033     } finally {
5034       if (scanner != null)
5035         scanner.close();
5036     }
5037 
5038     // post-get CP hook
5039     if (withCoprocessor && (coprocessorHost != null)) {
5040       coprocessorHost.postGet(get, results);
5041     }
5042 
5043     // do after lock
5044     if (this.metricsRegion != null) {
5045       long totalSize = 0l;
5046       for (Cell cell : results) {
5047         totalSize += CellUtil.estimatedSerializedSizeOf(cell);
5048       }
5049       this.metricsRegion.updateGet(totalSize);
5050     }
5051 
5052     return results;
5053   }
5054 
5055   public void mutateRow(RowMutations rm) throws IOException {
5056     // Don't need nonces here - RowMutations only supports puts and deletes
5057     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
5058   }
5059 
5060   /**
5061    * Perform atomic mutations within the region w/o nonces.
5062    * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)}
5063    */
5064   public void mutateRowsWithLocks(Collection<Mutation> mutations,
5065       Collection<byte[]> rowsToLock) throws IOException {
5066     mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
5067   }
5068 
5069   /**
5070    * Perform atomic mutations within the region.
5071    * @param mutations The list of mutations to perform.
5072    * <code>mutations</code> can contain operations for multiple rows.
5073    * Caller has to ensure that all rows are contained in this region.
5074    * @param rowsToLock Rows to lock
5075    * @param nonceGroup Optional nonce group of the operation (client Id)
5076    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5077    * If multiple rows are locked care should be taken that
5078    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
5079    * @throws IOException
5080    */
5081   public void mutateRowsWithLocks(Collection<Mutation> mutations,
5082       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
5083     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
5084     processRowsWithLocks(proc, -1, nonceGroup, nonce);
5085   }
5086 
5087   /**
5088    * Performs atomic multiple reads and writes on a given row.
5089    *
5090    * @param processor The object defines the reads and writes to a row.
5091    * @param nonceGroup Optional nonce group of the operation (client Id)
5092    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5093    */
5094   public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
5095       throws IOException {
5096     processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
5097   }
5098 
5099   /**
5100    * Performs atomic multiple reads and writes on a given row.
5101    *
5102    * @param processor The object defines the reads and writes to a row.
5103    * @param timeout The timeout of the processor.process() execution
5104    *                Use a negative number to switch off the time bound
5105    * @param nonceGroup Optional nonce group of the operation (client Id)
5106    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5107    */
5108   public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
5109       long nonceGroup, long nonce) throws IOException {
5110 
5111     for (byte[] row : processor.getRowsToLock()) {
5112       checkRow(row, "processRowsWithLocks");
5113     }
5114     if (!processor.readOnly()) {
5115       checkReadOnly();
5116     }
5117     checkResources();
5118 
5119     startRegionOperation();
5120     WALEdit walEdit = new WALEdit();
5121 
5122     // 1. Run pre-process hook
5123     try {
5124       processor.preProcess(this, walEdit);
5125     } catch (IOException e) {
5126       closeRegionOperation();
5127       throw e;
5128     }
5129     // Short circuit the read only case
5130     if (processor.readOnly()) {
5131       try {
5132         long now = EnvironmentEdgeManager.currentTime();
5133         doProcessRowWithTimeout(
5134             processor, now, this, null, null, timeout);
5135         processor.postProcess(this, walEdit, true);
5136       } catch (IOException e) {
5137         throw e;
5138       } finally {
5139         closeRegionOperation();
5140       }
5141       return;
5142     }
5143 
5144     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
5145     boolean locked;
5146     boolean walSyncSuccessful = false;
5147     List<RowLock> acquiredRowLocks;
5148     long addedSize = 0;
5149     List<Mutation> mutations = new ArrayList<Mutation>();
5150     List<Cell> memstoreCells = new ArrayList<Cell>();
5151     Collection<byte[]> rowsToLock = processor.getRowsToLock();
5152     long mvccNum = 0;
5153     WALKey walKey = null;
5154     try {
5155       // 2. Acquire the row lock(s)
5156       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
5157       for (byte[] row : rowsToLock) {
5158         // Attempt to lock all involved rows, throw if any lock times out
5159         acquiredRowLocks.add(getRowLock(row));
5160       }
5161       // 3. Region lock
5162       lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
5163       locked = true;
5164       // Get a mvcc write number
5165       mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5166 
5167       long now = EnvironmentEdgeManager.currentTime();
5168       try {
5169         // 4. Let the processor scan the rows, generate mutations and add
5170         //    waledits
5171         doProcessRowWithTimeout(
5172             processor, now, this, mutations, walEdit, timeout);
5173 
5174         if (!mutations.isEmpty()) {
5175           // 5. Start mvcc transaction
5176           writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5177           // 6. Call the preBatchMutate hook
5178           processor.preBatchMutate(this, walEdit);
5179           // 7. Apply to memstore
5180           for (Mutation m : mutations) {
5181             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5182               Cell cell = cellScanner.current();
5183               CellUtil.setSequenceId(cell, mvccNum);
5184               Store store = getStore(cell);
5185               if (store == null) {
5186                 checkFamily(CellUtil.cloneFamily(cell));
5187                 // unreachable
5188               }
5189               Pair<Long, Cell> ret = store.add(cell);
5190               addedSize += ret.getFirst();
5191               memstoreCells.add(ret.getSecond());
5192             }
5193           }
5194 
5195           long txid = 0;
5196           // 8. Append no sync
5197           if (!walEdit.isEmpty()) {
5198             // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
5199             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
5200               this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
5201               processor.getClusterIds(), nonceGroup, nonce);
5202             txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
5203               walKey, walEdit, getSequenceId(), true, memstoreCells);
5204           }
5205           if(walKey == null){
5206             // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
5207             // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
5208             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
5209           }
5210 
5211           // 9. Release region lock
5212           if (locked) {
5213             this.updatesLock.readLock().unlock();
5214             locked = false;
5215           }
5216 
5217           // 10. Release row lock(s)
5218           releaseRowLocks(acquiredRowLocks);
5219 
5220           // 11. Sync edit log
5221           if (txid != 0) {
5222             syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
5223           }
5224           walSyncSuccessful = true;
5225           // 12. call postBatchMutate hook
5226           processor.postBatchMutate(this);
5227         }
5228       } finally {
5229         if (!mutations.isEmpty() && !walSyncSuccessful) {
5230           LOG.warn("Wal sync failed. Roll back " + mutations.size() +
5231               " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
5232               processor.getRowsToLock().iterator().next()) + "...");
5233           for (Mutation m : mutations) {
5234             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5235               Cell cell = cellScanner.current();
5236               getStore(cell).rollback(cell);
5237             }
5238           }
5239         }
5240         // 13. Roll mvcc forward
5241         if (writeEntry != null) {
5242           mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
5243         }
5244         if (locked) {
5245           this.updatesLock.readLock().unlock();
5246         }
5247         // release locks if some were acquired but another timed out
5248         releaseRowLocks(acquiredRowLocks);
5249       }
5250 
5251       // 14. Run post-process hook
5252       processor.postProcess(this, walEdit, walSyncSuccessful);
5253 
5254     } catch (IOException e) {
5255       throw e;
5256     } finally {
5257       closeRegionOperation();
5258       if (!mutations.isEmpty() &&
5259           isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
5260         requestFlush();
5261       }
5262     }
5263   }
5264 
5265   private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
5266                                        final long now,
5267                                        final HRegion region,
5268                                        final List<Mutation> mutations,
5269                                        final WALEdit walEdit,
5270                                        final long timeout) throws IOException {
5271     // Short circuit the no time bound case.
5272     if (timeout < 0) {
5273       try {
5274         processor.process(now, region, mutations, walEdit);
5275       } catch (IOException e) {
5276         LOG.warn("RowProcessor:" + processor.getClass().getName() +
5277             " throws Exception on row(s):" +
5278             Bytes.toStringBinary(
5279               processor.getRowsToLock().iterator().next()) + "...", e);
5280         throw e;
5281       }
5282       return;
5283     }
5284 
5285     // Case with time bound
5286     FutureTask<Void> task =
5287       new FutureTask<Void>(new Callable<Void>() {
5288         @Override
5289         public Void call() throws IOException {
5290           try {
5291             processor.process(now, region, mutations, walEdit);
5292             return null;
5293           } catch (IOException e) {
5294             LOG.warn("RowProcessor:" + processor.getClass().getName() +
5295                 " throws Exception on row(s):" +
5296                 Bytes.toStringBinary(
5297                     processor.getRowsToLock().iterator().next()) + "...", e);
5298             throw e;
5299           }
5300         }
5301       });
5302     rowProcessorExecutor.execute(task);
5303     try {
5304       task.get(timeout, TimeUnit.MILLISECONDS);
5305     } catch (TimeoutException te) {
5306       LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
5307           Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
5308           "...");
5309       throw new IOException(te);
5310     } catch (Exception e) {
5311       throw new IOException(e);
5312     }
5313   }
5314 
5315   public Result append(Append append) throws IOException {
5316     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
5317   }
5318 
5319   // TODO: There's a lot of boiler plate code identical
5320   // to increment... See how to better unify that.
5321   /**
5322    * Perform one or more append operations on a row.
5323    *
5324    * @return new keyvalues after increment
5325    * @throws IOException
5326    */
5327   public Result append(Append append, long nonceGroup, long nonce)
5328       throws IOException {
5329     byte[] row = append.getRow();
5330     checkRow(row, "append");
5331     boolean flush = false;
5332     Durability durability = getEffectiveDurability(append.getDurability());
5333     boolean writeToWAL = durability != Durability.SKIP_WAL;
5334     WALEdit walEdits = null;
5335     List<Cell> allKVs = new ArrayList<Cell>(append.size());
5336     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5337 
5338     long size = 0;
5339     long txid = 0;
5340 
5341     checkReadOnly();
5342     checkResources();
5343     // Lock row
5344     startRegionOperation(Operation.APPEND);
5345     this.writeRequestsCount.increment();
5346     long mvccNum = 0;
5347     WriteEntry w = null;
5348     WALKey walKey = null;
5349     RowLock rowLock = null;
5350     List<Cell> memstoreCells = new ArrayList<Cell>();
5351     boolean doRollBackMemstore = false;
5352     try {
5353       rowLock = getRowLock(row);
5354       try {
5355         lock(this.updatesLock.readLock());
5356         try {
5357           // wait for all prior MVCC transactions to finish - while we hold the row lock
5358           // (so that we are guaranteed to see the latest state)
5359           mvcc.waitForPreviousTransactionsComplete();
5360           if (this.coprocessorHost != null) {
5361             Result r = this.coprocessorHost.preAppendAfterRowLock(append);
5362             if(r!= null) {
5363               return r;
5364             }
5365           }
5366           // now start my own transaction
5367           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5368           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5369           long now = EnvironmentEdgeManager.currentTime();
5370           // Process each family
5371           for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
5372 
5373             Store store = stores.get(family.getKey());
5374             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5375 
5376             // Sort the cells so that they match the order that they
5377             // appear in the Get results. Otherwise, we won't be able to
5378             // find the existing values if the cells are not specified
5379             // in order by the client since cells are in an array list.
5380             Collections.sort(family.getValue(), store.getComparator());
5381             // Get previous values for all columns in this family
5382             Get get = new Get(row);
5383             for (Cell cell : family.getValue()) {
5384               get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
5385             }
5386             List<Cell> results = get(get, false);
5387             // Iterate the input columns and update existing values if they were
5388             // found, otherwise add new column initialized to the append value
5389 
5390             // Avoid as much copying as possible. Every byte is copied at most
5391             // once.
5392             // Would be nice if KeyValue had scatter/gather logic
5393             int idx = 0;
5394             for (Cell cell : family.getValue()) {
5395               Cell newCell;
5396               Cell oldCell = null;
5397               if (idx < results.size()
5398                   && CellUtil.matchingQualifier(results.get(idx), cell)) {
5399                 oldCell = results.get(idx);
5400                 long ts = Math.max(now, oldCell.getTimestamp());
5401                 // allocate an empty kv once
5402                 newCell = new KeyValue(row.length, cell.getFamilyLength(),
5403                     cell.getQualifierLength(), ts, KeyValue.Type.Put,
5404                     oldCell.getValueLength() + cell.getValueLength(),
5405                     oldCell.getTagsLength() + cell.getTagsLength());
5406                 // copy in the value
5407                 System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
5408                     newCell.getValueArray(), newCell.getValueOffset(),
5409                     oldCell.getValueLength());
5410                 System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
5411                     newCell.getValueArray(),
5412                     newCell.getValueOffset() + oldCell.getValueLength(),
5413                     cell.getValueLength());
5414                 // copy in the tags
5415                 System.arraycopy(oldCell.getTagsArray(), oldCell.getTagsOffset(),
5416                     newCell.getTagsArray(), newCell.getTagsOffset(), oldCell.getTagsLength());
5417                 System.arraycopy(cell.getTagsArray(), cell.getTagsOffset(), newCell.getTagsArray(),
5418                     newCell.getTagsOffset() + oldCell.getTagsLength(), cell.getTagsLength());
5419                 // copy in row, family, and qualifier
5420                 System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
5421                     newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
5422                 System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
5423                     newCell.getFamilyArray(), newCell.getFamilyOffset(),
5424                     cell.getFamilyLength());
5425                 System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
5426                     newCell.getQualifierArray(), newCell.getQualifierOffset(),
5427                     cell.getQualifierLength());
5428                 idx++;
5429               } else {
5430                 // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
5431                 // so only need to update the timestamp to 'now'
5432                 CellUtil.updateLatestStamp(cell, now);
5433                 newCell = cell;
5434              }
5435               CellUtil.setSequenceId(newCell, mvccNum);
5436               // Give coprocessors a chance to update the new cell
5437               if (coprocessorHost != null) {
5438                 newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,
5439                     append, oldCell, newCell);
5440               }
5441               kvs.add(newCell);
5442 
5443               // Append update to WAL
5444               if (writeToWAL) {
5445                 if (walEdits == null) {
5446                   walEdits = new WALEdit();
5447                 }
5448                 walEdits.add(newCell);
5449               }
5450             }
5451 
5452             //store the kvs to the temporary memstore before writing WAL
5453             tempMemstore.put(store, kvs);
5454           }
5455 
5456           //Actually write to Memstore now
5457           for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5458             Store store = entry.getKey();
5459             if (store.getFamily().getMaxVersions() == 1) {
5460               // upsert if VERSIONS for this CF == 1
5461               size += store.upsert(entry.getValue(), getSmallestReadPoint());
5462               memstoreCells.addAll(entry.getValue());
5463             } else {
5464               // otherwise keep older versions around
5465               for (Cell cell: entry.getValue()) {
5466                 Pair<Long, Cell> ret = store.add(cell);
5467                 size += ret.getFirst();
5468                 memstoreCells.add(ret.getSecond());
5469                 doRollBackMemstore = true;
5470               }
5471             }
5472             allKVs.addAll(entry.getValue());
5473           }
5474 
5475           // Actually write to WAL now
5476           if (writeToWAL) {
5477             // Using default cluster id, as this can only happen in the originating
5478             // cluster. A slave cluster receives the final value (not the delta)
5479             // as a Put.
5480             // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
5481             walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
5482               this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
5483             txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
5484               this.sequenceId, true, memstoreCells);
5485           } else {
5486             recordMutationWithoutWal(append.getFamilyCellMap());
5487           }
5488           if (walKey == null) {
5489             // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
5490             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
5491           }
5492 
5493           size = this.addAndGetGlobalMemstoreSize(size);
5494           flush = isFlushSize(size);
5495         } finally {
5496           this.updatesLock.readLock().unlock();
5497         }
5498       } finally {
5499         rowLock.release();
5500         rowLock = null;
5501       }
5502       // sync the transaction log outside the rowlock
5503       if(txid != 0){
5504         syncOrDefer(txid, durability);
5505       }
5506       doRollBackMemstore = false;
5507     } finally {
5508       if (rowLock != null) {
5509         rowLock.release();
5510       }
5511       // if the wal sync was unsuccessful, remove keys from memstore
5512       if (doRollBackMemstore) {
5513         rollbackMemstore(memstoreCells);
5514       }
5515       if (w != null) {
5516         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5517       }
5518       closeRegionOperation(Operation.APPEND);
5519     }
5520 
5521     if (this.metricsRegion != null) {
5522       this.metricsRegion.updateAppend();
5523     }
5524 
5525     if (flush) {
5526       // Request a cache flush. Do it outside update lock.
5527       requestFlush();
5528     }
5529 
5530 
5531     return append.isReturnResults() ? Result.create(allKVs) : null;
5532   }
5533 
5534   public Result increment(Increment increment) throws IOException {
5535     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
5536   }
5537 
5538   /**
5539    * Perform one or more increment operations on a row.
5540    * @return new keyvalues after increment
5541    * @throws IOException
5542    */
5543   public Result increment(Increment increment, long nonceGroup, long nonce)
5544   throws IOException {
5545     byte [] row = increment.getRow();
5546     checkRow(row, "increment");
5547     TimeRange tr = increment.getTimeRange();
5548     boolean flush = false;
5549     Durability durability = getEffectiveDurability(increment.getDurability());
5550     boolean writeToWAL = durability != Durability.SKIP_WAL;
5551     WALEdit walEdits = null;
5552     List<Cell> allKVs = new ArrayList<Cell>(increment.size());
5553     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5554 
5555     long size = 0;
5556     long txid = 0;
5557 
5558     checkReadOnly();
5559     checkResources();
5560     // Lock row
5561     startRegionOperation(Operation.INCREMENT);
5562     this.writeRequestsCount.increment();
5563     RowLock rowLock = null;
5564     WriteEntry w = null;
5565     WALKey walKey = null;
5566     long mvccNum = 0;
5567     List<Cell> memstoreCells = new ArrayList<Cell>();
5568     boolean doRollBackMemstore = false;
5569     try {
5570       rowLock = getRowLock(row);
5571       try {
5572         lock(this.updatesLock.readLock());
5573         try {
5574           // wait for all prior MVCC transactions to finish - while we hold the row lock
5575           // (so that we are guaranteed to see the latest state)
5576           mvcc.waitForPreviousTransactionsComplete();
5577           if (this.coprocessorHost != null) {
5578             Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
5579             if (r != null) {
5580               return r;
5581             }
5582           }
5583           // now start my own transaction
5584           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5585           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5586           long now = EnvironmentEdgeManager.currentTime();
5587           // Process each family
5588           for (Map.Entry<byte [], List<Cell>> family:
5589               increment.getFamilyCellMap().entrySet()) {
5590 
5591             Store store = stores.get(family.getKey());
5592             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5593 
5594             // Sort the cells so that they match the order that they
5595             // appear in the Get results. Otherwise, we won't be able to
5596             // find the existing values if the cells are not specified
5597             // in order by the client since cells are in an array list.
5598             Collections.sort(family.getValue(), store.getComparator());
5599             // Get previous values for all columns in this family
5600             Get get = new Get(row);
5601             for (Cell cell: family.getValue()) {
5602               get.addColumn(family.getKey(),  CellUtil.cloneQualifier(cell));
5603             }
5604             get.setTimeRange(tr.getMin(), tr.getMax());
5605             List<Cell> results = get(get, false);
5606 
5607             // Iterate the input columns and update existing values if they were
5608             // found, otherwise add new column initialized to the increment amount
5609             int idx = 0;
5610             for (Cell kv: family.getValue()) {
5611               long amount = Bytes.toLong(CellUtil.cloneValue(kv));
5612               boolean noWriteBack = (amount == 0);
5613 
5614               Cell c = null;
5615               long ts = now;
5616               if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
5617                 c = results.get(idx);
5618                 ts = Math.max(now, c.getTimestamp());
5619                 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
5620                   amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
5621                 } else {
5622                   // throw DoNotRetryIOException instead of IllegalArgumentException
5623                   throw new org.apache.hadoop.hbase.DoNotRetryIOException(
5624                       "Attempted to increment field that isn't 64 bits wide");
5625                 }
5626                 idx++;
5627               }
5628 
5629               // Append new incremented KeyValue to list
5630               byte[] q = CellUtil.cloneQualifier(kv);
5631               byte[] val = Bytes.toBytes(amount);
5632               int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
5633               int incCellTagsLen = kv.getTagsLength();
5634               Cell newKV = new KeyValue(row.length, family.getKey().length, q.length, ts,
5635                   KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
5636               System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length);
5637               System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(),
5638                   family.getKey().length);
5639               System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length);
5640               // copy in the value
5641               System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length);
5642               // copy tags
5643               if (oldCellTagsLen > 0) {
5644                 System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(),
5645                     newKV.getTagsOffset(), oldCellTagsLen);
5646               }
5647               if (incCellTagsLen > 0) {
5648                 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
5649                     newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
5650               }
5651               CellUtil.setSequenceId(newKV, mvccNum);
5652               // Give coprocessors a chance to update the new cell
5653               if (coprocessorHost != null) {
5654                 newKV = coprocessorHost.postMutationBeforeWAL(
5655                     RegionObserver.MutationType.INCREMENT, increment, c, newKV);
5656               }
5657               allKVs.add(newKV);
5658 
5659               if (!noWriteBack) {
5660                 kvs.add(newKV);
5661 
5662                 // Prepare WAL updates
5663                 if (writeToWAL) {
5664                   if (walEdits == null) {
5665                     walEdits = new WALEdit();
5666                   }
5667                   walEdits.add(newKV);
5668                 }
5669               }
5670             }
5671 
5672             //store the kvs to the temporary memstore before writing WAL
5673             if (!kvs.isEmpty()) {
5674               tempMemstore.put(store, kvs);
5675             }
5676           }
5677 
5678           //Actually write to Memstore now
5679           if (!tempMemstore.isEmpty()) {
5680             for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5681               Store store = entry.getKey();
5682               if (store.getFamily().getMaxVersions() == 1) {
5683                 // upsert if VERSIONS for this CF == 1
5684                 size += store.upsert(entry.getValue(), getSmallestReadPoint());
5685                 memstoreCells.addAll(entry.getValue());
5686               } else {
5687                 // otherwise keep older versions around
5688                 for (Cell cell : entry.getValue()) {
5689                   Pair<Long, Cell> ret = store.add(cell);
5690                   size += ret.getFirst();
5691                   memstoreCells.add(ret.getSecond());
5692                   doRollBackMemstore = true;
5693                 }
5694               }
5695             }
5696             size = this.addAndGetGlobalMemstoreSize(size);
5697             flush = isFlushSize(size);
5698           }
5699 
5700           // Actually write to WAL now
5701           if (walEdits != null && !walEdits.isEmpty()) {
5702             if (writeToWAL) {
5703               // Using default cluster id, as this can only happen in the originating
5704               // cluster. A slave cluster receives the final value (not the delta)
5705               // as a Put.
5706               // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
5707               walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
5708                 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
5709               txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
5710                 walKey, walEdits, getSequenceId(), true, memstoreCells);
5711             } else {
5712               recordMutationWithoutWal(increment.getFamilyCellMap());
5713             }
5714           }
5715           if(walKey == null){
5716             // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
5717             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
5718           }
5719         } finally {
5720           this.updatesLock.readLock().unlock();
5721         }
5722       } finally {
5723         rowLock.release();
5724         rowLock = null;
5725       }
5726       // sync the transaction log outside the rowlock
5727       if(txid != 0){
5728         syncOrDefer(txid, durability);
5729       }
5730       doRollBackMemstore = false;
5731     } finally {
5732       if (rowLock != null) {
5733         rowLock.release();
5734       }
5735       // if the wal sync was unsuccessful, remove keys from memstore
5736       if (doRollBackMemstore) {
5737         rollbackMemstore(memstoreCells);
5738       }
5739       if (w != null) {
5740         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5741       }
5742       closeRegionOperation(Operation.INCREMENT);
5743       if (this.metricsRegion != null) {
5744         this.metricsRegion.updateIncrement();
5745       }
5746     }
5747 
5748     if (flush) {
5749       // Request a cache flush.  Do it outside update lock.
5750       requestFlush();
5751     }
5752 
5753     return Result.create(allKVs);
5754   }
5755 
5756   //
5757   // New HBASE-880 Helpers
5758   //
5759 
5760   private void checkFamily(final byte [] family)
5761   throws NoSuchColumnFamilyException {
5762     if (!this.htableDescriptor.hasFamily(family)) {
5763       throw new NoSuchColumnFamilyException("Column family " +
5764           Bytes.toString(family) + " does not exist in region " + this
5765           + " in table " + this.htableDescriptor);
5766     }
5767   }
5768 
5769   public static final long FIXED_OVERHEAD = ClassSize.align(
5770       ClassSize.OBJECT +
5771       ClassSize.ARRAY +
5772       41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
5773       (12 * Bytes.SIZEOF_LONG) +
5774       4 * Bytes.SIZEOF_BOOLEAN);
5775 
5776   // woefully out of date - currently missing:
5777   // 1 x HashMap - coprocessorServiceHandlers
5778   // 6 x Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
5779   //   checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,
5780   //   writeRequestsCount
5781   // 1 x HRegion$WriteState - writestate
5782   // 1 x RegionCoprocessorHost - coprocessorHost
5783   // 1 x RegionSplitPolicy - splitPolicy
5784   // 1 x MetricsRegion - metricsRegion
5785   // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper
5786   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
5787       ClassSize.OBJECT + // closeLock
5788       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
5789       (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
5790       (2 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, scannerReadPoints
5791       WriteState.HEAP_SIZE + // writestate
5792       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
5793       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
5794       MultiVersionConsistencyControl.FIXED_SIZE // mvcc
5795       + ClassSize.TREEMAP // maxSeqIdInStores
5796       + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
5797       ;
5798 
5799   @Override
5800   public long heapSize() {
5801     long heapSize = DEEP_OVERHEAD;
5802     for (Store store : this.stores.values()) {
5803       heapSize += store.heapSize();
5804     }
5805     // this does not take into account row locks, recent flushes, mvcc entries, and more
5806     return heapSize;
5807   }
5808 
5809   /*
5810    * This method calls System.exit.
5811    * @param message Message to print out.  May be null.
5812    */
5813   private static void printUsageAndExit(final String message) {
5814     if (message != null && message.length() > 0) System.out.println(message);
5815     System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
5816     System.out.println("Options:");
5817     System.out.println(" major_compact  Pass this option to major compact " +
5818       "passed region.");
5819     System.out.println("Default outputs scan of passed region.");
5820     System.exit(1);
5821   }
5822 
5823   /**
5824    * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
5825    * be available for handling
5826    * {@link HRegion#execService(com.google.protobuf.RpcController,
5827    *    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
5828    *
5829    * <p>
5830    * Only a single instance may be registered per region for a given {@link Service} subclass (the
5831    * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
5832    * After the first registration, subsequent calls with the same service name will fail with
5833    * a return value of {@code false}.
5834    * </p>
5835    * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
5836    * @return {@code true} if the registration was successful, {@code false}
5837    * otherwise
5838    */
5839   public boolean registerService(Service instance) {
5840     /*
5841      * No stacking of instances is allowed for a single service name
5842      */
5843     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
5844     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
5845       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
5846           " already registered, rejecting request from "+instance
5847       );
5848       return false;
5849     }
5850 
5851     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
5852     if (LOG.isDebugEnabled()) {
5853       LOG.debug("Registered coprocessor service: region="+
5854           Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
5855     }
5856     return true;
5857   }
5858 
5859   /**
5860    * Executes a single protocol buffer coprocessor endpoint {@link Service} method using
5861    * the registered protocol handlers.  {@link Service} implementations must be registered via the
5862    * {@link HRegion#registerService(com.google.protobuf.Service)}
5863    * method before they are available.
5864    *
5865    * @param controller an {@code RpcController} implementation to pass to the invoked service
5866    * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
5867    *     and parameters for the method invocation
5868    * @return a protocol buffer {@code Message} instance containing the method's result
5869    * @throws IOException if no registered service handler is found or an error
5870    *     occurs during the invocation
5871    * @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)
5872    */
5873   public Message execService(RpcController controller, CoprocessorServiceCall call)
5874       throws IOException {
5875     String serviceName = call.getServiceName();
5876     String methodName = call.getMethodName();
5877     if (!coprocessorServiceHandlers.containsKey(serviceName)) {
5878       throw new UnknownProtocolException(null,
5879           "No registered coprocessor service found for name "+serviceName+
5880           " in region "+Bytes.toStringBinary(getRegionName()));
5881     }
5882 
5883     Service service = coprocessorServiceHandlers.get(serviceName);
5884     Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
5885     Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
5886     if (methodDesc == null) {
5887       throw new UnknownProtocolException(service.getClass(),
5888           "Unknown method "+methodName+" called on service "+serviceName+
5889               " in region "+Bytes.toStringBinary(getRegionName()));
5890     }
5891 
5892     Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
5893         .mergeFrom(call.getRequest()).build();
5894 
5895     if (coprocessorHost != null) {
5896       request = coprocessorHost.preEndpointInvocation(service, methodName, request);
5897     }
5898 
5899     final Message.Builder responseBuilder =
5900         service.getResponsePrototype(methodDesc).newBuilderForType();
5901     service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
5902       @Override
5903       public void run(Message message) {
5904         if (message != null) {
5905           responseBuilder.mergeFrom(message);
5906         }
5907       }
5908     });
5909 
5910     if (coprocessorHost != null) {
5911       coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
5912     }
5913 
5914     return responseBuilder.build();
5915   }
5916 
5917   /*
5918    * Process table.
5919    * Do major compaction or list content.
5920    * @throws IOException
5921    */
5922   private static void processTable(final FileSystem fs, final Path p,
5923       final WALFactory walFactory, final Configuration c,
5924       final boolean majorCompact)
5925   throws IOException {
5926     HRegion region;
5927     FSTableDescriptors fst = new FSTableDescriptors(c);
5928     // Currently expects tables have one region only.
5929     if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
5930       final WAL wal = walFactory.getMetaWAL(
5931           HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
5932       region = HRegion.newHRegion(p, wal, fs, c,
5933         HRegionInfo.FIRST_META_REGIONINFO,
5934           fst.get(TableName.META_TABLE_NAME), null);
5935     } else {
5936       throw new IOException("Not a known catalog table: " + p.toString());
5937     }
5938     try {
5939       region.initialize(null);
5940       if (majorCompact) {
5941         region.compactStores(true);
5942       } else {
5943         // Default behavior
5944         Scan scan = new Scan();
5945         // scan.addFamily(HConstants.CATALOG_FAMILY);
5946         RegionScanner scanner = region.getScanner(scan);
5947         try {
5948           List<Cell> kvs = new ArrayList<Cell>();
5949           boolean done;
5950           do {
5951             kvs.clear();
5952             done = scanner.next(kvs);
5953             if (kvs.size() > 0) LOG.info(kvs);
5954           } while (done);
5955         } finally {
5956           scanner.close();
5957         }
5958       }
5959     } finally {
5960       region.close();
5961     }
5962   }
5963 
5964   boolean shouldForceSplit() {
5965     return this.splitRequest;
5966   }
5967 
5968   byte[] getExplicitSplitPoint() {
5969     return this.explicitSplitPoint;
5970   }
5971 
5972   void forceSplit(byte[] sp) {
5973     // This HRegion will go away after the forced split is successful
5974     // But if a forced split fails, we need to clear forced split.
5975     this.splitRequest = true;
5976     if (sp != null) {
5977       this.explicitSplitPoint = sp;
5978     }
5979   }
5980 
5981   void clearSplit() {
5982     this.splitRequest = false;
5983     this.explicitSplitPoint = null;
5984   }
5985 
5986   /**
5987    * Give the region a chance to prepare before it is split.
5988    */
5989   protected void prepareToSplit() {
5990     // nothing
5991   }
5992 
5993   /**
5994    * Return the splitpoint. null indicates the region isn't splittable
5995    * If the splitpoint isn't explicitly specified, it will go over the stores
5996    * to find the best splitpoint. Currently the criteria of best splitpoint
5997    * is based on the size of the store.
5998    */
5999   public byte[] checkSplit() {
6000     // Can't split META
6001     if (this.getRegionInfo().isMetaTable() ||
6002         TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
6003       if (shouldForceSplit()) {
6004         LOG.warn("Cannot split meta region in HBase 0.20 and above");
6005       }
6006       return null;
6007     }
6008 
6009     // Can't split region which is in recovering state
6010     if (this.isRecovering()) {
6011       LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
6012       return null;
6013     }
6014 
6015     if (!splitPolicy.shouldSplit()) {
6016       return null;
6017     }
6018 
6019     byte[] ret = splitPolicy.getSplitPoint();
6020 
6021     if (ret != null) {
6022       try {
6023         checkRow(ret, "calculated split");
6024       } catch (IOException e) {
6025         LOG.error("Ignoring invalid split", e);
6026         return null;
6027       }
6028     }
6029     return ret;
6030   }
6031 
6032   /**
6033    * @return The priority that this region should have in the compaction queue
6034    */
6035   public int getCompactPriority() {
6036     int count = Integer.MAX_VALUE;
6037     for (Store store : stores.values()) {
6038       count = Math.min(count, store.getCompactPriority());
6039     }
6040     return count;
6041   }
6042 
6043 
6044   /** @return the coprocessor host */
6045   public RegionCoprocessorHost getCoprocessorHost() {
6046     return coprocessorHost;
6047   }
6048 
6049   /** @param coprocessorHost the new coprocessor host */
6050   public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
6051     this.coprocessorHost = coprocessorHost;
6052   }
6053 
6054   /**
6055    * This method needs to be called before any public call that reads or
6056    * modifies data. It has to be called just before a try.
6057    * #closeRegionOperation needs to be called in the try's finally block
6058    * Acquires a read lock and checks if the region is closing or closed.
6059    * @throws IOException
6060    */
6061   public void startRegionOperation() throws IOException {
6062     startRegionOperation(Operation.ANY);
6063   }
6064 
6065   /**
6066    * @param op The operation is about to be taken on the region
6067    * @throws IOException
6068    */
6069   protected void startRegionOperation(Operation op) throws IOException {
6070     switch (op) {
6071     case GET:  // read operations
6072     case SCAN:
6073       checkReadsEnabled();
6074     case INCREMENT: // write operations
6075     case APPEND:
6076     case SPLIT_REGION:
6077     case MERGE_REGION:
6078     case PUT:
6079     case DELETE:
6080     case BATCH_MUTATE:
6081     case COMPACT_REGION:
6082       // when a region is in recovering state, no read, split or merge is allowed
6083       if (isRecovering() && (this.disallowWritesInRecovering ||
6084               (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
6085         throw new RegionInRecoveryException(this.getRegionNameAsString() +
6086           " is recovering; cannot take reads");
6087       }
6088       break;
6089     default:
6090       break;
6091     }
6092     if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
6093         || op == Operation.COMPACT_REGION) {
6094       // split, merge or compact region doesn't need to check the closing/closed state or lock the
6095       // region
6096       return;
6097     }
6098     if (this.closing.get()) {
6099       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
6100     }
6101     lock(lock.readLock());
6102     if (this.closed.get()) {
6103       lock.readLock().unlock();
6104       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
6105     }
6106     try {
6107       if (coprocessorHost != null) {
6108         coprocessorHost.postStartRegionOperation(op);
6109       }
6110     } catch (Exception e) {
6111       lock.readLock().unlock();
6112       throw new IOException(e);
6113     }
6114   }
6115 
6116   /**
6117    * Closes the lock. This needs to be called in the finally block corresponding
6118    * to the try block of #startRegionOperation
6119    * @throws IOException
6120    */
6121   public void closeRegionOperation() throws IOException {
6122     closeRegionOperation(Operation.ANY);
6123   }
6124 
6125   /**
6126    * Closes the lock. This needs to be called in the finally block corresponding
6127    * to the try block of {@link #startRegionOperation(Operation)}
6128    * @throws IOException
6129    */
6130   public void closeRegionOperation(Operation operation) throws IOException {
6131     lock.readLock().unlock();
6132     if (coprocessorHost != null) {
6133       coprocessorHost.postCloseRegionOperation(operation);
6134     }
6135   }
6136 
6137   /**
6138    * This method needs to be called before any public call that reads or
6139    * modifies stores in bulk. It has to be called just before a try.
6140    * #closeBulkRegionOperation needs to be called in the try's finally block
6141    * Acquires a writelock and checks if the region is closing or closed.
6142    * @throws NotServingRegionException when the region is closing or closed
6143    * @throws RegionTooBusyException if failed to get the lock in time
6144    * @throws InterruptedIOException if interrupted while waiting for a lock
6145    */
6146   private void startBulkRegionOperation(boolean writeLockNeeded)
6147       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
6148     if (this.closing.get()) {
6149       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
6150     }
6151     if (writeLockNeeded) lock(lock.writeLock());
6152     else lock(lock.readLock());
6153     if (this.closed.get()) {
6154       if (writeLockNeeded) lock.writeLock().unlock();
6155       else lock.readLock().unlock();
6156       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
6157     }
6158   }
6159 
6160   /**
6161    * Closes the lock. This needs to be called in the finally block corresponding
6162    * to the try block of #startRegionOperation
6163    */
6164   private void closeBulkRegionOperation(){
6165     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
6166     else lock.readLock().unlock();
6167   }
6168 
6169   /**
6170    * Update counters for numer of puts without wal and the size of possible data loss.
6171    * These information are exposed by the region server metrics.
6172    */
6173   private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
6174     numMutationsWithoutWAL.increment();
6175     if (numMutationsWithoutWAL.get() <= 1) {
6176       LOG.info("writing data to region " + this +
6177                " with WAL disabled. Data may be lost in the event of a crash.");
6178     }
6179 
6180     long mutationSize = 0;
6181     for (List<Cell> cells: familyMap.values()) {
6182       assert cells instanceof RandomAccess;
6183       int listSize = cells.size();
6184       for (int i=0; i < listSize; i++) {
6185         Cell cell = cells.get(i);
6186         // TODO we need include tags length also here.
6187         mutationSize += KeyValueUtil.keyLength(cell) + cell.getValueLength();
6188       }
6189     }
6190 
6191     dataInMemoryWithoutWAL.add(mutationSize);
6192   }
6193 
6194   private void lock(final Lock lock)
6195       throws RegionTooBusyException, InterruptedIOException {
6196     lock(lock, 1);
6197   }
6198 
6199   /**
6200    * Try to acquire a lock.  Throw RegionTooBusyException
6201    * if failed to get the lock in time. Throw InterruptedIOException
6202    * if interrupted while waiting for the lock.
6203    */
6204   private void lock(final Lock lock, final int multiplier)
6205       throws RegionTooBusyException, InterruptedIOException {
6206     try {
6207       final long waitTime = Math.min(maxBusyWaitDuration,
6208           busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
6209       if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
6210         throw new RegionTooBusyException(
6211             "failed to get a lock in " + waitTime + " ms. " +
6212                 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
6213                 this.getRegionInfo().getRegionNameAsString()) +
6214                 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
6215                 this.getRegionServerServices().getServerName()));
6216       }
6217     } catch (InterruptedException ie) {
6218       LOG.info("Interrupted while waiting for a lock");
6219       InterruptedIOException iie = new InterruptedIOException();
6220       iie.initCause(ie);
6221       throw iie;
6222     }
6223   }
6224 
6225   /**
6226    * Calls sync with the given transaction ID if the region's table is not
6227    * deferring it.
6228    * @param txid should sync up to which transaction
6229    * @throws IOException If anything goes wrong with DFS
6230    */
6231   private void syncOrDefer(long txid, Durability durability) throws IOException {
6232     if (this.getRegionInfo().isMetaRegion()) {
6233       this.wal.sync(txid);
6234     } else {
6235       switch(durability) {
6236       case USE_DEFAULT:
6237         // do what table defaults to
6238         if (shouldSyncWAL()) {
6239           this.wal.sync(txid);
6240         }
6241         break;
6242       case SKIP_WAL:
6243         // nothing do to
6244         break;
6245       case ASYNC_WAL:
6246         // nothing do to
6247         break;
6248       case SYNC_WAL:
6249       case FSYNC_WAL:
6250         // sync the WAL edit (SYNC and FSYNC treated the same for now)
6251         this.wal.sync(txid);
6252         break;
6253       }
6254     }
6255   }
6256 
6257   /**
6258    * Check whether we should sync the wal from the table's durability settings
6259    */
6260   private boolean shouldSyncWAL() {
6261     return durability.ordinal() >  Durability.ASYNC_WAL.ordinal();
6262   }
6263 
6264   /**
6265    * A mocked list implementaion - discards all updates.
6266    */
6267   private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
6268 
6269     @Override
6270     public void add(int index, Cell element) {
6271       // do nothing
6272     }
6273 
6274     @Override
6275     public boolean addAll(int index, Collection<? extends Cell> c) {
6276       return false; // this list is never changed as a result of an update
6277     }
6278 
6279     @Override
6280     public KeyValue get(int index) {
6281       throw new UnsupportedOperationException();
6282     }
6283 
6284     @Override
6285     public int size() {
6286       return 0;
6287     }
6288   };
6289 
6290   /**
6291    * Facility for dumping and compacting catalog tables.
6292    * Only does catalog tables since these are only tables we for sure know
6293    * schema on.  For usage run:
6294    * <pre>
6295    *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
6296    * </pre>
6297    * @throws IOException
6298    */
6299   public static void main(String[] args) throws IOException {
6300     if (args.length < 1) {
6301       printUsageAndExit(null);
6302     }
6303     boolean majorCompact = false;
6304     if (args.length > 1) {
6305       if (!args[1].toLowerCase().startsWith("major")) {
6306         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
6307       }
6308       majorCompact = true;
6309     }
6310     final Path tableDir = new Path(args[0]);
6311     final Configuration c = HBaseConfiguration.create();
6312     final FileSystem fs = FileSystem.get(c);
6313     final Path logdir = new Path(c.get("hbase.tmp.dir"));
6314     final String logname = "wal" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
6315 
6316     final Configuration walConf = new Configuration(c);
6317     FSUtils.setRootDir(walConf, logdir);
6318     final WALFactory wals = new WALFactory(walConf, null, logname);
6319     try {
6320       processTable(fs, tableDir, wals, c, majorCompact);
6321     } finally {
6322        wals.close();
6323        // TODO: is this still right?
6324        BlockCache bc = new CacheConfig(c).getBlockCache();
6325        if (bc != null) bc.shutdown();
6326     }
6327   }
6328 
6329   /**
6330    * Gets the latest sequence number that was read from storage when this region was opened.
6331    */
6332   public long getOpenSeqNum() {
6333     return this.openSeqNum;
6334   }
6335 
6336   /**
6337    * Gets max sequence ids of stores that was read from storage when this region was opened. WAL
6338    * Edits with smaller or equal sequence number will be skipped from replay.
6339    */
6340   public Map<byte[], Long> getMaxStoreSeqIdForLogReplay() {
6341     return this.maxSeqIdInStores;
6342   }
6343 
6344   /**
6345    * @return if a given region is in compaction now.
6346    */
6347   public CompactionState getCompactionState() {
6348     boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
6349     return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
6350         : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
6351   }
6352 
6353   public void reportCompactionRequestStart(boolean isMajor){
6354     (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
6355   }
6356 
6357   public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
6358     int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
6359 
6360     // metrics
6361     compactionsFinished.incrementAndGet();
6362     compactionNumFilesCompacted.addAndGet(numFiles);
6363     compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
6364 
6365     assert newValue >= 0;
6366   }
6367 
6368   /**
6369    * Do not change this sequence id. See {@link #sequenceId} comment.
6370    * @return sequenceId
6371    */
6372   @VisibleForTesting
6373   public AtomicLong getSequenceId() {
6374     return this.sequenceId;
6375   }
6376 
6377   /**
6378    * sets this region's sequenceId.
6379    * @param value new value
6380    */
6381   private void setSequenceId(long value) {
6382     this.sequenceId.set(value);
6383   }
6384 
6385   /**
6386    * Listener class to enable callers of
6387    * bulkLoadHFile() to perform any necessary
6388    * pre/post processing of a given bulkload call
6389    */
6390   public interface BulkLoadListener {
6391 
6392     /**
6393      * Called before an HFile is actually loaded
6394      * @param family family being loaded to
6395      * @param srcPath path of HFile
6396      * @return final path to be used for actual loading
6397      * @throws IOException
6398      */
6399     String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
6400 
6401     /**
6402      * Called after a successful HFile load
6403      * @param family family being loaded to
6404      * @param srcPath path of HFile
6405      * @throws IOException
6406      */
6407     void doneBulkLoad(byte[] family, String srcPath) throws IOException;
6408 
6409     /**
6410      * Called after a failed HFile load
6411      * @param family family being loaded to
6412      * @param srcPath path of HFile
6413      * @throws IOException
6414      */
6415     void failedBulkLoad(byte[] family, String srcPath) throws IOException;
6416   }
6417 
6418   @VisibleForTesting class RowLockContext {
6419     private final HashedBytes row;
6420     private final CountDownLatch latch = new CountDownLatch(1);
6421     private final Thread thread;
6422     private int lockCount = 0;
6423 
6424     RowLockContext(HashedBytes row) {
6425       this.row = row;
6426       this.thread = Thread.currentThread();
6427     }
6428 
6429     boolean ownedByCurrentThread() {
6430       return thread == Thread.currentThread();
6431     }
6432 
6433     RowLock newLock() {
6434       lockCount++;
6435       return new RowLock(this);
6436     }
6437 
6438     @Override
6439     public String toString() {
6440       Thread t = this.thread;
6441       return "Thread=" + (t == null? "null": t.getName()) + ", row=" + this.row +
6442         ", lockCount=" + this.lockCount;
6443     }
6444 
6445     void releaseLock() {
6446       if (!ownedByCurrentThread()) {
6447         throw new IllegalArgumentException("Lock held by thread: " + thread
6448           + " cannot be released by different thread: " + Thread.currentThread());
6449       }
6450       lockCount--;
6451       if (lockCount == 0) {
6452         // no remaining locks by the thread, unlock and allow other threads to access
6453         RowLockContext existingContext = lockedRows.remove(row);
6454         if (existingContext != this) {
6455           throw new RuntimeException(
6456               "Internal row lock state inconsistent, should not happen, row: " + row);
6457         }
6458         latch.countDown();
6459       }
6460     }
6461   }
6462 
6463   /**
6464    * Row lock held by a given thread.
6465    * One thread may acquire multiple locks on the same row simultaneously.
6466    * The locks must be released by calling release() from the same thread.
6467    */
6468   public static class RowLock {
6469     @VisibleForTesting final RowLockContext context;
6470     private boolean released = false;
6471 
6472     @VisibleForTesting RowLock(RowLockContext context) {
6473       this.context = context;
6474     }
6475 
6476     /**
6477      * Release the given lock.  If there are no remaining locks held by the current thread
6478      * then unlock the row and allow other threads to acquire the lock.
6479      * @throws IllegalArgumentException if called by a different thread than the lock owning thread
6480      */
6481     public void release() {
6482       if (!released) {
6483         context.releaseLock();
6484         released = true;
6485       }
6486     }
6487   }
6488 
6489   /**
6490    * Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore
6491    * the WALEdit append later.
6492    * @param wal
6493    * @param cells list of Cells inserted into memstore. Those Cells are passed in order to
6494    *        be updated with right mvcc values(their wal sequence number)
6495    * @return Return the key used appending with no sync and no append.
6496    * @throws IOException
6497    */
6498   private WALKey appendEmptyEdit(final WAL wal, List<Cell> cells) throws IOException {
6499     // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
6500     WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
6501       WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
6502     // Call append but with an empty WALEdit.  The returned seqeunce id will not be associated
6503     // with any edit and we can be sure it went in after all outstanding appends.
6504     wal.append(getTableDesc(), getRegionInfo(), key,
6505       WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
6506     return key;
6507   }
6508 
6509   /**
6510    * Explictly sync wal
6511    * @throws IOException
6512    */
6513   public void syncWal() throws IOException {
6514     if(this.wal != null) {
6515       this.wal.sync();
6516     }
6517   }
6518 
6519   /**
6520    * {@inheritDoc}
6521    */
6522   @Override
6523   public void onConfigurationChange(Configuration conf) {
6524     // Do nothing for now.
6525   }
6526 
6527   /**
6528    * {@inheritDoc}
6529    */
6530   @Override
6531   public void registerChildren(ConfigurationManager manager) {
6532     configurationManager = Optional.of(manager);
6533     for (Store s : this.stores.values()) {
6534       configurationManager.get().registerObserver(s);
6535     }
6536   }
6537 
6538   /**
6539    * {@inheritDoc}
6540    */
6541   @Override
6542   public void deregisterChildren(ConfigurationManager manager) {
6543     for (Store s : this.stores.values()) {
6544       configurationManager.get().deregisterObserver(s);
6545     }
6546   }
6547 }