View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.io.UnsupportedEncodingException;
25  import java.lang.reflect.Constructor;
26  import java.text.ParseException;
27  import java.util.AbstractList;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.NavigableMap;
37  import java.util.NavigableSet;
38  import java.util.Set;
39  import java.util.TreeMap;
40  import java.util.concurrent.Callable;
41  import java.util.concurrent.CompletionService;
42  import java.util.concurrent.ConcurrentHashMap;
43  import java.util.concurrent.ConcurrentSkipListMap;
44  import java.util.concurrent.CountDownLatch;
45  import java.util.concurrent.ExecutionException;
46  import java.util.concurrent.ExecutorCompletionService;
47  import java.util.concurrent.ExecutorService;
48  import java.util.concurrent.Executors;
49  import java.util.concurrent.Future;
50  import java.util.concurrent.FutureTask;
51  import java.util.concurrent.ThreadFactory;
52  import java.util.concurrent.ThreadPoolExecutor;
53  import java.util.concurrent.TimeUnit;
54  import java.util.concurrent.TimeoutException;
55  import java.util.concurrent.atomic.AtomicBoolean;
56  import java.util.concurrent.atomic.AtomicInteger;
57  import java.util.concurrent.atomic.AtomicLong;
58  import java.util.concurrent.locks.Lock;
59  import java.util.concurrent.locks.ReentrantReadWriteLock;
60  
61  import org.apache.commons.logging.Log;
62  import org.apache.commons.logging.LogFactory;
63  import org.apache.hadoop.classification.InterfaceAudience;
64  import org.apache.hadoop.conf.Configuration;
65  import org.apache.hadoop.fs.FileStatus;
66  import org.apache.hadoop.fs.FileSystem;
67  import org.apache.hadoop.fs.Path;
68  import org.apache.hadoop.hbase.Cell;
69  import org.apache.hadoop.hbase.CellScanner;
70  import org.apache.hadoop.hbase.CellUtil;
71  import org.apache.hadoop.hbase.CompoundConfiguration;
72  import org.apache.hadoop.hbase.DoNotRetryIOException;
73  import org.apache.hadoop.hbase.DroppedSnapshotException;
74  import org.apache.hadoop.hbase.HBaseConfiguration;
75  import org.apache.hadoop.hbase.HColumnDescriptor;
76  import org.apache.hadoop.hbase.HConstants;
77  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
78  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
79  import org.apache.hadoop.hbase.HRegionInfo;
80  import org.apache.hadoop.hbase.HTableDescriptor;
81  import org.apache.hadoop.hbase.KeyValue;
82  import org.apache.hadoop.hbase.KeyValueUtil;
83  import org.apache.hadoop.hbase.NotServingRegionException;
84  import org.apache.hadoop.hbase.RegionTooBusyException;
85  import org.apache.hadoop.hbase.TableName;
86  import org.apache.hadoop.hbase.UnknownScannerException;
87  import org.apache.hadoop.hbase.backup.HFileArchiver;
88  import org.apache.hadoop.hbase.client.Append;
89  import org.apache.hadoop.hbase.client.Delete;
90  import org.apache.hadoop.hbase.client.Durability;
91  import org.apache.hadoop.hbase.client.Get;
92  import org.apache.hadoop.hbase.client.Increment;
93  import org.apache.hadoop.hbase.client.IsolationLevel;
94  import org.apache.hadoop.hbase.client.Mutation;
95  import org.apache.hadoop.hbase.client.Put;
96  import org.apache.hadoop.hbase.client.Result;
97  import org.apache.hadoop.hbase.client.RowMutations;
98  import org.apache.hadoop.hbase.client.Scan;
99  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
100 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
101 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
102 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
103 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
104 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
105 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
106 import org.apache.hadoop.hbase.filter.FilterWrapper;
107 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
108 import org.apache.hadoop.hbase.io.HeapSize;
109 import org.apache.hadoop.hbase.io.TimeRange;
110 import org.apache.hadoop.hbase.io.hfile.BlockCache;
111 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
112 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
113 import org.apache.hadoop.hbase.ipc.RpcCallContext;
114 import org.apache.hadoop.hbase.ipc.RpcServer;
115 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
116 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
117 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
119 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
121 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
122 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
123 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
124 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
125 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
126 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
127 import org.apache.hadoop.hbase.regionserver.wal.HLog;
128 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
129 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
130 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
131 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.MutationReplay;
132 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
133 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
134 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
135 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
136 import org.apache.hadoop.hbase.util.Bytes;
137 import org.apache.hadoop.hbase.util.CancelableProgressable;
138 import org.apache.hadoop.hbase.util.ClassSize;
139 import org.apache.hadoop.hbase.util.CompressionTest;
140 import org.apache.hadoop.hbase.util.Counter;
141 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
142 import org.apache.hadoop.hbase.util.FSUtils;
143 import org.apache.hadoop.hbase.util.HashedBytes;
144 import org.apache.hadoop.hbase.util.Pair;
145 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
146 import org.apache.hadoop.hbase.util.Threads;
147 import org.apache.hadoop.io.MultipleIOException;
148 import org.apache.hadoop.util.StringUtils;
149 
150 import com.google.common.annotations.VisibleForTesting;
151 import com.google.common.base.Preconditions;
152 import com.google.common.collect.Lists;
153 import com.google.common.collect.Maps;
154 import com.google.common.io.Closeables;
155 import com.google.protobuf.Descriptors;
156 import com.google.protobuf.Message;
157 import com.google.protobuf.RpcCallback;
158 import com.google.protobuf.RpcController;
159 import com.google.protobuf.Service;
160 
161 /**
162  * HRegion stores data for a certain region of a table.  It stores all columns
163  * for each row. A given table consists of one or more HRegions.
164  *
165  * <p>We maintain multiple HStores for a single HRegion.
166  *
167  * <p>An Store is a set of rows with some column data; together,
168  * they make up all the data for the rows.
169  *
170  * <p>Each HRegion has a 'startKey' and 'endKey'.
171  * <p>The first is inclusive, the second is exclusive (except for
172  * the final region)  The endKey of region 0 is the same as
173  * startKey for region 1 (if it exists).  The startKey for the
174  * first region is null. The endKey for the final region is null.
175  *
176  * <p>Locking at the HRegion level serves only one purpose: preventing the
177  * region from being closed (and consequently split) while other operations
178  * are ongoing. Each row level operation obtains both a row lock and a region
179  * read lock for the duration of the operation. While a scanner is being
180  * constructed, getScanner holds a read lock. If the scanner is successfully
181  * constructed, it holds a read lock until it is closed. A close takes out a
182  * write lock and consequently will block for ongoing operations and will block
183  * new operations from starting while the close is in progress.
184  *
185  * <p>An HRegion is defined by its table and its key extent.
186  *
187  * <p>It consists of at least one Store.  The number of Stores should be
188  * configurable, so that data which is accessed together is stored in the same
189  * Store.  Right now, we approximate that by building a single Store for
190  * each column family.  (This config info will be communicated via the
191  * tabledesc.)
192  *
193  * <p>The HTableDescriptor contains metainfo about the HRegion's table.
194  * regionName is a unique identifier for this HRegion. (startKey, endKey]
195  * defines the keyspace for this HRegion.
196  */
197 @InterfaceAudience.Private
198 public class HRegion implements HeapSize { // , Writable{
199   public static final Log LOG = LogFactory.getLog(HRegion.class);
200 
201   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
202       "hbase.hregion.scan.loadColumnFamiliesOnDemand";
203 
204   /**
205    * This is the global default value for durability. All tables/mutations not
206    * defining a durability or using USE_DEFAULT will default to this value.
207    */
208   private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL;
209 
210   final AtomicBoolean closed = new AtomicBoolean(false);
211   /* Closing can take some time; use the closing flag if there is stuff we don't
212    * want to do while in closing state; e.g. like offer this region up to the
213    * master as a region to close if the carrying regionserver is overloaded.
214    * Once set, it is never cleared.
215    */
216   final AtomicBoolean closing = new AtomicBoolean(false);
217 
218   /**
219    * The sequence id of the last flush on this region.  Used doing some rough calculations on
220    * whether time to flush or not.
221    */
222   protected volatile long lastFlushSeqId = -1L;
223 
224   /**
225    * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL/HLog
226    * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
227    * Its default value is -1L. This default is used as a marker to indicate
228    * that the region hasn't opened yet. Once it is opened, it is set to the derived
229    * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
230    *
231    * <p>Control of this sequence is handed off to the WAL/HLog implementation.  It is responsible
232    * for tagging edits with the correct sequence id since it is responsible for getting the
233    * edits into the WAL files. It controls updating the sequence id value.  DO NOT UPDATE IT
234    * OUTSIDE OF THE WAL.  The value you get will not be what you think it is.
235    */
236   private final AtomicLong sequenceId = new AtomicLong(-1L);
237 
238   /**
239    * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
240    * startRegionOperation to possibly invoke different checks before any region operations. Not all
241    * operations have to be defined here. It's only needed when a special check is need in
242    * startRegionOperation
243    */
244   public enum Operation {
245     ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
246     REPLAY_BATCH_MUTATE, COMPACT_REGION
247   }
248 
249   //////////////////////////////////////////////////////////////////////////////
250   // Members
251   //////////////////////////////////////////////////////////////////////////////
252 
253   // map from a locked row to the context for that lock including:
254   // - CountDownLatch for threads waiting on that row
255   // - the thread that owns the lock (allow reentrancy)
256   // - reference count of (reentrant) locks held by the thread
257   // - the row itself
258   private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
259       new ConcurrentHashMap<HashedBytes, RowLockContext>();
260 
261   protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
262       Bytes.BYTES_RAWCOMPARATOR);
263 
264   // TODO: account for each registered handler in HeapSize computation
265   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
266 
267   public final AtomicLong memstoreSize = new AtomicLong(0);
268 
269   // Debug possible data loss due to WAL off
270   final Counter numMutationsWithoutWAL = new Counter();
271   final Counter dataInMemoryWithoutWAL = new Counter();
272 
273   // Debug why CAS operations are taking a while.
274   final Counter checkAndMutateChecksPassed = new Counter();
275   final Counter checkAndMutateChecksFailed = new Counter();
276 
277   //Number of requests
278   final Counter readRequestsCount = new Counter();
279   final Counter writeRequestsCount = new Counter();
280 
281   // Compaction counters
282   final AtomicLong compactionsFinished = new AtomicLong(0L);
283   final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
284   final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
285 
286 
287   private final HLog log;
288   private final HRegionFileSystem fs;
289   protected final Configuration conf;
290   private final Configuration baseConf;
291   private final KeyValue.KVComparator comparator;
292   private final int rowLockWaitDuration;
293   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
294 
295   // The internal wait duration to acquire a lock before read/update
296   // from the region. It is not per row. The purpose of this wait time
297   // is to avoid waiting a long time while the region is busy, so that
298   // we can release the IPC handler soon enough to improve the
299   // availability of the region server. It can be adjusted by
300   // tuning configuration "hbase.busy.wait.duration".
301   final long busyWaitDuration;
302   static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
303 
304   // If updating multiple rows in one call, wait longer,
305   // i.e. waiting for busyWaitDuration * # of rows. However,
306   // we can limit the max multiplier.
307   final int maxBusyWaitMultiplier;
308 
309   // Max busy wait duration. There is no point to wait longer than the RPC
310   // purge timeout, when a RPC call will be terminated by the RPC engine.
311   final long maxBusyWaitDuration;
312 
313   // negative number indicates infinite timeout
314   static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
315   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
316 
317   private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
318 
319   /**
320    * The sequence ID that was encountered when this region was opened.
321    */
322   private long openSeqNum = HConstants.NO_SEQNUM;
323 
324   /**
325    * The default setting for whether to enable on-demand CF loading for
326    * scan requests to this region. Requests can override it.
327    */
328   private boolean isLoadingCfsOnDemandDefault = false;
329 
330   private final AtomicInteger majorInProgress = new AtomicInteger(0);
331   private final AtomicInteger minorInProgress = new AtomicInteger(0);
332 
333   //
334   // Context: During replay we want to ensure that we do not lose any data. So, we
335   // have to be conservative in how we replay logs. For each store, we calculate
336   // the maxSeqId up to which the store was flushed. And, skip the edits which
337   // are equal to or lower than maxSeqId for each store.
338   // The following map is populated when opening the region
339   Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
340 
341   /**
342    * Config setting for whether to allow writes when a region is in recovering or not.
343    */
344   private boolean disallowWritesInRecovering = false;
345 
346   // when a region is in recovering state, it can only accept writes not reads
347   private volatile boolean isRecovering = false;
348 
349   /**
350    * @return The smallest mvcc readPoint across all the scanners in this
351    * region. Writes older than this readPoint, are included  in every
352    * read operation.
353    */
354   public long getSmallestReadPoint() {
355     long minimumReadPoint;
356     // We need to ensure that while we are calculating the smallestReadPoint
357     // no new RegionScanners can grab a readPoint that we are unaware of.
358     // We achieve this by synchronizing on the scannerReadPoints object.
359     synchronized(scannerReadPoints) {
360       minimumReadPoint = mvcc.memstoreReadPoint();
361 
362       for (Long readPoint: this.scannerReadPoints.values()) {
363         if (readPoint < minimumReadPoint) {
364           minimumReadPoint = readPoint;
365         }
366       }
367     }
368     return minimumReadPoint;
369   }
370   /*
371    * Data structure of write state flags used coordinating flushes,
372    * compactions and closes.
373    */
374   static class WriteState {
375     // Set while a memstore flush is happening.
376     volatile boolean flushing = false;
377     // Set when a flush has been requested.
378     volatile boolean flushRequested = false;
379     // Number of compactions running.
380     volatile int compacting = 0;
381     // Gets set in close. If set, cannot compact or flush again.
382     volatile boolean writesEnabled = true;
383     // Set if region is read-only
384     volatile boolean readOnly = false;
385     // whether the reads are enabled. This is different than readOnly, because readOnly is
386     // static in the lifetime of the region, while readsEnabled is dynamic
387     volatile boolean readsEnabled = true;
388 
389     /**
390      * Set flags that make this region read-only.
391      *
392      * @param onOff flip value for region r/o setting
393      */
394     synchronized void setReadOnly(final boolean onOff) {
395       this.writesEnabled = !onOff;
396       this.readOnly = onOff;
397     }
398 
399     boolean isReadOnly() {
400       return this.readOnly;
401     }
402 
403     boolean isFlushRequested() {
404       return this.flushRequested;
405     }
406 
407     void setReadsEnabled(boolean readsEnabled) {
408       this.readsEnabled = readsEnabled;
409     }
410 
411     static final long HEAP_SIZE = ClassSize.align(
412         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
413   }
414 
415   /**
416    * Objects from this class are created when flushing to describe all the different states that
417    * that method ends up in. The Result enum describes those states. The sequence id should only
418    * be specified if the flush was successful, and the failure message should only be specified
419    * if it didn't flush.
420    */
421   public static class FlushResult {
422     enum Result {
423       FLUSHED_NO_COMPACTION_NEEDED,
424       FLUSHED_COMPACTION_NEEDED,
425       // Special case where a flush didn't run because there's nothing in the memstores. Used when
426       // bulk loading to know when we can still load even if a flush didn't happen.
427       CANNOT_FLUSH_MEMSTORE_EMPTY,
428       CANNOT_FLUSH
429       // Be careful adding more to this enum, look at the below methods to make sure
430     }
431 
432     final Result result;
433     final String failureReason;
434     final long flushSequenceId;
435 
436     /**
437      * Convenience constructor to use when the flush is successful, the failure message is set to
438      * null.
439      * @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED.
440      * @param flushSequenceId Generated sequence id that comes right after the edits in the
441      *                        memstores.
442      */
443     FlushResult(Result result, long flushSequenceId) {
444       this(result, flushSequenceId, null);
445       assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
446           .FLUSHED_COMPACTION_NEEDED;
447     }
448 
449     /**
450      * Convenience constructor to use when we cannot flush.
451      * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
452      * @param failureReason Reason why we couldn't flush.
453      */
454     FlushResult(Result result, String failureReason) {
455       this(result, -1, failureReason);
456       assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
457     }
458 
459     /**
460      * Constructor with all the parameters.
461      * @param result Any of the Result.
462      * @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
463      * @param failureReason Reason why we couldn't flush, or null.
464      */
465     FlushResult(Result result, long flushSequenceId, String failureReason) {
466       this.result = result;
467       this.flushSequenceId = flushSequenceId;
468       this.failureReason = failureReason;
469     }
470 
471     /**
472      * Convenience method, the equivalent of checking if result is
473      * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
474      * @return true if the memstores were flushed, else false.
475      */
476     public boolean isFlushSucceeded() {
477       return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
478           .FLUSHED_COMPACTION_NEEDED;
479     }
480 
481     /**
482      * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
483      * @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
484      */
485     public boolean isCompactionNeeded() {
486       return result == Result.FLUSHED_COMPACTION_NEEDED;
487     }
488   }
489 
490   final WriteState writestate = new WriteState();
491 
492   long memstoreFlushSize;
493   final long timestampSlop;
494   final long rowProcessorTimeout;
495   private volatile long lastFlushTime;
496   final RegionServerServices rsServices;
497   private RegionServerAccounting rsAccounting;
498   private long flushCheckInterval;
499   // flushPerChanges is to prevent too many changes in memstore
500   private long flushPerChanges;
501   private long blockingMemStoreSize;
502   final long threadWakeFrequency;
503   // Used to guard closes
504   final ReentrantReadWriteLock lock =
505     new ReentrantReadWriteLock();
506 
507   // Stop updates lock
508   private final ReentrantReadWriteLock updatesLock =
509     new ReentrantReadWriteLock();
510   private boolean splitRequest;
511   private byte[] explicitSplitPoint = null;
512 
513   private final MultiVersionConsistencyControl mvcc =
514       new MultiVersionConsistencyControl();
515 
516   // Coprocessor host
517   private RegionCoprocessorHost coprocessorHost;
518 
519   private HTableDescriptor htableDescriptor = null;
520   private RegionSplitPolicy splitPolicy;
521 
522   private final MetricsRegion metricsRegion;
523   private final MetricsRegionWrapperImpl metricsRegionWrapper;
524   private final Durability durability;
525 
526   /**
527    * HRegion constructor. This constructor should only be used for testing and
528    * extensions.  Instances of HRegion should be instantiated with the
529    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
530    *
531    * @param tableDir qualified path of directory where region should be located,
532    * usually the table directory.
533    * @param log The HLog is the outbound log for any updates to the HRegion
534    * (There's a single HLog for all the HRegions on a single HRegionServer.)
535    * The log file is a logfile from the previous execution that's
536    * custom-computed for this HRegion. The HRegionServer computes and sorts the
537    * appropriate log info for this HRegion. If there is a previous log file
538    * (implying that the HRegion has been written-to before), then read it from
539    * the supplied path.
540    * @param fs is the filesystem.
541    * @param confParam is global configuration settings.
542    * @param regionInfo - HRegionInfo that describes the region
543    * is new), then read them from the supplied path.
544    * @param htd the table descriptor
545    * @param rsServices reference to {@link RegionServerServices} or null
546    */
547   @Deprecated
548   public HRegion(final Path tableDir, final HLog log, final FileSystem fs,
549       final Configuration confParam, final HRegionInfo regionInfo,
550       final HTableDescriptor htd, final RegionServerServices rsServices) {
551     this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
552       log, confParam, htd, rsServices);
553   }
554 
555   /**
556    * HRegion constructor. This constructor should only be used for testing and
557    * extensions.  Instances of HRegion should be instantiated with the
558    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
559    *
560    * @param fs is the filesystem.
561    * @param log The HLog is the outbound log for any updates to the HRegion
562    * (There's a single HLog for all the HRegions on a single HRegionServer.)
563    * The log file is a logfile from the previous execution that's
564    * custom-computed for this HRegion. The HRegionServer computes and sorts the
565    * appropriate log info for this HRegion. If there is a previous log file
566    * (implying that the HRegion has been written-to before), then read it from
567    * the supplied path.
568    * @param confParam is global configuration settings.
569    * @param htd the table descriptor
570    * @param rsServices reference to {@link RegionServerServices} or null
571    */
572   public HRegion(final HRegionFileSystem fs, final HLog log, final Configuration confParam,
573       final HTableDescriptor htd, final RegionServerServices rsServices) {
574     if (htd == null) {
575       throw new IllegalArgumentException("Need table descriptor");
576     }
577 
578     if (confParam instanceof CompoundConfiguration) {
579       throw new IllegalArgumentException("Need original base configuration");
580     }
581 
582     this.comparator = fs.getRegionInfo().getComparator();
583     this.log = log;
584     this.fs = fs;
585 
586     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
587     this.baseConf = confParam;
588     this.conf = new CompoundConfiguration()
589       .add(confParam)
590       .addStringMap(htd.getConfiguration())
591       .addWritableMap(htd.getValues());
592     this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
593         DEFAULT_CACHE_FLUSH_INTERVAL);
594     this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
595     if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
596       throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
597           + MAX_FLUSH_PER_CHANGES);
598     }
599 
600     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
601                     DEFAULT_ROWLOCK_WAIT_DURATION);
602 
603     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
604     this.htableDescriptor = htd;
605     this.rsServices = rsServices;
606     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
607     setHTableSpecificConf();
608     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
609 
610     this.busyWaitDuration = conf.getLong(
611       "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
612     this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
613     if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
614       throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
615         + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
616         + maxBusyWaitMultiplier + "). Their product should be positive");
617     }
618     this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
619       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
620 
621     /*
622      * timestamp.slop provides a server-side constraint on the timestamp. This
623      * assumes that you base your TS around currentTimeMillis(). In this case,
624      * throw an error to the user if the user-specified TS is newer than now +
625      * slop. LATEST_TIMESTAMP == don't use this functionality
626      */
627     this.timestampSlop = conf.getLong(
628         "hbase.hregion.keyvalue.timestamp.slop.millisecs",
629         HConstants.LATEST_TIMESTAMP);
630 
631     /**
632      * Timeout for the process time in processRowsWithLocks().
633      * Use -1 to switch off time bound.
634      */
635     this.rowProcessorTimeout = conf.getLong(
636         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
637     this.durability = htd.getDurability() == Durability.USE_DEFAULT
638         ? DEFAULT_DURABLITY
639         : htd.getDurability();
640     if (rsServices != null) {
641       this.rsAccounting = this.rsServices.getRegionServerAccounting();
642       // don't initialize coprocessors if not running within a regionserver
643       // TODO: revisit if coprocessors should load in other cases
644       this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
645       this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
646       this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
647 
648       Map<String, HRegion> recoveringRegions = rsServices.getRecoveringRegions();
649       String encodedName = getRegionInfo().getEncodedName();
650       if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
651         this.isRecovering = true;
652         recoveringRegions.put(encodedName, this);
653       }
654     } else {
655       this.metricsRegionWrapper = null;
656       this.metricsRegion = null;
657     }
658     if (LOG.isDebugEnabled()) {
659       // Write out region name as string and its encoded name.
660       LOG.debug("Instantiated " + this);
661     }
662 
663     // by default, we allow writes against a region when it's in recovering
664     this.disallowWritesInRecovering =
665         conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
666           HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
667   }
668 
669   void setHTableSpecificConf() {
670     if (this.htableDescriptor == null) return;
671     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
672 
673     if (flushSize <= 0) {
674       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
675         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
676     }
677     this.memstoreFlushSize = flushSize;
678     this.blockingMemStoreSize = this.memstoreFlushSize *
679         conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
680   }
681 
682   /**
683    * Initialize this region.
684    * Used only by tests and SplitTransaction to reopen the region.
685    * You should use createHRegion() or openHRegion()
686    * @return What the next sequence (edit) id should be.
687    * @throws IOException e
688    * @deprecated use HRegion.createHRegion() or HRegion.openHRegion()
689    */
690   @Deprecated
691   public long initialize() throws IOException {
692     return initialize(null);
693   }
694 
695   /**
696    * Initialize this region.
697    *
698    * @param reporter Tickle every so often if initialize is taking a while.
699    * @return What the next sequence (edit) id should be.
700    * @throws IOException e
701    */
702   private long initialize(final CancelableProgressable reporter) throws IOException {
703     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
704     long nextSeqId = -1;
705     try {
706       nextSeqId = initializeRegionInternals(reporter, status);
707       return nextSeqId;
708     } finally {
709       // nextSeqid will be -1 if the initialization fails.
710       // At least it will be 0 otherwise.
711       if (nextSeqId == -1) {
712         status
713             .abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
714       }
715     }
716   }
717 
718   private long initializeRegionInternals(final CancelableProgressable reporter,
719       final MonitoredTask status) throws IOException, UnsupportedEncodingException {
720     if (coprocessorHost != null) {
721       status.setStatus("Running coprocessor pre-open hook");
722       coprocessorHost.preOpen();
723     }
724 
725     // Write HRI to a file in case we need to recover hbase:meta
726     status.setStatus("Writing region info on filesystem");
727     fs.checkRegionInfoOnFilesystem();
728 
729 
730 
731     // Initialize all the HStores
732     status.setStatus("Initializing all the Stores");
733     long maxSeqId = initializeRegionStores(reporter, status);
734 
735     this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
736     this.writestate.flushRequested = false;
737     this.writestate.compacting = 0;
738 
739     if (this.writestate.writesEnabled) {
740       // Remove temporary data left over from old regions
741       status.setStatus("Cleaning up temporary data from old regions");
742       fs.cleanupTempDir();
743     }
744 
745     if (this.writestate.writesEnabled) {
746       status.setStatus("Cleaning up detritus from prior splits");
747       // Get rid of any splits or merges that were lost in-progress.  Clean out
748       // these directories here on open.  We may be opening a region that was
749       // being split but we crashed in the middle of it all.
750       fs.cleanupAnySplitDetritus();
751       fs.cleanupMergesDir();
752     }
753 
754     // Initialize split policy
755     this.splitPolicy = RegionSplitPolicy.create(this, conf);
756 
757     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
758     // Use maximum of log sequenceid or that which was found in stores
759     // (particularly if no recovered edits, seqid will be -1).
760     long nextSeqid = maxSeqId + 1;
761     if (this.isRecovering) {
762       // In distributedLogReplay mode, we don't know the last change sequence number because region
763       // is opened before recovery completes. So we add a safety bumper to avoid new sequence number
764       // overlaps used sequence numbers
765       nextSeqid += this.flushPerChanges + 10000000; // add another extra 10million
766     }
767 
768     LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
769       "; next sequenceid=" + nextSeqid);
770 
771     // A region can be reopened if failed a split; reset flags
772     this.closing.set(false);
773     this.closed.set(false);
774 
775     if (coprocessorHost != null) {
776       status.setStatus("Running coprocessor post-open hooks");
777       coprocessorHost.postOpen();
778     }
779 
780     status.markComplete("Region opened successfully");
781     return nextSeqid;
782   }
783 
784   private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
785       throws IOException, UnsupportedEncodingException {
786     // Load in all the HStores.
787 
788     long maxSeqId = -1;
789     // initialized to -1 so that we pick up MemstoreTS from column families
790     long maxMemstoreTS = -1;
791 
792     if (!htableDescriptor.getFamilies().isEmpty()) {
793       // initialize the thread pool for opening stores in parallel.
794       ThreadPoolExecutor storeOpenerThreadPool =
795         getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
796       CompletionService<HStore> completionService =
797         new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
798 
799       // initialize each store in parallel
800       for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
801         status.setStatus("Instantiating store for column family " + family);
802         completionService.submit(new Callable<HStore>() {
803           @Override
804           public HStore call() throws IOException {
805             return instantiateHStore(family);
806           }
807         });
808       }
809       boolean allStoresOpened = false;
810       try {
811         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
812           Future<HStore> future = completionService.take();
813           HStore store = future.get();
814           this.stores.put(store.getColumnFamilyName().getBytes(), store);
815 
816           long storeMaxSequenceId = store.getMaxSequenceId();
817           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
818               storeMaxSequenceId);
819           if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
820             maxSeqId = storeMaxSequenceId;
821           }
822           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
823           if (maxStoreMemstoreTS > maxMemstoreTS) {
824             maxMemstoreTS = maxStoreMemstoreTS;
825           }
826         }
827         allStoresOpened = true;
828       } catch (InterruptedException e) {
829         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
830       } catch (ExecutionException e) {
831         throw new IOException(e.getCause());
832       } finally {
833         storeOpenerThreadPool.shutdownNow();
834         if (!allStoresOpened) {
835           // something went wrong, close all opened stores
836           LOG.error("Could not initialize all stores for the region=" + this);
837           for (Store store : this.stores.values()) {
838             try {
839               store.close();
840             } catch (IOException e) {
841               LOG.warn(e.getMessage());
842             }
843           }
844         }
845       }
846     }
847     if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
848       // Recover any edits if available.
849       maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
850           this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
851     }
852     maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
853     mvcc.initialize(maxSeqId);
854     return maxSeqId;
855   }
856 
857   private void writeRegionOpenMarker(HLog log, long openSeqId) throws IOException {
858     Map<byte[], List<Path>> storeFiles
859     = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
860     for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
861       Store store = entry.getValue();
862       ArrayList<Path> storeFileNames = new ArrayList<Path>();
863       for (StoreFile storeFile: store.getStorefiles()) {
864         storeFileNames.add(storeFile.getPath());
865       }
866       storeFiles.put(entry.getKey(), storeFileNames);
867     }
868 
869     RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
870       RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
871       getRegionServerServices().getServerName(), storeFiles);
872     HLogUtil.writeRegionEventMarker(log, getTableDesc(), getRegionInfo(), regionOpenDesc,
873       getSequenceId());
874   }
875 
876   private void writeRegionCloseMarker(HLog log) throws IOException {
877     Map<byte[], List<Path>> storeFiles
878     = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
879     for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
880       Store store = entry.getValue();
881       ArrayList<Path> storeFileNames = new ArrayList<Path>();
882       for (StoreFile storeFile: store.getStorefiles()) {
883         storeFileNames.add(storeFile.getPath());
884       }
885       storeFiles.put(entry.getKey(), storeFileNames);
886     }
887 
888     RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
889       RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
890       getRegionServerServices().getServerName(), storeFiles);
891     HLogUtil.writeRegionEventMarker(log, getTableDesc(), getRegionInfo(), regionEventDesc,
892       getSequenceId());
893   }
894 
895   /**
896    * @return True if this region has references.
897    */
898   public boolean hasReferences() {
899     for (Store store : this.stores.values()) {
900       if (store.hasReferences()) return true;
901     }
902     return false;
903   }
904 
905   /**
906    * This function will return the HDFS blocks distribution based on the data
907    * captured when HFile is created
908    * @return The HDFS blocks distribution for the region.
909    */
910   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
911     HDFSBlocksDistribution hdfsBlocksDistribution =
912       new HDFSBlocksDistribution();
913     synchronized (this.stores) {
914       for (Store store : this.stores.values()) {
915         for (StoreFile sf : store.getStorefiles()) {
916           HDFSBlocksDistribution storeFileBlocksDistribution =
917             sf.getHDFSBlockDistribution();
918           hdfsBlocksDistribution.add(storeFileBlocksDistribution);
919         }
920       }
921     }
922     return hdfsBlocksDistribution;
923   }
924 
925   /**
926    * This is a helper function to compute HDFS block distribution on demand
927    * @param conf configuration
928    * @param tableDescriptor HTableDescriptor of the table
929    * @param regionInfo encoded name of the region
930    * @return The HDFS blocks distribution for the given region.
931    * @throws IOException
932    */
933   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
934       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
935     Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
936     return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
937   }
938 
939   /**
940    * This is a helper function to compute HDFS block distribution on demand
941    * @param conf configuration
942    * @param tableDescriptor HTableDescriptor of the table
943    * @param regionInfo encoded name of the region
944    * @param tablePath the table directory
945    * @return The HDFS blocks distribution for the given region.
946    * @throws IOException
947    */
948   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
949       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo,  Path tablePath)
950       throws IOException {
951     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
952     FileSystem fs = tablePath.getFileSystem(conf);
953 
954     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
955     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
956       Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
957       if (storeFiles == null) continue;
958 
959       for (StoreFileInfo storeFileInfo : storeFiles) {
960         hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
961       }
962     }
963     return hdfsBlocksDistribution;
964   }
965 
966   public AtomicLong getMemstoreSize() {
967     return memstoreSize;
968   }
969 
970   /**
971    * Increase the size of mem store in this region and the size of global mem
972    * store
973    * @return the size of memstore in this region
974    */
975   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
976     if (this.rsAccounting != null) {
977       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
978     }
979     return this.memstoreSize.addAndGet(memStoreSize);
980   }
981 
982   /** @return a HRegionInfo object for this region */
983   public HRegionInfo getRegionInfo() {
984     return this.fs.getRegionInfo();
985   }
986 
987   /**
988    * @return Instance of {@link RegionServerServices} used by this HRegion.
989    * Can be null.
990    */
991   RegionServerServices getRegionServerServices() {
992     return this.rsServices;
993   }
994 
995   /** @return readRequestsCount for this region */
996   long getReadRequestsCount() {
997     return this.readRequestsCount.get();
998   }
999 
1000   /** @return writeRequestsCount for this region */
1001   long getWriteRequestsCount() {
1002     return this.writeRequestsCount.get();
1003   }
1004 
1005   public MetricsRegion getMetrics() {
1006     return metricsRegion;
1007   }
1008 
1009   /** @return true if region is closed */
1010   public boolean isClosed() {
1011     return this.closed.get();
1012   }
1013 
1014   /**
1015    * @return True if closing process has started.
1016    */
1017   public boolean isClosing() {
1018     return this.closing.get();
1019   }
1020 
1021   /**
1022    * Reset recovering state of current region
1023    */
1024   public void setRecovering(boolean newState) {
1025     boolean wasRecovering = this.isRecovering;
1026     this.isRecovering = newState;
1027     if (wasRecovering && !isRecovering) {
1028       // Call only when log replay is over.
1029       coprocessorHost.postLogReplay();
1030     }
1031   }
1032 
1033   /**
1034    * @return True if current region is in recovering
1035    */
1036   public boolean isRecovering() {
1037     return this.isRecovering;
1038   }
1039 
1040   /** @return true if region is available (not closed and not closing) */
1041   public boolean isAvailable() {
1042     return !isClosed() && !isClosing();
1043   }
1044 
1045   /** @return true if region is splittable */
1046   public boolean isSplittable() {
1047     return isAvailable() && !hasReferences();
1048   }
1049 
1050   /**
1051    * @return true if region is mergeable
1052    */
1053   public boolean isMergeable() {
1054     if (!isAvailable()) {
1055       LOG.debug("Region " + this.getRegionNameAsString()
1056           + " is not mergeable because it is closing or closed");
1057       return false;
1058     }
1059     if (hasReferences()) {
1060       LOG.debug("Region " + this.getRegionNameAsString()
1061           + " is not mergeable because it has references");
1062       return false;
1063     }
1064 
1065     return true;
1066   }
1067 
1068   public boolean areWritesEnabled() {
1069     synchronized(this.writestate) {
1070       return this.writestate.writesEnabled;
1071     }
1072   }
1073 
1074    public MultiVersionConsistencyControl getMVCC() {
1075      return mvcc;
1076    }
1077 
1078    /*
1079     * Returns readpoint considering given IsolationLevel
1080     */
1081    public long getReadpoint(IsolationLevel isolationLevel) {
1082      if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1083        // This scan can read even uncommitted transactions
1084        return Long.MAX_VALUE;
1085      }
1086      return mvcc.memstoreReadPoint();
1087    }
1088 
1089    public boolean isLoadingCfsOnDemandDefault() {
1090      return this.isLoadingCfsOnDemandDefault;
1091    }
1092 
1093   /**
1094    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
1095    * service any more calls.
1096    *
1097    * <p>This method could take some time to execute, so don't call it from a
1098    * time-sensitive thread.
1099    *
1100    * @return Vector of all the storage files that the HRegion's component
1101    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
1102    * vector if already closed and null if judged that it should not close.
1103    *
1104    * @throws IOException e
1105    */
1106   public Map<byte[], List<StoreFile>> close() throws IOException {
1107     return close(false);
1108   }
1109 
1110   private final Object closeLock = new Object();
1111 
1112   /** Conf key for the periodic flush interval */
1113   public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1114       "hbase.regionserver.optionalcacheflushinterval";
1115   /** Default interval for the memstore flush */
1116   public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1117 
1118   /** Conf key to force a flush if there are already enough changes for one region in memstore */
1119   public static final String MEMSTORE_FLUSH_PER_CHANGES =
1120       "hbase.regionserver.flush.per.changes";
1121   public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000; // 30 millions
1122   /**
1123    * The following MAX_FLUSH_PER_CHANGES is large enough because each KeyValue has 20+ bytes
1124    * overhead. Therefore, even 1G empty KVs occupy at least 20GB memstore size for a single region
1125    */
1126   public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G
1127 
1128   /**
1129    * Close down this HRegion.  Flush the cache unless abort parameter is true,
1130    * Shut down each HStore, don't service any more calls.
1131    *
1132    * This method could take some time to execute, so don't call it from a
1133    * time-sensitive thread.
1134    *
1135    * @param abort true if server is aborting (only during testing)
1136    * @return Vector of all the storage files that the HRegion's component
1137    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
1138    * we are not to close at this time or we are already closed.
1139    *
1140    * @throws IOException e
1141    */
1142   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1143     // Only allow one thread to close at a time. Serialize them so dual
1144     // threads attempting to close will run up against each other.
1145     MonitoredTask status = TaskMonitor.get().createStatus(
1146         "Closing region " + this +
1147         (abort ? " due to abort" : ""));
1148 
1149     status.setStatus("Waiting for close lock");
1150     try {
1151       synchronized (closeLock) {
1152         return doClose(abort, status);
1153       }
1154     } finally {
1155       status.cleanup();
1156     }
1157   }
1158 
1159   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1160       throws IOException {
1161     if (isClosed()) {
1162       LOG.warn("Region " + this + " already closed");
1163       return null;
1164     }
1165 
1166     if (coprocessorHost != null) {
1167       status.setStatus("Running coprocessor pre-close hooks");
1168       this.coprocessorHost.preClose(abort);
1169     }
1170 
1171     status.setStatus("Disabling compacts and flushes for region");
1172     synchronized (writestate) {
1173       // Disable compacting and flushing by background threads for this
1174       // region.
1175       writestate.writesEnabled = false;
1176       LOG.debug("Closing " + this + ": disabling compactions & flushes");
1177       waitForFlushesAndCompactions();
1178     }
1179     // If we were not just flushing, is it worth doing a preflush...one
1180     // that will clear out of the bulk of the memstore before we put up
1181     // the close flag?
1182     if (!abort && worthPreFlushing()) {
1183       status.setStatus("Pre-flushing region before close");
1184       LOG.info("Running close preflush of " + this.getRegionNameAsString());
1185       try {
1186         internalFlushcache(status);
1187       } catch (IOException ioe) {
1188         // Failed to flush the region. Keep going.
1189         status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1190       }
1191     }
1192 
1193     this.closing.set(true);
1194     status.setStatus("Disabling writes for close");
1195     // block waiting for the lock for closing
1196     lock.writeLock().lock();
1197     try {
1198       if (this.isClosed()) {
1199         status.abort("Already got closed by another process");
1200         // SplitTransaction handles the null
1201         return null;
1202       }
1203       LOG.debug("Updates disabled for region " + this);
1204       // Don't flush the cache if we are aborting
1205       if (!abort) {
1206         int flushCount = 0;
1207         while (this.getMemstoreSize().get() > 0) {
1208           try {
1209             if (flushCount++ > 0) {
1210               int actualFlushes = flushCount - 1;
1211               if (actualFlushes > 5) {
1212                 // If we tried 5 times and are unable to clear memory, abort
1213                 // so we do not lose data
1214                 throw new DroppedSnapshotException("Failed clearing memory after " +
1215                   actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
1216               }
1217               LOG.info("Running extra flush, " + actualFlushes +
1218                 " (carrying snapshot?) " + this);
1219             }
1220             internalFlushcache(status);
1221           } catch (IOException ioe) {
1222             status.setStatus("Failed flush " + this + ", putting online again");
1223             synchronized (writestate) {
1224               writestate.writesEnabled = true;
1225             }
1226             // Have to throw to upper layers.  I can't abort server from here.
1227             throw ioe;
1228           }
1229         }
1230       }
1231 
1232       Map<byte[], List<StoreFile>> result =
1233         new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1234       if (!stores.isEmpty()) {
1235         // initialize the thread pool for closing stores in parallel.
1236         ThreadPoolExecutor storeCloserThreadPool =
1237           getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
1238         CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1239           new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1240 
1241         // close each store in parallel
1242         for (final Store store : stores.values()) {
1243           assert abort || store.getFlushableSize() == 0;
1244           completionService
1245               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1246                 @Override
1247                 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1248                   return new Pair<byte[], Collection<StoreFile>>(
1249                     store.getFamily().getName(), store.close());
1250                 }
1251               });
1252         }
1253         try {
1254           for (int i = 0; i < stores.size(); i++) {
1255             Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1256             Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1257             List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1258             if (familyFiles == null) {
1259               familyFiles = new ArrayList<StoreFile>();
1260               result.put(storeFiles.getFirst(), familyFiles);
1261             }
1262             familyFiles.addAll(storeFiles.getSecond());
1263           }
1264         } catch (InterruptedException e) {
1265           throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1266         } catch (ExecutionException e) {
1267           throw new IOException(e.getCause());
1268         } finally {
1269           storeCloserThreadPool.shutdownNow();
1270         }
1271       }
1272 
1273       status.setStatus("Writing region close event to WAL");
1274       if (!abort  && log != null && getRegionServerServices() != null) {
1275         writeRegionCloseMarker(log);
1276       }
1277 
1278       this.closed.set(true);
1279       if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
1280       if (coprocessorHost != null) {
1281         status.setStatus("Running coprocessor post-close hooks");
1282         this.coprocessorHost.postClose(abort);
1283       }
1284       if ( this.metricsRegion != null) {
1285         this.metricsRegion.close();
1286       }
1287       if ( this.metricsRegionWrapper != null) {
1288         Closeables.closeQuietly(this.metricsRegionWrapper);
1289       }
1290       status.markComplete("Closed");
1291       LOG.info("Closed " + this);
1292       return result;
1293     } finally {
1294       lock.writeLock().unlock();
1295     }
1296   }
1297 
1298   /**
1299    * Wait for all current flushes and compactions of the region to complete.
1300    * <p>
1301    * Exposed for TESTING.
1302    */
1303   public void waitForFlushesAndCompactions() {
1304     synchronized (writestate) {
1305       boolean interrupted = false;
1306       try {
1307         while (writestate.compacting > 0 || writestate.flushing) {
1308           LOG.debug("waiting for " + writestate.compacting + " compactions"
1309             + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1310           try {
1311             writestate.wait();
1312           } catch (InterruptedException iex) {
1313             // essentially ignore and propagate the interrupt back up
1314             LOG.warn("Interrupted while waiting");
1315             interrupted = true;
1316           }
1317         }
1318       } finally {
1319         if (interrupted) {
1320           Thread.currentThread().interrupt();
1321         }
1322       }
1323     }
1324   }
1325 
1326   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1327       final String threadNamePrefix) {
1328     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1329     int maxThreads = Math.min(numStores,
1330         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1331             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1332     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1333   }
1334 
1335   protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1336       final String threadNamePrefix) {
1337     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1338     int maxThreads = Math.max(1,
1339         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1340             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1341             / numStores);
1342     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1343   }
1344 
1345   static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1346       final String threadNamePrefix) {
1347     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1348       new ThreadFactory() {
1349         private int count = 1;
1350 
1351         @Override
1352         public Thread newThread(Runnable r) {
1353           return new Thread(r, threadNamePrefix + "-" + count++);
1354         }
1355       });
1356   }
1357 
1358    /**
1359     * @return True if its worth doing a flush before we put up the close flag.
1360     */
1361   private boolean worthPreFlushing() {
1362     return this.memstoreSize.get() >
1363       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1364   }
1365 
1366   //////////////////////////////////////////////////////////////////////////////
1367   // HRegion accessors
1368   //////////////////////////////////////////////////////////////////////////////
1369 
1370   /** @return start key for region */
1371   public byte [] getStartKey() {
1372     return this.getRegionInfo().getStartKey();
1373   }
1374 
1375   /** @return end key for region */
1376   public byte [] getEndKey() {
1377     return this.getRegionInfo().getEndKey();
1378   }
1379 
1380   /** @return region id */
1381   public long getRegionId() {
1382     return this.getRegionInfo().getRegionId();
1383   }
1384 
1385   /** @return region name */
1386   public byte [] getRegionName() {
1387     return this.getRegionInfo().getRegionName();
1388   }
1389 
1390   /** @return region name as string for logging */
1391   public String getRegionNameAsString() {
1392     return this.getRegionInfo().getRegionNameAsString();
1393   }
1394 
1395   /** @return HTableDescriptor for this region */
1396   public HTableDescriptor getTableDesc() {
1397     return this.htableDescriptor;
1398   }
1399 
1400   /** @return HLog in use for this region */
1401   public HLog getLog() {
1402     return this.log;
1403   }
1404 
1405   /**
1406    * A split takes the config from the parent region & passes it to the daughter
1407    * region's constructor. If 'conf' was passed, you would end up using the HTD
1408    * of the parent region in addition to the new daughter HTD. Pass 'baseConf'
1409    * to the daughter regions to avoid this tricky dedupe problem.
1410    * @return Configuration object
1411    */
1412   Configuration getBaseConf() {
1413     return this.baseConf;
1414   }
1415 
1416   /** @return {@link FileSystem} being used by this region */
1417   public FileSystem getFilesystem() {
1418     return fs.getFileSystem();
1419   }
1420 
1421   /** @return the {@link HRegionFileSystem} used by this region */
1422   public HRegionFileSystem getRegionFileSystem() {
1423     return this.fs;
1424   }
1425 
1426   /** @return the last time the region was flushed */
1427   public long getLastFlushTime() {
1428     return this.lastFlushTime;
1429   }
1430 
1431   //////////////////////////////////////////////////////////////////////////////
1432   // HRegion maintenance.
1433   //
1434   // These methods are meant to be called periodically by the HRegionServer for
1435   // upkeep.
1436   //////////////////////////////////////////////////////////////////////////////
1437 
1438   /** @return returns size of largest HStore. */
1439   public long getLargestHStoreSize() {
1440     long size = 0;
1441     for (Store h : stores.values()) {
1442       long storeSize = h.getSize();
1443       if (storeSize > size) {
1444         size = storeSize;
1445       }
1446     }
1447     return size;
1448   }
1449 
1450   /**
1451    * @return KeyValue Comparator
1452    */
1453   public KeyValue.KVComparator getComparator() {
1454     return this.comparator;
1455   }
1456 
1457   /*
1458    * Do preparation for pending compaction.
1459    * @throws IOException
1460    */
1461   protected void doRegionCompactionPrep() throws IOException {
1462   }
1463 
1464   void triggerMajorCompaction() {
1465     for (Store h : stores.values()) {
1466       h.triggerMajorCompaction();
1467     }
1468   }
1469 
1470   /**
1471    * This is a helper function that compact all the stores synchronously
1472    * It is used by utilities and testing
1473    *
1474    * @param majorCompaction True to force a major compaction regardless of thresholds
1475    * @throws IOException e
1476    */
1477   public void compactStores(final boolean majorCompaction)
1478   throws IOException {
1479     if (majorCompaction) {
1480       this.triggerMajorCompaction();
1481     }
1482     compactStores();
1483   }
1484 
1485   /**
1486    * This is a helper function that compact all the stores synchronously
1487    * It is used by utilities and testing
1488    *
1489    * @throws IOException e
1490    */
1491   public void compactStores() throws IOException {
1492     for (Store s : getStores().values()) {
1493       CompactionContext compaction = s.requestCompaction();
1494       if (compaction != null) {
1495         compact(compaction, s);
1496       }
1497     }
1498   }
1499 
1500   /*
1501    * Called by compaction thread and after region is opened to compact the
1502    * HStores if necessary.
1503    *
1504    * <p>This operation could block for a long time, so don't call it from a
1505    * time-sensitive thread.
1506    *
1507    * Note that no locking is necessary at this level because compaction only
1508    * conflicts with a region split, and that cannot happen because the region
1509    * server does them sequentially and not in parallel.
1510    *
1511    * @param cr Compaction details, obtained by requestCompaction()
1512    * @return whether the compaction completed
1513    * @throws IOException e
1514    */
1515   public boolean compact(CompactionContext compaction, Store store) throws IOException {
1516     assert compaction != null && compaction.hasSelection();
1517     assert !compaction.getRequest().getFiles().isEmpty();
1518     if (this.closing.get() || this.closed.get()) {
1519       LOG.debug("Skipping compaction on " + this + " because closing/closed");
1520       store.cancelRequestedCompaction(compaction);
1521       return false;
1522     }
1523     MonitoredTask status = null;
1524     boolean didPerformCompaction = false;
1525     // block waiting for the lock for compaction
1526     lock.readLock().lock();
1527     try {
1528       byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1529       if (stores.get(cf) != store) {
1530         LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1531             + " has been re-instantiated, cancel this compaction request. "
1532             + " It may be caused by the roll back of split transaction");
1533         return false;
1534       }
1535 
1536       status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1537       if (this.closed.get()) {
1538         String msg = "Skipping compaction on " + this + " because closed";
1539         LOG.debug(msg);
1540         status.abort(msg);
1541         return false;
1542       }
1543       boolean wasStateSet = false;
1544       try {
1545         synchronized (writestate) {
1546           if (writestate.writesEnabled) {
1547             wasStateSet = true;
1548             ++writestate.compacting;
1549           } else {
1550             String msg = "NOT compacting region " + this + ". Writes disabled.";
1551             LOG.info(msg);
1552             status.abort(msg);
1553             return false;
1554           }
1555         }
1556         LOG.info("Starting compaction on " + store + " in region " + this
1557             + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1558         doRegionCompactionPrep();
1559         try {
1560           status.setStatus("Compacting store " + store);
1561           didPerformCompaction = true;
1562           store.compact(compaction);
1563         } catch (InterruptedIOException iioe) {
1564           String msg = "compaction interrupted";
1565           LOG.info(msg, iioe);
1566           status.abort(msg);
1567           return false;
1568         }
1569       } finally {
1570         if (wasStateSet) {
1571           synchronized (writestate) {
1572             --writestate.compacting;
1573             if (writestate.compacting <= 0) {
1574               writestate.notifyAll();
1575             }
1576           }
1577         }
1578       }
1579       status.markComplete("Compaction complete");
1580       return true;
1581     } finally {
1582       try {
1583         if (!didPerformCompaction) store.cancelRequestedCompaction(compaction);
1584         if (status != null) status.cleanup();
1585       } finally {
1586         lock.readLock().unlock();
1587       }
1588     }
1589   }
1590 
1591   /**
1592    * Flush the cache.
1593    *
1594    * When this method is called the cache will be flushed unless:
1595    * <ol>
1596    *   <li>the cache is empty</li>
1597    *   <li>the region is closed.</li>
1598    *   <li>a flush is already in progress</li>
1599    *   <li>writes are disabled</li>
1600    * </ol>
1601    *
1602    * <p>This method may block for some time, so it should not be called from a
1603    * time-sensitive thread.
1604    *
1605    * @return true if the region needs compacting
1606    *
1607    * @throws IOException general io exceptions
1608    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1609    * because a Snapshot was not properly persisted.
1610    */
1611   public FlushResult flushcache() throws IOException {
1612     // fail-fast instead of waiting on the lock
1613     if (this.closing.get()) {
1614       String msg = "Skipping flush on " + this + " because closing";
1615       LOG.debug(msg);
1616       return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1617     }
1618     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1619     status.setStatus("Acquiring readlock on region");
1620     // block waiting for the lock for flushing cache
1621     lock.readLock().lock();
1622     try {
1623       if (this.closed.get()) {
1624         String msg = "Skipping flush on " + this + " because closed";
1625         LOG.debug(msg);
1626         status.abort(msg);
1627         return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1628       }
1629       if (coprocessorHost != null) {
1630         status.setStatus("Running coprocessor pre-flush hooks");
1631         coprocessorHost.preFlush();
1632       }
1633       if (numMutationsWithoutWAL.get() > 0) {
1634         numMutationsWithoutWAL.set(0);
1635         dataInMemoryWithoutWAL.set(0);
1636       }
1637       synchronized (writestate) {
1638         if (!writestate.flushing && writestate.writesEnabled) {
1639           this.writestate.flushing = true;
1640         } else {
1641           if (LOG.isDebugEnabled()) {
1642             LOG.debug("NOT flushing memstore for region " + this
1643                 + ", flushing=" + writestate.flushing + ", writesEnabled="
1644                 + writestate.writesEnabled);
1645           }
1646           String msg = "Not flushing since "
1647               + (writestate.flushing ? "already flushing"
1648               : "writes not enabled");
1649           status.abort(msg);
1650           return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1651         }
1652       }
1653       try {
1654         FlushResult fs = internalFlushcache(status);
1655 
1656         if (coprocessorHost != null) {
1657           status.setStatus("Running post-flush coprocessor hooks");
1658           coprocessorHost.postFlush();
1659         }
1660 
1661         status.markComplete("Flush successful");
1662         return fs;
1663       } finally {
1664         synchronized (writestate) {
1665           writestate.flushing = false;
1666           this.writestate.flushRequested = false;
1667           writestate.notifyAll();
1668         }
1669       }
1670     } finally {
1671       lock.readLock().unlock();
1672       status.cleanup();
1673     }
1674   }
1675 
1676   /**
1677    * Should the memstore be flushed now
1678    */
1679   boolean shouldFlush() {
1680     // This is a rough measure.
1681     if (this.lastFlushSeqId > 0
1682           && (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get())) {
1683       return true;
1684     }
1685     if (flushCheckInterval <= 0) { //disabled
1686       return false;
1687     }
1688     long now = EnvironmentEdgeManager.currentTimeMillis();
1689     //if we flushed in the recent past, we don't need to do again now
1690     if ((now - getLastFlushTime() < flushCheckInterval)) {
1691       return false;
1692     }
1693     //since we didn't flush in the recent past, flush now if certain conditions
1694     //are met. Return true on first such memstore hit.
1695     for (Store s : this.getStores().values()) {
1696       if (s.timeOfOldestEdit() < now - flushCheckInterval) {
1697         // we have an old enough edit in the memstore, flush
1698         return true;
1699       }
1700     }
1701     return false;
1702   }
1703 
1704   /**
1705    * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
1706    * memstore, all of which have also been written to the log. We need to write those updates in the
1707    * memstore out to disk, while being able to process reads/writes as much as possible during the
1708    * flush operation.
1709    * <p>This method may block for some time.  Every time you call it, we up the regions
1710    * sequence id even if we don't flush; i.e. the returned region id will be at least one larger
1711    * than the last edit applied to this region. The returned id does not refer to an actual edit.
1712    * The returned id can be used for say installing a bulk loaded file just ahead of the last hfile
1713    * that was the result of this flush, etc.
1714    * @return object describing the flush's state
1715    *
1716    * @throws IOException general io exceptions
1717    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1718    * because a Snapshot was not properly persisted.
1719    */
1720   protected FlushResult internalFlushcache(MonitoredTask status)
1721       throws IOException {
1722     return internalFlushcache(this.log, -1, status);
1723   }
1724 
1725   /**
1726    * @param wal Null if we're NOT to go via hlog/wal.
1727    * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
1728    * @return object describing the flush's state
1729    * @throws IOException
1730    * @see #internalFlushcache(MonitoredTask)
1731    */
1732   protected FlushResult internalFlushcache(
1733       final HLog wal, final long myseqid, MonitoredTask status)
1734   throws IOException {
1735     if (this.rsServices != null && this.rsServices.isAborted()) {
1736       // Don't flush when server aborting, it's unsafe
1737       throw new IOException("Aborting flush because server is aborted...");
1738     }
1739     final long startTime = EnvironmentEdgeManager.currentTimeMillis();
1740     // If nothing to flush, return, but we need to safely update the region sequence id
1741     if (this.memstoreSize.get() <= 0) {
1742       // Take an update lock because am about to change the sequence id and we want the sequence id
1743       // to be at the border of the empty memstore.
1744       MultiVersionConsistencyControl.WriteEntry w = null;
1745       this.updatesLock.writeLock().lock();
1746       try {
1747         if (this.memstoreSize.get() <= 0) {
1748           // Presume that if there are still no edits in the memstore, then there are no edits for
1749           // this region out in the WAL/HLog subsystem so no need to do any trickery clearing out
1750           // edits in the WAL system. Up the sequence number so the resulting flush id is for
1751           // sure just beyond the last appended region edit (useful as a marker when bulk loading,
1752           // etc.)
1753           // wal can be null replaying edits.
1754           try {
1755             if (wal != null) {
1756               w = mvcc.beginMemstoreInsert();
1757               long flushSeqId = getNextSequenceId(wal);
1758               FlushResult flushResult = new FlushResult(
1759                   FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush");
1760               w.setWriteNumber(flushSeqId);
1761               mvcc.waitForPreviousTransactionsComplete(w);
1762               w = null;
1763               return flushResult;
1764             } else {
1765               return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
1766                   "Nothing to flush");
1767             }
1768 
1769           } finally {
1770             this.updatesLock.writeLock().unlock();
1771           }
1772         }
1773       } finally {
1774         if (w != null) {
1775           mvcc.advanceMemstore(w);
1776         }
1777       }
1778     }
1779 
1780     LOG.info("Started memstore flush for " + this +
1781       ", current region memstore size " +
1782       StringUtils.byteDesc(this.memstoreSize.get()) +
1783       ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
1784 
1785     // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
1786     // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
1787     // allow updates again so its value will represent the size of the updates received
1788     // during flush
1789     MultiVersionConsistencyControl.WriteEntry w = null;
1790 
1791     // We have to take an update lock during snapshot, or else a write could end up in both snapshot
1792     // and memstore (makes it difficult to do atomic rows then)
1793     status.setStatus("Obtaining lock to block concurrent updates");
1794     // block waiting for the lock for internal flush
1795     this.updatesLock.writeLock().lock();
1796     long totalFlushableSize = 0;
1797     status.setStatus("Preparing to flush by snapshotting stores in " +
1798       getRegionInfo().getEncodedName());
1799     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
1800     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
1801         Bytes.BYTES_COMPARATOR);
1802     long flushSeqId = -1L;
1803 
1804     long trxId = 0;
1805     try {
1806       try {
1807         w = mvcc.beginMemstoreInsert();
1808         if (wal != null) {
1809           if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
1810             // This should never happen.
1811             String msg = "Flush will not be started for ["
1812                 + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
1813             status.setStatus(msg);
1814             return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1815           }
1816           // Get a sequence id that we can use to denote the flush. It will be one beyond the last
1817           // edit that made it into the hfile (the below does not add an edit, it just asks the
1818           // WAL system to return next sequence edit).
1819           flushSeqId = getNextSequenceId(wal);
1820         } else {
1821           // use the provided sequence Id as WAL is not being used for this flush.
1822           flushSeqId = myseqid;
1823         }
1824 
1825         for (Store s : stores.values()) {
1826           totalFlushableSize += s.getFlushableSize();
1827           storeFlushCtxs.add(s.createFlushContext(flushSeqId));
1828           committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
1829         }
1830 
1831         // write the snapshot start to WAL
1832         if (wal != null) {
1833           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
1834             getRegionInfo(), flushSeqId, committedFiles);
1835           trxId = HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1836             desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock
1837         }
1838 
1839         // Prepare flush (take a snapshot)
1840         for (StoreFlushContext flush : storeFlushCtxs) {
1841           flush.prepare();
1842         }
1843       } catch (IOException ex) {
1844         if (wal != null) {
1845           if (trxId > 0) { // check whether we have already written START_FLUSH to WAL
1846             try {
1847               FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
1848                 getRegionInfo(), flushSeqId, committedFiles);
1849               HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1850                 desc, sequenceId, false);
1851             } catch (Throwable t) {
1852               LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
1853                   StringUtils.stringifyException(t));
1854               // ignore this since we will be aborting the RS with DSE.
1855             }
1856           }
1857           // we have called wal.startCacheFlush(), now we have to abort it
1858           wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1859           throw ex; // let upper layers deal with it.
1860         }
1861       } finally {
1862         this.updatesLock.writeLock().unlock();
1863       }
1864       String s = "Finished memstore snapshotting " + this +
1865         ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
1866       status.setStatus(s);
1867       if (LOG.isTraceEnabled()) LOG.trace(s);
1868       // sync unflushed WAL changes
1869       // see HBASE-8208 for details
1870       if (wal != null) {
1871         try {
1872           wal.sync(); // ensure that flush marker is sync'ed
1873         } catch (IOException ioe) {
1874           LOG.warn("Unexpected exception while log.sync(), ignoring. Exception: "
1875               + StringUtils.stringifyException(ioe));
1876         }
1877       }
1878 
1879       // wait for all in-progress transactions to commit to HLog before
1880       // we can start the flush. This prevents
1881       // uncommitted transactions from being written into HFiles.
1882       // We have to block before we start the flush, otherwise keys that
1883       // were removed via a rollbackMemstore could be written to Hfiles.
1884       w.setWriteNumber(flushSeqId);
1885       mvcc.waitForPreviousTransactionsComplete(w);
1886       // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
1887       w = null;
1888       s = "Flushing stores of " + this;
1889       status.setStatus(s);
1890       if (LOG.isTraceEnabled()) LOG.trace(s);
1891     } finally {
1892       if (w != null) {
1893         // in case of failure just mark current w as complete
1894         mvcc.advanceMemstore(w);
1895       }
1896     }
1897 
1898     // Any failure from here on out will be catastrophic requiring server
1899     // restart so hlog content can be replayed and put back into the memstore.
1900     // Otherwise, the snapshot content while backed up in the hlog, it will not
1901     // be part of the current running servers state.
1902     boolean compactionRequested = false;
1903     try {
1904       // A.  Flush memstore to all the HStores.
1905       // Keep running vector of all store files that includes both old and the
1906       // just-made new flush store file. The new flushed file is still in the
1907       // tmp directory.
1908 
1909       for (StoreFlushContext flush : storeFlushCtxs) {
1910         flush.flushCache(status);
1911       }
1912 
1913       // Switch snapshot (in memstore) -> new hfile (thus causing
1914       // all the store scanners to reset/reseek).
1915       Iterator<Store> it = stores.values().iterator(); // stores.values() and storeFlushCtxs have
1916       // same order
1917       for (StoreFlushContext flush : storeFlushCtxs) {
1918         boolean needsCompaction = flush.commit(status);
1919         if (needsCompaction) {
1920           compactionRequested = true;
1921         }
1922         committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles());
1923       }
1924       storeFlushCtxs.clear();
1925 
1926       // Set down the memstore size by amount of flush.
1927       this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
1928 
1929       if (wal != null) {
1930         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
1931         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
1932           getRegionInfo(), flushSeqId, committedFiles);
1933         HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1934           desc, sequenceId, true);
1935       }
1936     } catch (Throwable t) {
1937       // An exception here means that the snapshot was not persisted.
1938       // The hlog needs to be replayed so its content is restored to memstore.
1939       // Currently, only a server restart will do this.
1940       // We used to only catch IOEs but its possible that we'd get other
1941       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
1942       // all and sundry.
1943       if (wal != null) {
1944         try {
1945           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
1946             getRegionInfo(), flushSeqId, committedFiles);
1947           HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1948             desc, sequenceId, false);
1949         } catch (Throwable ex) {
1950           LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
1951               StringUtils.stringifyException(ex));
1952           // ignore this since we will be aborting the RS with DSE.
1953         }
1954         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1955       }
1956       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
1957           Bytes.toStringBinary(getRegionName()));
1958       dse.initCause(t);
1959       status.abort("Flush failed: " + StringUtils.stringifyException(t));
1960       throw dse;
1961     }
1962 
1963     // If we get to here, the HStores have been written.
1964     if (wal != null) {
1965       wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1966     }
1967 
1968     // Record latest flush time
1969     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
1970 
1971     // Update the last flushed sequence id for region. TODO: This is dup'd inside the WAL/FSHlog.
1972     this.lastFlushSeqId = flushSeqId;
1973 
1974     // C. Finally notify anyone waiting on memstore to clear:
1975     // e.g. checkResources().
1976     synchronized (this) {
1977       notifyAll(); // FindBugs NN_NAKED_NOTIFY
1978     }
1979 
1980     long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
1981     long memstoresize = this.memstoreSize.get();
1982     String msg = "Finished memstore flush of ~" +
1983       StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize +
1984       ", currentsize=" +
1985       StringUtils.byteDesc(memstoresize) + "/" + memstoresize +
1986       " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
1987       ", compaction requested=" + compactionRequested +
1988       ((wal == null)? "; wal=null": "");
1989     LOG.info(msg);
1990     status.setStatus(msg);
1991 
1992     return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
1993         FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
1994   }
1995 
1996   /**
1997    * Method to safely get the next sequence number.
1998    * @return Next sequence number unassociated with any actual edit.
1999    * @throws IOException
2000    */
2001   private long getNextSequenceId(final HLog wal) throws IOException {
2002     HLogKey key = this.appendNoSyncNoAppend(wal, null);
2003     return key.getSequenceNumber();
2004   }
2005 
2006   //////////////////////////////////////////////////////////////////////////////
2007   // get() methods for client use.
2008   //////////////////////////////////////////////////////////////////////////////
2009   /**
2010    * Return all the data for the row that matches <i>row</i> exactly,
2011    * or the one that immediately preceeds it, at or immediately before
2012    * <i>ts</i>.
2013    *
2014    * @param row row key
2015    * @return map of values
2016    * @throws IOException
2017    */
2018   Result getClosestRowBefore(final byte [] row)
2019   throws IOException{
2020     return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
2021   }
2022 
2023   /**
2024    * Return all the data for the row that matches <i>row</i> exactly,
2025    * or the one that immediately precedes it, at or immediately before
2026    * <i>ts</i>.
2027    *
2028    * @param row row key
2029    * @param family column family to find on
2030    * @return map of values
2031    * @throws IOException read exceptions
2032    */
2033   public Result getClosestRowBefore(final byte [] row, final byte [] family)
2034   throws IOException {
2035     if (coprocessorHost != null) {
2036       Result result = new Result();
2037       if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
2038         return result;
2039       }
2040     }
2041     // look across all the HStores for this region and determine what the
2042     // closest key is across all column families, since the data may be sparse
2043     checkRow(row, "getClosestRowBefore");
2044     startRegionOperation(Operation.GET);
2045     this.readRequestsCount.increment();
2046     try {
2047       Store store = getStore(family);
2048       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
2049       KeyValue key = store.getRowKeyAtOrBefore(row);
2050       Result result = null;
2051       if (key != null) {
2052         Get get = new Get(CellUtil.cloneRow(key));
2053         get.addFamily(family);
2054         result = get(get);
2055       }
2056       if (coprocessorHost != null) {
2057         coprocessorHost.postGetClosestRowBefore(row, family, result);
2058       }
2059       return result;
2060     } finally {
2061       closeRegionOperation(Operation.GET);
2062     }
2063   }
2064 
2065   /**
2066    * Return an iterator that scans over the HRegion, returning the indicated
2067    * columns and rows specified by the {@link Scan}.
2068    * <p>
2069    * This Iterator must be closed by the caller.
2070    *
2071    * @param scan configured {@link Scan}
2072    * @return RegionScanner
2073    * @throws IOException read exceptions
2074    */
2075   public RegionScanner getScanner(Scan scan) throws IOException {
2076    return getScanner(scan, null);
2077   }
2078 
2079   void prepareScanner(Scan scan) throws IOException {
2080     if(!scan.hasFamilies()) {
2081       // Adding all families to scanner
2082       for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
2083         scan.addFamily(family);
2084       }
2085     }
2086   }
2087 
2088   protected RegionScanner getScanner(Scan scan,
2089       List<KeyValueScanner> additionalScanners) throws IOException {
2090     startRegionOperation(Operation.SCAN);
2091     try {
2092       // Verify families are all valid
2093       prepareScanner(scan);
2094       if(scan.hasFamilies()) {
2095         for(byte [] family : scan.getFamilyMap().keySet()) {
2096           checkFamily(family);
2097         }
2098       }
2099       return instantiateRegionScanner(scan, additionalScanners);
2100     } finally {
2101       closeRegionOperation(Operation.SCAN);
2102     }
2103   }
2104 
2105   protected RegionScanner instantiateRegionScanner(Scan scan,
2106       List<KeyValueScanner> additionalScanners) throws IOException {
2107     if (scan.isReversed()) {
2108       if (scan.getFilter() != null) {
2109         scan.getFilter().setReversed(true);
2110       }
2111       return new ReversedRegionScannerImpl(scan, additionalScanners, this);
2112     }
2113     return new RegionScannerImpl(scan, additionalScanners, this);
2114   }
2115 
2116   /*
2117    * @param delete The passed delete is modified by this method. WARNING!
2118    */
2119   void prepareDelete(Delete delete) throws IOException {
2120     // Check to see if this is a deleteRow insert
2121     if(delete.getFamilyCellMap().isEmpty()){
2122       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
2123         // Don't eat the timestamp
2124         delete.deleteFamily(family, delete.getTimeStamp());
2125       }
2126     } else {
2127       for(byte [] family : delete.getFamilyCellMap().keySet()) {
2128         if(family == null) {
2129           throw new NoSuchColumnFamilyException("Empty family is invalid");
2130         }
2131         checkFamily(family);
2132       }
2133     }
2134   }
2135 
2136   //////////////////////////////////////////////////////////////////////////////
2137   // set() methods for client use.
2138   //////////////////////////////////////////////////////////////////////////////
2139   /**
2140    * @param delete delete object
2141    * @throws IOException read exceptions
2142    */
2143   public void delete(Delete delete)
2144   throws IOException {
2145     checkReadOnly();
2146     checkResources();
2147     startRegionOperation(Operation.DELETE);
2148     try {
2149       delete.getRow();
2150       // All edits for the given row (across all column families) must happen atomically.
2151       doBatchMutate(delete);
2152     } finally {
2153       closeRegionOperation(Operation.DELETE);
2154     }
2155   }
2156 
2157   /**
2158    * Row needed by below method.
2159    */
2160   private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2161   /**
2162    * This is used only by unit tests. Not required to be a public API.
2163    * @param familyMap map of family to edits for the given family.
2164    * @throws IOException
2165    */
2166   void delete(NavigableMap<byte[], List<Cell>> familyMap,
2167       Durability durability) throws IOException {
2168     Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2169     delete.setFamilyCellMap(familyMap);
2170     delete.setDurability(durability);
2171     doBatchMutate(delete);
2172   }
2173 
2174   /**
2175    * Setup correct timestamps in the KVs in Delete object.
2176    * Caller should have the row and region locks.
2177    * @param mutation
2178    * @param familyMap
2179    * @param byteNow
2180    * @throws IOException
2181    */
2182   void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2183       byte[] byteNow) throws IOException {
2184     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2185 
2186       byte[] family = e.getKey();
2187       List<Cell> cells = e.getValue();
2188       Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2189 
2190       for (Cell cell: cells) {
2191         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2192         //  Check if time is LATEST, change to time of most recent addition if so
2193         //  This is expensive.
2194         if (kv.isLatestTimestamp() && CellUtil.isDeleteType(kv)) {
2195           byte[] qual = CellUtil.cloneQualifier(kv);
2196           if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2197 
2198           Integer count = kvCount.get(qual);
2199           if (count == null) {
2200             kvCount.put(qual, 1);
2201           } else {
2202             kvCount.put(qual, count + 1);
2203           }
2204           count = kvCount.get(qual);
2205 
2206           Get get = new Get(CellUtil.cloneRow(kv));
2207           get.setMaxVersions(count);
2208           get.addColumn(family, qual);
2209           if (coprocessorHost != null) {
2210             if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2211                 byteNow, get)) {
2212               updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
2213             }
2214           } else {
2215             updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
2216           }
2217         } else {
2218           kv.updateLatestStamp(byteNow);
2219         }
2220       }
2221     }
2222   }
2223 
2224   void updateDeleteLatestVersionTimeStamp(KeyValue kv, Get get, int count, byte[] byteNow)
2225       throws IOException {
2226     List<Cell> result = get(get, false);
2227 
2228     if (result.size() < count) {
2229       // Nothing to delete
2230       kv.updateLatestStamp(byteNow);
2231       return;
2232     }
2233     if (result.size() > count) {
2234       throw new RuntimeException("Unexpected size: " + result.size());
2235     }
2236     KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
2237     Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), getkv.getBuffer(),
2238         getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
2239   }
2240 
2241   /**
2242    * @throws IOException
2243    */
2244   public void put(Put put)
2245   throws IOException {
2246     checkReadOnly();
2247 
2248     // Do a rough check that we have resources to accept a write.  The check is
2249     // 'rough' in that between the resource check and the call to obtain a
2250     // read lock, resources may run out.  For now, the thought is that this
2251     // will be extremely rare; we'll deal with it when it happens.
2252     checkResources();
2253     startRegionOperation(Operation.PUT);
2254     try {
2255       // All edits for the given row (across all column families) must happen atomically.
2256       doBatchMutate(put);
2257     } finally {
2258       closeRegionOperation(Operation.PUT);
2259     }
2260   }
2261 
2262   /**
2263    * Struct-like class that tracks the progress of a batch operation,
2264    * accumulating status codes and tracking the index at which processing
2265    * is proceeding.
2266    */
2267   private abstract static class BatchOperationInProgress<T> {
2268     T[] operations;
2269     int nextIndexToProcess = 0;
2270     OperationStatus[] retCodeDetails;
2271     WALEdit[] walEditsFromCoprocessors;
2272 
2273     public BatchOperationInProgress(T[] operations) {
2274       this.operations = operations;
2275       this.retCodeDetails = new OperationStatus[operations.length];
2276       this.walEditsFromCoprocessors = new WALEdit[operations.length];
2277       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2278     }
2279 
2280     public abstract Mutation getMutation(int index);
2281     public abstract long getNonceGroup(int index);
2282     public abstract long getNonce(int index);
2283     /** This method is potentially expensive and should only be used for non-replay CP path. */
2284     public abstract Mutation[] getMutationsForCoprocs();
2285     public abstract boolean isInReplay();
2286     public abstract long getReplaySequenceId();
2287 
2288     public boolean isDone() {
2289       return nextIndexToProcess == operations.length;
2290     }
2291   }
2292 
2293   private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2294     private long nonceGroup;
2295     private long nonce;
2296     public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2297       super(operations);
2298       this.nonceGroup = nonceGroup;
2299       this.nonce = nonce;
2300     }
2301 
2302     @Override
2303     public Mutation getMutation(int index) {
2304       return this.operations[index];
2305     }
2306 
2307     @Override
2308     public long getNonceGroup(int index) {
2309       return nonceGroup;
2310     }
2311 
2312     @Override
2313     public long getNonce(int index) {
2314       return nonce;
2315     }
2316 
2317     @Override
2318     public Mutation[] getMutationsForCoprocs() {
2319       return this.operations;
2320     }
2321 
2322     @Override
2323     public boolean isInReplay() {
2324       return false;
2325     }
2326 
2327     @Override
2328     public long getReplaySequenceId() {
2329       return 0;
2330     }
2331   }
2332 
2333   private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
2334     private long replaySeqId = 0;
2335     public ReplayBatch(MutationReplay[] operations, long seqId) {
2336       super(operations);
2337       this.replaySeqId = seqId;
2338     }
2339 
2340     @Override
2341     public Mutation getMutation(int index) {
2342       return this.operations[index].mutation;
2343     }
2344 
2345     @Override
2346     public long getNonceGroup(int index) {
2347       return this.operations[index].nonceGroup;
2348     }
2349 
2350     @Override
2351     public long getNonce(int index) {
2352       return this.operations[index].nonce;
2353     }
2354 
2355     @Override
2356     public Mutation[] getMutationsForCoprocs() {
2357       assert false;
2358       throw new RuntimeException("Should not be called for replay batch");
2359     }
2360 
2361     @Override
2362     public boolean isInReplay() {
2363       return true;
2364     }
2365 
2366     @Override
2367     public long getReplaySequenceId() {
2368       return this.replaySeqId;
2369     }
2370   }
2371 
2372   /**
2373    * Perform a batch of mutations.
2374    * It supports only Put and Delete mutations and will ignore other types passed.
2375    * @param mutations the list of mutations
2376    * @return an array of OperationStatus which internally contains the
2377    *         OperationStatusCode and the exceptionMessage if any.
2378    * @throws IOException
2379    */
2380   public OperationStatus[] batchMutate(
2381       Mutation[] mutations, long nonceGroup, long nonce) throws IOException {
2382     // As it stands, this is used for 3 things
2383     //  * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
2384     //  * coprocessor calls (see ex. BulkDeleteEndpoint).
2385     // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
2386     return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2387   }
2388 
2389   public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2390     return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2391   }
2392 
2393   /**
2394    * Replay a batch of mutations.
2395    * @param mutations mutations to replay.
2396    * @param replaySeqId SeqId for current mutations
2397    * @return an array of OperationStatus which internally contains the
2398    *         OperationStatusCode and the exceptionMessage if any.
2399    * @throws IOException
2400    */
2401   public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations, long replaySeqId)
2402       throws IOException {
2403     return batchMutate(new ReplayBatch(mutations, replaySeqId));
2404   }
2405 
2406   /**
2407    * Perform a batch of mutations.
2408    * It supports only Put and Delete mutations and will ignore other types passed.
2409    * @param mutations the list of mutations
2410    * @return an array of OperationStatus which internally contains the
2411    *         OperationStatusCode and the exceptionMessage if any.
2412    * @throws IOException
2413    */
2414   OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2415     boolean initialized = false;
2416     while (!batchOp.isDone()) {
2417       if (!batchOp.isInReplay()) {
2418         checkReadOnly();
2419       }
2420       checkResources();
2421 
2422       long newSize;
2423       Operation op = Operation.BATCH_MUTATE;
2424       if (batchOp.isInReplay()) op = Operation.REPLAY_BATCH_MUTATE;
2425       startRegionOperation(op);
2426 
2427       try {
2428         if (!initialized) {
2429           this.writeRequestsCount.add(batchOp.operations.length);
2430           if (!batchOp.isInReplay()) {
2431             doPreMutationHook(batchOp);
2432           }
2433           initialized = true;
2434         }
2435         long addedSize = doMiniBatchMutation(batchOp);
2436         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2437       } finally {
2438         closeRegionOperation(op);
2439       }
2440       if (isFlushSize(newSize)) {
2441         requestFlush();
2442       }
2443     }
2444     return batchOp.retCodeDetails;
2445   }
2446 
2447 
2448   private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2449       throws IOException {
2450     /* Run coprocessor pre hook outside of locks to avoid deadlock */
2451     WALEdit walEdit = new WALEdit();
2452     if (coprocessorHost != null) {
2453       for (int i = 0 ; i < batchOp.operations.length; i++) {
2454         Mutation m = batchOp.getMutation(i);
2455         if (m instanceof Put) {
2456           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2457             // pre hook says skip this Put
2458             // mark as success and skip in doMiniBatchMutation
2459             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2460           }
2461         } else if (m instanceof Delete) {
2462           Delete curDel = (Delete) m;
2463           if (curDel.getFamilyCellMap().isEmpty()) {
2464             // handle deleting a row case
2465             prepareDelete(curDel);
2466           }
2467           if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2468             // pre hook says skip this Delete
2469             // mark as success and skip in doMiniBatchMutation
2470             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2471           }
2472         } else {
2473           // In case of passing Append mutations along with the Puts and Deletes in batchMutate
2474           // mark the operation return code as failure so that it will not be considered in
2475           // the doMiniBatchMutation
2476           batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2477               "Put/Delete mutations only supported in batchMutate() now");
2478         }
2479         if (!walEdit.isEmpty()) {
2480           batchOp.walEditsFromCoprocessors[i] = walEdit;
2481           walEdit = new WALEdit();
2482         }
2483       }
2484     }
2485   }
2486 
2487   @SuppressWarnings("unchecked")
2488   private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2489     boolean isInReplay = batchOp.isInReplay();
2490     // variable to note if all Put items are for the same CF -- metrics related
2491     boolean putsCfSetConsistent = true;
2492     //The set of columnFamilies first seen for Put.
2493     Set<byte[]> putsCfSet = null;
2494     // variable to note if all Delete items are for the same CF -- metrics related
2495     boolean deletesCfSetConsistent = true;
2496     //The set of columnFamilies first seen for Delete.
2497     Set<byte[]> deletesCfSet = null;
2498 
2499     long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2500     WALEdit walEdit = new WALEdit(isInReplay);
2501     MultiVersionConsistencyControl.WriteEntry w = null;
2502     long txid = 0;
2503     boolean doRollBackMemstore = false;
2504     boolean locked = false;
2505 
2506     /** Keep track of the locks we hold so we can release them in finally clause */
2507     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2508     // reference family maps directly so coprocessors can mutate them if desired
2509     Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2510     List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
2511     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
2512     int firstIndex = batchOp.nextIndexToProcess;
2513     int lastIndexExclusive = firstIndex;
2514     boolean success = false;
2515     int noOfPuts = 0, noOfDeletes = 0;
2516     HLogKey walKey = null;
2517     long mvccNum = 0;
2518     try {
2519       // ------------------------------------
2520       // STEP 1. Try to acquire as many locks as we can, and ensure
2521       // we acquire at least one.
2522       // ----------------------------------
2523       int numReadyToWrite = 0;
2524       long now = EnvironmentEdgeManager.currentTimeMillis();
2525       while (lastIndexExclusive < batchOp.operations.length) {
2526         Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2527         boolean isPutMutation = mutation instanceof Put;
2528 
2529         Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2530         // store the family map reference to allow for mutations
2531         familyMaps[lastIndexExclusive] = familyMap;
2532 
2533         // skip anything that "ran" already
2534         if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2535             != OperationStatusCode.NOT_RUN) {
2536           lastIndexExclusive++;
2537           continue;
2538         }
2539 
2540         try {
2541           if (isPutMutation) {
2542             // Check the families in the put. If bad, skip this one.
2543             if (isInReplay) {
2544               removeNonExistentColumnFamilyForReplay(familyMap);
2545             } else {
2546               checkFamilies(familyMap.keySet());
2547             }
2548             checkTimestamps(mutation.getFamilyCellMap(), now);
2549           } else {
2550             prepareDelete((Delete) mutation);
2551           }
2552         } catch (NoSuchColumnFamilyException nscf) {
2553           LOG.warn("No such column family in batch mutation", nscf);
2554           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2555               OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2556           lastIndexExclusive++;
2557           continue;
2558         } catch (FailedSanityCheckException fsce) {
2559           LOG.warn("Batch Mutation did not pass sanity check", fsce);
2560           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2561               OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2562           lastIndexExclusive++;
2563           continue;
2564         }
2565 
2566         // If we haven't got any rows in our batch, we should block to
2567         // get the next one.
2568         boolean shouldBlock = numReadyToWrite == 0;
2569         RowLock rowLock = null;
2570         try {
2571           rowLock = getRowLock(mutation.getRow(), shouldBlock);
2572         } catch (IOException ioe) {
2573           LOG.warn("Failed getting lock in batch put, row="
2574             + Bytes.toStringBinary(mutation.getRow()), ioe);
2575         }
2576         if (rowLock == null) {
2577           // We failed to grab another lock
2578           assert !shouldBlock : "Should never fail to get lock when blocking";
2579           break; // stop acquiring more rows for this batch
2580         } else {
2581           acquiredRowLocks.add(rowLock);
2582         }
2583 
2584         lastIndexExclusive++;
2585         numReadyToWrite++;
2586 
2587         if (isPutMutation) {
2588           // If Column Families stay consistent through out all of the
2589           // individual puts then metrics can be reported as a mutliput across
2590           // column families in the first put.
2591           if (putsCfSet == null) {
2592             putsCfSet = mutation.getFamilyCellMap().keySet();
2593           } else {
2594             putsCfSetConsistent = putsCfSetConsistent
2595                 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
2596           }
2597         } else {
2598           if (deletesCfSet == null) {
2599             deletesCfSet = mutation.getFamilyCellMap().keySet();
2600           } else {
2601             deletesCfSetConsistent = deletesCfSetConsistent
2602                 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
2603           }
2604         }
2605       }
2606 
2607       // we should record the timestamp only after we have acquired the rowLock,
2608       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
2609       now = EnvironmentEdgeManager.currentTimeMillis();
2610       byte[] byteNow = Bytes.toBytes(now);
2611 
2612       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
2613       if (numReadyToWrite <= 0) return 0L;
2614 
2615       // We've now grabbed as many mutations off the list as we can
2616 
2617       // ------------------------------------
2618       // STEP 2. Update any LATEST_TIMESTAMP timestamps
2619       // ----------------------------------
2620       for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
2621         // skip invalid
2622         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2623             != OperationStatusCode.NOT_RUN) continue;
2624 
2625         Mutation mutation = batchOp.getMutation(i);
2626         if (mutation instanceof Put) {
2627           updateKVTimestamps(familyMaps[i].values(), byteNow);
2628           noOfPuts++;
2629         } else {
2630           prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
2631           noOfDeletes++;
2632         }
2633       }
2634 
2635       lock(this.updatesLock.readLock(), numReadyToWrite);
2636       locked = true;
2637       if(isInReplay) {
2638         mvccNum = batchOp.getReplaySequenceId();
2639       } else {
2640         mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
2641       }
2642       //
2643       // ------------------------------------
2644       // Acquire the latest mvcc number
2645       // ----------------------------------
2646       w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
2647 
2648       // calling the pre CP hook for batch mutation
2649       if (!isInReplay && coprocessorHost != null) {
2650         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2651           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2652           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2653         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2654       }
2655 
2656       // ------------------------------------
2657       // STEP 3. Write back to memstore
2658       // Write to memstore. It is ok to write to memstore
2659       // first without updating the HLog because we do not roll
2660       // forward the memstore MVCC. The MVCC will be moved up when
2661       // the complete operation is done. These changes are not yet
2662       // visible to scanners till we update the MVCC. The MVCC is
2663       // moved only when the sync is complete.
2664       // ----------------------------------
2665       long addedSize = 0;
2666       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2667         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2668             != OperationStatusCode.NOT_RUN) {
2669           continue;
2670         }
2671         doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
2672         addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells);
2673       }
2674 
2675       // ------------------------------------
2676       // STEP 4. Build WAL edit
2677       // ----------------------------------
2678       Durability durability = Durability.USE_DEFAULT;
2679       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2680         // Skip puts that were determined to be invalid during preprocessing
2681         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2682             != OperationStatusCode.NOT_RUN) {
2683           continue;
2684         }
2685         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2686 
2687         Mutation m = batchOp.getMutation(i);
2688         Durability tmpDur = getEffectiveDurability(m.getDurability());
2689         if (tmpDur.ordinal() > durability.ordinal()) {
2690           durability = tmpDur;
2691         }
2692         if (tmpDur == Durability.SKIP_WAL) {
2693           recordMutationWithoutWal(m.getFamilyCellMap());
2694           continue;
2695         }
2696 
2697         long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
2698         // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
2699         // Given how nonces are originally written, these should be contiguous.
2700         // They don't have to be, it will still work, just write more WALEdits than needed.
2701         if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
2702           if (walEdit.size() > 0) {
2703             assert isInReplay;
2704             if (!isInReplay) {
2705               throw new IOException("Multiple nonces per batch and not in replay");
2706             }
2707             // txid should always increase, so having the one from the last call is ok.
2708             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2709               this.htableDescriptor.getTableName(), now, m.getClusterIds(),
2710               currentNonceGroup, currentNonce);
2711             txid = this.log.appendNoSync(this.htableDescriptor,  this.getRegionInfo(),  walKey,
2712               walEdit, getSequenceId(), true, null);
2713             walEdit = new WALEdit(isInReplay);
2714             walKey = null;
2715           }
2716           currentNonceGroup = nonceGroup;
2717           currentNonce = nonce;
2718         }
2719 
2720         // Add WAL edits by CP
2721         WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2722         if (fromCP != null) {
2723           for (KeyValue kv : fromCP.getKeyValues()) {
2724             walEdit.add(kv);
2725           }
2726         }
2727         addFamilyMapToWALEdit(familyMaps[i], walEdit);
2728       }
2729 
2730       // -------------------------
2731       // STEP 5. Append the final edit to WAL. Do not sync wal.
2732       // -------------------------
2733       Mutation mutation = batchOp.getMutation(firstIndex);
2734       if (walEdit.size() > 0) {
2735         walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2736             this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
2737             mutation.getClusterIds(), currentNonceGroup, currentNonce);
2738         if(isInReplay) {
2739           walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
2740         }
2741         txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
2742           getSequenceId(), true, memstoreCells);
2743       }
2744       if(walKey == null){
2745         // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
2746         walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
2747       }
2748 
2749       // -------------------------------
2750       // STEP 6. Release row locks, etc.
2751       // -------------------------------
2752       if (locked) {
2753         this.updatesLock.readLock().unlock();
2754         locked = false;
2755       }
2756       releaseRowLocks(acquiredRowLocks);
2757 
2758       // -------------------------
2759       // STEP 7. Sync wal.
2760       // -------------------------
2761       if (txid != 0) {
2762         syncOrDefer(txid, durability);
2763       }
2764 
2765       doRollBackMemstore = false;
2766       // calling the post CP hook for batch mutation
2767       if (!isInReplay && coprocessorHost != null) {
2768         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2769           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2770           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2771         coprocessorHost.postBatchMutate(miniBatchOp);
2772       }
2773 
2774       // ------------------------------------------------------------------
2775       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
2776       // ------------------------------------------------------------------
2777       if (w != null) {
2778         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2779         w = null;
2780       }
2781 
2782       // ------------------------------------
2783       // STEP 9. Run coprocessor post hooks. This should be done after the wal is
2784       // synced so that the coprocessor contract is adhered to.
2785       // ------------------------------------
2786       if (!isInReplay && coprocessorHost != null) {
2787         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2788           // only for successful puts
2789           if (batchOp.retCodeDetails[i].getOperationStatusCode()
2790               != OperationStatusCode.SUCCESS) {
2791             continue;
2792           }
2793           Mutation m = batchOp.getMutation(i);
2794           if (m instanceof Put) {
2795             coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
2796           } else {
2797             coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
2798           }
2799         }
2800       }
2801 
2802       success = true;
2803       return addedSize;
2804     } finally {
2805 
2806       // if the wal sync was unsuccessful, remove keys from memstore
2807       if (doRollBackMemstore) {
2808         rollbackMemstore(memstoreCells);
2809       }
2810       if (w != null) {
2811         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2812       }
2813 
2814       if (locked) {
2815         this.updatesLock.readLock().unlock();
2816       }
2817       releaseRowLocks(acquiredRowLocks);
2818 
2819       // See if the column families were consistent through the whole thing.
2820       // if they were then keep them. If they were not then pass a null.
2821       // null will be treated as unknown.
2822       // Total time taken might be involving Puts and Deletes.
2823       // Split the time for puts and deletes based on the total number of Puts and Deletes.
2824 
2825       if (noOfPuts > 0) {
2826         // There were some Puts in the batch.
2827         if (this.metricsRegion != null) {
2828           this.metricsRegion.updatePut();
2829         }
2830       }
2831       if (noOfDeletes > 0) {
2832         // There were some Deletes in the batch.
2833         if (this.metricsRegion != null) {
2834           this.metricsRegion.updateDelete();
2835         }
2836       }
2837       if (!success) {
2838         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2839           if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
2840             batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
2841           }
2842         }
2843       }
2844       if (coprocessorHost != null && !batchOp.isInReplay()) {
2845         // call the coprocessor hook to do any finalization steps
2846         // after the put is done
2847         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2848             new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2849                 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
2850                 lastIndexExclusive);
2851         coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
2852       }
2853 
2854       batchOp.nextIndexToProcess = lastIndexExclusive;
2855     }
2856   }
2857 
2858   /**
2859    * Returns effective durability from the passed durability and
2860    * the table descriptor.
2861    */
2862   protected Durability getEffectiveDurability(Durability d) {
2863     return d == Durability.USE_DEFAULT ? this.durability : d;
2864   }
2865 
2866   //TODO, Think that gets/puts and deletes should be refactored a bit so that
2867   //the getting of the lock happens before, so that you would just pass it into
2868   //the methods. So in the case of checkAndMutate you could just do lockRow,
2869   //get, put, unlockRow or something
2870   /**
2871    *
2872    * @throws IOException
2873    * @return true if the new put was executed, false otherwise
2874    */
2875   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
2876       CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
2877       boolean writeToWAL)
2878   throws IOException{
2879     checkReadOnly();
2880     //TODO, add check for value length or maybe even better move this to the
2881     //client if this becomes a global setting
2882     checkResources();
2883     boolean isPut = w instanceof Put;
2884     if (!isPut && !(w instanceof Delete))
2885       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
2886           "be Put or Delete");
2887     if (!Bytes.equals(row, w.getRow())) {
2888       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
2889           "getRow must match the passed row");
2890     }
2891 
2892     startRegionOperation();
2893     try {
2894       Get get = new Get(row);
2895       checkFamily(family);
2896       get.addColumn(family, qualifier);
2897 
2898       // Lock row - note that doBatchMutate will relock this row if called
2899       RowLock rowLock = getRowLock(get.getRow());
2900       // wait for all previous transactions to complete (with lock held)
2901       mvcc.waitForPreviousTransactionsComplete();
2902       try {
2903         if (this.getCoprocessorHost() != null) {
2904           Boolean processed = null;
2905           if (w instanceof Put) {
2906             processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
2907                 qualifier, compareOp, comparator, (Put) w);
2908           } else if (w instanceof Delete) {
2909             processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
2910                 qualifier, compareOp, comparator, (Delete) w);
2911           }
2912           if (processed != null) {
2913             return processed;
2914           }
2915         }
2916         List<Cell> result = get(get, false);
2917 
2918         boolean valueIsNull = comparator.getValue() == null ||
2919           comparator.getValue().length == 0;
2920         boolean matches = false;
2921         if (result.size() == 0 && valueIsNull) {
2922           matches = true;
2923         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
2924             valueIsNull) {
2925           matches = true;
2926         } else if (result.size() == 1 && !valueIsNull) {
2927           Cell kv = result.get(0);
2928           int compareResult = comparator.compareTo(kv.getValueArray(),
2929               kv.getValueOffset(), kv.getValueLength());
2930           switch (compareOp) {
2931           case LESS:
2932             matches = compareResult < 0;
2933             break;
2934           case LESS_OR_EQUAL:
2935             matches = compareResult <= 0;
2936             break;
2937           case EQUAL:
2938             matches = compareResult == 0;
2939             break;
2940           case NOT_EQUAL:
2941             matches = compareResult != 0;
2942             break;
2943           case GREATER_OR_EQUAL:
2944             matches = compareResult >= 0;
2945             break;
2946           case GREATER:
2947             matches = compareResult > 0;
2948             break;
2949           default:
2950             throw new RuntimeException("Unknown Compare op " + compareOp.name());
2951           }
2952         }
2953         //If matches put the new put or delete the new delete
2954         if (matches) {
2955           // All edits for the given row (across all column families) must
2956           // happen atomically.
2957           doBatchMutate(w);
2958           this.checkAndMutateChecksPassed.increment();
2959           return true;
2960         }
2961         this.checkAndMutateChecksFailed.increment();
2962         return false;
2963       } finally {
2964         rowLock.release();
2965       }
2966     } finally {
2967       closeRegionOperation();
2968     }
2969   }
2970 
2971   private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
2972     // Currently this is only called for puts and deletes, so no nonces.
2973     OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
2974         HConstants.NO_NONCE, HConstants.NO_NONCE);
2975     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
2976       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
2977     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
2978       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
2979     }
2980   }
2981 
2982   /**
2983    * Complete taking the snapshot on the region. Writes the region info and adds references to the
2984    * working snapshot directory.
2985    *
2986    * TODO for api consistency, consider adding another version with no {@link ForeignExceptionSnare}
2987    * arg.  (In the future other cancellable HRegion methods could eventually add a
2988    * {@link ForeignExceptionSnare}, or we could do something fancier).
2989    *
2990    * @param desc snapshot description object
2991    * @param exnSnare ForeignExceptionSnare that captures external exceptions in case we need to
2992    *   bail out.  This is allowed to be null and will just be ignored in that case.
2993    * @throws IOException if there is an external or internal error causing the snapshot to fail
2994    */
2995   public void addRegionToSnapshot(SnapshotDescription desc,
2996       ForeignExceptionSnare exnSnare) throws IOException {
2997     Path rootDir = FSUtils.getRootDir(conf);
2998     Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
2999 
3000     SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
3001                                                         snapshotDir, desc, exnSnare);
3002     manifest.addRegion(this);
3003   }
3004 
3005   /**
3006    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the
3007    * provided current timestamp.
3008    */
3009   void updateKVTimestamps(final Iterable<List<Cell>> keyLists, final byte[] now) {
3010     for (List<Cell> cells: keyLists) {
3011       if (cells == null) continue;
3012       for (Cell cell : cells) {
3013         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
3014         kv.updateLatestStamp(now);
3015       }
3016     }
3017   }
3018 
3019   /*
3020    * Check if resources to support an update.
3021    *
3022    * We throw RegionTooBusyException if above memstore limit
3023    * and expect client to retry using some kind of backoff
3024   */
3025   private void checkResources()
3026     throws RegionTooBusyException {
3027     // If catalog region, do not impose resource constraints or block updates.
3028     if (this.getRegionInfo().isMetaRegion()) return;
3029 
3030     if (this.memstoreSize.get() > this.blockingMemStoreSize) {
3031       requestFlush();
3032       throw new RegionTooBusyException("Above memstore limit, " +
3033           "regionName=" + (this.getRegionInfo() == null ? "unknown" :
3034           this.getRegionInfo().getRegionNameAsString()) +
3035           ", server=" + (this.getRegionServerServices() == null ? "unknown" :
3036           this.getRegionServerServices().getServerName()) +
3037           ", memstoreSize=" + memstoreSize.get() +
3038           ", blockingMemStoreSize=" + blockingMemStoreSize);
3039     }
3040   }
3041 
3042   /**
3043    * @throws IOException Throws exception if region is in read-only mode.
3044    */
3045   protected void checkReadOnly() throws IOException {
3046     if (this.writestate.isReadOnly()) {
3047       throw new IOException("region is read only");
3048     }
3049   }
3050 
3051   protected void checkReadsEnabled() throws IOException {
3052     if (!this.writestate.readsEnabled) {
3053       throw new IOException ("The region's reads are disabled. Cannot serve the request");
3054     }
3055   }
3056 
3057   public void setReadsEnabled(boolean readsEnabled) {
3058     this.writestate.setReadsEnabled(readsEnabled);
3059   }
3060 
3061   /**
3062    * Add updates first to the hlog and then add values to memstore.
3063    * Warning: Assumption is caller has lock on passed in row.
3064    * @param edits Cell updates by column
3065    * @throws IOException
3066    */
3067   private void put(final byte [] row, byte [] family, List<Cell> edits)
3068   throws IOException {
3069     NavigableMap<byte[], List<Cell>> familyMap;
3070     familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
3071 
3072     familyMap.put(family, edits);
3073     Put p = new Put(row);
3074     p.setFamilyCellMap(familyMap);
3075     doBatchMutate(p);
3076   }
3077 
3078   /**
3079    * Atomically apply the given map of family->edits to the memstore.
3080    * This handles the consistency control on its own, but the caller
3081    * should already have locked updatesLock.readLock(). This also does
3082    * <b>not</b> check the families for validity.
3083    *
3084    * @param familyMap Map of kvs per family
3085    * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
3086    *        If null, then this method internally creates a mvcc transaction.
3087    * @param output newly added KVs into memstore
3088    * @return the additional memory usage of the memstore caused by the
3089    * new entries.
3090    */
3091   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
3092     long mvccNum, List<KeyValue> memstoreCells) {
3093     long size = 0;
3094 
3095     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3096       byte[] family = e.getKey();
3097       List<Cell> cells = e.getValue();
3098 
3099       Store store = getStore(family);
3100       for (Cell cell: cells) {
3101         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
3102         kv.setSequenceId(mvccNum);
3103         Pair<Long, Cell> ret = store.add(kv);
3104         size += ret.getFirst();
3105         memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
3106       }
3107     }
3108 
3109      return size;
3110    }
3111 
3112   /**
3113    * Remove all the keys listed in the map from the memstore. This method is
3114    * called when a Put/Delete has updated memstore but subsequently fails to update
3115    * the wal. This method is then invoked to rollback the memstore.
3116    */
3117   private void rollbackMemstore(List<KeyValue> memstoreCells) {
3118     int kvsRolledback = 0;
3119 
3120     for (KeyValue kv : memstoreCells) {
3121       byte[] family = kv.getFamily();
3122       Store store = getStore(family);
3123       store.rollback(kv);
3124       kvsRolledback++;
3125     }
3126     LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
3127   }
3128 
3129   /**
3130    * Check the collection of families for validity.
3131    * @throws NoSuchColumnFamilyException if a family does not exist.
3132    */
3133   void checkFamilies(Collection<byte[]> families)
3134   throws NoSuchColumnFamilyException {
3135     for (byte[] family : families) {
3136       checkFamily(family);
3137     }
3138   }
3139 
3140   /**
3141    * During replay, there could exist column families which are removed between region server
3142    * failure and replay
3143    */
3144   private void removeNonExistentColumnFamilyForReplay(
3145       final Map<byte[], List<Cell>> familyMap) {
3146     List<byte[]> nonExistentList = null;
3147     for (byte[] family : familyMap.keySet()) {
3148       if (!this.htableDescriptor.hasFamily(family)) {
3149         if (nonExistentList == null) {
3150           nonExistentList = new ArrayList<byte[]>();
3151         }
3152         nonExistentList.add(family);
3153       }
3154     }
3155     if (nonExistentList != null) {
3156       for (byte[] family : nonExistentList) {
3157         // Perhaps schema was changed between crash and replay
3158         LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3159         familyMap.remove(family);
3160       }
3161     }
3162   }
3163 
3164   void checkTimestamps(final Map<byte[], List<Cell>> familyMap,
3165       long now) throws FailedSanityCheckException {
3166     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3167       return;
3168     }
3169     long maxTs = now + timestampSlop;
3170     for (List<Cell> kvs : familyMap.values()) {
3171       for (Cell cell : kvs) {
3172         // see if the user-side TS is out of range. latest = server-side
3173         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
3174         if (!kv.isLatestTimestamp() && kv.getTimestamp() > maxTs) {
3175           throw new FailedSanityCheckException("Timestamp for KV out of range "
3176               + cell + " (too.new=" + timestampSlop + ")");
3177         }
3178       }
3179     }
3180   }
3181 
3182   /**
3183    * Append the given map of family->edits to a WALEdit data structure.
3184    * This does not write to the HLog itself.
3185    * @param familyMap map of family->edits
3186    * @param walEdit the destination entry to append into
3187    */
3188   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3189       WALEdit walEdit) {
3190     for (List<Cell> edits : familyMap.values()) {
3191       for (Cell cell : edits) {
3192         walEdit.add(KeyValueUtil.ensureKeyValue(cell));
3193       }
3194     }
3195   }
3196 
3197   private void requestFlush() {
3198     if (this.rsServices == null) {
3199       return;
3200     }
3201     synchronized (writestate) {
3202       if (this.writestate.isFlushRequested()) {
3203         return;
3204       }
3205       writestate.flushRequested = true;
3206     }
3207     // Make request outside of synchronize block; HBASE-818.
3208     this.rsServices.getFlushRequester().requestFlush(this);
3209     if (LOG.isDebugEnabled()) {
3210       LOG.debug("Flush requested on " + this);
3211     }
3212   }
3213 
3214   /*
3215    * @param size
3216    * @return True if size is over the flush threshold
3217    */
3218   private boolean isFlushSize(final long size) {
3219     return size > this.memstoreFlushSize;
3220   }
3221 
3222   /**
3223    * Read the edits log put under this region by wal log splitting process.  Put
3224    * the recovered edits back up into this region.
3225    *
3226    * <p>We can ignore any log message that has a sequence ID that's equal to or
3227    * lower than minSeqId.  (Because we know such log messages are already
3228    * reflected in the HFiles.)
3229    *
3230    * <p>While this is running we are putting pressure on memory yet we are
3231    * outside of our usual accounting because we are not yet an onlined region
3232    * (this stuff is being run as part of Region initialization).  This means
3233    * that if we're up against global memory limits, we'll not be flagged to flush
3234    * because we are not online. We can't be flushed by usual mechanisms anyways;
3235    * we're not yet online so our relative sequenceids are not yet aligned with
3236    * HLog sequenceids -- not till we come up online, post processing of split
3237    * edits.
3238    *
3239    * <p>But to help relieve memory pressure, at least manage our own heap size
3240    * flushing if are in excess of per-region limits.  Flushing, though, we have
3241    * to be careful and avoid using the regionserver/hlog sequenceid.  Its running
3242    * on a different line to whats going on in here in this region context so if we
3243    * crashed replaying these edits, but in the midst had a flush that used the
3244    * regionserver log with a sequenceid in excess of whats going on in here
3245    * in this region and with its split editlogs, then we could miss edits the
3246    * next time we go to recover. So, we have to flush inline, using seqids that
3247    * make sense in a this single region context only -- until we online.
3248    *
3249    * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
3250    * the maxSeqId for the store to be applied, else its skipped.
3251    * @return the sequence id of the last edit added to this region out of the
3252    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3253    * @throws UnsupportedEncodingException
3254    * @throws IOException
3255    */
3256   protected long replayRecoveredEditsIfAny(final Path regiondir,
3257       Map<byte[], Long> maxSeqIdInStores,
3258       final CancelableProgressable reporter, final MonitoredTask status)
3259       throws UnsupportedEncodingException, IOException {
3260     long minSeqIdForTheRegion = -1;
3261     for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3262       if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3263         minSeqIdForTheRegion = maxSeqIdInStore;
3264       }
3265     }
3266     long seqid = minSeqIdForTheRegion;
3267 
3268     FileSystem fs = this.fs.getFileSystem();
3269     NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
3270     if (LOG.isDebugEnabled()) {
3271       LOG.debug("Found " + (files == null ? 0 : files.size())
3272         + " recovered edits file(s) under " + regiondir);
3273     }
3274 
3275     if (files == null || files.isEmpty()) return seqid;
3276 
3277     for (Path edits: files) {
3278       if (edits == null || !fs.exists(edits)) {
3279         LOG.warn("Null or non-existent edits file: " + edits);
3280         continue;
3281       }
3282       if (isZeroLengthThenDelete(fs, edits)) continue;
3283 
3284       long maxSeqId;
3285       String fileName = edits.getName();
3286       maxSeqId = Math.abs(Long.parseLong(fileName));
3287       if (maxSeqId <= minSeqIdForTheRegion) {
3288         if (LOG.isDebugEnabled()) {
3289           String msg = "Maximum sequenceid for this log is " + maxSeqId
3290             + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3291             + ", skipped the whole file, path=" + edits;
3292           LOG.debug(msg);
3293         }
3294         continue;
3295       }
3296 
3297       try {
3298         // replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater
3299         seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3300       } catch (IOException e) {
3301         boolean skipErrors = conf.getBoolean(
3302             HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3303             conf.getBoolean(
3304                 "hbase.skip.errors",
3305                 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3306         if (conf.get("hbase.skip.errors") != null) {
3307           LOG.warn(
3308               "The property 'hbase.skip.errors' has been deprecated. Please use " +
3309               HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3310         }
3311         if (skipErrors) {
3312           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3313           LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3314               + "=true so continuing. Renamed " + edits +
3315               " as " + p, e);
3316         } else {
3317           throw e;
3318         }
3319       }
3320     }
3321     // The edits size added into rsAccounting during this replaying will not
3322     // be required any more. So just clear it.
3323     if (this.rsAccounting != null) {
3324       this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
3325     }
3326     if (seqid > minSeqIdForTheRegion) {
3327       // Then we added some edits to memory. Flush and cleanup split edit files.
3328       internalFlushcache(null, seqid, status);
3329     }
3330     // Now delete the content of recovered edits.  We're done w/ them.
3331     for (Path file: files) {
3332       if (!fs.delete(file, false)) {
3333         LOG.error("Failed delete of " + file);
3334       } else {
3335         LOG.debug("Deleted recovered.edits file=" + file);
3336       }
3337     }
3338     return seqid;
3339   }
3340 
3341   /*
3342    * @param edits File of recovered edits.
3343    * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in log
3344    * must be larger than this to be replayed for each store.
3345    * @param reporter
3346    * @return the sequence id of the last edit added to this region out of the
3347    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3348    * @throws IOException
3349    */
3350   private long replayRecoveredEdits(final Path edits,
3351       Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3352     throws IOException {
3353     String msg = "Replaying edits from " + edits;
3354     LOG.info(msg);
3355     MonitoredTask status = TaskMonitor.get().createStatus(msg);
3356     FileSystem fs = this.fs.getFileSystem();
3357 
3358     status.setStatus("Opening logs");
3359     HLog.Reader reader = null;
3360     try {
3361       reader = HLogFactory.createReader(fs, edits, conf);
3362       long currentEditSeqId = -1;
3363       long currentReplaySeqId = -1;
3364       long firstSeqIdInLog = -1;
3365       long skippedEdits = 0;
3366       long editsCount = 0;
3367       long intervalEdits = 0;
3368       HLog.Entry entry;
3369       Store store = null;
3370       boolean reported_once = false;
3371       ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3372 
3373       try {
3374         // How many edits seen before we check elapsed time
3375         int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
3376             2000);
3377         // How often to send a progress report (default 1/2 master timeout)
3378         int period = this.conf.getInt("hbase.hstore.report.period", 300000);
3379         long lastReport = EnvironmentEdgeManager.currentTimeMillis();
3380 
3381         while ((entry = reader.next()) != null) {
3382           HLogKey key = entry.getKey();
3383           WALEdit val = entry.getEdit();
3384 
3385           if (ng != null) { // some test, or nonces disabled
3386             ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
3387           }
3388 
3389           if (reporter != null) {
3390             intervalEdits += val.size();
3391             if (intervalEdits >= interval) {
3392               // Number of edits interval reached
3393               intervalEdits = 0;
3394               long cur = EnvironmentEdgeManager.currentTimeMillis();
3395               if (lastReport + period <= cur) {
3396                 status.setStatus("Replaying edits..." +
3397                     " skipped=" + skippedEdits +
3398                     " edits=" + editsCount);
3399                 // Timeout reached
3400                 if(!reporter.progress()) {
3401                   msg = "Progressable reporter failed, stopping replay";
3402                   LOG.warn(msg);
3403                   status.abort(msg);
3404                   throw new IOException(msg);
3405                 }
3406                 reported_once = true;
3407                 lastReport = cur;
3408               }
3409             }
3410           }
3411 
3412           // Start coprocessor replay here. The coprocessor is for each WALEdit
3413           // instead of a KeyValue.
3414           if (coprocessorHost != null) {
3415             status.setStatus("Running pre-WAL-restore hook in coprocessors");
3416             if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
3417               // if bypass this log entry, ignore it ...
3418               continue;
3419             }
3420           }
3421 
3422           if (firstSeqIdInLog == -1) {
3423             firstSeqIdInLog = key.getLogSeqNum();
3424           }
3425           currentEditSeqId = key.getLogSeqNum();
3426           currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
3427             key.getOrigLogSeqNum() : currentEditSeqId;
3428           boolean flush = false;
3429           for (KeyValue kv: val.getKeyValues()) {
3430             // Check this edit is for me. Also, guard against writing the special
3431             // METACOLUMN info such as HBASE::CACHEFLUSH entries
3432             if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY) ||
3433                 !Bytes.equals(key.getEncodedRegionName(),
3434                   this.getRegionInfo().getEncodedNameAsBytes())) {
3435               //this is a special edit, we should handle it
3436               CompactionDescriptor compaction = WALEdit.getCompaction(kv);
3437               if (compaction != null) {
3438                 //replay the compaction
3439                 completeCompactionMarker(compaction);
3440               }
3441 
3442               skippedEdits++;
3443               continue;
3444             }
3445             // Figure which store the edit is meant for.
3446             if (store == null || !CellUtil.matchingFamily(kv, store.getFamily().getName())) {
3447               store = getStore(kv);
3448             }
3449             if (store == null) {
3450               // This should never happen.  Perhaps schema was changed between
3451               // crash and redeploy?
3452               LOG.warn("No family for " + kv);
3453               skippedEdits++;
3454               continue;
3455             }
3456             // Now, figure if we should skip this edit.
3457             if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
3458                 .getName())) {
3459               skippedEdits++;
3460               continue;
3461             }
3462             kv.setSequenceId(currentReplaySeqId);
3463             // Once we are over the limit, restoreEdit will keep returning true to
3464             // flush -- but don't flush until we've played all the kvs that make up
3465             // the WALEdit.
3466             flush = restoreEdit(store, kv);
3467             editsCount++;
3468           }
3469           if (flush) internalFlushcache(null, currentEditSeqId, status);
3470 
3471           if (coprocessorHost != null) {
3472             coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3473           }
3474         }
3475       } catch (EOFException eof) {
3476         Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3477         msg = "Encountered EOF. Most likely due to Master failure during " +
3478             "log splitting, so we have this data in another edit.  " +
3479             "Continuing, but renaming " + edits + " as " + p;
3480         LOG.warn(msg, eof);
3481         status.abort(msg);
3482       } catch (IOException ioe) {
3483         // If the IOE resulted from bad file format,
3484         // then this problem is idempotent and retrying won't help
3485         if (ioe.getCause() instanceof ParseException) {
3486           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3487           msg = "File corruption encountered!  " +
3488               "Continuing, but renaming " + edits + " as " + p;
3489           LOG.warn(msg, ioe);
3490           status.setStatus(msg);
3491         } else {
3492           status.abort(StringUtils.stringifyException(ioe));
3493           // other IO errors may be transient (bad network connection,
3494           // checksum exception on one datanode, etc).  throw & retry
3495           throw ioe;
3496         }
3497       }
3498       if (reporter != null && !reported_once) {
3499         reporter.progress();
3500       }
3501       msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3502         ", firstSequenceIdInLog=" + firstSeqIdInLog +
3503         ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
3504       status.markComplete(msg);
3505       LOG.debug(msg);
3506       return currentEditSeqId;
3507     } finally {
3508       status.cleanup();
3509       if (reader != null) {
3510          reader.close();
3511       }
3512     }
3513   }
3514 
3515   /**
3516    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
3517    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
3518    * See HBASE-2331.
3519    */
3520   void completeCompactionMarker(CompactionDescriptor compaction)
3521       throws IOException {
3522     Store store = this.getStore(compaction.getFamilyName().toByteArray());
3523     if (store == null) {
3524       LOG.warn("Found Compaction WAL edit for deleted family:" +
3525           Bytes.toString(compaction.getFamilyName().toByteArray()));
3526       return;
3527     }
3528     store.completeCompactionMarker(compaction);
3529   }
3530 
3531   /**
3532    * Used by tests
3533    * @param s Store to add edit too.
3534    * @param kv KeyValue to add.
3535    * @return True if we should flush.
3536    */
3537   protected boolean restoreEdit(final Store s, final KeyValue kv) {
3538     long kvSize = s.add(kv).getFirst();
3539     if (this.rsAccounting != null) {
3540       rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
3541     }
3542     return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3543   }
3544 
3545   /*
3546    * @param fs
3547    * @param p File to check.
3548    * @return True if file was zero-length (and if so, we'll delete it in here).
3549    * @throws IOException
3550    */
3551   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3552       throws IOException {
3553     FileStatus stat = fs.getFileStatus(p);
3554     if (stat.getLen() > 0) return false;
3555     LOG.warn("File " + p + " is zero-length, deleting.");
3556     fs.delete(p, false);
3557     return true;
3558   }
3559 
3560   protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
3561     return new HStore(this, family, this.conf);
3562   }
3563 
3564   /**
3565    * Return HStore instance.
3566    * Use with caution.  Exposed for use of fixup utilities.
3567    * @param column Name of column family hosted by this region.
3568    * @return Store that goes with the family on passed <code>column</code>.
3569    * TODO: Make this lookup faster.
3570    */
3571   public Store getStore(final byte[] column) {
3572     return this.stores.get(column);
3573   }
3574 
3575   /**
3576    * Return HStore instance. Does not do any copy: as the number of store is limited, we
3577    *  iterate on the list.
3578    */
3579   private Store getStore(Cell cell) {
3580     for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
3581       if (Bytes.equals(
3582           cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3583           famStore.getKey(), 0, famStore.getKey().length)) {
3584         return famStore.getValue();
3585       }
3586     }
3587 
3588     return null;
3589   }
3590 
3591   public Map<byte[], Store> getStores() {
3592     return this.stores;
3593   }
3594 
3595   /**
3596    * Return list of storeFiles for the set of CFs.
3597    * Uses closeLock to prevent the race condition where a region closes
3598    * in between the for loop - closing the stores one by one, some stores
3599    * will return 0 files.
3600    * @return List of storeFiles.
3601    */
3602   public List<String> getStoreFileList(final byte [][] columns)
3603     throws IllegalArgumentException {
3604     List<String> storeFileNames = new ArrayList<String>();
3605     synchronized(closeLock) {
3606       for(byte[] column : columns) {
3607         Store store = this.stores.get(column);
3608         if (store == null) {
3609           throw new IllegalArgumentException("No column family : " +
3610               new String(column) + " available");
3611         }
3612         for (StoreFile storeFile: store.getStorefiles()) {
3613           storeFileNames.add(storeFile.getPath().toString());
3614         }
3615       }
3616     }
3617     return storeFileNames;
3618   }
3619 
3620   //////////////////////////////////////////////////////////////////////////////
3621   // Support code
3622   //////////////////////////////////////////////////////////////////////////////
3623 
3624   /** Make sure this is a valid row for the HRegion */
3625   void checkRow(final byte [] row, String op) throws IOException {
3626     if (!rowIsInRange(getRegionInfo(), row)) {
3627       throw new WrongRegionException("Requested row out of range for " +
3628           op + " on HRegion " + this + ", startKey='" +
3629           Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" +
3630           Bytes.toStringBinary(getEndKey()) + "', row='" +
3631           Bytes.toStringBinary(row) + "'");
3632     }
3633   }
3634 
3635   /**
3636    * Tries to acquire a lock on the given row.
3637    * @param waitForLock if true, will block until the lock is available.
3638    *        Otherwise, just tries to obtain the lock and returns
3639    *        false if unavailable.
3640    * @return the row lock if acquired,
3641    *   null if waitForLock was false and the lock was not acquired
3642    * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
3643    */
3644   public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
3645     checkRow(row, "row lock");
3646     startRegionOperation();
3647     try {
3648       HashedBytes rowKey = new HashedBytes(row);
3649       RowLockContext rowLockContext = new RowLockContext(rowKey);
3650 
3651       // loop until we acquire the row lock (unless !waitForLock)
3652       while (true) {
3653         RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
3654         if (existingContext == null) {
3655           // Row is not already locked by any thread, use newly created context.
3656           break;
3657         } else if (existingContext.ownedByCurrentThread()) {
3658           // Row is already locked by current thread, reuse existing context instead.
3659           rowLockContext = existingContext;
3660           break;
3661         } else {
3662           // Row is already locked by some other thread, give up or wait for it
3663           if (!waitForLock) {
3664             return null;
3665           }
3666           try {
3667             if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
3668               throw new IOException("Timed out waiting for lock for row: " + rowKey);
3669             }
3670           } catch (InterruptedException ie) {
3671             LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
3672             InterruptedIOException iie = new InterruptedIOException();
3673             iie.initCause(ie);
3674             throw iie;
3675           }
3676         }
3677       }
3678 
3679       // allocate new lock for this thread
3680       return rowLockContext.newLock();
3681     } finally {
3682       closeRegionOperation();
3683     }
3684   }
3685 
3686   /**
3687    * Acquires a lock on the given row.
3688    * The same thread may acquire multiple locks on the same row.
3689    * @return the acquired row lock
3690    * @throws IOException if the lock could not be acquired after waiting
3691    */
3692   public RowLock getRowLock(byte[] row) throws IOException {
3693     return getRowLock(row, true);
3694   }
3695 
3696   /**
3697    * If the given list of row locks is not null, releases all locks.
3698    */
3699   public void releaseRowLocks(List<RowLock> rowLocks) {
3700     if (rowLocks != null) {
3701       for (RowLock rowLock : rowLocks) {
3702         rowLock.release();
3703       }
3704       rowLocks.clear();
3705     }
3706   }
3707 
3708   /**
3709    * Determines whether multiple column families are present
3710    * Precondition: familyPaths is not null
3711    *
3712    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3713    */
3714   private static boolean hasMultipleColumnFamilies(
3715       List<Pair<byte[], String>> familyPaths) {
3716     boolean multipleFamilies = false;
3717     byte[] family = null;
3718     for (Pair<byte[], String> pair : familyPaths) {
3719       byte[] fam = pair.getFirst();
3720       if (family == null) {
3721         family = fam;
3722       } else if (!Bytes.equals(family, fam)) {
3723         multipleFamilies = true;
3724         break;
3725       }
3726     }
3727     return multipleFamilies;
3728   }
3729 
3730 
3731   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3732                                 boolean assignSeqId) throws IOException {
3733     return bulkLoadHFiles(familyPaths, assignSeqId, null);
3734   }
3735 
3736   /**
3737    * Attempts to atomically load a group of hfiles.  This is critical for loading
3738    * rows with multiple column families atomically.
3739    *
3740    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3741    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
3742    * file about to be bulk loaded
3743    * @return true if successful, false if failed recoverably
3744    * @throws IOException if failed unrecoverably.
3745    */
3746   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
3747       BulkLoadListener bulkLoadListener) throws IOException {
3748     Preconditions.checkNotNull(familyPaths);
3749     // we need writeLock for multi-family bulk load
3750     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
3751     try {
3752       this.writeRequestsCount.increment();
3753 
3754       // There possibly was a split that happened between when the split keys
3755       // were gathered and before the HRegion's write lock was taken.  We need
3756       // to validate the HFile region before attempting to bulk load all of them
3757       List<IOException> ioes = new ArrayList<IOException>();
3758       List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
3759       for (Pair<byte[], String> p : familyPaths) {
3760         byte[] familyName = p.getFirst();
3761         String path = p.getSecond();
3762 
3763         Store store = getStore(familyName);
3764         if (store == null) {
3765           IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
3766               "No such column family " + Bytes.toStringBinary(familyName));
3767           ioes.add(ioe);
3768         } else {
3769           try {
3770             store.assertBulkLoadHFileOk(new Path(path));
3771           } catch (WrongRegionException wre) {
3772             // recoverable (file doesn't fit in region)
3773             failures.add(p);
3774           } catch (IOException ioe) {
3775             // unrecoverable (hdfs problem)
3776             ioes.add(ioe);
3777           }
3778         }
3779       }
3780 
3781       // validation failed because of some sort of IO problem.
3782       if (ioes.size() != 0) {
3783         IOException e = MultipleIOException.createIOException(ioes);
3784         LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
3785         throw e;
3786       }
3787 
3788       // validation failed, bail out before doing anything permanent.
3789       if (failures.size() != 0) {
3790         StringBuilder list = new StringBuilder();
3791         for (Pair<byte[], String> p : failures) {
3792           list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
3793             .append(p.getSecond());
3794         }
3795         // problem when validating
3796         LOG.warn("There was a recoverable bulk load failure likely due to a" +
3797             " split.  These (family, HFile) pairs were not loaded: " + list);
3798         return false;
3799       }
3800 
3801       long seqId = -1;
3802       // We need to assign a sequential ID that's in between two memstores in order to preserve
3803       // the guarantee that all the edits lower than the highest sequential ID from all the
3804       // HFiles are flushed on disk. See HBASE-10958.  The sequence id returned when we flush is
3805       // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
3806       // a sequence id that we can be sure is beyond the last hfile written).
3807       if (assignSeqId) {
3808         FlushResult fs = this.flushcache();
3809         if (fs.isFlushSucceeded()) {
3810           seqId = fs.flushSequenceId;
3811         } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
3812           seqId = fs.flushSequenceId;
3813         } else {
3814           throw new IOException("Could not bulk load with an assigned sequential ID because the " +
3815               "flush didn't run. Reason for not flushing: " + fs.failureReason);
3816         }
3817       }
3818 
3819       for (Pair<byte[], String> p : familyPaths) {
3820         byte[] familyName = p.getFirst();
3821         String path = p.getSecond();
3822         Store store = getStore(familyName);
3823         try {
3824           String finalPath = path;
3825           if(bulkLoadListener != null) {
3826             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
3827           }
3828           store.bulkLoadHFile(finalPath, seqId);
3829           if(bulkLoadListener != null) {
3830             bulkLoadListener.doneBulkLoad(familyName, path);
3831           }
3832         } catch (IOException ioe) {
3833           // A failure here can cause an atomicity violation that we currently
3834           // cannot recover from since it is likely a failed HDFS operation.
3835 
3836           // TODO Need a better story for reverting partial failures due to HDFS.
3837           LOG.error("There was a partial failure due to IO when attempting to" +
3838               " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe);
3839           if(bulkLoadListener != null) {
3840             try {
3841               bulkLoadListener.failedBulkLoad(familyName, path);
3842             } catch (Exception ex) {
3843               LOG.error("Error while calling failedBulkLoad for family "+
3844                   Bytes.toString(familyName)+" with path "+path, ex);
3845             }
3846           }
3847           throw ioe;
3848         }
3849       }
3850       return true;
3851     } finally {
3852       closeBulkRegionOperation();
3853     }
3854   }
3855 
3856   @Override
3857   public boolean equals(Object o) {
3858     return o instanceof HRegion && Bytes.equals(this.getRegionName(),
3859                                                 ((HRegion) o).getRegionName());
3860   }
3861 
3862   @Override
3863   public int hashCode() {
3864     return Bytes.hashCode(this.getRegionName());
3865   }
3866 
3867   @Override
3868   public String toString() {
3869     return this.getRegionNameAsString();
3870   }
3871 
3872   /**
3873    * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
3874    */
3875   class RegionScannerImpl implements RegionScanner {
3876     // Package local for testability
3877     KeyValueHeap storeHeap = null;
3878     /** Heap of key-values that are not essential for the provided filters and are thus read
3879      * on demand, if on-demand column family loading is enabled.*/
3880     KeyValueHeap joinedHeap = null;
3881     /**
3882      * If the joined heap data gathering is interrupted due to scan limits, this will
3883      * contain the row for which we are populating the values.*/
3884     protected Cell joinedContinuationRow = null;
3885     // KeyValue indicating that limit is reached when scanning
3886     private final KeyValue KV_LIMIT = new KeyValue();
3887     protected final byte[] stopRow;
3888     private final FilterWrapper filter;
3889     private int batch;
3890     protected int isScan;
3891     private boolean filterClosed = false;
3892     private long readPt;
3893     private long maxResultSize;
3894     protected HRegion region;
3895 
3896     @Override
3897     public HRegionInfo getRegionInfo() {
3898       return region.getRegionInfo();
3899     }
3900 
3901     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
3902         throws IOException {
3903 
3904       this.region = region;
3905       this.maxResultSize = scan.getMaxResultSize();
3906       if (scan.hasFilter()) {
3907         this.filter = new FilterWrapper(scan.getFilter());
3908       } else {
3909         this.filter = null;
3910       }
3911 
3912       this.batch = scan.getBatch();
3913       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
3914         this.stopRow = null;
3915       } else {
3916         this.stopRow = scan.getStopRow();
3917       }
3918       // If we are doing a get, we want to be [startRow,endRow] normally
3919       // it is [startRow,endRow) and if startRow=endRow we get nothing.
3920       this.isScan = scan.isGetScan() ? -1 : 0;
3921 
3922       // synchronize on scannerReadPoints so that nobody calculates
3923       // getSmallestReadPoint, before scannerReadPoints is updated.
3924       IsolationLevel isolationLevel = scan.getIsolationLevel();
3925       synchronized(scannerReadPoints) {
3926         this.readPt = getReadpoint(isolationLevel);
3927         scannerReadPoints.put(this, this.readPt);
3928       }
3929 
3930       // Here we separate all scanners into two lists - scanner that provide data required
3931       // by the filter to operate (scanners list) and all others (joinedScanners list).
3932       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
3933       List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
3934       if (additionalScanners != null) {
3935         scanners.addAll(additionalScanners);
3936       }
3937 
3938       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
3939           scan.getFamilyMap().entrySet()) {
3940         Store store = stores.get(entry.getKey());
3941         KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
3942         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
3943           || this.filter.isFamilyEssential(entry.getKey())) {
3944           scanners.add(scanner);
3945         } else {
3946           joinedScanners.add(scanner);
3947         }
3948       }
3949       initializeKVHeap(scanners, joinedScanners, region);
3950     }
3951 
3952     protected void initializeKVHeap(List<KeyValueScanner> scanners,
3953         List<KeyValueScanner> joinedScanners, HRegion region)
3954         throws IOException {
3955       this.storeHeap = new KeyValueHeap(scanners, region.comparator);
3956       if (!joinedScanners.isEmpty()) {
3957         this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
3958       }
3959     }
3960 
3961     @Override
3962     public long getMaxResultSize() {
3963       return maxResultSize;
3964     }
3965 
3966     @Override
3967     public long getMvccReadPoint() {
3968       return this.readPt;
3969     }
3970 
3971     /**
3972      * Reset both the filter and the old filter.
3973      *
3974      * @throws IOException in case a filter raises an I/O exception.
3975      */
3976     protected void resetFilters() throws IOException {
3977       if (filter != null) {
3978         filter.reset();
3979       }
3980     }
3981 
3982     @Override
3983     public boolean next(List<Cell> outResults)
3984         throws IOException {
3985       // apply the batching limit by default
3986       return next(outResults, batch);
3987     }
3988 
3989     @Override
3990     public synchronized boolean next(List<Cell> outResults, int limit) throws IOException {
3991       if (this.filterClosed) {
3992         throw new UnknownScannerException("Scanner was closed (timed out?) " +
3993             "after we renewed it. Could be caused by a very slow scanner " +
3994             "or a lengthy garbage collection");
3995       }
3996       startRegionOperation(Operation.SCAN);
3997       readRequestsCount.increment();
3998       try {
3999         return nextRaw(outResults, limit);
4000       } finally {
4001         closeRegionOperation(Operation.SCAN);
4002       }
4003     }
4004 
4005     @Override
4006     public boolean nextRaw(List<Cell> outResults)
4007         throws IOException {
4008       return nextRaw(outResults, batch);
4009     }
4010 
4011     @Override
4012     public boolean nextRaw(List<Cell> outResults, int limit) throws IOException {
4013       boolean returnResult;
4014       if (outResults.isEmpty()) {
4015         // Usually outResults is empty. This is true when next is called
4016         // to handle scan or get operation.
4017         returnResult = nextInternal(outResults, limit);
4018       } else {
4019         List<Cell> tmpList = new ArrayList<Cell>();
4020         returnResult = nextInternal(tmpList, limit);
4021         outResults.addAll(tmpList);
4022       }
4023       resetFilters();
4024       if (isFilterDoneInternal()) {
4025         returnResult = false;
4026       }
4027       if (region != null && region.metricsRegion != null) {
4028         long totalSize = 0;
4029         for(Cell c:outResults) {
4030           // TODO clean up.  Find way to remove this ensureKeyValue
4031           KeyValue kv = KeyValueUtil.ensureKeyValue(c);
4032           totalSize += kv.getLength();
4033         }
4034         region.metricsRegion.updateScanNext(totalSize);
4035       }
4036       return returnResult;
4037     }
4038 
4039 
4040     private void populateFromJoinedHeap(List<Cell> results, int limit)
4041         throws IOException {
4042       assert joinedContinuationRow != null;
4043       Cell kv = populateResult(results, this.joinedHeap, limit,
4044           joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
4045           joinedContinuationRow.getRowLength());
4046       if (kv != KV_LIMIT) {
4047         // We are done with this row, reset the continuation.
4048         joinedContinuationRow = null;
4049       }
4050       // As the data is obtained from two independent heaps, we need to
4051       // ensure that result list is sorted, because Result relies on that.
4052       Collections.sort(results, comparator);
4053     }
4054 
4055     /**
4056      * Fetches records with currentRow into results list, until next row or limit (if not -1).
4057      * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
4058      * @param limit Max amount of KVs to place in result list, -1 means no limit.
4059      * @param currentRow Byte array with key we are fetching.
4060      * @param offset offset for currentRow
4061      * @param length length for currentRow
4062      * @return KV_LIMIT if limit reached, next KeyValue otherwise.
4063      */
4064     private Cell populateResult(List<Cell> results, KeyValueHeap heap, int limit,
4065         byte[] currentRow, int offset, short length) throws IOException {
4066       Cell nextKv;
4067       do {
4068         heap.next(results, limit - results.size());
4069         if (limit > 0 && results.size() == limit) {
4070           return KV_LIMIT;
4071         }
4072         nextKv = heap.peek();
4073       } while (nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length));
4074 
4075       return nextKv;
4076     }
4077 
4078     /*
4079      * @return True if a filter rules the scanner is over, done.
4080      */
4081     @Override
4082     public synchronized boolean isFilterDone() throws IOException {
4083       return isFilterDoneInternal();
4084     }
4085 
4086     private boolean isFilterDoneInternal() throws IOException {
4087       return this.filter != null && this.filter.filterAllRemaining();
4088     }
4089 
4090     private boolean nextInternal(List<Cell> results, int limit)
4091     throws IOException {
4092       if (!results.isEmpty()) {
4093         throw new IllegalArgumentException("First parameter should be an empty list");
4094       }
4095       RpcCallContext rpcCall = RpcServer.getCurrentCall();
4096       // The loop here is used only when at some point during the next we determine
4097       // that due to effects of filters or otherwise, we have an empty row in the result.
4098       // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
4099       // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
4100       // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
4101       while (true) {
4102         if (rpcCall != null) {
4103           // If a user specifies a too-restrictive or too-slow scanner, the
4104           // client might time out and disconnect while the server side
4105           // is still processing the request. We should abort aggressively
4106           // in that case.
4107           long afterTime = rpcCall.disconnectSince();
4108           if (afterTime >= 0) {
4109             throw new CallerDisconnectedException(
4110                 "Aborting on region " + getRegionNameAsString() + ", call " +
4111                     this + " after " + afterTime + " ms, since " +
4112                     "caller disconnected");
4113           }
4114         }
4115 
4116         // Let's see what we have in the storeHeap.
4117         Cell current = this.storeHeap.peek();
4118 
4119         byte[] currentRow = null;
4120         int offset = 0;
4121         short length = 0;
4122         if (current != null) {
4123           currentRow = current.getRowArray();
4124           offset = current.getRowOffset();
4125           length = current.getRowLength();
4126         }
4127         boolean stopRow = isStopRow(currentRow, offset, length);
4128         // Check if we were getting data from the joinedHeap and hit the limit.
4129         // If not, then it's main path - getting results from storeHeap.
4130         if (joinedContinuationRow == null) {
4131           // First, check if we are at a stop row. If so, there are no more results.
4132           if (stopRow) {
4133             if (filter != null && filter.hasFilterRow()) {
4134               filter.filterRowCells(results);
4135             }
4136             return false;
4137           }
4138 
4139           // Check if rowkey filter wants to exclude this row. If so, loop to next.
4140           // Technically, if we hit limits before on this row, we don't need this call.
4141           if (filterRowKey(currentRow, offset, length)) {
4142             boolean moreRows = nextRow(currentRow, offset, length);
4143             if (!moreRows) return false;
4144             results.clear();
4145             continue;
4146           }
4147 
4148           Cell nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
4149               length);
4150           // Ok, we are good, let's try to get some results from the main heap.
4151           if (nextKv == KV_LIMIT) {
4152             if (this.filter != null && filter.hasFilterRow()) {
4153               throw new IncompatibleFilterException(
4154                 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
4155             }
4156             return true; // We hit the limit.
4157           }
4158 
4159           stopRow = nextKv == null ||
4160               isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
4161           // save that the row was empty before filters applied to it.
4162           final boolean isEmptyRow = results.isEmpty();
4163 
4164           // We have the part of the row necessary for filtering (all of it, usually).
4165           // First filter with the filterRow(List).
4166           FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
4167           if (filter != null && filter.hasFilterRow()) {
4168             ret = filter.filterRowCellsWithRet(results);
4169           }
4170 
4171           if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
4172             results.clear();
4173             boolean moreRows = nextRow(currentRow, offset, length);
4174             if (!moreRows) return false;
4175 
4176             // This row was totally filtered out, if this is NOT the last row,
4177             // we should continue on. Otherwise, nothing else to do.
4178             if (!stopRow) continue;
4179             return false;
4180           }
4181 
4182           // Ok, we are done with storeHeap for this row.
4183           // Now we may need to fetch additional, non-essential data into row.
4184           // These values are not needed for filter to work, so we postpone their
4185           // fetch to (possibly) reduce amount of data loads from disk.
4186           if (this.joinedHeap != null) {
4187             Cell nextJoinedKv = joinedHeap.peek();
4188             // If joinedHeap is pointing to some other row, try to seek to a correct one.
4189             boolean mayHaveData = (nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv,
4190                 currentRow, offset, length))
4191                 || (this.joinedHeap.requestSeek(
4192                     KeyValueUtil.createFirstOnRow(currentRow, offset, length), true, true)
4193                     && joinedHeap.peek() != null && CellUtil.matchingRow(joinedHeap.peek(),
4194                     currentRow, offset, length));
4195             if (mayHaveData) {
4196               joinedContinuationRow = current;
4197               populateFromJoinedHeap(results, limit);
4198             }
4199           }
4200         } else {
4201           // Populating from the joined heap was stopped by limits, populate some more.
4202           populateFromJoinedHeap(results, limit);
4203         }
4204 
4205         // We may have just called populateFromJoinedMap and hit the limits. If that is
4206         // the case, we need to call it again on the next next() invocation.
4207         if (joinedContinuationRow != null) {
4208           return true;
4209         }
4210 
4211         // Finally, we are done with both joinedHeap and storeHeap.
4212         // Double check to prevent empty rows from appearing in result. It could be
4213         // the case when SingleColumnValueExcludeFilter is used.
4214         if (results.isEmpty()) {
4215           boolean moreRows = nextRow(currentRow, offset, length);
4216           if (!moreRows) return false;
4217           if (!stopRow) continue;
4218         }
4219 
4220         // We are done. Return the result.
4221         return !stopRow;
4222       }
4223     }
4224 
4225     /**
4226      * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines
4227      * both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may
4228      * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns
4229      * true when filterRow(List<KeyValue> kvs) is overridden not the filterRow(). Therefore, the
4230      * filterRow() will be skipped.
4231      */
4232     private boolean filterRow() throws IOException {
4233       // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
4234       // filterRowCells(List<Cell> kvs) so we skip that scenario here.
4235       return filter != null && (!filter.hasFilterRow())
4236           && filter.filterRow();
4237     }
4238 
4239     private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
4240       return filter != null
4241           && filter.filterRowKey(row, offset, length);
4242     }
4243 
4244     protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
4245       assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
4246       Cell next;
4247       while ((next = this.storeHeap.peek()) != null &&
4248              CellUtil.matchingRow(next, currentRow, offset, length)) {
4249         this.storeHeap.next(MOCKED_LIST);
4250       }
4251       resetFilters();
4252       // Calling the hook in CP which allows it to do a fast forward
4253       return this.region.getCoprocessorHost() == null
4254           || this.region.getCoprocessorHost()
4255               .postScannerFilterRow(this, currentRow, offset, length);
4256     }
4257 
4258     protected boolean isStopRow(byte[] currentRow, int offset, short length) {
4259       return currentRow == null ||
4260           (stopRow != null &&
4261           comparator.compareRows(stopRow, 0, stopRow.length,
4262             currentRow, offset, length) <= isScan);
4263     }
4264 
4265     @Override
4266     public synchronized void close() {
4267       if (storeHeap != null) {
4268         storeHeap.close();
4269         storeHeap = null;
4270       }
4271       if (joinedHeap != null) {
4272         joinedHeap.close();
4273         joinedHeap = null;
4274       }
4275       // no need to synchronize here.
4276       scannerReadPoints.remove(this);
4277       this.filterClosed = true;
4278     }
4279 
4280     KeyValueHeap getStoreHeapForTesting() {
4281       return storeHeap;
4282     }
4283 
4284     @Override
4285     public synchronized boolean reseek(byte[] row) throws IOException {
4286       if (row == null) {
4287         throw new IllegalArgumentException("Row cannot be null.");
4288       }
4289       boolean result = false;
4290       startRegionOperation();
4291       try {
4292         KeyValue kv = KeyValueUtil.createFirstOnRow(row);
4293         // use request seek to make use of the lazy seek option. See HBASE-5520
4294         result = this.storeHeap.requestSeek(kv, true, true);
4295         if (this.joinedHeap != null) {
4296           result = this.joinedHeap.requestSeek(kv, true, true) || result;
4297         }
4298       } finally {
4299         closeRegionOperation();
4300       }
4301       return result;
4302     }
4303   }
4304 
4305   // Utility methods
4306   /**
4307    * A utility method to create new instances of HRegion based on the
4308    * {@link HConstants#REGION_IMPL} configuration property.
4309    * @param tableDir qualified path of directory where region should be located,
4310    * usually the table directory.
4311    * @param log The HLog is the outbound log for any updates to the HRegion
4312    * (There's a single HLog for all the HRegions on a single HRegionServer.)
4313    * The log file is a logfile from the previous execution that's
4314    * custom-computed for this HRegion. The HRegionServer computes and sorts the
4315    * appropriate log info for this HRegion. If there is a previous log file
4316    * (implying that the HRegion has been written-to before), then read it from
4317    * the supplied path.
4318    * @param fs is the filesystem.
4319    * @param conf is global configuration settings.
4320    * @param regionInfo - HRegionInfo that describes the region
4321    * is new), then read them from the supplied path.
4322    * @param htd the table descriptor
4323    * @return the new instance
4324    */
4325   static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
4326       Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
4327       RegionServerServices rsServices) {
4328     try {
4329       @SuppressWarnings("unchecked")
4330       Class<? extends HRegion> regionClass =
4331           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
4332 
4333       Constructor<? extends HRegion> c =
4334           regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
4335               Configuration.class, HRegionInfo.class, HTableDescriptor.class,
4336               RegionServerServices.class);
4337 
4338       return c.newInstance(tableDir, log, fs, conf, regionInfo, htd, rsServices);
4339     } catch (Throwable e) {
4340       // todo: what should I throw here?
4341       throw new IllegalStateException("Could not instantiate a region instance.", e);
4342     }
4343   }
4344 
4345   /**
4346    * Convenience method creating new HRegions. Used by createTable and by the
4347    * bootstrap code in the HMaster constructor.
4348    * Note, this method creates an {@link HLog} for the created region. It
4349    * needs to be closed explicitly.  Use {@link HRegion#getLog()} to get
4350    * access.  <b>When done with a region created using this method, you will
4351    * need to explicitly close the {@link HLog} it created too; it will not be
4352    * done for you.  Not closing the log will leave at least a daemon thread
4353    * running.</b>  Call {@link #closeHRegion(HRegion)} and it will do
4354    * necessary cleanup for you.
4355    * @param info Info for region to create.
4356    * @param rootDir Root directory for HBase instance
4357    * @return new HRegion
4358    *
4359    * @throws IOException
4360    */
4361   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4362       final Configuration conf, final HTableDescriptor hTableDescriptor)
4363   throws IOException {
4364     return createHRegion(info, rootDir, conf, hTableDescriptor, null);
4365   }
4366 
4367   /**
4368    * This will do the necessary cleanup a call to
4369    * {@link #createHRegion(HRegionInfo, Path, Configuration, HTableDescriptor)}
4370    * requires.  This method will close the region and then close its
4371    * associated {@link HLog} file.  You use it if you call the other createHRegion,
4372    * the one that takes an {@link HLog} instance but don't be surprised by the
4373    * call to the {@link HLog#closeAndDelete()} on the {@link HLog} the
4374    * HRegion was carrying.
4375    * @throws IOException
4376    */
4377   public static void closeHRegion(final HRegion r) throws IOException {
4378     if (r == null) return;
4379     r.close();
4380     if (r.getLog() == null) return;
4381     r.getLog().closeAndDelete();
4382   }
4383 
4384   /**
4385    * Convenience method creating new HRegions. Used by createTable.
4386    * The {@link HLog} for the created region needs to be closed explicitly.
4387    * Use {@link HRegion#getLog()} to get access.
4388    *
4389    * @param info Info for region to create.
4390    * @param rootDir Root directory for HBase instance
4391    * @param hlog shared HLog
4392    * @param initialize - true to initialize the region
4393    * @return new HRegion
4394    *
4395    * @throws IOException
4396    */
4397   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4398                                       final Configuration conf,
4399                                       final HTableDescriptor hTableDescriptor,
4400                                       final HLog hlog,
4401                                       final boolean initialize)
4402       throws IOException {
4403     return createHRegion(info, rootDir, conf, hTableDescriptor,
4404         hlog, initialize, false);
4405   }
4406 
4407   /**
4408    * Convenience method creating new HRegions. Used by createTable.
4409    * The {@link HLog} for the created region needs to be closed
4410    * explicitly, if it is not null.
4411    * Use {@link HRegion#getLog()} to get access.
4412    *
4413    * @param info Info for region to create.
4414    * @param rootDir Root directory for HBase instance
4415    * @param hlog shared HLog
4416    * @param initialize - true to initialize the region
4417    * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
4418    * @return new HRegion
4419    * @throws IOException
4420    */
4421   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4422                                       final Configuration conf,
4423                                       final HTableDescriptor hTableDescriptor,
4424                                       final HLog hlog,
4425                                       final boolean initialize, final boolean ignoreHLog)
4426       throws IOException {
4427       Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4428       return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
4429   }
4430 
4431   /**
4432    * Convenience method creating new HRegions. Used by createTable.
4433    * The {@link HLog} for the created region needs to be closed
4434    * explicitly, if it is not null.
4435    * Use {@link HRegion#getLog()} to get access.
4436    *
4437    * @param info Info for region to create.
4438    * @param rootDir Root directory for HBase instance
4439    * @param tableDir table directory
4440    * @param hlog shared HLog
4441    * @param initialize - true to initialize the region
4442    * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
4443    * @return new HRegion
4444    * @throws IOException
4445    */
4446   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
4447                                       final Configuration conf,
4448                                       final HTableDescriptor hTableDescriptor,
4449                                       final HLog hlog,
4450                                       final boolean initialize, final boolean ignoreHLog)
4451       throws IOException {
4452     LOG.info("creating HRegion " + info.getTable().getNameAsString()
4453         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
4454         " Table name == " + info.getTable().getNameAsString());
4455     FileSystem fs = FileSystem.get(conf);
4456     HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
4457     HLog effectiveHLog = hlog;
4458     if (hlog == null && !ignoreHLog) {
4459       effectiveHLog = HLogFactory.createHLog(fs, rfs.getRegionDir(),
4460                                              HConstants.HREGION_LOGDIR_NAME, conf);
4461     }
4462     HRegion region = HRegion.newHRegion(tableDir,
4463         effectiveHLog, fs, conf, info, hTableDescriptor, null);
4464     if (initialize) {
4465       // If initializing, set the sequenceId. It is also required by HLogPerformanceEvaluation when
4466       // verifying the WALEdits.
4467       region.setSequenceId(region.initialize(null));
4468     }
4469     return region;
4470   }
4471 
4472   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4473                                       final Configuration conf,
4474                                       final HTableDescriptor hTableDescriptor,
4475                                       final HLog hlog)
4476     throws IOException {
4477     return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
4478   }
4479 
4480 
4481   /**
4482    * Open a Region.
4483    * @param info Info for region to be opened.
4484    * @param wal HLog for region to use. This method will call
4485    * HLog#setSequenceNumber(long) passing the result of the call to
4486    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4487    * up.  HRegionStore does this every time it opens a new region.
4488    * @return new HRegion
4489    *
4490    * @throws IOException
4491    */
4492   public static HRegion openHRegion(final HRegionInfo info,
4493       final HTableDescriptor htd, final HLog wal,
4494       final Configuration conf)
4495   throws IOException {
4496     return openHRegion(info, htd, wal, conf, null, null);
4497   }
4498 
4499   /**
4500    * Open a Region.
4501    * @param info Info for region to be opened
4502    * @param htd the table descriptor
4503    * @param wal HLog for region to use. This method will call
4504    * HLog#setSequenceNumber(long) passing the result of the call to
4505    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4506    * up.  HRegionStore does this every time it opens a new region.
4507    * @param conf The Configuration object to use.
4508    * @param rsServices An interface we can request flushes against.
4509    * @param reporter An interface we can report progress against.
4510    * @return new HRegion
4511    *
4512    * @throws IOException
4513    */
4514   public static HRegion openHRegion(final HRegionInfo info,
4515     final HTableDescriptor htd, final HLog wal, final Configuration conf,
4516     final RegionServerServices rsServices,
4517     final CancelableProgressable reporter)
4518   throws IOException {
4519     return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
4520   }
4521 
4522   /**
4523    * Open a Region.
4524    * @param rootDir Root directory for HBase instance
4525    * @param info Info for region to be opened.
4526    * @param htd the table descriptor
4527    * @param wal HLog for region to use. This method will call
4528    * HLog#setSequenceNumber(long) passing the result of the call to
4529    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4530    * up.  HRegionStore does this every time it opens a new region.
4531    * @param conf The Configuration object to use.
4532    * @return new HRegion
4533    * @throws IOException
4534    */
4535   public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
4536       final HTableDescriptor htd, final HLog wal, final Configuration conf)
4537   throws IOException {
4538     return openHRegion(rootDir, info, htd, wal, conf, null, null);
4539   }
4540 
4541   /**
4542    * Open a Region.
4543    * @param rootDir Root directory for HBase instance
4544    * @param info Info for region to be opened.
4545    * @param htd the table descriptor
4546    * @param wal HLog for region to use. This method will call
4547    * HLog#setSequenceNumber(long) passing the result of the call to
4548    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4549    * up.  HRegionStore does this every time it opens a new region.
4550    * @param conf The Configuration object to use.
4551    * @param rsServices An interface we can request flushes against.
4552    * @param reporter An interface we can report progress against.
4553    * @return new HRegion
4554    * @throws IOException
4555    */
4556   public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
4557       final HTableDescriptor htd, final HLog wal, final Configuration conf,
4558       final RegionServerServices rsServices,
4559       final CancelableProgressable reporter)
4560   throws IOException {
4561     FileSystem fs = null;
4562     if (rsServices != null) {
4563       fs = rsServices.getFileSystem();
4564     }
4565     if (fs == null) {
4566       fs = FileSystem.get(conf);
4567     }
4568     return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
4569   }
4570 
4571   /**
4572    * Open a Region.
4573    * @param conf The Configuration object to use.
4574    * @param fs Filesystem to use
4575    * @param rootDir Root directory for HBase instance
4576    * @param info Info for region to be opened.
4577    * @param htd the table descriptor
4578    * @param wal HLog for region to use. This method will call
4579    * HLog#setSequenceNumber(long) passing the result of the call to
4580    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4581    * up.  HRegionStore does this every time it opens a new region.
4582    * @return new HRegion
4583    * @throws IOException
4584    */
4585   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4586       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal)
4587       throws IOException {
4588     return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
4589   }
4590 
4591   /**
4592    * Open a Region.
4593    * @param conf The Configuration object to use.
4594    * @param fs Filesystem to use
4595    * @param rootDir Root directory for HBase instance
4596    * @param info Info for region to be opened.
4597    * @param htd the table descriptor
4598    * @param wal HLog for region to use. This method will call
4599    * HLog#setSequenceNumber(long) passing the result of the call to
4600    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4601    * up.  HRegionStore does this every time it opens a new region.
4602    * @param rsServices An interface we can request flushes against.
4603    * @param reporter An interface we can report progress against.
4604    * @return new HRegion
4605    * @throws IOException
4606    */
4607   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4608       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4609       final RegionServerServices rsServices, final CancelableProgressable reporter)
4610       throws IOException {
4611     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4612     return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
4613   }
4614 
4615   /**
4616    * Open a Region.
4617    * @param conf The Configuration object to use.
4618    * @param fs Filesystem to use
4619    * @param rootDir Root directory for HBase instance
4620    * @param info Info for region to be opened.
4621    * @param htd the table descriptor
4622    * @param wal HLog for region to use. This method will call
4623    * HLog#setSequenceNumber(long) passing the result of the call to
4624    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4625    * up.  HRegionStore does this every time it opens a new region.
4626    * @param rsServices An interface we can request flushes against.
4627    * @param reporter An interface we can report progress against.
4628    * @return new HRegion
4629    * @throws IOException
4630    */
4631   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4632       final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4633       final RegionServerServices rsServices, final CancelableProgressable reporter)
4634       throws IOException {
4635     if (info == null) throw new NullPointerException("Passed region info is null");
4636     if (LOG.isDebugEnabled()) {
4637       LOG.debug("Opening region: " + info);
4638     }
4639     HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
4640     return r.openHRegion(reporter);
4641   }
4642 
4643 
4644   /**
4645    * Useful when reopening a closed region (normally for unit tests)
4646    * @param other original object
4647    * @param reporter An interface we can report progress against.
4648    * @return new HRegion
4649    * @throws IOException
4650    */
4651   public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
4652       throws IOException {
4653     HRegionFileSystem regionFs = other.getRegionFileSystem();
4654     HRegion r = newHRegion(regionFs.getTableDir(), other.getLog(), regionFs.getFileSystem(),
4655         other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
4656     return r.openHRegion(reporter);
4657   }
4658 
4659   /**
4660    * Open HRegion.
4661    * Calls initialize and sets sequenceId.
4662    * @return Returns <code>this</code>
4663    * @throws IOException
4664    */
4665   protected HRegion openHRegion(final CancelableProgressable reporter)
4666   throws IOException {
4667     checkCompressionCodecs();
4668 
4669     this.openSeqNum = initialize(reporter);
4670     this.setSequenceId(openSeqNum);
4671     if (log != null && getRegionServerServices() != null) {
4672       writeRegionOpenMarker(log, openSeqNum);
4673     }
4674     return this;
4675   }
4676 
4677   private void checkCompressionCodecs() throws IOException {
4678     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4679       CompressionTest.testCompression(fam.getCompression());
4680       CompressionTest.testCompression(fam.getCompactionCompression());
4681     }
4682   }
4683 
4684   /**
4685    * Create a daughter region from given a temp directory with the region data.
4686    * @param hri Spec. for daughter region to open.
4687    * @throws IOException
4688    */
4689   HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
4690     // Move the files from the temporary .splits to the final /table/region directory
4691     fs.commitDaughterRegion(hri);
4692 
4693     // Create the daughter HRegion instance
4694     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
4695         this.getBaseConf(), hri, this.getTableDesc(), rsServices);
4696     r.readRequestsCount.set(this.getReadRequestsCount() / 2);
4697     r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
4698     return r;
4699   }
4700 
4701   /**
4702    * Create a merged region given a temp directory with the region data.
4703    * @param region_b another merging region
4704    * @return merged HRegion
4705    * @throws IOException
4706    */
4707   HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
4708       final HRegion region_b) throws IOException {
4709     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(),
4710         fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
4711         this.getTableDesc(), this.rsServices);
4712     r.readRequestsCount.set(this.getReadRequestsCount()
4713         + region_b.getReadRequestsCount());
4714     r.writeRequestsCount.set(this.getWriteRequestsCount()
4715         + region_b.getWriteRequestsCount());
4716     this.fs.commitMergedRegion(mergedRegionInfo);
4717     return r;
4718   }
4719 
4720   /**
4721    * Inserts a new region's meta information into the passed
4722    * <code>meta</code> region. Used by the HMaster bootstrap code adding
4723    * new table to hbase:meta table.
4724    *
4725    * @param meta hbase:meta HRegion to be updated
4726    * @param r HRegion to add to <code>meta</code>
4727    *
4728    * @throws IOException
4729    */
4730   // TODO remove since only test and merge use this
4731   public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
4732     meta.checkResources();
4733     // The row key is the region name
4734     byte[] row = r.getRegionName();
4735     final long now = EnvironmentEdgeManager.currentTimeMillis();
4736     final List<Cell> cells = new ArrayList<Cell>(2);
4737     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4738       HConstants.REGIONINFO_QUALIFIER, now,
4739       r.getRegionInfo().toByteArray()));
4740     // Set into the root table the version of the meta table.
4741     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4742       HConstants.META_VERSION_QUALIFIER, now,
4743       Bytes.toBytes(HConstants.META_VERSION)));
4744     meta.put(row, HConstants.CATALOG_FAMILY, cells);
4745   }
4746 
4747   /**
4748    * Computes the Path of the HRegion
4749    *
4750    * @param tabledir qualified path for table
4751    * @param name ENCODED region name
4752    * @return Path of HRegion directory
4753    */
4754   @Deprecated
4755   public static Path getRegionDir(final Path tabledir, final String name) {
4756     return new Path(tabledir, name);
4757   }
4758 
4759   /**
4760    * Computes the Path of the HRegion
4761    *
4762    * @param rootdir qualified path of HBase root directory
4763    * @param info HRegionInfo for the region
4764    * @return qualified path of region directory
4765    */
4766   @Deprecated
4767   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
4768     return new Path(
4769       FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
4770   }
4771 
4772   /**
4773    * Determines if the specified row is within the row range specified by the
4774    * specified HRegionInfo
4775    *
4776    * @param info HRegionInfo that specifies the row range
4777    * @param row row to be checked
4778    * @return true if the row is within the range specified by the HRegionInfo
4779    */
4780   public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
4781     return ((info.getStartKey().length == 0) ||
4782         (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
4783         ((info.getEndKey().length == 0) ||
4784             (Bytes.compareTo(info.getEndKey(), row) > 0));
4785   }
4786 
4787   /**
4788    * Merge two HRegions.  The regions must be adjacent and must not overlap.
4789    *
4790    * @return new merged HRegion
4791    * @throws IOException
4792    */
4793   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
4794   throws IOException {
4795     HRegion a = srcA;
4796     HRegion b = srcB;
4797 
4798     // Make sure that srcA comes first; important for key-ordering during
4799     // write of the merged file.
4800     if (srcA.getStartKey() == null) {
4801       if (srcB.getStartKey() == null) {
4802         throw new IOException("Cannot merge two regions with null start key");
4803       }
4804       // A's start key is null but B's isn't. Assume A comes before B
4805     } else if ((srcB.getStartKey() == null) ||
4806       (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
4807       a = srcB;
4808       b = srcA;
4809     }
4810 
4811     if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
4812       throw new IOException("Cannot merge non-adjacent regions");
4813     }
4814     return merge(a, b);
4815   }
4816 
4817   /**
4818    * Merge two regions whether they are adjacent or not.
4819    *
4820    * @param a region a
4821    * @param b region b
4822    * @return new merged region
4823    * @throws IOException
4824    */
4825   public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
4826     if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
4827       throw new IOException("Regions do not belong to the same table");
4828     }
4829 
4830     FileSystem fs = a.getRegionFileSystem().getFileSystem();
4831     // Make sure each region's cache is empty
4832     a.flushcache();
4833     b.flushcache();
4834 
4835     // Compact each region so we only have one store file per family
4836     a.compactStores(true);
4837     if (LOG.isDebugEnabled()) {
4838       LOG.debug("Files for region: " + a);
4839       a.getRegionFileSystem().logFileSystemState(LOG);
4840     }
4841     b.compactStores(true);
4842     if (LOG.isDebugEnabled()) {
4843       LOG.debug("Files for region: " + b);
4844       b.getRegionFileSystem().logFileSystemState(LOG);
4845     }
4846 
4847     RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
4848     if (!rmt.prepare(null)) {
4849       throw new IOException("Unable to merge regions " + a + " and " + b);
4850     }
4851     HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
4852     LOG.info("starting merge of regions: " + a + " and " + b
4853         + " into new region " + mergedRegionInfo.getRegionNameAsString()
4854         + " with start key <"
4855         + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
4856         + "> and end key <"
4857         + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
4858     HRegion dstRegion;
4859     try {
4860       dstRegion = rmt.execute(null, null);
4861     } catch (IOException ioe) {
4862       rmt.rollback(null, null);
4863       throw new IOException("Failed merging region " + a + " and " + b
4864           + ", and successfully rolled back");
4865     }
4866     dstRegion.compactStores(true);
4867 
4868     if (LOG.isDebugEnabled()) {
4869       LOG.debug("Files for new region");
4870       dstRegion.getRegionFileSystem().logFileSystemState(LOG);
4871     }
4872 
4873     if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
4874       throw new IOException("Merged region " + dstRegion
4875           + " still has references after the compaction, is compaction canceled?");
4876     }
4877 
4878     // Archiving the 'A' region
4879     HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
4880     // Archiving the 'B' region
4881     HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
4882 
4883     LOG.info("merge completed. New region is " + dstRegion);
4884     return dstRegion;
4885   }
4886 
4887   //
4888   // HBASE-880
4889   //
4890   /**
4891    * @param get get object
4892    * @return result
4893    * @throws IOException read exceptions
4894    */
4895   public Result get(final Get get) throws IOException {
4896     checkRow(get.getRow(), "Get");
4897     // Verify families are all valid
4898     if (get.hasFamilies()) {
4899       for (byte [] family: get.familySet()) {
4900         checkFamily(family);
4901       }
4902     } else { // Adding all families to scanner
4903       for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
4904         get.addFamily(family);
4905       }
4906     }
4907     List<Cell> results = get(get, true);
4908     boolean stale = this.getRegionInfo().getReplicaId() != 0;
4909     return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
4910   }
4911 
4912   /*
4913    * Do a get based on the get parameter.
4914    * @param withCoprocessor invoke coprocessor or not. We don't want to
4915    * always invoke cp for this private method.
4916    */
4917   public List<Cell> get(Get get, boolean withCoprocessor)
4918   throws IOException {
4919 
4920     List<Cell> results = new ArrayList<Cell>();
4921 
4922     // pre-get CP hook
4923     if (withCoprocessor && (coprocessorHost != null)) {
4924        if (coprocessorHost.preGet(get, results)) {
4925          return results;
4926        }
4927     }
4928 
4929     Scan scan = new Scan(get);
4930 
4931     RegionScanner scanner = null;
4932     try {
4933       scanner = getScanner(scan);
4934       scanner.next(results);
4935     } finally {
4936       if (scanner != null)
4937         scanner.close();
4938     }
4939 
4940     // post-get CP hook
4941     if (withCoprocessor && (coprocessorHost != null)) {
4942       coprocessorHost.postGet(get, results);
4943     }
4944 
4945     // do after lock
4946     if (this.metricsRegion != null) {
4947       long totalSize = 0l;
4948       for (Cell kv : results) {
4949         totalSize += KeyValueUtil.ensureKeyValue(kv).getLength();
4950       }
4951       this.metricsRegion.updateGet(totalSize);
4952     }
4953 
4954     return results;
4955   }
4956 
4957   public void mutateRow(RowMutations rm) throws IOException {
4958     // Don't need nonces here - RowMutations only supports puts and deletes
4959     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
4960   }
4961 
4962   /**
4963    * Perform atomic mutations within the region w/o nonces.
4964    * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)}
4965    */
4966   public void mutateRowsWithLocks(Collection<Mutation> mutations,
4967       Collection<byte[]> rowsToLock) throws IOException {
4968     mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
4969   }
4970 
4971   /**
4972    * Perform atomic mutations within the region.
4973    * @param mutations The list of mutations to perform.
4974    * <code>mutations</code> can contain operations for multiple rows.
4975    * Caller has to ensure that all rows are contained in this region.
4976    * @param rowsToLock Rows to lock
4977    * @param nonceGroup Optional nonce group of the operation (client Id)
4978    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4979    * If multiple rows are locked care should be taken that
4980    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
4981    * @throws IOException
4982    */
4983   public void mutateRowsWithLocks(Collection<Mutation> mutations,
4984       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
4985     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
4986     processRowsWithLocks(proc, -1, nonceGroup, nonce);
4987   }
4988 
4989   /**
4990    * Performs atomic multiple reads and writes on a given row.
4991    *
4992    * @param processor The object defines the reads and writes to a row.
4993    * @param nonceGroup Optional nonce group of the operation (client Id)
4994    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4995    */
4996   public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
4997       throws IOException {
4998     processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
4999   }
5000 
5001   /**
5002    * Performs atomic multiple reads and writes on a given row.
5003    *
5004    * @param processor The object defines the reads and writes to a row.
5005    * @param timeout The timeout of the processor.process() execution
5006    *                Use a negative number to switch off the time bound
5007    * @param nonceGroup Optional nonce group of the operation (client Id)
5008    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5009    */
5010   public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
5011       long nonceGroup, long nonce) throws IOException {
5012 
5013     for (byte[] row : processor.getRowsToLock()) {
5014       checkRow(row, "processRowsWithLocks");
5015     }
5016     if (!processor.readOnly()) {
5017       checkReadOnly();
5018     }
5019     checkResources();
5020 
5021     startRegionOperation();
5022     WALEdit walEdit = new WALEdit();
5023 
5024     // 1. Run pre-process hook
5025     try {
5026       processor.preProcess(this, walEdit);
5027     } catch (IOException e) {
5028       closeRegionOperation();
5029       throw e;
5030     }
5031     // Short circuit the read only case
5032     if (processor.readOnly()) {
5033       try {
5034         long now = EnvironmentEdgeManager.currentTimeMillis();
5035         doProcessRowWithTimeout(
5036             processor, now, this, null, null, timeout);
5037         processor.postProcess(this, walEdit, true);
5038       } catch (IOException e) {
5039         throw e;
5040       } finally {
5041         closeRegionOperation();
5042       }
5043       return;
5044     }
5045 
5046     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
5047     boolean locked;
5048     boolean walSyncSuccessful = false;
5049     List<RowLock> acquiredRowLocks;
5050     long addedSize = 0;
5051     List<Mutation> mutations = new ArrayList<Mutation>();
5052     List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
5053     Collection<byte[]> rowsToLock = processor.getRowsToLock();
5054     long mvccNum = 0;
5055     HLogKey walKey = null;
5056     try {
5057       // 2. Acquire the row lock(s)
5058       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
5059       for (byte[] row : rowsToLock) {
5060         // Attempt to lock all involved rows, throw if any lock times out
5061         acquiredRowLocks.add(getRowLock(row));
5062       }
5063       // 3. Region lock
5064       lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
5065       locked = true;
5066       // Get a mvcc write number
5067       mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5068 
5069       long now = EnvironmentEdgeManager.currentTimeMillis();
5070       try {
5071         // 4. Let the processor scan the rows, generate mutations and add
5072         //    waledits
5073         doProcessRowWithTimeout(
5074             processor, now, this, mutations, walEdit, timeout);
5075 
5076         if (!mutations.isEmpty()) {
5077           // 5. Start mvcc transaction
5078           writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5079           // 6. Call the preBatchMutate hook
5080           processor.preBatchMutate(this, walEdit);
5081           // 7. Apply to memstore
5082           for (Mutation m : mutations) {
5083             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5084               KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
5085               kv.setSequenceId(mvccNum);
5086               Store store = getStore(kv);
5087               if (store == null) {
5088                 checkFamily(CellUtil.cloneFamily(kv));
5089                 // unreachable
5090               }
5091               Pair<Long, Cell> ret = store.add(kv);
5092               addedSize += ret.getFirst();
5093               memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
5094             }
5095           }
5096 
5097           long txid = 0;
5098           // 8. Append no sync
5099           if (!walEdit.isEmpty()) {
5100             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
5101               this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
5102               processor.getClusterIds(), nonceGroup, nonce);
5103             txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
5104               walKey, walEdit, getSequenceId(), true, memstoreCells);
5105           }
5106           if(walKey == null){
5107             // since we use log sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
5108             // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
5109             walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
5110           }
5111 
5112           // 9. Release region lock
5113           if (locked) {
5114             this.updatesLock.readLock().unlock();
5115             locked = false;
5116           }
5117 
5118           // 10. Release row lock(s)
5119           releaseRowLocks(acquiredRowLocks);
5120 
5121           // 11. Sync edit log
5122           if (txid != 0) {
5123             syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
5124           }
5125           walSyncSuccessful = true;
5126           // 12. call postBatchMutate hook
5127           processor.postBatchMutate(this);
5128         }
5129       } finally {
5130         if (!mutations.isEmpty() && !walSyncSuccessful) {
5131           LOG.warn("Wal sync failed. Roll back " + mutations.size() +
5132               " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
5133               processor.getRowsToLock().iterator().next()) + "...");
5134           for (Mutation m : mutations) {
5135             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5136               KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
5137               getStore(kv).rollback(kv);
5138             }
5139           }
5140         }
5141         // 13. Roll mvcc forward
5142         if (writeEntry != null) {
5143           mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
5144         }
5145         if (locked) {
5146           this.updatesLock.readLock().unlock();
5147         }
5148         // release locks if some were acquired but another timed out
5149         releaseRowLocks(acquiredRowLocks);
5150       }
5151 
5152       // 14. Run post-process hook
5153       processor.postProcess(this, walEdit, walSyncSuccessful);
5154 
5155     } catch (IOException e) {
5156       throw e;
5157     } finally {
5158       closeRegionOperation();
5159       if (!mutations.isEmpty() &&
5160           isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
5161         requestFlush();
5162       }
5163     }
5164   }
5165 
5166   private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
5167                                        final long now,
5168                                        final HRegion region,
5169                                        final List<Mutation> mutations,
5170                                        final WALEdit walEdit,
5171                                        final long timeout) throws IOException {
5172     // Short circuit the no time bound case.
5173     if (timeout < 0) {
5174       try {
5175         processor.process(now, region, mutations, walEdit);
5176       } catch (IOException e) {
5177         LOG.warn("RowProcessor:" + processor.getClass().getName() +
5178             " throws Exception on row(s):" +
5179             Bytes.toStringBinary(
5180               processor.getRowsToLock().iterator().next()) + "...", e);
5181         throw e;
5182       }
5183       return;
5184     }
5185 
5186     // Case with time bound
5187     FutureTask<Void> task =
5188       new FutureTask<Void>(new Callable<Void>() {
5189         @Override
5190         public Void call() throws IOException {
5191           try {
5192             processor.process(now, region, mutations, walEdit);
5193             return null;
5194           } catch (IOException e) {
5195             LOG.warn("RowProcessor:" + processor.getClass().getName() +
5196                 " throws Exception on row(s):" +
5197                 Bytes.toStringBinary(
5198                     processor.getRowsToLock().iterator().next()) + "...", e);
5199             throw e;
5200           }
5201         }
5202       });
5203     rowProcessorExecutor.execute(task);
5204     try {
5205       task.get(timeout, TimeUnit.MILLISECONDS);
5206     } catch (TimeoutException te) {
5207       LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
5208           Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
5209           "...");
5210       throw new IOException(te);
5211     } catch (Exception e) {
5212       throw new IOException(e);
5213     }
5214   }
5215 
5216   public Result append(Append append) throws IOException {
5217     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
5218   }
5219 
5220   // TODO: There's a lot of boiler plate code identical
5221   // to increment... See how to better unify that.
5222   /**
5223    * Perform one or more append operations on a row.
5224    *
5225    * @return new keyvalues after increment
5226    * @throws IOException
5227    */
5228   public Result append(Append append, long nonceGroup, long nonce)
5229       throws IOException {
5230     byte[] row = append.getRow();
5231     checkRow(row, "append");
5232     boolean flush = false;
5233     Durability durability = getEffectiveDurability(append.getDurability());
5234     boolean writeToWAL = durability != Durability.SKIP_WAL;
5235     WALEdit walEdits = null;
5236     List<Cell> allKVs = new ArrayList<Cell>(append.size());
5237     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5238 
5239     long size = 0;
5240     long txid = 0;
5241 
5242     checkReadOnly();
5243     checkResources();
5244     // Lock row
5245     startRegionOperation(Operation.APPEND);
5246     this.writeRequestsCount.increment();
5247     long mvccNum = 0;
5248     WriteEntry w = null;
5249     HLogKey walKey = null;
5250     RowLock rowLock = null;
5251     List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
5252     boolean doRollBackMemstore = false;
5253     try {
5254       rowLock = getRowLock(row);
5255       try {
5256         lock(this.updatesLock.readLock());
5257         try {
5258           // wait for all prior MVCC transactions to finish - while we hold the row lock
5259           // (so that we are guaranteed to see the latest state)
5260           mvcc.waitForPreviousTransactionsComplete();
5261           if (this.coprocessorHost != null) {
5262             Result r = this.coprocessorHost.preAppendAfterRowLock(append);
5263             if(r!= null) {
5264               return r;
5265             }
5266           }
5267           // now start my own transaction
5268           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5269           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5270           long now = EnvironmentEdgeManager.currentTimeMillis();
5271           // Process each family
5272           for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
5273 
5274             Store store = stores.get(family.getKey());
5275             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5276 
5277             // Sort the cells so that they match the order that they
5278             // appear in the Get results. Otherwise, we won't be able to
5279             // find the existing values if the cells are not specified
5280             // in order by the client since cells are in an array list.
5281             Collections.sort(family.getValue(), store.getComparator());
5282             // Get previous values for all columns in this family
5283             Get get = new Get(row);
5284             for (Cell cell : family.getValue()) {
5285               get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
5286             }
5287             List<Cell> results = get(get, false);
5288 
5289             // Iterate the input columns and update existing values if they were
5290             // found, otherwise add new column initialized to the append value
5291 
5292             // Avoid as much copying as possible. Every byte is copied at most
5293             // once.
5294             // Would be nice if KeyValue had scatter/gather logic
5295             int idx = 0;
5296             for (Cell cell : family.getValue()) {
5297               KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5298               KeyValue newKV;
5299               KeyValue oldKv = null;
5300               if (idx < results.size()
5301                   && CellUtil.matchingQualifier(results.get(idx),kv)) {
5302                 oldKv = KeyValueUtil.ensureKeyValue(results.get(idx));
5303                 // allocate an empty kv once
5304                 newKV = new KeyValue(row.length, kv.getFamilyLength(),
5305                     kv.getQualifierLength(), now, KeyValue.Type.Put,
5306                     oldKv.getValueLength() + kv.getValueLength(),
5307                     oldKv.getTagsLength() + kv.getTagsLength());
5308                 // copy in the value
5309                 System.arraycopy(oldKv.getValueArray(), oldKv.getValueOffset(),
5310                     newKV.getValueArray(), newKV.getValueOffset(),
5311                     oldKv.getValueLength());
5312                 System.arraycopy(kv.getValueArray(), kv.getValueOffset(),
5313                     newKV.getValueArray(),
5314                     newKV.getValueOffset() + oldKv.getValueLength(),
5315                     kv.getValueLength());
5316                 // copy in the tags
5317                 System.arraycopy(oldKv.getTagsArray(), oldKv.getTagsOffset(), newKV.getTagsArray(),
5318                     newKV.getTagsOffset(), oldKv.getTagsLength());
5319                 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
5320                     newKV.getTagsOffset() + oldKv.getTagsLength(), kv.getTagsLength());
5321                 // copy in row, family, and qualifier
5322                 System.arraycopy(kv.getRowArray(), kv.getRowOffset(),
5323                     newKV.getRowArray(), newKV.getRowOffset(), kv.getRowLength());
5324                 System.arraycopy(kv.getFamilyArray(), kv.getFamilyOffset(),
5325                     newKV.getFamilyArray(), newKV.getFamilyOffset(),
5326                     kv.getFamilyLength());
5327                 System.arraycopy(kv.getQualifierArray(), kv.getQualifierOffset(),
5328                     newKV.getQualifierArray(), newKV.getQualifierOffset(),
5329                     kv.getQualifierLength());
5330                 idx++;
5331               } else {
5332                 newKV = kv;
5333                 // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
5334                 // so only need to update the timestamp to 'now'
5335                 newKV.updateLatestStamp(Bytes.toBytes(now));
5336              }
5337               newKV.setSequenceId(mvccNum);
5338               // Give coprocessors a chance to update the new cell
5339               if (coprocessorHost != null) {
5340                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5341                     RegionObserver.MutationType.APPEND, append, oldKv, newKV));
5342               }
5343               kvs.add(newKV);
5344 
5345               // Append update to WAL
5346               if (writeToWAL) {
5347                 if (walEdits == null) {
5348                   walEdits = new WALEdit();
5349                 }
5350                 walEdits.add(newKV);
5351               }
5352             }
5353 
5354             //store the kvs to the temporary memstore before writing HLog
5355             tempMemstore.put(store, kvs);
5356           }
5357 
5358           //Actually write to Memstore now
5359           for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5360             Store store = entry.getKey();
5361             if (store.getFamily().getMaxVersions() == 1) {
5362               // upsert if VERSIONS for this CF == 1
5363               size += store.upsert(entry.getValue(), getSmallestReadPoint());
5364               memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue()));
5365             } else {
5366               // otherwise keep older versions around
5367               for (Cell cell: entry.getValue()) {
5368                 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5369                 Pair<Long, Cell> ret = store.add(kv);
5370                 size += ret.getFirst();
5371                 memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
5372                 doRollBackMemstore = true;
5373               }
5374             }
5375             allKVs.addAll(entry.getValue());
5376           }
5377 
5378           // Actually write to WAL now
5379           if (writeToWAL) {
5380             // Using default cluster id, as this can only happen in the originating
5381             // cluster. A slave cluster receives the final value (not the delta)
5382             // as a Put.
5383             walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
5384               this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
5385             txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
5386               this.sequenceId, true, memstoreCells);
5387           } else {
5388             recordMutationWithoutWal(append.getFamilyCellMap());
5389           }
5390           if(walKey == null){
5391             // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
5392             walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
5393           }
5394 
5395           size = this.addAndGetGlobalMemstoreSize(size);
5396           flush = isFlushSize(size);
5397         } finally {
5398           this.updatesLock.readLock().unlock();
5399         }
5400       } finally {
5401         rowLock.release();
5402         rowLock = null;
5403       }
5404       // sync the transaction log outside the rowlock
5405       if(txid != 0){
5406         syncOrDefer(txid, durability);
5407       }
5408       doRollBackMemstore = false;
5409     } finally {
5410       if (rowLock != null) {
5411         rowLock.release();
5412       }
5413       // if the wal sync was unsuccessful, remove keys from memstore
5414       if (doRollBackMemstore) {
5415         rollbackMemstore(memstoreCells);
5416       }
5417       if (w != null) {
5418         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5419       }
5420       closeRegionOperation(Operation.APPEND);
5421     }
5422 
5423     if (this.metricsRegion != null) {
5424       this.metricsRegion.updateAppend();
5425     }
5426 
5427     if (flush) {
5428       // Request a cache flush. Do it outside update lock.
5429       requestFlush();
5430     }
5431 
5432 
5433     return append.isReturnResults() ? Result.create(allKVs) : null;
5434   }
5435 
5436   public Result increment(Increment increment) throws IOException {
5437     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
5438   }
5439 
5440   /**
5441    * Perform one or more increment operations on a row.
5442    * @return new keyvalues after increment
5443    * @throws IOException
5444    */
5445   public Result increment(Increment increment, long nonceGroup, long nonce)
5446   throws IOException {
5447     byte [] row = increment.getRow();
5448     checkRow(row, "increment");
5449     TimeRange tr = increment.getTimeRange();
5450     boolean flush = false;
5451     Durability durability = getEffectiveDurability(increment.getDurability());
5452     boolean writeToWAL = durability != Durability.SKIP_WAL;
5453     WALEdit walEdits = null;
5454     List<Cell> allKVs = new ArrayList<Cell>(increment.size());
5455     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5456 
5457     long size = 0;
5458     long txid = 0;
5459 
5460     checkReadOnly();
5461     checkResources();
5462     // Lock row
5463     startRegionOperation(Operation.INCREMENT);
5464     this.writeRequestsCount.increment();
5465     RowLock rowLock = null;
5466     WriteEntry w = null;
5467     HLogKey walKey = null;
5468     long mvccNum = 0;
5469     List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
5470     boolean doRollBackMemstore = false;
5471     try {
5472       rowLock = getRowLock(row);
5473       try {
5474         lock(this.updatesLock.readLock());
5475         try {
5476           // wait for all prior MVCC transactions to finish - while we hold the row lock
5477           // (so that we are guaranteed to see the latest state)
5478           mvcc.waitForPreviousTransactionsComplete();
5479           if (this.coprocessorHost != null) {
5480             Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
5481             if (r != null) {
5482               return r;
5483             }
5484           }
5485           // now start my own transaction
5486           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5487           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5488           long now = EnvironmentEdgeManager.currentTimeMillis();
5489           // Process each family
5490           for (Map.Entry<byte [], List<Cell>> family:
5491               increment.getFamilyCellMap().entrySet()) {
5492 
5493             Store store = stores.get(family.getKey());
5494             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5495 
5496             // Sort the cells so that they match the order that they
5497             // appear in the Get results. Otherwise, we won't be able to
5498             // find the existing values if the cells are not specified
5499             // in order by the client since cells are in an array list.
5500             Collections.sort(family.getValue(), store.getComparator());
5501             // Get previous values for all columns in this family
5502             Get get = new Get(row);
5503             for (Cell cell: family.getValue()) {
5504               get.addColumn(family.getKey(),  CellUtil.cloneQualifier(cell));
5505             }
5506             get.setTimeRange(tr.getMin(), tr.getMax());
5507             List<Cell> results = get(get, false);
5508 
5509             // Iterate the input columns and update existing values if they were
5510             // found, otherwise add new column initialized to the increment amount
5511             int idx = 0;
5512             for (Cell kv: family.getValue()) {
5513               long amount = Bytes.toLong(CellUtil.cloneValue(kv));
5514               boolean noWriteBack = (amount == 0);
5515 
5516               Cell c = null;
5517               if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
5518                 c = results.get(idx);
5519                 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
5520                   amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
5521                 } else {
5522                   // throw DoNotRetryIOException instead of IllegalArgumentException
5523                   throw new org.apache.hadoop.hbase.DoNotRetryIOException(
5524                       "Attempted to increment field that isn't 64 bits wide");
5525                 }
5526                 idx++;
5527               }
5528 
5529               // Append new incremented KeyValue to list
5530               byte[] q = CellUtil.cloneQualifier(kv);
5531               byte[] val = Bytes.toBytes(amount);
5532               int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
5533               int incCellTagsLen = kv.getTagsLength();
5534               KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, now,
5535                   KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
5536               System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length);
5537               System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(),
5538                   family.getKey().length);
5539               System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length);
5540               // copy in the value
5541               System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length);
5542               // copy tags
5543               if (oldCellTagsLen > 0) {
5544                 System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(),
5545                     newKV.getTagsOffset(), oldCellTagsLen);
5546               }
5547               if (incCellTagsLen > 0) {
5548                 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
5549                     newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
5550               }
5551               newKV.setSequenceId(mvccNum);
5552               // Give coprocessors a chance to update the new cell
5553               if (coprocessorHost != null) {
5554                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5555                     RegionObserver.MutationType.INCREMENT, increment, c, newKV));
5556               }
5557               allKVs.add(newKV);
5558 
5559               if (!noWriteBack) {
5560                 kvs.add(newKV);
5561 
5562                 // Prepare WAL updates
5563                 if (writeToWAL) {
5564                   if (walEdits == null) {
5565                     walEdits = new WALEdit();
5566                   }
5567                   walEdits.add(newKV);
5568                 }
5569               }
5570             }
5571 
5572             //store the kvs to the temporary memstore before writing HLog
5573             if (!kvs.isEmpty()) {
5574               tempMemstore.put(store, kvs);
5575             }
5576           }
5577 
5578           //Actually write to Memstore now
5579           if (!tempMemstore.isEmpty()) {
5580             for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5581               Store store = entry.getKey();
5582               if (store.getFamily().getMaxVersions() == 1) {
5583                 // upsert if VERSIONS for this CF == 1
5584                 size += store.upsert(entry.getValue(), getSmallestReadPoint());
5585                 memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue()));
5586               } else {
5587                 // otherwise keep older versions around
5588                 for (Cell cell : entry.getValue()) {
5589                   KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5590                   Pair<Long, Cell> ret = store.add(kv);
5591                   size += ret.getFirst();
5592                   memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
5593                   doRollBackMemstore = true;
5594                 }
5595               }
5596             }
5597             size = this.addAndGetGlobalMemstoreSize(size);
5598             flush = isFlushSize(size);
5599           }
5600 
5601           // Actually write to WAL now
5602           if (walEdits != null && !walEdits.isEmpty()) {
5603             if (writeToWAL) {
5604               // Using default cluster id, as this can only happen in the originating
5605               // cluster. A slave cluster receives the final value (not the delta)
5606               // as a Put.
5607               walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
5608                 this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
5609               txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
5610                 walKey, walEdits, getSequenceId(), true, memstoreCells);
5611             } else {
5612               recordMutationWithoutWal(increment.getFamilyCellMap());
5613             }
5614           }
5615           if(walKey == null){
5616             // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
5617             walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
5618           }
5619         } finally {
5620           this.updatesLock.readLock().unlock();
5621         }
5622       } finally {
5623         rowLock.release();
5624         rowLock = null;
5625       }
5626       // sync the transaction log outside the rowlock
5627       if(txid != 0){
5628         syncOrDefer(txid, durability);
5629       }
5630       doRollBackMemstore = false;
5631     } finally {
5632       if (rowLock != null) {
5633         rowLock.release();
5634       }
5635       // if the wal sync was unsuccessful, remove keys from memstore
5636       if (doRollBackMemstore) {
5637         rollbackMemstore(memstoreCells);
5638       }
5639       if (w != null) {
5640         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5641       }
5642       closeRegionOperation(Operation.INCREMENT);
5643       if (this.metricsRegion != null) {
5644         this.metricsRegion.updateIncrement();
5645       }
5646     }
5647 
5648     if (flush) {
5649       // Request a cache flush.  Do it outside update lock.
5650       requestFlush();
5651     }
5652 
5653     return Result.create(allKVs);
5654   }
5655 
5656   //
5657   // New HBASE-880 Helpers
5658   //
5659 
5660   private void checkFamily(final byte [] family)
5661   throws NoSuchColumnFamilyException {
5662     if (!this.htableDescriptor.hasFamily(family)) {
5663       throw new NoSuchColumnFamilyException("Column family " +
5664           Bytes.toString(family) + " does not exist in region " + this
5665           + " in table " + this.htableDescriptor);
5666     }
5667   }
5668 
5669   public static final long FIXED_OVERHEAD = ClassSize.align(
5670       ClassSize.OBJECT +
5671       ClassSize.ARRAY +
5672       40 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
5673       (12 * Bytes.SIZEOF_LONG) +
5674       4 * Bytes.SIZEOF_BOOLEAN);
5675 
5676   // woefully out of date - currently missing:
5677   // 1 x HashMap - coprocessorServiceHandlers
5678   // 6 x Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
5679   //   checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,
5680   //   writeRequestsCount
5681   // 1 x HRegion$WriteState - writestate
5682   // 1 x RegionCoprocessorHost - coprocessorHost
5683   // 1 x RegionSplitPolicy - splitPolicy
5684   // 1 x MetricsRegion - metricsRegion
5685   // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper
5686   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
5687       ClassSize.OBJECT + // closeLock
5688       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
5689       (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
5690       (2 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, scannerReadPoints
5691       WriteState.HEAP_SIZE + // writestate
5692       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
5693       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
5694       MultiVersionConsistencyControl.FIXED_SIZE // mvcc
5695       + ClassSize.TREEMAP // maxSeqIdInStores
5696       + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
5697       ;
5698 
5699   @Override
5700   public long heapSize() {
5701     long heapSize = DEEP_OVERHEAD;
5702     for (Store store : this.stores.values()) {
5703       heapSize += store.heapSize();
5704     }
5705     // this does not take into account row locks, recent flushes, mvcc entries, and more
5706     return heapSize;
5707   }
5708 
5709   /*
5710    * This method calls System.exit.
5711    * @param message Message to print out.  May be null.
5712    */
5713   private static void printUsageAndExit(final String message) {
5714     if (message != null && message.length() > 0) System.out.println(message);
5715     System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
5716     System.out.println("Options:");
5717     System.out.println(" major_compact  Pass this option to major compact " +
5718       "passed region.");
5719     System.out.println("Default outputs scan of passed region.");
5720     System.exit(1);
5721   }
5722 
5723   /**
5724    * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
5725    * be available for handling
5726    * {@link HRegion#execService(com.google.protobuf.RpcController,
5727    *    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
5728    *
5729    * <p>
5730    * Only a single instance may be registered per region for a given {@link Service} subclass (the
5731    * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
5732    * After the first registration, subsequent calls with the same service name will fail with
5733    * a return value of {@code false}.
5734    * </p>
5735    * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
5736    * @return {@code true} if the registration was successful, {@code false}
5737    * otherwise
5738    */
5739   public boolean registerService(Service instance) {
5740     /*
5741      * No stacking of instances is allowed for a single service name
5742      */
5743     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
5744     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
5745       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
5746           " already registered, rejecting request from "+instance
5747       );
5748       return false;
5749     }
5750 
5751     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
5752     if (LOG.isDebugEnabled()) {
5753       LOG.debug("Registered coprocessor service: region="+
5754           Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
5755     }
5756     return true;
5757   }
5758 
5759   /**
5760    * Executes a single protocol buffer coprocessor endpoint {@link Service} method using
5761    * the registered protocol handlers.  {@link Service} implementations must be registered via the
5762    * {@link HRegion#registerService(com.google.protobuf.Service)}
5763    * method before they are available.
5764    *
5765    * @param controller an {@code RpcController} implementation to pass to the invoked service
5766    * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
5767    *     and parameters for the method invocation
5768    * @return a protocol buffer {@code Message} instance containing the method's result
5769    * @throws IOException if no registered service handler is found or an error
5770    *     occurs during the invocation
5771    * @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)
5772    */
5773   public Message execService(RpcController controller, CoprocessorServiceCall call)
5774       throws IOException {
5775     String serviceName = call.getServiceName();
5776     String methodName = call.getMethodName();
5777     if (!coprocessorServiceHandlers.containsKey(serviceName)) {
5778       throw new UnknownProtocolException(null,
5779           "No registered coprocessor service found for name "+serviceName+
5780           " in region "+Bytes.toStringBinary(getRegionName()));
5781     }
5782 
5783     Service service = coprocessorServiceHandlers.get(serviceName);
5784     Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
5785     Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
5786     if (methodDesc == null) {
5787       throw new UnknownProtocolException(service.getClass(),
5788           "Unknown method "+methodName+" called on service "+serviceName+
5789               " in region "+Bytes.toStringBinary(getRegionName()));
5790     }
5791 
5792     Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
5793         .mergeFrom(call.getRequest()).build();
5794 
5795     if (coprocessorHost != null) {
5796       request = coprocessorHost.preEndpointInvocation(service, methodName, request);
5797     }
5798 
5799     final Message.Builder responseBuilder =
5800         service.getResponsePrototype(methodDesc).newBuilderForType();
5801     service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
5802       @Override
5803       public void run(Message message) {
5804         if (message != null) {
5805           responseBuilder.mergeFrom(message);
5806         }
5807       }
5808     });
5809 
5810     if (coprocessorHost != null) {
5811       coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
5812     }
5813 
5814     return responseBuilder.build();
5815   }
5816 
5817   /*
5818    * Process table.
5819    * Do major compaction or list content.
5820    * @throws IOException
5821    */
5822   private static void processTable(final FileSystem fs, final Path p,
5823       final HLog log, final Configuration c,
5824       final boolean majorCompact)
5825   throws IOException {
5826     HRegion region;
5827     // Currently expects tables have one region only.
5828     if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
5829       region = HRegion.newHRegion(p, log, fs, c,
5830         HRegionInfo.FIRST_META_REGIONINFO, HTableDescriptor.META_TABLEDESC, null);
5831     } else {
5832       throw new IOException("Not a known catalog table: " + p.toString());
5833     }
5834     try {
5835       region.initialize(null);
5836       if (majorCompact) {
5837         region.compactStores(true);
5838       } else {
5839         // Default behavior
5840         Scan scan = new Scan();
5841         // scan.addFamily(HConstants.CATALOG_FAMILY);
5842         RegionScanner scanner = region.getScanner(scan);
5843         try {
5844           List<Cell> kvs = new ArrayList<Cell>();
5845           boolean done;
5846           do {
5847             kvs.clear();
5848             done = scanner.next(kvs);
5849             if (kvs.size() > 0) LOG.info(kvs);
5850           } while (done);
5851         } finally {
5852           scanner.close();
5853         }
5854       }
5855     } finally {
5856       region.close();
5857     }
5858   }
5859 
5860   boolean shouldForceSplit() {
5861     return this.splitRequest;
5862   }
5863 
5864   byte[] getExplicitSplitPoint() {
5865     return this.explicitSplitPoint;
5866   }
5867 
5868   void forceSplit(byte[] sp) {
5869     // This HRegion will go away after the forced split is successful
5870     // But if a forced split fails, we need to clear forced split.
5871     this.splitRequest = true;
5872     if (sp != null) {
5873       this.explicitSplitPoint = sp;
5874     }
5875   }
5876 
5877   void clearSplit() {
5878     this.splitRequest = false;
5879     this.explicitSplitPoint = null;
5880   }
5881 
5882   /**
5883    * Give the region a chance to prepare before it is split.
5884    */
5885   protected void prepareToSplit() {
5886     // nothing
5887   }
5888 
5889   /**
5890    * Return the splitpoint. null indicates the region isn't splittable
5891    * If the splitpoint isn't explicitly specified, it will go over the stores
5892    * to find the best splitpoint. Currently the criteria of best splitpoint
5893    * is based on the size of the store.
5894    */
5895   public byte[] checkSplit() {
5896     // Can't split META
5897     if (this.getRegionInfo().isMetaTable() ||
5898         TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
5899       if (shouldForceSplit()) {
5900         LOG.warn("Cannot split meta region in HBase 0.20 and above");
5901       }
5902       return null;
5903     }
5904 
5905     // Can't split region which is in recovering state
5906     if (this.isRecovering()) {
5907       LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
5908       return null;
5909     }
5910 
5911     if (!splitPolicy.shouldSplit()) {
5912       return null;
5913     }
5914 
5915     byte[] ret = splitPolicy.getSplitPoint();
5916 
5917     if (ret != null) {
5918       try {
5919         checkRow(ret, "calculated split");
5920       } catch (IOException e) {
5921         LOG.error("Ignoring invalid split", e);
5922         return null;
5923       }
5924     }
5925     return ret;
5926   }
5927 
5928   /**
5929    * @return The priority that this region should have in the compaction queue
5930    */
5931   public int getCompactPriority() {
5932     int count = Integer.MAX_VALUE;
5933     for (Store store : stores.values()) {
5934       count = Math.min(count, store.getCompactPriority());
5935     }
5936     return count;
5937   }
5938 
5939 
5940   /** @return the coprocessor host */
5941   public RegionCoprocessorHost getCoprocessorHost() {
5942     return coprocessorHost;
5943   }
5944 
5945   /** @param coprocessorHost the new coprocessor host */
5946   public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
5947     this.coprocessorHost = coprocessorHost;
5948   }
5949 
5950   /**
5951    * This method needs to be called before any public call that reads or
5952    * modifies data. It has to be called just before a try.
5953    * #closeRegionOperation needs to be called in the try's finally block
5954    * Acquires a read lock and checks if the region is closing or closed.
5955    * @throws IOException
5956    */
5957   public void startRegionOperation() throws IOException {
5958     startRegionOperation(Operation.ANY);
5959   }
5960 
5961   /**
5962    * @param op The operation is about to be taken on the region
5963    * @throws IOException
5964    */
5965   protected void startRegionOperation(Operation op) throws IOException {
5966     switch (op) {
5967     case GET:  // read operations
5968     case SCAN:
5969       checkReadsEnabled();
5970     case INCREMENT: // write operations
5971     case APPEND:
5972     case SPLIT_REGION:
5973     case MERGE_REGION:
5974     case PUT:
5975     case DELETE:
5976     case BATCH_MUTATE:
5977     case COMPACT_REGION:
5978       // when a region is in recovering state, no read, split or merge is allowed
5979       if (this.isRecovering() && (this.disallowWritesInRecovering ||
5980               (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
5981         throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering");
5982       }
5983       break;
5984     default:
5985       break;
5986     }
5987     if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
5988         || op == Operation.COMPACT_REGION) {
5989       // split, merge or compact region doesn't need to check the closing/closed state or lock the
5990       // region
5991       return;
5992     }
5993     if (this.closing.get()) {
5994       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5995     }
5996     lock(lock.readLock());
5997     if (this.closed.get()) {
5998       lock.readLock().unlock();
5999       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
6000     }
6001     try {
6002       if (coprocessorHost != null) {
6003         coprocessorHost.postStartRegionOperation(op);
6004       }
6005     } catch (Exception e) {
6006       lock.readLock().unlock();
6007       throw new IOException(e);
6008     }
6009   }
6010 
6011   /**
6012    * Closes the lock. This needs to be called in the finally block corresponding
6013    * to the try block of #startRegionOperation
6014    * @throws IOException
6015    */
6016   public void closeRegionOperation() throws IOException {
6017     closeRegionOperation(Operation.ANY);
6018   }
6019 
6020   /**
6021    * Closes the lock. This needs to be called in the finally block corresponding
6022    * to the try block of {@link #startRegionOperation(Operation)}
6023    * @throws IOException
6024    */
6025   public void closeRegionOperation(Operation operation) throws IOException {
6026     lock.readLock().unlock();
6027     if (coprocessorHost != null) {
6028       coprocessorHost.postCloseRegionOperation(operation);
6029     }
6030   }
6031 
6032   /**
6033    * This method needs to be called before any public call that reads or
6034    * modifies stores in bulk. It has to be called just before a try.
6035    * #closeBulkRegionOperation needs to be called in the try's finally block
6036    * Acquires a writelock and checks if the region is closing or closed.
6037    * @throws NotServingRegionException when the region is closing or closed
6038    * @throws RegionTooBusyException if failed to get the lock in time
6039    * @throws InterruptedIOException if interrupted while waiting for a lock
6040    */
6041   private void startBulkRegionOperation(boolean writeLockNeeded)
6042       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
6043     if (this.closing.get()) {
6044       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
6045     }
6046     if (writeLockNeeded) lock(lock.writeLock());
6047     else lock(lock.readLock());
6048     if (this.closed.get()) {
6049       if (writeLockNeeded) lock.writeLock().unlock();
6050       else lock.readLock().unlock();
6051       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
6052     }
6053   }
6054 
6055   /**
6056    * Closes the lock. This needs to be called in the finally block corresponding
6057    * to the try block of #startRegionOperation
6058    */
6059   private void closeBulkRegionOperation(){
6060     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
6061     else lock.readLock().unlock();
6062   }
6063 
6064   /**
6065    * Update counters for numer of puts without wal and the size of possible data loss.
6066    * These information are exposed by the region server metrics.
6067    */
6068   private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
6069     numMutationsWithoutWAL.increment();
6070     if (numMutationsWithoutWAL.get() <= 1) {
6071       LOG.info("writing data to region " + this +
6072                " with WAL disabled. Data may be lost in the event of a crash.");
6073     }
6074 
6075     long mutationSize = 0;
6076     for (List<Cell> cells: familyMap.values()) {
6077       for (Cell cell : cells) {
6078         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
6079         mutationSize += kv.getKeyLength() + kv.getValueLength();
6080       }
6081     }
6082 
6083     dataInMemoryWithoutWAL.add(mutationSize);
6084   }
6085 
6086   private void lock(final Lock lock)
6087       throws RegionTooBusyException, InterruptedIOException {
6088     lock(lock, 1);
6089   }
6090 
6091   /**
6092    * Try to acquire a lock.  Throw RegionTooBusyException
6093    * if failed to get the lock in time. Throw InterruptedIOException
6094    * if interrupted while waiting for the lock.
6095    */
6096   private void lock(final Lock lock, final int multiplier)
6097       throws RegionTooBusyException, InterruptedIOException {
6098     try {
6099       final long waitTime = Math.min(maxBusyWaitDuration,
6100           busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
6101       if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
6102         throw new RegionTooBusyException(
6103             "failed to get a lock in " + waitTime + " ms. " +
6104                 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
6105                 this.getRegionInfo().getRegionNameAsString()) +
6106                 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
6107                 this.getRegionServerServices().getServerName()));
6108       }
6109     } catch (InterruptedException ie) {
6110       LOG.info("Interrupted while waiting for a lock");
6111       InterruptedIOException iie = new InterruptedIOException();
6112       iie.initCause(ie);
6113       throw iie;
6114     }
6115   }
6116 
6117   /**
6118    * Calls sync with the given transaction ID if the region's table is not
6119    * deferring it.
6120    * @param txid should sync up to which transaction
6121    * @throws IOException If anything goes wrong with DFS
6122    */
6123   private void syncOrDefer(long txid, Durability durability) throws IOException {
6124     if (this.getRegionInfo().isMetaRegion()) {
6125       this.log.sync(txid);
6126     } else {
6127       switch(durability) {
6128       case USE_DEFAULT:
6129         // do what table defaults to
6130         if (shouldSyncLog()) {
6131           this.log.sync(txid);
6132         }
6133         break;
6134       case SKIP_WAL:
6135         // nothing do to
6136         break;
6137       case ASYNC_WAL:
6138         // nothing do to
6139         break;
6140       case SYNC_WAL:
6141       case FSYNC_WAL:
6142         // sync the WAL edit (SYNC and FSYNC treated the same for now)
6143         this.log.sync(txid);
6144         break;
6145       }
6146     }
6147   }
6148 
6149   /**
6150    * Check whether we should sync the log from the table's durability settings
6151    */
6152   private boolean shouldSyncLog() {
6153     return durability.ordinal() >  Durability.ASYNC_WAL.ordinal();
6154   }
6155 
6156   /**
6157    * A mocked list implementaion - discards all updates.
6158    */
6159   private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
6160 
6161     @Override
6162     public void add(int index, Cell element) {
6163       // do nothing
6164     }
6165 
6166     @Override
6167     public boolean addAll(int index, Collection<? extends Cell> c) {
6168       return false; // this list is never changed as a result of an update
6169     }
6170 
6171     @Override
6172     public KeyValue get(int index) {
6173       throw new UnsupportedOperationException();
6174     }
6175 
6176     @Override
6177     public int size() {
6178       return 0;
6179     }
6180   };
6181 
6182   /**
6183    * Facility for dumping and compacting catalog tables.
6184    * Only does catalog tables since these are only tables we for sure know
6185    * schema on.  For usage run:
6186    * <pre>
6187    *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
6188    * </pre>
6189    * @throws IOException
6190    */
6191   public static void main(String[] args) throws IOException {
6192     if (args.length < 1) {
6193       printUsageAndExit(null);
6194     }
6195     boolean majorCompact = false;
6196     if (args.length > 1) {
6197       if (!args[1].toLowerCase().startsWith("major")) {
6198         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
6199       }
6200       majorCompact = true;
6201     }
6202     final Path tableDir = new Path(args[0]);
6203     final Configuration c = HBaseConfiguration.create();
6204     final FileSystem fs = FileSystem.get(c);
6205     final Path logdir = new Path(c.get("hbase.tmp.dir"));
6206     final String logname = "hlog" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
6207 
6208     final HLog log = HLogFactory.createHLog(fs, logdir, logname, c);
6209     try {
6210       processTable(fs, tableDir, log, c, majorCompact);
6211     } finally {
6212        log.close();
6213        // TODO: is this still right?
6214        BlockCache bc = new CacheConfig(c).getBlockCache();
6215        if (bc != null) bc.shutdown();
6216     }
6217   }
6218 
6219   /**
6220    * Gets the latest sequence number that was read from storage when this region was opened.
6221    */
6222   public long getOpenSeqNum() {
6223     return this.openSeqNum;
6224   }
6225 
6226   /**
6227    * Gets max sequence ids of stores that was read from storage when this region was opened. WAL
6228    * Edits with smaller or equal sequence number will be skipped from replay.
6229    */
6230   public Map<byte[], Long> getMaxStoreSeqIdForLogReplay() {
6231     return this.maxSeqIdInStores;
6232   }
6233 
6234   /**
6235    * @return if a given region is in compaction now.
6236    */
6237   public CompactionState getCompactionState() {
6238     boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
6239     return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
6240         : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
6241   }
6242 
6243   public void reportCompactionRequestStart(boolean isMajor){
6244     (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
6245   }
6246 
6247   public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
6248     int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
6249 
6250     // metrics
6251     compactionsFinished.incrementAndGet();
6252     compactionNumFilesCompacted.addAndGet(numFiles);
6253     compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
6254 
6255     assert newValue >= 0;
6256   }
6257 
6258   /**
6259    * Do not change this sequence id. See {@link #sequenceId} comment.
6260    * @return sequenceId
6261    */
6262   @VisibleForTesting
6263   public AtomicLong getSequenceId() {
6264     return this.sequenceId;
6265   }
6266 
6267   /**
6268    * sets this region's sequenceId.
6269    * @param value new value
6270    */
6271   private void setSequenceId(long value) {
6272     this.sequenceId.set(value);
6273   }
6274 
6275   /**
6276    * Listener class to enable callers of
6277    * bulkLoadHFile() to perform any necessary
6278    * pre/post processing of a given bulkload call
6279    */
6280   public interface BulkLoadListener {
6281 
6282     /**
6283      * Called before an HFile is actually loaded
6284      * @param family family being loaded to
6285      * @param srcPath path of HFile
6286      * @return final path to be used for actual loading
6287      * @throws IOException
6288      */
6289     String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
6290 
6291     /**
6292      * Called after a successful HFile load
6293      * @param family family being loaded to
6294      * @param srcPath path of HFile
6295      * @throws IOException
6296      */
6297     void doneBulkLoad(byte[] family, String srcPath) throws IOException;
6298 
6299     /**
6300      * Called after a failed HFile load
6301      * @param family family being loaded to
6302      * @param srcPath path of HFile
6303      * @throws IOException
6304      */
6305     void failedBulkLoad(byte[] family, String srcPath) throws IOException;
6306   }
6307 
6308   @VisibleForTesting class RowLockContext {
6309     private final HashedBytes row;
6310     private final CountDownLatch latch = new CountDownLatch(1);
6311     private final Thread thread;
6312     private int lockCount = 0;
6313 
6314     RowLockContext(HashedBytes row) {
6315       this.row = row;
6316       this.thread = Thread.currentThread();
6317     }
6318 
6319     boolean ownedByCurrentThread() {
6320       return thread == Thread.currentThread();
6321     }
6322 
6323     RowLock newLock() {
6324       lockCount++;
6325       return new RowLock(this);
6326     }
6327 
6328     void releaseLock() {
6329       if (!ownedByCurrentThread()) {
6330         throw new IllegalArgumentException("Lock held by thread: " + thread
6331           + " cannot be released by different thread: " + Thread.currentThread());
6332       }
6333       lockCount--;
6334       if (lockCount == 0) {
6335         // no remaining locks by the thread, unlock and allow other threads to access
6336         RowLockContext existingContext = lockedRows.remove(row);
6337         if (existingContext != this) {
6338           throw new RuntimeException(
6339               "Internal row lock state inconsistent, should not happen, row: " + row);
6340         }
6341         latch.countDown();
6342       }
6343     }
6344   }
6345 
6346   /**
6347    * Row lock held by a given thread.
6348    * One thread may acquire multiple locks on the same row simultaneously.
6349    * The locks must be released by calling release() from the same thread.
6350    */
6351   public static class RowLock {
6352     @VisibleForTesting final RowLockContext context;
6353     private boolean released = false;
6354 
6355     @VisibleForTesting RowLock(RowLockContext context) {
6356       this.context = context;
6357     }
6358 
6359     /**
6360      * Release the given lock.  If there are no remaining locks held by the current thread
6361      * then unlock the row and allow other threads to acquire the lock.
6362      * @throws IllegalArgumentException if called by a different thread than the lock owning thread
6363      */
6364     public void release() {
6365       if (!released) {
6366         context.releaseLock();
6367         released = true;
6368       }
6369     }
6370   }
6371 
6372   /**
6373    * Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore
6374    * the WALEdit append later.
6375    * @param wal
6376    * @param cells list of KeyValues inserted into memstore. Those KeyValues are passed in order to
6377    *        be updated with right mvcc values(their log sequence nu
6378    * @return Return the key used appending with no sync and no append.
6379    * @throws IOException
6380    */
6381   private HLogKey appendNoSyncNoAppend(final HLog wal, List<KeyValue> cells) throws IOException {
6382     HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
6383       HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
6384     // Call append but with an empty WALEdit.  The returned seqeunce id will not be associated
6385     // with any edit and we can be sure it went in after all outstanding appends.
6386     wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
6387       WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
6388     return key;
6389   }
6390 
6391   /**
6392    * Explictly sync wal
6393    * @throws IOException
6394    */
6395   public void syncWal() throws IOException {
6396     if(this.log != null) {
6397       this.log.sync();
6398     }
6399   }
6400 }