View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.io.UnsupportedEncodingException;
25  import java.lang.reflect.Constructor;
26  import java.text.ParseException;
27  import java.util.AbstractList;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.NavigableMap;
37  import java.util.NavigableSet;
38  import java.util.Set;
39  import java.util.TreeMap;
40  import java.util.concurrent.Callable;
41  import java.util.concurrent.CompletionService;
42  import java.util.concurrent.ConcurrentHashMap;
43  import java.util.concurrent.ConcurrentSkipListMap;
44  import java.util.concurrent.CountDownLatch;
45  import java.util.concurrent.ExecutionException;
46  import java.util.concurrent.ExecutorCompletionService;
47  import java.util.concurrent.ExecutorService;
48  import java.util.concurrent.Executors;
49  import java.util.concurrent.Future;
50  import java.util.concurrent.FutureTask;
51  import java.util.concurrent.ThreadFactory;
52  import java.util.concurrent.ThreadPoolExecutor;
53  import java.util.concurrent.TimeUnit;
54  import java.util.concurrent.TimeoutException;
55  import java.util.concurrent.atomic.AtomicBoolean;
56  import java.util.concurrent.atomic.AtomicInteger;
57  import java.util.concurrent.atomic.AtomicLong;
58  import java.util.concurrent.locks.Lock;
59  import java.util.concurrent.locks.ReentrantReadWriteLock;
60  
61  import org.apache.commons.logging.Log;
62  import org.apache.commons.logging.LogFactory;
63  import org.apache.hadoop.classification.InterfaceAudience;
64  import org.apache.hadoop.conf.Configuration;
65  import org.apache.hadoop.fs.FileStatus;
66  import org.apache.hadoop.fs.FileSystem;
67  import org.apache.hadoop.fs.Path;
68  import org.apache.hadoop.hbase.Cell;
69  import org.apache.hadoop.hbase.CellUtil;
70  import org.apache.hadoop.hbase.CompoundConfiguration;
71  import org.apache.hadoop.hbase.DoNotRetryIOException;
72  import org.apache.hadoop.hbase.DroppedSnapshotException;
73  import org.apache.hadoop.hbase.HBaseConfiguration;
74  import org.apache.hadoop.hbase.HColumnDescriptor;
75  import org.apache.hadoop.hbase.HConstants;
76  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
77  import org.apache.hadoop.hbase.CellScanner;
78  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
79  import org.apache.hadoop.hbase.HRegionInfo;
80  import org.apache.hadoop.hbase.HTableDescriptor;
81  import org.apache.hadoop.hbase.KeyValue;
82  import org.apache.hadoop.hbase.KeyValueUtil;
83  import org.apache.hadoop.hbase.NotServingRegionException;
84  import org.apache.hadoop.hbase.RegionTooBusyException;
85  import org.apache.hadoop.hbase.TableName;
86  import org.apache.hadoop.hbase.UnknownScannerException;
87  import org.apache.hadoop.hbase.backup.HFileArchiver;
88  import org.apache.hadoop.hbase.client.Append;
89  import org.apache.hadoop.hbase.client.Delete;
90  import org.apache.hadoop.hbase.client.Durability;
91  import org.apache.hadoop.hbase.client.Get;
92  import org.apache.hadoop.hbase.client.Increment;
93  import org.apache.hadoop.hbase.client.IsolationLevel;
94  import org.apache.hadoop.hbase.client.Mutation;
95  import org.apache.hadoop.hbase.client.Put;
96  import org.apache.hadoop.hbase.client.Result;
97  import org.apache.hadoop.hbase.client.RowMutations;
98  import org.apache.hadoop.hbase.client.Scan;
99  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
100 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
101 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
102 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
103 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
104 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
105 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
106 import org.apache.hadoop.hbase.filter.FilterWrapper;
107 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
108 import org.apache.hadoop.hbase.io.HeapSize;
109 import org.apache.hadoop.hbase.io.TimeRange;
110 import org.apache.hadoop.hbase.io.hfile.BlockCache;
111 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
112 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
113 import org.apache.hadoop.hbase.ipc.RpcCallContext;
114 import org.apache.hadoop.hbase.ipc.RpcServer;
115 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
116 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
117 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
119 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
121 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
122 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
123 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
124 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
125 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
126 import org.apache.hadoop.hbase.regionserver.wal.HLog;
127 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
128 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
129 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
130 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.MutationReplay;
131 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
132 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
133 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
134 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
135 import org.apache.hadoop.hbase.util.Bytes;
136 import org.apache.hadoop.hbase.util.CancelableProgressable;
137 import org.apache.hadoop.hbase.util.ClassSize;
138 import org.apache.hadoop.hbase.util.CompressionTest;
139 import org.apache.hadoop.hbase.util.Counter;
140 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
141 import org.apache.hadoop.hbase.util.FSUtils;
142 import org.apache.hadoop.hbase.util.HashedBytes;
143 import org.apache.hadoop.hbase.util.Pair;
144 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
145 import org.apache.hadoop.hbase.util.Threads;
146 import org.apache.hadoop.io.MultipleIOException;
147 import org.apache.hadoop.util.StringUtils;
148 
149 import com.google.common.annotations.VisibleForTesting;
150 import com.google.common.base.Preconditions;
151 import com.google.common.collect.Lists;
152 import com.google.common.collect.Maps;
153 import com.google.common.io.Closeables;
154 import com.google.protobuf.Descriptors;
155 import com.google.protobuf.Message;
156 import com.google.protobuf.RpcCallback;
157 import com.google.protobuf.RpcController;
158 import com.google.protobuf.Service;
159 
160 /**
161  * HRegion stores data for a certain region of a table.  It stores all columns
162  * for each row. A given table consists of one or more HRegions.
163  *
164  * <p>We maintain multiple HStores for a single HRegion.
165  *
166  * <p>An Store is a set of rows with some column data; together,
167  * they make up all the data for the rows.
168  *
169  * <p>Each HRegion has a 'startKey' and 'endKey'.
170  * <p>The first is inclusive, the second is exclusive (except for
171  * the final region)  The endKey of region 0 is the same as
172  * startKey for region 1 (if it exists).  The startKey for the
173  * first region is null. The endKey for the final region is null.
174  *
175  * <p>Locking at the HRegion level serves only one purpose: preventing the
176  * region from being closed (and consequently split) while other operations
177  * are ongoing. Each row level operation obtains both a row lock and a region
178  * read lock for the duration of the operation. While a scanner is being
179  * constructed, getScanner holds a read lock. If the scanner is successfully
180  * constructed, it holds a read lock until it is closed. A close takes out a
181  * write lock and consequently will block for ongoing operations and will block
182  * new operations from starting while the close is in progress.
183  *
184  * <p>An HRegion is defined by its table and its key extent.
185  *
186  * <p>It consists of at least one Store.  The number of Stores should be
187  * configurable, so that data which is accessed together is stored in the same
188  * Store.  Right now, we approximate that by building a single Store for
189  * each column family.  (This config info will be communicated via the
190  * tabledesc.)
191  *
192  * <p>The HTableDescriptor contains metainfo about the HRegion's table.
193  * regionName is a unique identifier for this HRegion. (startKey, endKey]
194  * defines the keyspace for this HRegion.
195  */
196 @InterfaceAudience.Private
197 public class HRegion implements HeapSize { // , Writable{
198   public static final Log LOG = LogFactory.getLog(HRegion.class);
199 
200   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
201       "hbase.hregion.scan.loadColumnFamiliesOnDemand";
202 
203   /**
204    * This is the global default value for durability. All tables/mutations not
205    * defining a durability or using USE_DEFAULT will default to this value.
206    */
207   private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL;
208 
209   final AtomicBoolean closed = new AtomicBoolean(false);
210   /* Closing can take some time; use the closing flag if there is stuff we don't
211    * want to do while in closing state; e.g. like offer this region up to the
212    * master as a region to close if the carrying regionserver is overloaded.
213    * Once set, it is never cleared.
214    */
215   final AtomicBoolean closing = new AtomicBoolean(false);
216 
217   /**
218    * The sequence id of the last flush on this region.  Used doing some rough calculations on
219    * whether time to flush or not.
220    */
221   protected volatile long lastFlushSeqId = -1L;
222 
223   /**
224    * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL/HLog
225    * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
226    * Its default value is -1L. This default is used as a marker to indicate
227    * that the region hasn't opened yet. Once it is opened, it is set to the derived
228    * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
229    *
230    * <p>Control of this sequence is handed off to the WAL/HLog implementation.  It is responsible
231    * for tagging edits with the correct sequence id since it is responsible for getting the
232    * edits into the WAL files. It controls updating the sequence id value.  DO NOT UPDATE IT
233    * OUTSIDE OF THE WAL.  The value you get will not be what you think it is.
234    */
235   private final AtomicLong sequenceId = new AtomicLong(-1L);
236 
237   /**
238    * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
239    * startRegionOperation to possibly invoke different checks before any region operations. Not all
240    * operations have to be defined here. It's only needed when a special check is need in
241    * startRegionOperation
242    */
243   public enum Operation {
244     ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
245     REPLAY_BATCH_MUTATE, COMPACT_REGION
246   }
247 
248   //////////////////////////////////////////////////////////////////////////////
249   // Members
250   //////////////////////////////////////////////////////////////////////////////
251 
252   // map from a locked row to the context for that lock including:
253   // - CountDownLatch for threads waiting on that row
254   // - the thread that owns the lock (allow reentrancy)
255   // - reference count of (reentrant) locks held by the thread
256   // - the row itself
257   private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
258       new ConcurrentHashMap<HashedBytes, RowLockContext>();
259 
260   protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
261       Bytes.BYTES_RAWCOMPARATOR);
262 
263   // TODO: account for each registered handler in HeapSize computation
264   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
265 
266   public final AtomicLong memstoreSize = new AtomicLong(0);
267 
268   // Debug possible data loss due to WAL off
269   final Counter numMutationsWithoutWAL = new Counter();
270   final Counter dataInMemoryWithoutWAL = new Counter();
271 
272   // Debug why CAS operations are taking a while.
273   final Counter checkAndMutateChecksPassed = new Counter();
274   final Counter checkAndMutateChecksFailed = new Counter();
275 
276   //Number of requests
277   final Counter readRequestsCount = new Counter();
278   final Counter writeRequestsCount = new Counter();
279 
280   // Compaction counters
281   final AtomicLong compactionsFinished = new AtomicLong(0L);
282   final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
283   final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
284 
285 
286   private final HLog log;
287   private final HRegionFileSystem fs;
288   protected final Configuration conf;
289   private final Configuration baseConf;
290   private final KeyValue.KVComparator comparator;
291   private final int rowLockWaitDuration;
292   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
293 
294   // The internal wait duration to acquire a lock before read/update
295   // from the region. It is not per row. The purpose of this wait time
296   // is to avoid waiting a long time while the region is busy, so that
297   // we can release the IPC handler soon enough to improve the
298   // availability of the region server. It can be adjusted by
299   // tuning configuration "hbase.busy.wait.duration".
300   final long busyWaitDuration;
301   static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
302 
303   // If updating multiple rows in one call, wait longer,
304   // i.e. waiting for busyWaitDuration * # of rows. However,
305   // we can limit the max multiplier.
306   final int maxBusyWaitMultiplier;
307 
308   // Max busy wait duration. There is no point to wait longer than the RPC
309   // purge timeout, when a RPC call will be terminated by the RPC engine.
310   final long maxBusyWaitDuration;
311 
312   // negative number indicates infinite timeout
313   static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
314   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
315 
316   private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
317 
318   /**
319    * The sequence ID that was encountered when this region was opened.
320    */
321   private long openSeqNum = HConstants.NO_SEQNUM;
322 
323   /**
324    * The default setting for whether to enable on-demand CF loading for
325    * scan requests to this region. Requests can override it.
326    */
327   private boolean isLoadingCfsOnDemandDefault = false;
328 
329   private final AtomicInteger majorInProgress = new AtomicInteger(0);
330   private final AtomicInteger minorInProgress = new AtomicInteger(0);
331 
332   //
333   // Context: During replay we want to ensure that we do not lose any data. So, we
334   // have to be conservative in how we replay logs. For each store, we calculate
335   // the maxSeqId up to which the store was flushed. And, skip the edits which
336   // are equal to or lower than maxSeqId for each store.
337   // The following map is populated when opening the region
338   Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
339 
340   /**
341    * Config setting for whether to allow writes when a region is in recovering or not.
342    */
343   private boolean disallowWritesInRecovering = false;
344 
345   // when a region is in recovering state, it can only accept writes not reads
346   private volatile boolean isRecovering = false;
347 
348   /**
349    * @return The smallest mvcc readPoint across all the scanners in this
350    * region. Writes older than this readPoint, are included  in every
351    * read operation.
352    */
353   public long getSmallestReadPoint() {
354     long minimumReadPoint;
355     // We need to ensure that while we are calculating the smallestReadPoint
356     // no new RegionScanners can grab a readPoint that we are unaware of.
357     // We achieve this by synchronizing on the scannerReadPoints object.
358     synchronized(scannerReadPoints) {
359       minimumReadPoint = mvcc.memstoreReadPoint();
360 
361       for (Long readPoint: this.scannerReadPoints.values()) {
362         if (readPoint < minimumReadPoint) {
363           minimumReadPoint = readPoint;
364         }
365       }
366     }
367     return minimumReadPoint;
368   }
369   /*
370    * Data structure of write state flags used coordinating flushes,
371    * compactions and closes.
372    */
373   static class WriteState {
374     // Set while a memstore flush is happening.
375     volatile boolean flushing = false;
376     // Set when a flush has been requested.
377     volatile boolean flushRequested = false;
378     // Number of compactions running.
379     volatile int compacting = 0;
380     // Gets set in close. If set, cannot compact or flush again.
381     volatile boolean writesEnabled = true;
382     // Set if region is read-only
383     volatile boolean readOnly = false;
384     // whether the reads are enabled. This is different than readOnly, because readOnly is
385     // static in the lifetime of the region, while readsEnabled is dynamic
386     volatile boolean readsEnabled = true;
387 
388     /**
389      * Set flags that make this region read-only.
390      *
391      * @param onOff flip value for region r/o setting
392      */
393     synchronized void setReadOnly(final boolean onOff) {
394       this.writesEnabled = !onOff;
395       this.readOnly = onOff;
396     }
397 
398     boolean isReadOnly() {
399       return this.readOnly;
400     }
401 
402     boolean isFlushRequested() {
403       return this.flushRequested;
404     }
405 
406     void setReadsEnabled(boolean readsEnabled) {
407       this.readsEnabled = readsEnabled;
408     }
409 
410     static final long HEAP_SIZE = ClassSize.align(
411         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
412   }
413 
414   /**
415    * Objects from this class are created when flushing to describe all the different states that
416    * that method ends up in. The Result enum describes those states. The sequence id should only
417    * be specified if the flush was successful, and the failure message should only be specified
418    * if it didn't flush.
419    */
420   public static class FlushResult {
421     enum Result {
422       FLUSHED_NO_COMPACTION_NEEDED,
423       FLUSHED_COMPACTION_NEEDED,
424       // Special case where a flush didn't run because there's nothing in the memstores. Used when
425       // bulk loading to know when we can still load even if a flush didn't happen.
426       CANNOT_FLUSH_MEMSTORE_EMPTY,
427       CANNOT_FLUSH
428       // Be careful adding more to this enum, look at the below methods to make sure
429     }
430 
431     final Result result;
432     final String failureReason;
433     final long flushSequenceId;
434 
435     /**
436      * Convenience constructor to use when the flush is successful, the failure message is set to
437      * null.
438      * @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED.
439      * @param flushSequenceId Generated sequence id that comes right after the edits in the
440      *                        memstores.
441      */
442     FlushResult(Result result, long flushSequenceId) {
443       this(result, flushSequenceId, null);
444       assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
445           .FLUSHED_COMPACTION_NEEDED;
446     }
447 
448     /**
449      * Convenience constructor to use when we cannot flush.
450      * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
451      * @param failureReason Reason why we couldn't flush.
452      */
453     FlushResult(Result result, String failureReason) {
454       this(result, -1, failureReason);
455       assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
456     }
457 
458     /**
459      * Constructor with all the parameters.
460      * @param result Any of the Result.
461      * @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
462      * @param failureReason Reason why we couldn't flush, or null.
463      */
464     FlushResult(Result result, long flushSequenceId, String failureReason) {
465       this.result = result;
466       this.flushSequenceId = flushSequenceId;
467       this.failureReason = failureReason;
468     }
469 
470     /**
471      * Convenience method, the equivalent of checking if result is
472      * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
473      * @return true if the memstores were flushed, else false.
474      */
475     public boolean isFlushSucceeded() {
476       return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
477           .FLUSHED_COMPACTION_NEEDED;
478     }
479 
480     /**
481      * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
482      * @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
483      */
484     public boolean isCompactionNeeded() {
485       return result == Result.FLUSHED_COMPACTION_NEEDED;
486     }
487   }
488 
489   final WriteState writestate = new WriteState();
490 
491   long memstoreFlushSize;
492   final long timestampSlop;
493   final long rowProcessorTimeout;
494   private volatile long lastFlushTime;
495   final RegionServerServices rsServices;
496   private RegionServerAccounting rsAccounting;
497   private long flushCheckInterval;
498   // flushPerChanges is to prevent too many changes in memstore
499   private long flushPerChanges;
500   private long blockingMemStoreSize;
501   final long threadWakeFrequency;
502   // Used to guard closes
503   final ReentrantReadWriteLock lock =
504     new ReentrantReadWriteLock();
505 
506   // Stop updates lock
507   private final ReentrantReadWriteLock updatesLock =
508     new ReentrantReadWriteLock();
509   private boolean splitRequest;
510   private byte[] explicitSplitPoint = null;
511 
512   private final MultiVersionConsistencyControl mvcc =
513       new MultiVersionConsistencyControl();
514 
515   // Coprocessor host
516   private RegionCoprocessorHost coprocessorHost;
517 
518   private HTableDescriptor htableDescriptor = null;
519   private RegionSplitPolicy splitPolicy;
520 
521   private final MetricsRegion metricsRegion;
522   private final MetricsRegionWrapperImpl metricsRegionWrapper;
523   private final Durability durability;
524 
525   /**
526    * HRegion constructor. This constructor should only be used for testing and
527    * extensions.  Instances of HRegion should be instantiated with the
528    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
529    *
530    * @param tableDir qualified path of directory where region should be located,
531    * usually the table directory.
532    * @param log The HLog is the outbound log for any updates to the HRegion
533    * (There's a single HLog for all the HRegions on a single HRegionServer.)
534    * The log file is a logfile from the previous execution that's
535    * custom-computed for this HRegion. The HRegionServer computes and sorts the
536    * appropriate log info for this HRegion. If there is a previous log file
537    * (implying that the HRegion has been written-to before), then read it from
538    * the supplied path.
539    * @param fs is the filesystem.
540    * @param confParam is global configuration settings.
541    * @param regionInfo - HRegionInfo that describes the region
542    * is new), then read them from the supplied path.
543    * @param htd the table descriptor
544    * @param rsServices reference to {@link RegionServerServices} or null
545    */
546   @Deprecated
547   public HRegion(final Path tableDir, final HLog log, final FileSystem fs,
548       final Configuration confParam, final HRegionInfo regionInfo,
549       final HTableDescriptor htd, final RegionServerServices rsServices) {
550     this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
551       log, confParam, htd, rsServices);
552   }
553 
554   /**
555    * HRegion constructor. This constructor should only be used for testing and
556    * extensions.  Instances of HRegion should be instantiated with the
557    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
558    *
559    * @param fs is the filesystem.
560    * @param log The HLog is the outbound log for any updates to the HRegion
561    * (There's a single HLog for all the HRegions on a single HRegionServer.)
562    * The log file is a logfile from the previous execution that's
563    * custom-computed for this HRegion. The HRegionServer computes and sorts the
564    * appropriate log info for this HRegion. If there is a previous log file
565    * (implying that the HRegion has been written-to before), then read it from
566    * the supplied path.
567    * @param confParam is global configuration settings.
568    * @param htd the table descriptor
569    * @param rsServices reference to {@link RegionServerServices} or null
570    */
571   public HRegion(final HRegionFileSystem fs, final HLog log, final Configuration confParam,
572       final HTableDescriptor htd, final RegionServerServices rsServices) {
573     if (htd == null) {
574       throw new IllegalArgumentException("Need table descriptor");
575     }
576 
577     if (confParam instanceof CompoundConfiguration) {
578       throw new IllegalArgumentException("Need original base configuration");
579     }
580 
581     this.comparator = fs.getRegionInfo().getComparator();
582     this.log = log;
583     this.fs = fs;
584 
585     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
586     this.baseConf = confParam;
587     this.conf = new CompoundConfiguration()
588       .add(confParam)
589       .addStringMap(htd.getConfiguration())
590       .addWritableMap(htd.getValues());
591     this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
592         DEFAULT_CACHE_FLUSH_INTERVAL);
593     this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
594     if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
595       throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
596           + MAX_FLUSH_PER_CHANGES);
597     }
598 
599     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
600                     DEFAULT_ROWLOCK_WAIT_DURATION);
601 
602     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
603     this.htableDescriptor = htd;
604     this.rsServices = rsServices;
605     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
606     setHTableSpecificConf();
607     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
608 
609     this.busyWaitDuration = conf.getLong(
610       "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
611     this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
612     if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
613       throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
614         + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
615         + maxBusyWaitMultiplier + "). Their product should be positive");
616     }
617     this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
618       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
619 
620     /*
621      * timestamp.slop provides a server-side constraint on the timestamp. This
622      * assumes that you base your TS around currentTimeMillis(). In this case,
623      * throw an error to the user if the user-specified TS is newer than now +
624      * slop. LATEST_TIMESTAMP == don't use this functionality
625      */
626     this.timestampSlop = conf.getLong(
627         "hbase.hregion.keyvalue.timestamp.slop.millisecs",
628         HConstants.LATEST_TIMESTAMP);
629 
630     /**
631      * Timeout for the process time in processRowsWithLocks().
632      * Use -1 to switch off time bound.
633      */
634     this.rowProcessorTimeout = conf.getLong(
635         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
636     this.durability = htd.getDurability() == Durability.USE_DEFAULT
637         ? DEFAULT_DURABLITY
638         : htd.getDurability();
639     if (rsServices != null) {
640       this.rsAccounting = this.rsServices.getRegionServerAccounting();
641       // don't initialize coprocessors if not running within a regionserver
642       // TODO: revisit if coprocessors should load in other cases
643       this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
644       this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
645       this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
646 
647       Map<String, HRegion> recoveringRegions = rsServices.getRecoveringRegions();
648       String encodedName = getRegionInfo().getEncodedName();
649       if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
650         this.isRecovering = true;
651         recoveringRegions.put(encodedName, this);
652       }
653     } else {
654       this.metricsRegionWrapper = null;
655       this.metricsRegion = null;
656     }
657     if (LOG.isDebugEnabled()) {
658       // Write out region name as string and its encoded name.
659       LOG.debug("Instantiated " + this);
660     }
661 
662     // by default, we allow writes against a region when it's in recovering
663     this.disallowWritesInRecovering =
664         conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
665           HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
666   }
667 
668   void setHTableSpecificConf() {
669     if (this.htableDescriptor == null) return;
670     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
671 
672     if (flushSize <= 0) {
673       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
674         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
675     }
676     this.memstoreFlushSize = flushSize;
677     this.blockingMemStoreSize = this.memstoreFlushSize *
678         conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
679   }
680 
681   /**
682    * Initialize this region.
683    * Used only by tests and SplitTransaction to reopen the region.
684    * You should use createHRegion() or openHRegion()
685    * @return What the next sequence (edit) id should be.
686    * @throws IOException e
687    * @deprecated use HRegion.createHRegion() or HRegion.openHRegion()
688    */
689   @Deprecated
690   public long initialize() throws IOException {
691     return initialize(null);
692   }
693 
694   /**
695    * Initialize this region.
696    *
697    * @param reporter Tickle every so often if initialize is taking a while.
698    * @return What the next sequence (edit) id should be.
699    * @throws IOException e
700    */
701   private long initialize(final CancelableProgressable reporter) throws IOException {
702     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
703     long nextSeqId = -1;
704     try {
705       nextSeqId = initializeRegionInternals(reporter, status);
706       return nextSeqId;
707     } finally {
708       // nextSeqid will be -1 if the initialization fails.
709       // At least it will be 0 otherwise.
710       if (nextSeqId == -1) {
711         status
712             .abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
713       }
714     }
715   }
716 
717   private long initializeRegionInternals(final CancelableProgressable reporter,
718       final MonitoredTask status) throws IOException, UnsupportedEncodingException {
719     if (coprocessorHost != null) {
720       status.setStatus("Running coprocessor pre-open hook");
721       coprocessorHost.preOpen();
722     }
723 
724     // Write HRI to a file in case we need to recover hbase:meta
725     status.setStatus("Writing region info on filesystem");
726     fs.checkRegionInfoOnFilesystem();
727 
728     // Initialize all the HStores
729     status.setStatus("Initializing all the Stores");
730     long maxSeqId = initializeRegionStores(reporter, status);
731 
732     this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
733     this.writestate.flushRequested = false;
734     this.writestate.compacting = 0;
735 
736     if (this.writestate.writesEnabled) {
737       // Remove temporary data left over from old regions
738       status.setStatus("Cleaning up temporary data from old regions");
739       fs.cleanupTempDir();
740     }
741 
742     if (this.writestate.writesEnabled) {
743       status.setStatus("Cleaning up detritus from prior splits");
744       // Get rid of any splits or merges that were lost in-progress.  Clean out
745       // these directories here on open.  We may be opening a region that was
746       // being split but we crashed in the middle of it all.
747       fs.cleanupAnySplitDetritus();
748       fs.cleanupMergesDir();
749     }
750 
751     // Initialize split policy
752     this.splitPolicy = RegionSplitPolicy.create(this, conf);
753 
754     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
755     // Use maximum of log sequenceid or that which was found in stores
756     // (particularly if no recovered edits, seqid will be -1).
757     long nextSeqid = maxSeqId + 1;
758     if (this.isRecovering) {
759       // In distributedLogReplay mode, we don't know the last change sequence number because region
760       // is opened before recovery completes. So we add a safety bumper to avoid new sequence number
761       // overlaps used sequence numbers
762       nextSeqid += this.flushPerChanges + 10000000; // add another extra 10million
763     }
764     LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
765       "; next sequenceid=" + nextSeqid);
766 
767     // A region can be reopened if failed a split; reset flags
768     this.closing.set(false);
769     this.closed.set(false);
770 
771     if (coprocessorHost != null) {
772       status.setStatus("Running coprocessor post-open hooks");
773       coprocessorHost.postOpen();
774     }
775 
776     status.markComplete("Region opened successfully");
777     return nextSeqid;
778   }
779 
780   private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
781       throws IOException, UnsupportedEncodingException {
782     // Load in all the HStores.
783 
784     long maxSeqId = -1;
785     // initialized to -1 so that we pick up MemstoreTS from column families
786     long maxMemstoreTS = -1;
787 
788     if (!htableDescriptor.getFamilies().isEmpty()) {
789       // initialize the thread pool for opening stores in parallel.
790       ThreadPoolExecutor storeOpenerThreadPool =
791         getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
792       CompletionService<HStore> completionService =
793         new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
794 
795       // initialize each store in parallel
796       for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
797         status.setStatus("Instantiating store for column family " + family);
798         completionService.submit(new Callable<HStore>() {
799           @Override
800           public HStore call() throws IOException {
801             return instantiateHStore(family);
802           }
803         });
804       }
805       boolean allStoresOpened = false;
806       try {
807         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
808           Future<HStore> future = completionService.take();
809           HStore store = future.get();
810           this.stores.put(store.getColumnFamilyName().getBytes(), store);
811 
812           long storeMaxSequenceId = store.getMaxSequenceId();
813           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
814               storeMaxSequenceId);
815           if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
816             maxSeqId = storeMaxSequenceId;
817           }
818           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
819           if (maxStoreMemstoreTS > maxMemstoreTS) {
820             maxMemstoreTS = maxStoreMemstoreTS;
821           }
822         }
823         allStoresOpened = true;
824       } catch (InterruptedException e) {
825         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
826       } catch (ExecutionException e) {
827         throw new IOException(e.getCause());
828       } finally {
829         storeOpenerThreadPool.shutdownNow();
830         if (!allStoresOpened) {
831           // something went wrong, close all opened stores
832           LOG.error("Could not initialize all stores for the region=" + this);
833           for (Store store : this.stores.values()) {
834             try {
835               store.close();
836             } catch (IOException e) {
837               LOG.warn(e.getMessage());
838             }
839           }
840         }
841       }
842     }
843     if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
844       // Recover any edits if available.
845       maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
846           this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
847     }
848     maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
849     mvcc.initialize(maxSeqId);
850     return maxSeqId;
851   }
852 
853   /**
854    * @return True if this region has references.
855    */
856   public boolean hasReferences() {
857     for (Store store : this.stores.values()) {
858       if (store.hasReferences()) return true;
859     }
860     return false;
861   }
862 
863   /**
864    * This function will return the HDFS blocks distribution based on the data
865    * captured when HFile is created
866    * @return The HDFS blocks distribution for the region.
867    */
868   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
869     HDFSBlocksDistribution hdfsBlocksDistribution =
870       new HDFSBlocksDistribution();
871     synchronized (this.stores) {
872       for (Store store : this.stores.values()) {
873         for (StoreFile sf : store.getStorefiles()) {
874           HDFSBlocksDistribution storeFileBlocksDistribution =
875             sf.getHDFSBlockDistribution();
876           hdfsBlocksDistribution.add(storeFileBlocksDistribution);
877         }
878       }
879     }
880     return hdfsBlocksDistribution;
881   }
882 
883   /**
884    * This is a helper function to compute HDFS block distribution on demand
885    * @param conf configuration
886    * @param tableDescriptor HTableDescriptor of the table
887    * @param regionInfo encoded name of the region
888    * @return The HDFS blocks distribution for the given region.
889    * @throws IOException
890    */
891   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
892       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
893     Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
894     return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
895   }
896 
897   /**
898    * This is a helper function to compute HDFS block distribution on demand
899    * @param conf configuration
900    * @param tableDescriptor HTableDescriptor of the table
901    * @param regionInfo encoded name of the region
902    * @param tablePath the table directory
903    * @return The HDFS blocks distribution for the given region.
904    * @throws IOException
905    */
906   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
907       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo,  Path tablePath)
908       throws IOException {
909     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
910     FileSystem fs = tablePath.getFileSystem(conf);
911 
912     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
913     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
914       Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
915       if (storeFiles == null) continue;
916 
917       for (StoreFileInfo storeFileInfo : storeFiles) {
918         hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
919       }
920     }
921     return hdfsBlocksDistribution;
922   }
923 
924   public AtomicLong getMemstoreSize() {
925     return memstoreSize;
926   }
927 
928   /**
929    * Increase the size of mem store in this region and the size of global mem
930    * store
931    * @return the size of memstore in this region
932    */
933   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
934     if (this.rsAccounting != null) {
935       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
936     }
937     return this.memstoreSize.addAndGet(memStoreSize);
938   }
939 
940   /** @return a HRegionInfo object for this region */
941   public HRegionInfo getRegionInfo() {
942     return this.fs.getRegionInfo();
943   }
944 
945   /**
946    * @return Instance of {@link RegionServerServices} used by this HRegion.
947    * Can be null.
948    */
949   RegionServerServices getRegionServerServices() {
950     return this.rsServices;
951   }
952 
953   /** @return readRequestsCount for this region */
954   long getReadRequestsCount() {
955     return this.readRequestsCount.get();
956   }
957 
958   /** @return writeRequestsCount for this region */
959   long getWriteRequestsCount() {
960     return this.writeRequestsCount.get();
961   }
962 
963   public MetricsRegion getMetrics() {
964     return metricsRegion;
965   }
966 
967   /** @return true if region is closed */
968   public boolean isClosed() {
969     return this.closed.get();
970   }
971 
972   /**
973    * @return True if closing process has started.
974    */
975   public boolean isClosing() {
976     return this.closing.get();
977   }
978 
979   /**
980    * Reset recovering state of current region
981    */
982   public void setRecovering(boolean newState) {
983     boolean wasRecovering = this.isRecovering;
984     this.isRecovering = newState;
985     if (wasRecovering && !isRecovering) {
986       // Call only when log replay is over.
987       coprocessorHost.postLogReplay();
988     }
989   }
990 
991   /**
992    * @return True if current region is in recovering
993    */
994   public boolean isRecovering() {
995     return this.isRecovering;
996   }
997 
998   /** @return true if region is available (not closed and not closing) */
999   public boolean isAvailable() {
1000     return !isClosed() && !isClosing();
1001   }
1002 
1003   /** @return true if region is splittable */
1004   public boolean isSplittable() {
1005     return isAvailable() && !hasReferences();
1006   }
1007 
1008   /**
1009    * @return true if region is mergeable
1010    */
1011   public boolean isMergeable() {
1012     if (!isAvailable()) {
1013       LOG.debug("Region " + this.getRegionNameAsString()
1014           + " is not mergeable because it is closing or closed");
1015       return false;
1016     }
1017     if (hasReferences()) {
1018       LOG.debug("Region " + this.getRegionNameAsString()
1019           + " is not mergeable because it has references");
1020       return false;
1021     }
1022 
1023     return true;
1024   }
1025 
1026   public boolean areWritesEnabled() {
1027     synchronized(this.writestate) {
1028       return this.writestate.writesEnabled;
1029     }
1030   }
1031 
1032    public MultiVersionConsistencyControl getMVCC() {
1033      return mvcc;
1034    }
1035 
1036    /*
1037     * Returns readpoint considering given IsolationLevel
1038     */
1039    public long getReadpoint(IsolationLevel isolationLevel) {
1040      if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1041        // This scan can read even uncommitted transactions
1042        return Long.MAX_VALUE;
1043      }
1044      return mvcc.memstoreReadPoint();
1045    }
1046 
1047    public boolean isLoadingCfsOnDemandDefault() {
1048      return this.isLoadingCfsOnDemandDefault;
1049    }
1050 
1051   /**
1052    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
1053    * service any more calls.
1054    *
1055    * <p>This method could take some time to execute, so don't call it from a
1056    * time-sensitive thread.
1057    *
1058    * @return Vector of all the storage files that the HRegion's component
1059    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
1060    * vector if already closed and null if judged that it should not close.
1061    *
1062    * @throws IOException e
1063    */
1064   public Map<byte[], List<StoreFile>> close() throws IOException {
1065     return close(false);
1066   }
1067 
1068   private final Object closeLock = new Object();
1069 
1070   /** Conf key for the periodic flush interval */
1071   public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1072       "hbase.regionserver.optionalcacheflushinterval";
1073   /** Default interval for the memstore flush */
1074   public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1075 
1076   /** Conf key to force a flush if there are already enough changes for one region in memstore */
1077   public static final String MEMSTORE_FLUSH_PER_CHANGES =
1078       "hbase.regionserver.flush.per.changes";
1079   public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000; // 30 millions
1080   /**
1081    * The following MAX_FLUSH_PER_CHANGES is large enough because each KeyValue has 20+ bytes
1082    * overhead. Therefore, even 1G empty KVs occupy at least 20GB memstore size for a single region
1083    */
1084   public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G
1085 
1086   /**
1087    * Close down this HRegion.  Flush the cache unless abort parameter is true,
1088    * Shut down each HStore, don't service any more calls.
1089    *
1090    * This method could take some time to execute, so don't call it from a
1091    * time-sensitive thread.
1092    *
1093    * @param abort true if server is aborting (only during testing)
1094    * @return Vector of all the storage files that the HRegion's component
1095    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
1096    * we are not to close at this time or we are already closed.
1097    *
1098    * @throws IOException e
1099    */
1100   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1101     // Only allow one thread to close at a time. Serialize them so dual
1102     // threads attempting to close will run up against each other.
1103     MonitoredTask status = TaskMonitor.get().createStatus(
1104         "Closing region " + this +
1105         (abort ? " due to abort" : ""));
1106 
1107     status.setStatus("Waiting for close lock");
1108     try {
1109       synchronized (closeLock) {
1110         return doClose(abort, status);
1111       }
1112     } finally {
1113       status.cleanup();
1114     }
1115   }
1116 
1117   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1118       throws IOException {
1119     if (isClosed()) {
1120       LOG.warn("Region " + this + " already closed");
1121       return null;
1122     }
1123 
1124     if (coprocessorHost != null) {
1125       status.setStatus("Running coprocessor pre-close hooks");
1126       this.coprocessorHost.preClose(abort);
1127     }
1128 
1129     status.setStatus("Disabling compacts and flushes for region");
1130     synchronized (writestate) {
1131       // Disable compacting and flushing by background threads for this
1132       // region.
1133       writestate.writesEnabled = false;
1134       LOG.debug("Closing " + this + ": disabling compactions & flushes");
1135       waitForFlushesAndCompactions();
1136     }
1137     // If we were not just flushing, is it worth doing a preflush...one
1138     // that will clear out of the bulk of the memstore before we put up
1139     // the close flag?
1140     if (!abort && worthPreFlushing()) {
1141       status.setStatus("Pre-flushing region before close");
1142       LOG.info("Running close preflush of " + this.getRegionNameAsString());
1143       try {
1144         internalFlushcache(status);
1145       } catch (IOException ioe) {
1146         // Failed to flush the region. Keep going.
1147         status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1148       }
1149     }
1150 
1151     this.closing.set(true);
1152     status.setStatus("Disabling writes for close");
1153     // block waiting for the lock for closing
1154     lock.writeLock().lock();
1155     try {
1156       if (this.isClosed()) {
1157         status.abort("Already got closed by another process");
1158         // SplitTransaction handles the null
1159         return null;
1160       }
1161       LOG.debug("Updates disabled for region " + this);
1162       // Don't flush the cache if we are aborting
1163       if (!abort) {
1164         int flushCount = 0;
1165         while (this.getMemstoreSize().get() > 0) {
1166           try {
1167             if (flushCount++ > 0) {
1168               int actualFlushes = flushCount - 1;
1169               if (actualFlushes > 5) {
1170                 // If we tried 5 times and are unable to clear memory, abort
1171                 // so we do not lose data
1172                 throw new DroppedSnapshotException("Failed clearing memory after " +
1173                   actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
1174               }
1175               LOG.info("Running extra flush, " + actualFlushes +
1176                 " (carrying snapshot?) " + this);
1177             }
1178             internalFlushcache(status);
1179           } catch (IOException ioe) {
1180             status.setStatus("Failed flush " + this + ", putting online again");
1181             synchronized (writestate) {
1182               writestate.writesEnabled = true;
1183             }
1184             // Have to throw to upper layers.  I can't abort server from here.
1185             throw ioe;
1186           }
1187         }
1188       }
1189 
1190       Map<byte[], List<StoreFile>> result =
1191         new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1192       if (!stores.isEmpty()) {
1193         // initialize the thread pool for closing stores in parallel.
1194         ThreadPoolExecutor storeCloserThreadPool =
1195           getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
1196         CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1197           new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1198 
1199         // close each store in parallel
1200         for (final Store store : stores.values()) {
1201           assert abort || store.getFlushableSize() == 0;
1202           completionService
1203               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1204                 @Override
1205                 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1206                   return new Pair<byte[], Collection<StoreFile>>(
1207                     store.getFamily().getName(), store.close());
1208                 }
1209               });
1210         }
1211         try {
1212           for (int i = 0; i < stores.size(); i++) {
1213             Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1214             Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1215             List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1216             if (familyFiles == null) {
1217               familyFiles = new ArrayList<StoreFile>();
1218               result.put(storeFiles.getFirst(), familyFiles);
1219             }
1220             familyFiles.addAll(storeFiles.getSecond());
1221           }
1222         } catch (InterruptedException e) {
1223           throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1224         } catch (ExecutionException e) {
1225           throw new IOException(e.getCause());
1226         } finally {
1227           storeCloserThreadPool.shutdownNow();
1228         }
1229       }
1230       this.closed.set(true);
1231       if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
1232       if (coprocessorHost != null) {
1233         status.setStatus("Running coprocessor post-close hooks");
1234         this.coprocessorHost.postClose(abort);
1235       }
1236       if ( this.metricsRegion != null) {
1237         this.metricsRegion.close();
1238       }
1239       if ( this.metricsRegionWrapper != null) {
1240         Closeables.closeQuietly(this.metricsRegionWrapper);
1241       }
1242       status.markComplete("Closed");
1243       LOG.info("Closed " + this);
1244       return result;
1245     } finally {
1246       lock.writeLock().unlock();
1247     }
1248   }
1249 
1250   /**
1251    * Wait for all current flushes and compactions of the region to complete.
1252    * <p>
1253    * Exposed for TESTING.
1254    */
1255   public void waitForFlushesAndCompactions() {
1256     synchronized (writestate) {
1257       boolean interrupted = false;
1258       try {
1259         while (writestate.compacting > 0 || writestate.flushing) {
1260           LOG.debug("waiting for " + writestate.compacting + " compactions"
1261             + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1262           try {
1263             writestate.wait();
1264           } catch (InterruptedException iex) {
1265             // essentially ignore and propagate the interrupt back up
1266             LOG.warn("Interrupted while waiting");
1267             interrupted = true;
1268           }
1269         }
1270       } finally {
1271         if (interrupted) {
1272           Thread.currentThread().interrupt();
1273         }
1274       }
1275     }
1276   }
1277 
1278   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1279       final String threadNamePrefix) {
1280     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1281     int maxThreads = Math.min(numStores,
1282         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1283             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1284     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1285   }
1286 
1287   protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1288       final String threadNamePrefix) {
1289     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1290     int maxThreads = Math.max(1,
1291         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1292             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1293             / numStores);
1294     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1295   }
1296 
1297   static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1298       final String threadNamePrefix) {
1299     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1300       new ThreadFactory() {
1301         private int count = 1;
1302 
1303         @Override
1304         public Thread newThread(Runnable r) {
1305           return new Thread(r, threadNamePrefix + "-" + count++);
1306         }
1307       });
1308   }
1309 
1310    /**
1311     * @return True if its worth doing a flush before we put up the close flag.
1312     */
1313   private boolean worthPreFlushing() {
1314     return this.memstoreSize.get() >
1315       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1316   }
1317 
1318   //////////////////////////////////////////////////////////////////////////////
1319   // HRegion accessors
1320   //////////////////////////////////////////////////////////////////////////////
1321 
1322   /** @return start key for region */
1323   public byte [] getStartKey() {
1324     return this.getRegionInfo().getStartKey();
1325   }
1326 
1327   /** @return end key for region */
1328   public byte [] getEndKey() {
1329     return this.getRegionInfo().getEndKey();
1330   }
1331 
1332   /** @return region id */
1333   public long getRegionId() {
1334     return this.getRegionInfo().getRegionId();
1335   }
1336 
1337   /** @return region name */
1338   public byte [] getRegionName() {
1339     return this.getRegionInfo().getRegionName();
1340   }
1341 
1342   /** @return region name as string for logging */
1343   public String getRegionNameAsString() {
1344     return this.getRegionInfo().getRegionNameAsString();
1345   }
1346 
1347   /** @return HTableDescriptor for this region */
1348   public HTableDescriptor getTableDesc() {
1349     return this.htableDescriptor;
1350   }
1351 
1352   /** @return HLog in use for this region */
1353   public HLog getLog() {
1354     return this.log;
1355   }
1356 
1357   /**
1358    * A split takes the config from the parent region & passes it to the daughter
1359    * region's constructor. If 'conf' was passed, you would end up using the HTD
1360    * of the parent region in addition to the new daughter HTD. Pass 'baseConf'
1361    * to the daughter regions to avoid this tricky dedupe problem.
1362    * @return Configuration object
1363    */
1364   Configuration getBaseConf() {
1365     return this.baseConf;
1366   }
1367 
1368   /** @return {@link FileSystem} being used by this region */
1369   public FileSystem getFilesystem() {
1370     return fs.getFileSystem();
1371   }
1372 
1373   /** @return the {@link HRegionFileSystem} used by this region */
1374   public HRegionFileSystem getRegionFileSystem() {
1375     return this.fs;
1376   }
1377 
1378   /** @return the last time the region was flushed */
1379   public long getLastFlushTime() {
1380     return this.lastFlushTime;
1381   }
1382 
1383   //////////////////////////////////////////////////////////////////////////////
1384   // HRegion maintenance.
1385   //
1386   // These methods are meant to be called periodically by the HRegionServer for
1387   // upkeep.
1388   //////////////////////////////////////////////////////////////////////////////
1389 
1390   /** @return returns size of largest HStore. */
1391   public long getLargestHStoreSize() {
1392     long size = 0;
1393     for (Store h : stores.values()) {
1394       long storeSize = h.getSize();
1395       if (storeSize > size) {
1396         size = storeSize;
1397       }
1398     }
1399     return size;
1400   }
1401 
1402   /**
1403    * @return KeyValue Comparator
1404    */
1405   public KeyValue.KVComparator getComparator() {
1406     return this.comparator;
1407   }
1408 
1409   /*
1410    * Do preparation for pending compaction.
1411    * @throws IOException
1412    */
1413   protected void doRegionCompactionPrep() throws IOException {
1414   }
1415 
1416   void triggerMajorCompaction() {
1417     for (Store h : stores.values()) {
1418       h.triggerMajorCompaction();
1419     }
1420   }
1421 
1422   /**
1423    * This is a helper function that compact all the stores synchronously
1424    * It is used by utilities and testing
1425    *
1426    * @param majorCompaction True to force a major compaction regardless of thresholds
1427    * @throws IOException e
1428    */
1429   public void compactStores(final boolean majorCompaction)
1430   throws IOException {
1431     if (majorCompaction) {
1432       this.triggerMajorCompaction();
1433     }
1434     compactStores();
1435   }
1436 
1437   /**
1438    * This is a helper function that compact all the stores synchronously
1439    * It is used by utilities and testing
1440    *
1441    * @throws IOException e
1442    */
1443   public void compactStores() throws IOException {
1444     for (Store s : getStores().values()) {
1445       CompactionContext compaction = s.requestCompaction();
1446       if (compaction != null) {
1447         compact(compaction, s);
1448       }
1449     }
1450   }
1451 
1452   /*
1453    * Called by compaction thread and after region is opened to compact the
1454    * HStores if necessary.
1455    *
1456    * <p>This operation could block for a long time, so don't call it from a
1457    * time-sensitive thread.
1458    *
1459    * Note that no locking is necessary at this level because compaction only
1460    * conflicts with a region split, and that cannot happen because the region
1461    * server does them sequentially and not in parallel.
1462    *
1463    * @param cr Compaction details, obtained by requestCompaction()
1464    * @return whether the compaction completed
1465    * @throws IOException e
1466    */
1467   public boolean compact(CompactionContext compaction, Store store) throws IOException {
1468     assert compaction != null && compaction.hasSelection();
1469     assert !compaction.getRequest().getFiles().isEmpty();
1470     if (this.closing.get() || this.closed.get()) {
1471       LOG.debug("Skipping compaction on " + this + " because closing/closed");
1472       store.cancelRequestedCompaction(compaction);
1473       return false;
1474     }
1475     MonitoredTask status = null;
1476     boolean didPerformCompaction = false;
1477     // block waiting for the lock for compaction
1478     lock.readLock().lock();
1479     try {
1480       byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1481       if (stores.get(cf) != store) {
1482         LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1483             + " has been re-instantiated, cancel this compaction request. "
1484             + " It may be caused by the roll back of split transaction");
1485         return false;
1486       }
1487 
1488       status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1489       if (this.closed.get()) {
1490         String msg = "Skipping compaction on " + this + " because closed";
1491         LOG.debug(msg);
1492         status.abort(msg);
1493         return false;
1494       }
1495       boolean wasStateSet = false;
1496       try {
1497         synchronized (writestate) {
1498           if (writestate.writesEnabled) {
1499             wasStateSet = true;
1500             ++writestate.compacting;
1501           } else {
1502             String msg = "NOT compacting region " + this + ". Writes disabled.";
1503             LOG.info(msg);
1504             status.abort(msg);
1505             return false;
1506           }
1507         }
1508         LOG.info("Starting compaction on " + store + " in region " + this
1509             + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1510         doRegionCompactionPrep();
1511         try {
1512           status.setStatus("Compacting store " + store);
1513           didPerformCompaction = true;
1514           store.compact(compaction);
1515         } catch (InterruptedIOException iioe) {
1516           String msg = "compaction interrupted";
1517           LOG.info(msg, iioe);
1518           status.abort(msg);
1519           return false;
1520         }
1521       } finally {
1522         if (wasStateSet) {
1523           synchronized (writestate) {
1524             --writestate.compacting;
1525             if (writestate.compacting <= 0) {
1526               writestate.notifyAll();
1527             }
1528           }
1529         }
1530       }
1531       status.markComplete("Compaction complete");
1532       return true;
1533     } finally {
1534       try {
1535         if (!didPerformCompaction) store.cancelRequestedCompaction(compaction);
1536         if (status != null) status.cleanup();
1537       } finally {
1538         lock.readLock().unlock();
1539       }
1540     }
1541   }
1542 
1543   /**
1544    * Flush the cache.
1545    *
1546    * When this method is called the cache will be flushed unless:
1547    * <ol>
1548    *   <li>the cache is empty</li>
1549    *   <li>the region is closed.</li>
1550    *   <li>a flush is already in progress</li>
1551    *   <li>writes are disabled</li>
1552    * </ol>
1553    *
1554    * <p>This method may block for some time, so it should not be called from a
1555    * time-sensitive thread.
1556    *
1557    * @return true if the region needs compacting
1558    *
1559    * @throws IOException general io exceptions
1560    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1561    * because a Snapshot was not properly persisted.
1562    */
1563   public FlushResult flushcache() throws IOException {
1564     // fail-fast instead of waiting on the lock
1565     if (this.closing.get()) {
1566       String msg = "Skipping flush on " + this + " because closing";
1567       LOG.debug(msg);
1568       return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1569     }
1570     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1571     status.setStatus("Acquiring readlock on region");
1572     // block waiting for the lock for flushing cache
1573     lock.readLock().lock();
1574     try {
1575       if (this.closed.get()) {
1576         String msg = "Skipping flush on " + this + " because closed";
1577         LOG.debug(msg);
1578         status.abort(msg);
1579         return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1580       }
1581       if (coprocessorHost != null) {
1582         status.setStatus("Running coprocessor pre-flush hooks");
1583         coprocessorHost.preFlush();
1584       }
1585       if (numMutationsWithoutWAL.get() > 0) {
1586         numMutationsWithoutWAL.set(0);
1587         dataInMemoryWithoutWAL.set(0);
1588       }
1589       synchronized (writestate) {
1590         if (!writestate.flushing && writestate.writesEnabled) {
1591           this.writestate.flushing = true;
1592         } else {
1593           if (LOG.isDebugEnabled()) {
1594             LOG.debug("NOT flushing memstore for region " + this
1595                 + ", flushing=" + writestate.flushing + ", writesEnabled="
1596                 + writestate.writesEnabled);
1597           }
1598           String msg = "Not flushing since "
1599               + (writestate.flushing ? "already flushing"
1600               : "writes not enabled");
1601           status.abort(msg);
1602           return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1603         }
1604       }
1605       try {
1606         FlushResult fs = internalFlushcache(status);
1607 
1608         if (coprocessorHost != null) {
1609           status.setStatus("Running post-flush coprocessor hooks");
1610           coprocessorHost.postFlush();
1611         }
1612 
1613         status.markComplete("Flush successful");
1614         return fs;
1615       } finally {
1616         synchronized (writestate) {
1617           writestate.flushing = false;
1618           this.writestate.flushRequested = false;
1619           writestate.notifyAll();
1620         }
1621       }
1622     } finally {
1623       lock.readLock().unlock();
1624       status.cleanup();
1625     }
1626   }
1627 
1628   /**
1629    * Should the memstore be flushed now
1630    */
1631   boolean shouldFlush() {
1632     // This is a rough measure.
1633     if (this.lastFlushSeqId > 0
1634           && (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get())) {
1635       return true;
1636     }
1637     if (flushCheckInterval <= 0) { //disabled
1638       return false;
1639     }
1640     long now = EnvironmentEdgeManager.currentTimeMillis();
1641     //if we flushed in the recent past, we don't need to do again now
1642     if ((now - getLastFlushTime() < flushCheckInterval)) {
1643       return false;
1644     }
1645     //since we didn't flush in the recent past, flush now if certain conditions
1646     //are met. Return true on first such memstore hit.
1647     for (Store s : this.getStores().values()) {
1648       if (s.timeOfOldestEdit() < now - flushCheckInterval) {
1649         // we have an old enough edit in the memstore, flush
1650         return true;
1651       }
1652     }
1653     return false;
1654   }
1655 
1656   /**
1657    * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
1658    * memstore, all of which have also been written to the log. We need to write those updates in the
1659    * memstore out to disk, while being able to process reads/writes as much as possible during the
1660    * flush operation.
1661    * <p>This method may block for some time.  Every time you call it, we up the regions
1662    * sequence id even if we don't flush; i.e. the returned region id will be at least one larger
1663    * than the last edit applied to this region. The returned id does not refer to an actual edit.
1664    * The returned id can be used for say installing a bulk loaded file just ahead of the last hfile
1665    * that was the result of this flush, etc.
1666    * @return object describing the flush's state
1667    *
1668    * @throws IOException general io exceptions
1669    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1670    * because a Snapshot was not properly persisted.
1671    */
1672   protected FlushResult internalFlushcache(MonitoredTask status)
1673       throws IOException {
1674     return internalFlushcache(this.log, -1, status);
1675   }
1676 
1677   /**
1678    * @param wal Null if we're NOT to go via hlog/wal.
1679    * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
1680    * @return object describing the flush's state
1681    * @throws IOException
1682    * @see #internalFlushcache(MonitoredTask)
1683    */
1684   protected FlushResult internalFlushcache(
1685       final HLog wal, final long myseqid, MonitoredTask status)
1686   throws IOException {
1687     if (this.rsServices != null && this.rsServices.isAborted()) {
1688       // Don't flush when server aborting, it's unsafe
1689       throw new IOException("Aborting flush because server is aborted...");
1690     }
1691     final long startTime = EnvironmentEdgeManager.currentTimeMillis();
1692     // If nothing to flush, return, but we need to safely update the region sequence id
1693     if (this.memstoreSize.get() <= 0) {
1694       // Take an update lock because am about to change the sequence id and we want the sequence id
1695       // to be at the border of the empty memstore.
1696       this.updatesLock.writeLock().lock();
1697       try {
1698         if (this.memstoreSize.get() <= 0) {
1699           // Presume that if there are still no edits in the memstore, then there are no edits for
1700           // this region out in the WAL/HLog subsystem so no need to do any trickery clearing out
1701           // edits in the WAL system. Up the sequence number so the resulting flush id is for
1702           // sure just beyond the last appended region edit (useful as a marker when bulk loading,
1703           // etc.)
1704           // wal can be null replaying edits.
1705           return wal != null?
1706             new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
1707               getNextSequenceId(wal), "Nothing to flush"):
1708             new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
1709         }
1710       } finally {
1711         this.updatesLock.writeLock().unlock();
1712       }
1713     }
1714 
1715     LOG.info("Started memstore flush for " + this +
1716       ", current region memstore size " +
1717       StringUtils.byteDesc(this.memstoreSize.get()) +
1718       ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
1719 
1720     // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
1721     // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
1722     // allow updates again so its value will represent the size of the updates received
1723     // during flush
1724     MultiVersionConsistencyControl.WriteEntry w = null;
1725 
1726     // We have to take an update lock during snapshot, or else a write could end up in both snapshot
1727     // and memstore (makes it difficult to do atomic rows then)
1728     status.setStatus("Obtaining lock to block concurrent updates");
1729     // block waiting for the lock for internal flush
1730     this.updatesLock.writeLock().lock();
1731     long totalFlushableSize = 0;
1732     status.setStatus("Preparing to flush by snapshotting stores in " +
1733       getRegionInfo().getEncodedName());
1734     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
1735     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
1736         Bytes.BYTES_COMPARATOR);
1737     long flushSeqId = -1L;
1738 
1739     long trxId = 0;
1740     try {
1741       try {
1742         w = mvcc.beginMemstoreInsert();
1743         if (wal != null) {
1744           if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
1745             // This should never happen.
1746             String msg = "Flush will not be started for ["
1747                 + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
1748             status.setStatus(msg);
1749             return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1750           }
1751           // Get a sequence id that we can use to denote the flush. It will be one beyond the last
1752           // edit that made it into the hfile (the below does not add an edit, it just asks the
1753           // WAL system to return next sequence edit).
1754           flushSeqId = getNextSequenceId(wal);
1755         } else {
1756           // use the provided sequence Id as WAL is not being used for this flush.
1757           flushSeqId = myseqid;
1758         }
1759 
1760         for (Store s : stores.values()) {
1761           totalFlushableSize += s.getFlushableSize();
1762           storeFlushCtxs.add(s.createFlushContext(flushSeqId));
1763           committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
1764         }
1765 
1766         // write the snapshot start to WAL
1767         if (wal != null) {
1768           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
1769             getRegionInfo(), flushSeqId, committedFiles);
1770           trxId = HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1771             desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock
1772         }
1773 
1774         // Prepare flush (take a snapshot)
1775         for (StoreFlushContext flush : storeFlushCtxs) {
1776           flush.prepare();
1777         }
1778       } catch (IOException ex) {
1779         if (wal != null) {
1780           if (trxId > 0) { // check whether we have already written START_FLUSH to WAL
1781             try {
1782               FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
1783                 getRegionInfo(), flushSeqId, committedFiles);
1784               HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1785                 desc, sequenceId, false);
1786             } catch (Throwable t) {
1787               LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
1788                   StringUtils.stringifyException(t));
1789               // ignore this since we will be aborting the RS with DSE.
1790             }
1791           }
1792           // we have called wal.startCacheFlush(), now we have to abort it
1793           wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1794           throw ex; // let upper layers deal with it.
1795         }
1796       } finally {
1797         this.updatesLock.writeLock().unlock();
1798       }
1799       String s = "Finished memstore snapshotting " + this +
1800         ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
1801       status.setStatus(s);
1802       if (LOG.isTraceEnabled()) LOG.trace(s);
1803       // sync unflushed WAL changes
1804       // see HBASE-8208 for details
1805       if (wal != null) {
1806         try {
1807           wal.sync(); // ensure that flush marker is sync'ed
1808         } catch (IOException ioe) {
1809           LOG.warn("Unexpected exception while log.sync(), ignoring. Exception: "
1810               + StringUtils.stringifyException(ioe));
1811         }
1812       }
1813 
1814       // wait for all in-progress transactions to commit to HLog before
1815       // we can start the flush. This prevents
1816       // uncommitted transactions from being written into HFiles.
1817       // We have to block before we start the flush, otherwise keys that
1818       // were removed via a rollbackMemstore could be written to Hfiles.
1819       mvcc.waitForPreviousTransactionsComplete(w);
1820       // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
1821       w = null;
1822       s = "Flushing stores of " + this;
1823       status.setStatus(s);
1824       if (LOG.isTraceEnabled()) LOG.trace(s);
1825     } finally {
1826       if (w != null) {
1827         // in case of failure just mark current w as complete
1828         mvcc.advanceMemstore(w);
1829       }
1830     }
1831 
1832     // Any failure from here on out will be catastrophic requiring server
1833     // restart so hlog content can be replayed and put back into the memstore.
1834     // Otherwise, the snapshot content while backed up in the hlog, it will not
1835     // be part of the current running servers state.
1836     boolean compactionRequested = false;
1837     try {
1838       // A.  Flush memstore to all the HStores.
1839       // Keep running vector of all store files that includes both old and the
1840       // just-made new flush store file. The new flushed file is still in the
1841       // tmp directory.
1842 
1843       for (StoreFlushContext flush : storeFlushCtxs) {
1844         flush.flushCache(status);
1845       }
1846 
1847       // Switch snapshot (in memstore) -> new hfile (thus causing
1848       // all the store scanners to reset/reseek).
1849       Iterator<Store> it = stores.values().iterator(); // stores.values() and storeFlushCtxs have
1850       // same order
1851       for (StoreFlushContext flush : storeFlushCtxs) {
1852         boolean needsCompaction = flush.commit(status);
1853         if (needsCompaction) {
1854           compactionRequested = true;
1855         }
1856         committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles());
1857       }
1858       storeFlushCtxs.clear();
1859 
1860       // Set down the memstore size by amount of flush.
1861       this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
1862 
1863       if (wal != null) {
1864         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
1865         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
1866           getRegionInfo(), flushSeqId, committedFiles);
1867         HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1868           desc, sequenceId, true);
1869       }
1870     } catch (Throwable t) {
1871       // An exception here means that the snapshot was not persisted.
1872       // The hlog needs to be replayed so its content is restored to memstore.
1873       // Currently, only a server restart will do this.
1874       // We used to only catch IOEs but its possible that we'd get other
1875       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
1876       // all and sundry.
1877       if (wal != null) {
1878         try {
1879           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
1880             getRegionInfo(), flushSeqId, committedFiles);
1881           HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1882             desc, sequenceId, false);
1883         } catch (Throwable ex) {
1884           LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
1885               StringUtils.stringifyException(ex));
1886           // ignore this since we will be aborting the RS with DSE.
1887         }
1888         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1889       }
1890       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
1891           Bytes.toStringBinary(getRegionName()));
1892       dse.initCause(t);
1893       status.abort("Flush failed: " + StringUtils.stringifyException(t));
1894       throw dse;
1895     }
1896 
1897     // If we get to here, the HStores have been written.
1898     if (wal != null) {
1899       wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1900     }
1901 
1902     // Record latest flush time
1903     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
1904 
1905     // Update the last flushed sequence id for region. TODO: This is dup'd inside the WAL/FSHlog.
1906     this.lastFlushSeqId = flushSeqId;
1907 
1908     // C. Finally notify anyone waiting on memstore to clear:
1909     // e.g. checkResources().
1910     synchronized (this) {
1911       notifyAll(); // FindBugs NN_NAKED_NOTIFY
1912     }
1913 
1914     long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
1915     long memstoresize = this.memstoreSize.get();
1916     String msg = "Finished memstore flush of ~" +
1917       StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize +
1918       ", currentsize=" +
1919       StringUtils.byteDesc(memstoresize) + "/" + memstoresize +
1920       " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
1921       ", compaction requested=" + compactionRequested +
1922       ((wal == null)? "; wal=null": "");
1923     LOG.info(msg);
1924     status.setStatus(msg);
1925 
1926     return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
1927         FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
1928   }
1929 
1930   /**
1931    * Method to safely get the next sequence number.
1932    * @return Next sequence number unassociated with any actual edit.
1933    * @throws IOException
1934    */
1935   private long getNextSequenceId(final HLog wal) throws IOException {
1936     HLogKey key = this.appendNoSyncNoAppend(wal, null);
1937     return key.getSequenceNumber();
1938   }
1939 
1940   //////////////////////////////////////////////////////////////////////////////
1941   // get() methods for client use.
1942   //////////////////////////////////////////////////////////////////////////////
1943   /**
1944    * Return all the data for the row that matches <i>row</i> exactly,
1945    * or the one that immediately preceeds it, at or immediately before
1946    * <i>ts</i>.
1947    *
1948    * @param row row key
1949    * @return map of values
1950    * @throws IOException
1951    */
1952   Result getClosestRowBefore(final byte [] row)
1953   throws IOException{
1954     return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
1955   }
1956 
1957   /**
1958    * Return all the data for the row that matches <i>row</i> exactly,
1959    * or the one that immediately precedes it, at or immediately before
1960    * <i>ts</i>.
1961    *
1962    * @param row row key
1963    * @param family column family to find on
1964    * @return map of values
1965    * @throws IOException read exceptions
1966    */
1967   public Result getClosestRowBefore(final byte [] row, final byte [] family)
1968   throws IOException {
1969     if (coprocessorHost != null) {
1970       Result result = new Result();
1971       if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
1972         return result;
1973       }
1974     }
1975     // look across all the HStores for this region and determine what the
1976     // closest key is across all column families, since the data may be sparse
1977     checkRow(row, "getClosestRowBefore");
1978     startRegionOperation(Operation.GET);
1979     this.readRequestsCount.increment();
1980     try {
1981       Store store = getStore(family);
1982       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
1983       KeyValue key = store.getRowKeyAtOrBefore(row);
1984       Result result = null;
1985       if (key != null) {
1986         Get get = new Get(CellUtil.cloneRow(key));
1987         get.addFamily(family);
1988         result = get(get);
1989       }
1990       if (coprocessorHost != null) {
1991         coprocessorHost.postGetClosestRowBefore(row, family, result);
1992       }
1993       return result;
1994     } finally {
1995       closeRegionOperation(Operation.GET);
1996     }
1997   }
1998 
1999   /**
2000    * Return an iterator that scans over the HRegion, returning the indicated
2001    * columns and rows specified by the {@link Scan}.
2002    * <p>
2003    * This Iterator must be closed by the caller.
2004    *
2005    * @param scan configured {@link Scan}
2006    * @return RegionScanner
2007    * @throws IOException read exceptions
2008    */
2009   public RegionScanner getScanner(Scan scan) throws IOException {
2010    return getScanner(scan, null);
2011   }
2012 
2013   void prepareScanner(Scan scan) throws IOException {
2014     if(!scan.hasFamilies()) {
2015       // Adding all families to scanner
2016       for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
2017         scan.addFamily(family);
2018       }
2019     }
2020   }
2021 
2022   protected RegionScanner getScanner(Scan scan,
2023       List<KeyValueScanner> additionalScanners) throws IOException {
2024     startRegionOperation(Operation.SCAN);
2025     try {
2026       // Verify families are all valid
2027       prepareScanner(scan);
2028       if(scan.hasFamilies()) {
2029         for(byte [] family : scan.getFamilyMap().keySet()) {
2030           checkFamily(family);
2031         }
2032       }
2033       return instantiateRegionScanner(scan, additionalScanners);
2034     } finally {
2035       closeRegionOperation(Operation.SCAN);
2036     }
2037   }
2038 
2039   protected RegionScanner instantiateRegionScanner(Scan scan,
2040       List<KeyValueScanner> additionalScanners) throws IOException {
2041     if (scan.isReversed()) {
2042       if (scan.getFilter() != null) {
2043         scan.getFilter().setReversed(true);
2044       }
2045       return new ReversedRegionScannerImpl(scan, additionalScanners, this);
2046     }
2047     return new RegionScannerImpl(scan, additionalScanners, this);
2048   }
2049 
2050   /*
2051    * @param delete The passed delete is modified by this method. WARNING!
2052    */
2053   void prepareDelete(Delete delete) throws IOException {
2054     // Check to see if this is a deleteRow insert
2055     if(delete.getFamilyCellMap().isEmpty()){
2056       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
2057         // Don't eat the timestamp
2058         delete.deleteFamily(family, delete.getTimeStamp());
2059       }
2060     } else {
2061       for(byte [] family : delete.getFamilyCellMap().keySet()) {
2062         if(family == null) {
2063           throw new NoSuchColumnFamilyException("Empty family is invalid");
2064         }
2065         checkFamily(family);
2066       }
2067     }
2068   }
2069 
2070   //////////////////////////////////////////////////////////////////////////////
2071   // set() methods for client use.
2072   //////////////////////////////////////////////////////////////////////////////
2073   /**
2074    * @param delete delete object
2075    * @throws IOException read exceptions
2076    */
2077   public void delete(Delete delete)
2078   throws IOException {
2079     checkReadOnly();
2080     checkResources();
2081     startRegionOperation(Operation.DELETE);
2082     try {
2083       delete.getRow();
2084       // All edits for the given row (across all column families) must happen atomically.
2085       doBatchMutate(delete);
2086     } finally {
2087       closeRegionOperation(Operation.DELETE);
2088     }
2089   }
2090 
2091   /**
2092    * Row needed by below method.
2093    */
2094   private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2095   /**
2096    * This is used only by unit tests. Not required to be a public API.
2097    * @param familyMap map of family to edits for the given family.
2098    * @throws IOException
2099    */
2100   void delete(NavigableMap<byte[], List<Cell>> familyMap,
2101       Durability durability) throws IOException {
2102     Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2103     delete.setFamilyCellMap(familyMap);
2104     delete.setDurability(durability);
2105     doBatchMutate(delete);
2106   }
2107 
2108   /**
2109    * Setup correct timestamps in the KVs in Delete object.
2110    * Caller should have the row and region locks.
2111    * @param mutation
2112    * @param familyMap
2113    * @param byteNow
2114    * @throws IOException
2115    */
2116   void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2117       byte[] byteNow) throws IOException {
2118     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2119 
2120       byte[] family = e.getKey();
2121       List<Cell> cells = e.getValue();
2122       Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2123 
2124       for (Cell cell: cells) {
2125         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2126         //  Check if time is LATEST, change to time of most recent addition if so
2127         //  This is expensive.
2128         if (kv.isLatestTimestamp() && CellUtil.isDeleteType(kv)) {
2129           byte[] qual = CellUtil.cloneQualifier(kv);
2130           if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2131 
2132           Integer count = kvCount.get(qual);
2133           if (count == null) {
2134             kvCount.put(qual, 1);
2135           } else {
2136             kvCount.put(qual, count + 1);
2137           }
2138           count = kvCount.get(qual);
2139 
2140           Get get = new Get(CellUtil.cloneRow(kv));
2141           get.setMaxVersions(count);
2142           get.addColumn(family, qual);
2143           if (coprocessorHost != null) {
2144             if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2145                 byteNow, get)) {
2146               updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
2147             }
2148           } else {
2149             updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
2150           }
2151         } else {
2152           kv.updateLatestStamp(byteNow);
2153         }
2154       }
2155     }
2156   }
2157 
2158   void updateDeleteLatestVersionTimeStamp(KeyValue kv, Get get, int count, byte[] byteNow)
2159       throws IOException {
2160     List<Cell> result = get(get, false);
2161 
2162     if (result.size() < count) {
2163       // Nothing to delete
2164       kv.updateLatestStamp(byteNow);
2165       return;
2166     }
2167     if (result.size() > count) {
2168       throw new RuntimeException("Unexpected size: " + result.size());
2169     }
2170     KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
2171     Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), getkv.getBuffer(),
2172         getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
2173   }
2174 
2175   /**
2176    * @throws IOException
2177    */
2178   public void put(Put put)
2179   throws IOException {
2180     checkReadOnly();
2181 
2182     // Do a rough check that we have resources to accept a write.  The check is
2183     // 'rough' in that between the resource check and the call to obtain a
2184     // read lock, resources may run out.  For now, the thought is that this
2185     // will be extremely rare; we'll deal with it when it happens.
2186     checkResources();
2187     startRegionOperation(Operation.PUT);
2188     try {
2189       // All edits for the given row (across all column families) must happen atomically.
2190       doBatchMutate(put);
2191     } finally {
2192       closeRegionOperation(Operation.PUT);
2193     }
2194   }
2195 
2196   /**
2197    * Struct-like class that tracks the progress of a batch operation,
2198    * accumulating status codes and tracking the index at which processing
2199    * is proceeding.
2200    */
2201   private abstract static class BatchOperationInProgress<T> {
2202     T[] operations;
2203     int nextIndexToProcess = 0;
2204     OperationStatus[] retCodeDetails;
2205     WALEdit[] walEditsFromCoprocessors;
2206 
2207     public BatchOperationInProgress(T[] operations) {
2208       this.operations = operations;
2209       this.retCodeDetails = new OperationStatus[operations.length];
2210       this.walEditsFromCoprocessors = new WALEdit[operations.length];
2211       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2212     }
2213 
2214     public abstract Mutation getMutation(int index);
2215     public abstract long getNonceGroup(int index);
2216     public abstract long getNonce(int index);
2217     /** This method is potentially expensive and should only be used for non-replay CP path. */
2218     public abstract Mutation[] getMutationsForCoprocs();
2219     public abstract boolean isInReplay();
2220     public abstract long getReplaySequenceId();
2221 
2222     public boolean isDone() {
2223       return nextIndexToProcess == operations.length;
2224     }
2225   }
2226 
2227   private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2228     private long nonceGroup;
2229     private long nonce;
2230     public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2231       super(operations);
2232       this.nonceGroup = nonceGroup;
2233       this.nonce = nonce;
2234     }
2235 
2236     @Override
2237     public Mutation getMutation(int index) {
2238       return this.operations[index];
2239     }
2240 
2241     @Override
2242     public long getNonceGroup(int index) {
2243       return nonceGroup;
2244     }
2245 
2246     @Override
2247     public long getNonce(int index) {
2248       return nonce;
2249     }
2250 
2251     @Override
2252     public Mutation[] getMutationsForCoprocs() {
2253       return this.operations;
2254     }
2255 
2256     @Override
2257     public boolean isInReplay() {
2258       return false;
2259     }
2260 
2261     @Override
2262     public long getReplaySequenceId() {
2263       return 0;
2264     }
2265   }
2266 
2267   private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
2268     private long replaySeqId = 0;
2269     public ReplayBatch(MutationReplay[] operations, long seqId) {
2270       super(operations);
2271       this.replaySeqId = seqId;
2272     }
2273 
2274     @Override
2275     public Mutation getMutation(int index) {
2276       return this.operations[index].mutation;
2277     }
2278 
2279     @Override
2280     public long getNonceGroup(int index) {
2281       return this.operations[index].nonceGroup;
2282     }
2283 
2284     @Override
2285     public long getNonce(int index) {
2286       return this.operations[index].nonce;
2287     }
2288 
2289     @Override
2290     public Mutation[] getMutationsForCoprocs() {
2291       assert false;
2292       throw new RuntimeException("Should not be called for replay batch");
2293     }
2294 
2295     @Override
2296     public boolean isInReplay() {
2297       return true;
2298     }
2299 
2300     @Override
2301     public long getReplaySequenceId() {
2302       return this.replaySeqId;
2303     }
2304   }
2305 
2306   /**
2307    * Perform a batch of mutations.
2308    * It supports only Put and Delete mutations and will ignore other types passed.
2309    * @param mutations the list of mutations
2310    * @return an array of OperationStatus which internally contains the
2311    *         OperationStatusCode and the exceptionMessage if any.
2312    * @throws IOException
2313    */
2314   public OperationStatus[] batchMutate(
2315       Mutation[] mutations, long nonceGroup, long nonce) throws IOException {
2316     // As it stands, this is used for 3 things
2317     //  * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
2318     //  * coprocessor calls (see ex. BulkDeleteEndpoint).
2319     // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
2320     return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2321   }
2322 
2323   public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2324     return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2325   }
2326 
2327   /**
2328    * Replay a batch of mutations.
2329    * @param mutations mutations to replay.
2330    * @param replaySeqId SeqId for current mutations
2331    * @return an array of OperationStatus which internally contains the
2332    *         OperationStatusCode and the exceptionMessage if any.
2333    * @throws IOException
2334    */
2335   public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations, long replaySeqId)
2336       throws IOException {
2337     return batchMutate(new ReplayBatch(mutations, replaySeqId));
2338   }
2339 
2340   /**
2341    * Perform a batch of mutations.
2342    * It supports only Put and Delete mutations and will ignore other types passed.
2343    * @param mutations the list of mutations
2344    * @return an array of OperationStatus which internally contains the
2345    *         OperationStatusCode and the exceptionMessage if any.
2346    * @throws IOException
2347    */
2348   OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2349     boolean initialized = false;
2350     while (!batchOp.isDone()) {
2351       if (!batchOp.isInReplay()) {
2352         checkReadOnly();
2353       }
2354       checkResources();
2355 
2356       long newSize;
2357       Operation op = Operation.BATCH_MUTATE;
2358       if (batchOp.isInReplay()) op = Operation.REPLAY_BATCH_MUTATE;
2359       startRegionOperation(op);
2360 
2361       try {
2362         if (!initialized) {
2363           this.writeRequestsCount.add(batchOp.operations.length);
2364           if (!batchOp.isInReplay()) {
2365             doPreMutationHook(batchOp);
2366           }
2367           initialized = true;
2368         }
2369         long addedSize = doMiniBatchMutation(batchOp);
2370         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2371       } finally {
2372         closeRegionOperation(op);
2373       }
2374       if (isFlushSize(newSize)) {
2375         requestFlush();
2376       }
2377     }
2378     return batchOp.retCodeDetails;
2379   }
2380 
2381 
2382   private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2383       throws IOException {
2384     /* Run coprocessor pre hook outside of locks to avoid deadlock */
2385     WALEdit walEdit = new WALEdit();
2386     if (coprocessorHost != null) {
2387       for (int i = 0 ; i < batchOp.operations.length; i++) {
2388         Mutation m = batchOp.getMutation(i);
2389         if (m instanceof Put) {
2390           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2391             // pre hook says skip this Put
2392             // mark as success and skip in doMiniBatchMutation
2393             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2394           }
2395         } else if (m instanceof Delete) {
2396           Delete curDel = (Delete) m;
2397           if (curDel.getFamilyCellMap().isEmpty()) {
2398             // handle deleting a row case
2399             prepareDelete(curDel);
2400           }
2401           if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2402             // pre hook says skip this Delete
2403             // mark as success and skip in doMiniBatchMutation
2404             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2405           }
2406         } else {
2407           // In case of passing Append mutations along with the Puts and Deletes in batchMutate
2408           // mark the operation return code as failure so that it will not be considered in
2409           // the doMiniBatchMutation
2410           batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2411               "Put/Delete mutations only supported in batchMutate() now");
2412         }
2413         if (!walEdit.isEmpty()) {
2414           batchOp.walEditsFromCoprocessors[i] = walEdit;
2415           walEdit = new WALEdit();
2416         }
2417       }
2418     }
2419   }
2420 
2421   @SuppressWarnings("unchecked")
2422   private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2423     boolean isInReplay = batchOp.isInReplay();
2424     // variable to note if all Put items are for the same CF -- metrics related
2425     boolean putsCfSetConsistent = true;
2426     //The set of columnFamilies first seen for Put.
2427     Set<byte[]> putsCfSet = null;
2428     // variable to note if all Delete items are for the same CF -- metrics related
2429     boolean deletesCfSetConsistent = true;
2430     //The set of columnFamilies first seen for Delete.
2431     Set<byte[]> deletesCfSet = null;
2432 
2433     long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2434     WALEdit walEdit = new WALEdit(isInReplay);
2435     MultiVersionConsistencyControl.WriteEntry w = null;
2436     long txid = 0;
2437     boolean doRollBackMemstore = false;
2438     boolean locked = false;
2439 
2440     /** Keep track of the locks we hold so we can release them in finally clause */
2441     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2442     // reference family maps directly so coprocessors can mutate them if desired
2443     Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2444     List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
2445     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
2446     int firstIndex = batchOp.nextIndexToProcess;
2447     int lastIndexExclusive = firstIndex;
2448     boolean success = false;
2449     int noOfPuts = 0, noOfDeletes = 0;
2450     HLogKey walKey = null;
2451     long mvccNum = 0;
2452     try {
2453       // ------------------------------------
2454       // STEP 1. Try to acquire as many locks as we can, and ensure
2455       // we acquire at least one.
2456       // ----------------------------------
2457       int numReadyToWrite = 0;
2458       long now = EnvironmentEdgeManager.currentTimeMillis();
2459       while (lastIndexExclusive < batchOp.operations.length) {
2460         Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2461         boolean isPutMutation = mutation instanceof Put;
2462 
2463         Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2464         // store the family map reference to allow for mutations
2465         familyMaps[lastIndexExclusive] = familyMap;
2466 
2467         // skip anything that "ran" already
2468         if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2469             != OperationStatusCode.NOT_RUN) {
2470           lastIndexExclusive++;
2471           continue;
2472         }
2473 
2474         try {
2475           if (isPutMutation) {
2476             // Check the families in the put. If bad, skip this one.
2477             if (isInReplay) {
2478               removeNonExistentColumnFamilyForReplay(familyMap);
2479             } else {
2480               checkFamilies(familyMap.keySet());
2481             }
2482             checkTimestamps(mutation.getFamilyCellMap(), now);
2483           } else {
2484             prepareDelete((Delete) mutation);
2485           }
2486         } catch (NoSuchColumnFamilyException nscf) {
2487           LOG.warn("No such column family in batch mutation", nscf);
2488           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2489               OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2490           lastIndexExclusive++;
2491           continue;
2492         } catch (FailedSanityCheckException fsce) {
2493           LOG.warn("Batch Mutation did not pass sanity check", fsce);
2494           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2495               OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2496           lastIndexExclusive++;
2497           continue;
2498         }
2499 
2500         // If we haven't got any rows in our batch, we should block to
2501         // get the next one.
2502         boolean shouldBlock = numReadyToWrite == 0;
2503         RowLock rowLock = null;
2504         try {
2505           rowLock = getRowLock(mutation.getRow(), shouldBlock);
2506         } catch (IOException ioe) {
2507           LOG.warn("Failed getting lock in batch put, row="
2508             + Bytes.toStringBinary(mutation.getRow()), ioe);
2509         }
2510         if (rowLock == null) {
2511           // We failed to grab another lock
2512           assert !shouldBlock : "Should never fail to get lock when blocking";
2513           break; // stop acquiring more rows for this batch
2514         } else {
2515           acquiredRowLocks.add(rowLock);
2516         }
2517 
2518         lastIndexExclusive++;
2519         numReadyToWrite++;
2520 
2521         if (isPutMutation) {
2522           // If Column Families stay consistent through out all of the
2523           // individual puts then metrics can be reported as a mutliput across
2524           // column families in the first put.
2525           if (putsCfSet == null) {
2526             putsCfSet = mutation.getFamilyCellMap().keySet();
2527           } else {
2528             putsCfSetConsistent = putsCfSetConsistent
2529                 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
2530           }
2531         } else {
2532           if (deletesCfSet == null) {
2533             deletesCfSet = mutation.getFamilyCellMap().keySet();
2534           } else {
2535             deletesCfSetConsistent = deletesCfSetConsistent
2536                 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
2537           }
2538         }
2539       }
2540 
2541       // we should record the timestamp only after we have acquired the rowLock,
2542       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
2543       now = EnvironmentEdgeManager.currentTimeMillis();
2544       byte[] byteNow = Bytes.toBytes(now);
2545 
2546       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
2547       if (numReadyToWrite <= 0) return 0L;
2548 
2549       // We've now grabbed as many mutations off the list as we can
2550 
2551       // ------------------------------------
2552       // STEP 2. Update any LATEST_TIMESTAMP timestamps
2553       // ----------------------------------
2554       for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
2555         // skip invalid
2556         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2557             != OperationStatusCode.NOT_RUN) continue;
2558 
2559         Mutation mutation = batchOp.getMutation(i);
2560         if (mutation instanceof Put) {
2561           updateKVTimestamps(familyMaps[i].values(), byteNow);
2562           noOfPuts++;
2563         } else {
2564           prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
2565           noOfDeletes++;
2566         }
2567       }
2568 
2569       lock(this.updatesLock.readLock(), numReadyToWrite);
2570       locked = true;
2571       if(isInReplay) {
2572         mvccNum = batchOp.getReplaySequenceId();
2573       } else {
2574         mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
2575       }
2576       //
2577       // ------------------------------------
2578       // Acquire the latest mvcc number
2579       // ----------------------------------
2580       w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
2581 
2582       // calling the pre CP hook for batch mutation
2583       if (!isInReplay && coprocessorHost != null) {
2584         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2585           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2586           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2587         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2588       }
2589 
2590       // ------------------------------------
2591       // STEP 3. Write back to memstore
2592       // Write to memstore. It is ok to write to memstore
2593       // first without updating the HLog because we do not roll
2594       // forward the memstore MVCC. The MVCC will be moved up when
2595       // the complete operation is done. These changes are not yet
2596       // visible to scanners till we update the MVCC. The MVCC is
2597       // moved only when the sync is complete.
2598       // ----------------------------------
2599       long addedSize = 0;
2600       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2601         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2602             != OperationStatusCode.NOT_RUN) {
2603           continue;
2604         }
2605         doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
2606         addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells);
2607       }
2608 
2609       // ------------------------------------
2610       // STEP 4. Build WAL edit
2611       // ----------------------------------
2612       Durability durability = Durability.USE_DEFAULT;
2613       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2614         // Skip puts that were determined to be invalid during preprocessing
2615         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2616             != OperationStatusCode.NOT_RUN) {
2617           continue;
2618         }
2619         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2620 
2621         Mutation m = batchOp.getMutation(i);
2622         Durability tmpDur = getEffectiveDurability(m.getDurability());
2623         if (tmpDur.ordinal() > durability.ordinal()) {
2624           durability = tmpDur;
2625         }
2626         if (tmpDur == Durability.SKIP_WAL) {
2627           recordMutationWithoutWal(m.getFamilyCellMap());
2628           continue;
2629         }
2630 
2631         long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
2632         // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
2633         // Given how nonces are originally written, these should be contiguous.
2634         // They don't have to be, it will still work, just write more WALEdits than needed.
2635         if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
2636           if (walEdit.size() > 0) {
2637             assert isInReplay;
2638             if (!isInReplay) {
2639               throw new IOException("Multiple nonces per batch and not in replay");
2640             }
2641             // txid should always increase, so having the one from the last call is ok.
2642             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2643               this.htableDescriptor.getTableName(), now, m.getClusterIds(),
2644               currentNonceGroup, currentNonce);
2645             txid = this.log.appendNoSync(this.htableDescriptor,  this.getRegionInfo(),  walKey,
2646               walEdit, getSequenceId(), true, null);
2647             walEdit = new WALEdit(isInReplay);
2648             walKey = null;
2649           }
2650           currentNonceGroup = nonceGroup;
2651           currentNonce = nonce;
2652         }
2653 
2654         // Add WAL edits by CP
2655         WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2656         if (fromCP != null) {
2657           for (KeyValue kv : fromCP.getKeyValues()) {
2658             walEdit.add(kv);
2659           }
2660         }
2661         addFamilyMapToWALEdit(familyMaps[i], walEdit);
2662       }
2663 
2664       // -------------------------
2665       // STEP 5. Append the final edit to WAL. Do not sync wal.
2666       // -------------------------
2667       Mutation mutation = batchOp.getMutation(firstIndex);
2668       if (walEdit.size() > 0) {
2669         walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2670             this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
2671             mutation.getClusterIds(), currentNonceGroup, currentNonce);
2672         if(isInReplay) {
2673           walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
2674         }
2675         txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
2676           getSequenceId(), true, memstoreCells);
2677       }
2678       if(walKey == null){
2679         // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
2680         walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
2681       }
2682 
2683       // -------------------------------
2684       // STEP 6. Release row locks, etc.
2685       // -------------------------------
2686       if (locked) {
2687         this.updatesLock.readLock().unlock();
2688         locked = false;
2689       }
2690       releaseRowLocks(acquiredRowLocks);
2691 
2692       // -------------------------
2693       // STEP 7. Sync wal.
2694       // -------------------------
2695       if (txid != 0) {
2696         syncOrDefer(txid, durability);
2697       }
2698 
2699       doRollBackMemstore = false;
2700       // calling the post CP hook for batch mutation
2701       if (!isInReplay && coprocessorHost != null) {
2702         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2703           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2704           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2705         coprocessorHost.postBatchMutate(miniBatchOp);
2706       }
2707 
2708       // ------------------------------------------------------------------
2709       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
2710       // ------------------------------------------------------------------
2711       if (w != null) {
2712         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2713         w = null;
2714       }
2715 
2716       // ------------------------------------
2717       // STEP 9. Run coprocessor post hooks. This should be done after the wal is
2718       // synced so that the coprocessor contract is adhered to.
2719       // ------------------------------------
2720       if (!isInReplay && coprocessorHost != null) {
2721         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2722           // only for successful puts
2723           if (batchOp.retCodeDetails[i].getOperationStatusCode()
2724               != OperationStatusCode.SUCCESS) {
2725             continue;
2726           }
2727           Mutation m = batchOp.getMutation(i);
2728           if (m instanceof Put) {
2729             coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
2730           } else {
2731             coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
2732           }
2733         }
2734       }
2735 
2736       success = true;
2737       return addedSize;
2738     } finally {
2739 
2740       // if the wal sync was unsuccessful, remove keys from memstore
2741       if (doRollBackMemstore) {
2742         rollbackMemstore(memstoreCells);
2743       }
2744       if (w != null) {
2745         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2746       }
2747 
2748       if (locked) {
2749         this.updatesLock.readLock().unlock();
2750       }
2751       releaseRowLocks(acquiredRowLocks);
2752 
2753       // See if the column families were consistent through the whole thing.
2754       // if they were then keep them. If they were not then pass a null.
2755       // null will be treated as unknown.
2756       // Total time taken might be involving Puts and Deletes.
2757       // Split the time for puts and deletes based on the total number of Puts and Deletes.
2758 
2759       if (noOfPuts > 0) {
2760         // There were some Puts in the batch.
2761         if (this.metricsRegion != null) {
2762           this.metricsRegion.updatePut();
2763         }
2764       }
2765       if (noOfDeletes > 0) {
2766         // There were some Deletes in the batch.
2767         if (this.metricsRegion != null) {
2768           this.metricsRegion.updateDelete();
2769         }
2770       }
2771       if (!success) {
2772         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2773           if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
2774             batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
2775           }
2776         }
2777       }
2778       if (coprocessorHost != null && !batchOp.isInReplay()) {
2779         // call the coprocessor hook to do any finalization steps
2780         // after the put is done
2781         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2782             new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2783                 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
2784                 lastIndexExclusive);
2785         coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
2786       }
2787 
2788       batchOp.nextIndexToProcess = lastIndexExclusive;
2789     }
2790   }
2791 
2792   /**
2793    * Returns effective durability from the passed durability and
2794    * the table descriptor.
2795    */
2796   protected Durability getEffectiveDurability(Durability d) {
2797     return d == Durability.USE_DEFAULT ? this.durability : d;
2798   }
2799 
2800   //TODO, Think that gets/puts and deletes should be refactored a bit so that
2801   //the getting of the lock happens before, so that you would just pass it into
2802   //the methods. So in the case of checkAndMutate you could just do lockRow,
2803   //get, put, unlockRow or something
2804   /**
2805    *
2806    * @throws IOException
2807    * @return true if the new put was executed, false otherwise
2808    */
2809   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
2810       CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
2811       boolean writeToWAL)
2812   throws IOException{
2813     checkReadOnly();
2814     //TODO, add check for value length or maybe even better move this to the
2815     //client if this becomes a global setting
2816     checkResources();
2817     boolean isPut = w instanceof Put;
2818     if (!isPut && !(w instanceof Delete))
2819       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
2820           "be Put or Delete");
2821     if (!Bytes.equals(row, w.getRow())) {
2822       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
2823           "getRow must match the passed row");
2824     }
2825 
2826     startRegionOperation();
2827     try {
2828       Get get = new Get(row);
2829       checkFamily(family);
2830       get.addColumn(family, qualifier);
2831 
2832       // Lock row - note that doBatchMutate will relock this row if called
2833       RowLock rowLock = getRowLock(get.getRow());
2834       // wait for all previous transactions to complete (with lock held)
2835       mvcc.waitForPreviousTransactionsComplete();
2836       try {
2837         if (this.getCoprocessorHost() != null) {
2838           Boolean processed = null;
2839           if (w instanceof Put) {
2840             processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
2841                 qualifier, compareOp, comparator, (Put) w);
2842           } else if (w instanceof Delete) {
2843             processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
2844                 qualifier, compareOp, comparator, (Delete) w);
2845           }
2846           if (processed != null) {
2847             return processed;
2848           }
2849         }
2850         List<Cell> result = get(get, false);
2851 
2852         boolean valueIsNull = comparator.getValue() == null ||
2853           comparator.getValue().length == 0;
2854         boolean matches = false;
2855         if (result.size() == 0 && valueIsNull) {
2856           matches = true;
2857         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
2858             valueIsNull) {
2859           matches = true;
2860         } else if (result.size() == 1 && !valueIsNull) {
2861           Cell kv = result.get(0);
2862           int compareResult = comparator.compareTo(kv.getValueArray(),
2863               kv.getValueOffset(), kv.getValueLength());
2864           switch (compareOp) {
2865           case LESS:
2866             matches = compareResult < 0;
2867             break;
2868           case LESS_OR_EQUAL:
2869             matches = compareResult <= 0;
2870             break;
2871           case EQUAL:
2872             matches = compareResult == 0;
2873             break;
2874           case NOT_EQUAL:
2875             matches = compareResult != 0;
2876             break;
2877           case GREATER_OR_EQUAL:
2878             matches = compareResult >= 0;
2879             break;
2880           case GREATER:
2881             matches = compareResult > 0;
2882             break;
2883           default:
2884             throw new RuntimeException("Unknown Compare op " + compareOp.name());
2885           }
2886         }
2887         //If matches put the new put or delete the new delete
2888         if (matches) {
2889           // All edits for the given row (across all column families) must
2890           // happen atomically.
2891           doBatchMutate(w);
2892           this.checkAndMutateChecksPassed.increment();
2893           return true;
2894         }
2895         this.checkAndMutateChecksFailed.increment();
2896         return false;
2897       } finally {
2898         rowLock.release();
2899       }
2900     } finally {
2901       closeRegionOperation();
2902     }
2903   }
2904 
2905   private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
2906     // Currently this is only called for puts and deletes, so no nonces.
2907     OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
2908         HConstants.NO_NONCE, HConstants.NO_NONCE);
2909     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
2910       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
2911     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
2912       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
2913     }
2914   }
2915 
2916   /**
2917    * Complete taking the snapshot on the region. Writes the region info and adds references to the
2918    * working snapshot directory.
2919    *
2920    * TODO for api consistency, consider adding another version with no {@link ForeignExceptionSnare}
2921    * arg.  (In the future other cancellable HRegion methods could eventually add a
2922    * {@link ForeignExceptionSnare}, or we could do something fancier).
2923    *
2924    * @param desc snapshot description object
2925    * @param exnSnare ForeignExceptionSnare that captures external exceptions in case we need to
2926    *   bail out.  This is allowed to be null and will just be ignored in that case.
2927    * @throws IOException if there is an external or internal error causing the snapshot to fail
2928    */
2929   public void addRegionToSnapshot(SnapshotDescription desc,
2930       ForeignExceptionSnare exnSnare) throws IOException {
2931     Path rootDir = FSUtils.getRootDir(conf);
2932     Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
2933 
2934     SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
2935                                                         snapshotDir, desc, exnSnare);
2936     manifest.addRegion(this);
2937   }
2938 
2939   /**
2940    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the
2941    * provided current timestamp.
2942    */
2943   void updateKVTimestamps(final Iterable<List<Cell>> keyLists, final byte[] now) {
2944     for (List<Cell> cells: keyLists) {
2945       if (cells == null) continue;
2946       for (Cell cell : cells) {
2947         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2948         kv.updateLatestStamp(now);
2949       }
2950     }
2951   }
2952 
2953   /*
2954    * Check if resources to support an update.
2955    *
2956    * We throw RegionTooBusyException if above memstore limit
2957    * and expect client to retry using some kind of backoff
2958   */
2959   private void checkResources()
2960     throws RegionTooBusyException {
2961     // If catalog region, do not impose resource constraints or block updates.
2962     if (this.getRegionInfo().isMetaRegion()) return;
2963 
2964     if (this.memstoreSize.get() > this.blockingMemStoreSize) {
2965       requestFlush();
2966       throw new RegionTooBusyException("Above memstore limit, " +
2967           "regionName=" + (this.getRegionInfo() == null ? "unknown" :
2968           this.getRegionInfo().getRegionNameAsString()) +
2969           ", server=" + (this.getRegionServerServices() == null ? "unknown" :
2970           this.getRegionServerServices().getServerName()) +
2971           ", memstoreSize=" + memstoreSize.get() +
2972           ", blockingMemStoreSize=" + blockingMemStoreSize);
2973     }
2974   }
2975 
2976   /**
2977    * @throws IOException Throws exception if region is in read-only mode.
2978    */
2979   protected void checkReadOnly() throws IOException {
2980     if (this.writestate.isReadOnly()) {
2981       throw new IOException("region is read only");
2982     }
2983   }
2984 
2985   protected void checkReadsEnabled() throws IOException {
2986     if (!this.writestate.readsEnabled) {
2987       throw new IOException ("The region's reads are disabled. Cannot serve the request");
2988     }
2989   }
2990 
2991   public void setReadsEnabled(boolean readsEnabled) {
2992     this.writestate.setReadsEnabled(readsEnabled);
2993   }
2994 
2995   /**
2996    * Add updates first to the hlog and then add values to memstore.
2997    * Warning: Assumption is caller has lock on passed in row.
2998    * @param edits Cell updates by column
2999    * @throws IOException
3000    */
3001   private void put(final byte [] row, byte [] family, List<Cell> edits)
3002   throws IOException {
3003     NavigableMap<byte[], List<Cell>> familyMap;
3004     familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
3005 
3006     familyMap.put(family, edits);
3007     Put p = new Put(row);
3008     p.setFamilyCellMap(familyMap);
3009     doBatchMutate(p);
3010   }
3011 
3012   /**
3013    * Atomically apply the given map of family->edits to the memstore.
3014    * This handles the consistency control on its own, but the caller
3015    * should already have locked updatesLock.readLock(). This also does
3016    * <b>not</b> check the families for validity.
3017    *
3018    * @param familyMap Map of kvs per family
3019    * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
3020    *        If null, then this method internally creates a mvcc transaction.
3021    * @param output newly added KVs into memstore
3022    * @return the additional memory usage of the memstore caused by the
3023    * new entries.
3024    */
3025   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
3026     long mvccNum, List<KeyValue> memstoreCells) {
3027     long size = 0;
3028 
3029     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3030       byte[] family = e.getKey();
3031       List<Cell> cells = e.getValue();
3032 
3033       Store store = getStore(family);
3034       for (Cell cell: cells) {
3035         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
3036         kv.setSequenceId(mvccNum);
3037         Pair<Long, Cell> ret = store.add(kv);
3038         size += ret.getFirst();
3039         memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
3040       }
3041     }
3042 
3043      return size;
3044    }
3045 
3046   /**
3047    * Remove all the keys listed in the map from the memstore. This method is
3048    * called when a Put/Delete has updated memstore but subsequently fails to update
3049    * the wal. This method is then invoked to rollback the memstore.
3050    */
3051   private void rollbackMemstore(List<KeyValue> memstoreCells) {
3052     int kvsRolledback = 0;
3053 
3054     for (KeyValue kv : memstoreCells) {
3055       byte[] family = kv.getFamily();
3056       Store store = getStore(family);
3057       store.rollback(kv);
3058       kvsRolledback++;
3059     }
3060     LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
3061   }
3062 
3063   /**
3064    * Check the collection of families for validity.
3065    * @throws NoSuchColumnFamilyException if a family does not exist.
3066    */
3067   void checkFamilies(Collection<byte[]> families)
3068   throws NoSuchColumnFamilyException {
3069     for (byte[] family : families) {
3070       checkFamily(family);
3071     }
3072   }
3073 
3074   /**
3075    * During replay, there could exist column families which are removed between region server
3076    * failure and replay
3077    */
3078   private void removeNonExistentColumnFamilyForReplay(
3079       final Map<byte[], List<Cell>> familyMap) {
3080     List<byte[]> nonExistentList = null;
3081     for (byte[] family : familyMap.keySet()) {
3082       if (!this.htableDescriptor.hasFamily(family)) {
3083         if (nonExistentList == null) {
3084           nonExistentList = new ArrayList<byte[]>();
3085         }
3086         nonExistentList.add(family);
3087       }
3088     }
3089     if (nonExistentList != null) {
3090       for (byte[] family : nonExistentList) {
3091         // Perhaps schema was changed between crash and replay
3092         LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3093         familyMap.remove(family);
3094       }
3095     }
3096   }
3097 
3098   void checkTimestamps(final Map<byte[], List<Cell>> familyMap,
3099       long now) throws FailedSanityCheckException {
3100     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3101       return;
3102     }
3103     long maxTs = now + timestampSlop;
3104     for (List<Cell> kvs : familyMap.values()) {
3105       for (Cell cell : kvs) {
3106         // see if the user-side TS is out of range. latest = server-side
3107         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
3108         if (!kv.isLatestTimestamp() && kv.getTimestamp() > maxTs) {
3109           throw new FailedSanityCheckException("Timestamp for KV out of range "
3110               + cell + " (too.new=" + timestampSlop + ")");
3111         }
3112       }
3113     }
3114   }
3115 
3116   /**
3117    * Append the given map of family->edits to a WALEdit data structure.
3118    * This does not write to the HLog itself.
3119    * @param familyMap map of family->edits
3120    * @param walEdit the destination entry to append into
3121    */
3122   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3123       WALEdit walEdit) {
3124     for (List<Cell> edits : familyMap.values()) {
3125       for (Cell cell : edits) {
3126         walEdit.add(KeyValueUtil.ensureKeyValue(cell));
3127       }
3128     }
3129   }
3130 
3131   private void requestFlush() {
3132     if (this.rsServices == null) {
3133       return;
3134     }
3135     synchronized (writestate) {
3136       if (this.writestate.isFlushRequested()) {
3137         return;
3138       }
3139       writestate.flushRequested = true;
3140     }
3141     // Make request outside of synchronize block; HBASE-818.
3142     this.rsServices.getFlushRequester().requestFlush(this);
3143     if (LOG.isDebugEnabled()) {
3144       LOG.debug("Flush requested on " + this);
3145     }
3146   }
3147 
3148   /*
3149    * @param size
3150    * @return True if size is over the flush threshold
3151    */
3152   private boolean isFlushSize(final long size) {
3153     return size > this.memstoreFlushSize;
3154   }
3155 
3156   /**
3157    * Read the edits log put under this region by wal log splitting process.  Put
3158    * the recovered edits back up into this region.
3159    *
3160    * <p>We can ignore any log message that has a sequence ID that's equal to or
3161    * lower than minSeqId.  (Because we know such log messages are already
3162    * reflected in the HFiles.)
3163    *
3164    * <p>While this is running we are putting pressure on memory yet we are
3165    * outside of our usual accounting because we are not yet an onlined region
3166    * (this stuff is being run as part of Region initialization).  This means
3167    * that if we're up against global memory limits, we'll not be flagged to flush
3168    * because we are not online. We can't be flushed by usual mechanisms anyways;
3169    * we're not yet online so our relative sequenceids are not yet aligned with
3170    * HLog sequenceids -- not till we come up online, post processing of split
3171    * edits.
3172    *
3173    * <p>But to help relieve memory pressure, at least manage our own heap size
3174    * flushing if are in excess of per-region limits.  Flushing, though, we have
3175    * to be careful and avoid using the regionserver/hlog sequenceid.  Its running
3176    * on a different line to whats going on in here in this region context so if we
3177    * crashed replaying these edits, but in the midst had a flush that used the
3178    * regionserver log with a sequenceid in excess of whats going on in here
3179    * in this region and with its split editlogs, then we could miss edits the
3180    * next time we go to recover. So, we have to flush inline, using seqids that
3181    * make sense in a this single region context only -- until we online.
3182    *
3183    * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
3184    * the maxSeqId for the store to be applied, else its skipped.
3185    * @return the sequence id of the last edit added to this region out of the
3186    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3187    * @throws UnsupportedEncodingException
3188    * @throws IOException
3189    */
3190   protected long replayRecoveredEditsIfAny(final Path regiondir,
3191       Map<byte[], Long> maxSeqIdInStores,
3192       final CancelableProgressable reporter, final MonitoredTask status)
3193       throws UnsupportedEncodingException, IOException {
3194     long minSeqIdForTheRegion = -1;
3195     for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3196       if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3197         minSeqIdForTheRegion = maxSeqIdInStore;
3198       }
3199     }
3200     long seqid = minSeqIdForTheRegion;
3201 
3202     FileSystem fs = this.fs.getFileSystem();
3203     NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
3204     if (LOG.isDebugEnabled()) {
3205       LOG.debug("Found " + (files == null ? 0 : files.size())
3206         + " recovered edits file(s) under " + regiondir);
3207     }
3208 
3209     if (files == null || files.isEmpty()) return seqid;
3210 
3211     for (Path edits: files) {
3212       if (edits == null || !fs.exists(edits)) {
3213         LOG.warn("Null or non-existent edits file: " + edits);
3214         continue;
3215       }
3216       if (isZeroLengthThenDelete(fs, edits)) continue;
3217 
3218       long maxSeqId;
3219       String fileName = edits.getName();
3220       maxSeqId = Math.abs(Long.parseLong(fileName));
3221       if (maxSeqId <= minSeqIdForTheRegion) {
3222         if (LOG.isDebugEnabled()) {
3223           String msg = "Maximum sequenceid for this log is " + maxSeqId
3224             + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3225             + ", skipped the whole file, path=" + edits;
3226           LOG.debug(msg);
3227         }
3228         continue;
3229       }
3230 
3231       try {
3232         // replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater
3233         seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3234       } catch (IOException e) {
3235         boolean skipErrors = conf.getBoolean(
3236             HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3237             conf.getBoolean(
3238                 "hbase.skip.errors",
3239                 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3240         if (conf.get("hbase.skip.errors") != null) {
3241           LOG.warn(
3242               "The property 'hbase.skip.errors' has been deprecated. Please use " +
3243               HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3244         }
3245         if (skipErrors) {
3246           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3247           LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3248               + "=true so continuing. Renamed " + edits +
3249               " as " + p, e);
3250         } else {
3251           throw e;
3252         }
3253       }
3254     }
3255     // The edits size added into rsAccounting during this replaying will not
3256     // be required any more. So just clear it.
3257     if (this.rsAccounting != null) {
3258       this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
3259     }
3260     if (seqid > minSeqIdForTheRegion) {
3261       // Then we added some edits to memory. Flush and cleanup split edit files.
3262       internalFlushcache(null, seqid, status);
3263     }
3264     // Now delete the content of recovered edits.  We're done w/ them.
3265     for (Path file: files) {
3266       if (!fs.delete(file, false)) {
3267         LOG.error("Failed delete of " + file);
3268       } else {
3269         LOG.debug("Deleted recovered.edits file=" + file);
3270       }
3271     }
3272     return seqid;
3273   }
3274 
3275   /*
3276    * @param edits File of recovered edits.
3277    * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in log
3278    * must be larger than this to be replayed for each store.
3279    * @param reporter
3280    * @return the sequence id of the last edit added to this region out of the
3281    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3282    * @throws IOException
3283    */
3284   private long replayRecoveredEdits(final Path edits,
3285       Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3286     throws IOException {
3287     String msg = "Replaying edits from " + edits;
3288     LOG.info(msg);
3289     MonitoredTask status = TaskMonitor.get().createStatus(msg);
3290     FileSystem fs = this.fs.getFileSystem();
3291 
3292     status.setStatus("Opening logs");
3293     HLog.Reader reader = null;
3294     try {
3295       reader = HLogFactory.createReader(fs, edits, conf);
3296       long currentEditSeqId = -1;
3297       long currentReplaySeqId = -1;
3298       long firstSeqIdInLog = -1;
3299       long skippedEdits = 0;
3300       long editsCount = 0;
3301       long intervalEdits = 0;
3302       HLog.Entry entry;
3303       Store store = null;
3304       boolean reported_once = false;
3305       ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3306 
3307       try {
3308         // How many edits seen before we check elapsed time
3309         int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
3310             2000);
3311         // How often to send a progress report (default 1/2 master timeout)
3312         int period = this.conf.getInt("hbase.hstore.report.period", 300000);
3313         long lastReport = EnvironmentEdgeManager.currentTimeMillis();
3314 
3315         while ((entry = reader.next()) != null) {
3316           HLogKey key = entry.getKey();
3317           WALEdit val = entry.getEdit();
3318 
3319           if (ng != null) { // some test, or nonces disabled
3320             ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
3321           }
3322 
3323           if (reporter != null) {
3324             intervalEdits += val.size();
3325             if (intervalEdits >= interval) {
3326               // Number of edits interval reached
3327               intervalEdits = 0;
3328               long cur = EnvironmentEdgeManager.currentTimeMillis();
3329               if (lastReport + period <= cur) {
3330                 status.setStatus("Replaying edits..." +
3331                     " skipped=" + skippedEdits +
3332                     " edits=" + editsCount);
3333                 // Timeout reached
3334                 if(!reporter.progress()) {
3335                   msg = "Progressable reporter failed, stopping replay";
3336                   LOG.warn(msg);
3337                   status.abort(msg);
3338                   throw new IOException(msg);
3339                 }
3340                 reported_once = true;
3341                 lastReport = cur;
3342               }
3343             }
3344           }
3345 
3346           // Start coprocessor replay here. The coprocessor is for each WALEdit
3347           // instead of a KeyValue.
3348           if (coprocessorHost != null) {
3349             status.setStatus("Running pre-WAL-restore hook in coprocessors");
3350             if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
3351               // if bypass this log entry, ignore it ...
3352               continue;
3353             }
3354           }
3355 
3356           if (firstSeqIdInLog == -1) {
3357             firstSeqIdInLog = key.getLogSeqNum();
3358           }
3359           currentEditSeqId = key.getLogSeqNum();
3360           currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
3361             key.getOrigLogSeqNum() : currentEditSeqId;
3362           boolean flush = false;
3363           for (KeyValue kv: val.getKeyValues()) {
3364             // Check this edit is for me. Also, guard against writing the special
3365             // METACOLUMN info such as HBASE::CACHEFLUSH entries
3366             if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY) ||
3367                 !Bytes.equals(key.getEncodedRegionName(),
3368                   this.getRegionInfo().getEncodedNameAsBytes())) {
3369               //this is a special edit, we should handle it
3370               CompactionDescriptor compaction = WALEdit.getCompaction(kv);
3371               if (compaction != null) {
3372                 //replay the compaction
3373                 completeCompactionMarker(compaction);
3374               }
3375 
3376               skippedEdits++;
3377               continue;
3378             }
3379             // Figure which store the edit is meant for.
3380             if (store == null || !CellUtil.matchingFamily(kv, store.getFamily().getName())) {
3381               store = getStore(kv);
3382             }
3383             if (store == null) {
3384               // This should never happen.  Perhaps schema was changed between
3385               // crash and redeploy?
3386               LOG.warn("No family for " + kv);
3387               skippedEdits++;
3388               continue;
3389             }
3390             // Now, figure if we should skip this edit.
3391             if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
3392                 .getName())) {
3393               skippedEdits++;
3394               continue;
3395             }
3396             kv.setSequenceId(currentReplaySeqId);
3397             // Once we are over the limit, restoreEdit will keep returning true to
3398             // flush -- but don't flush until we've played all the kvs that make up
3399             // the WALEdit.
3400             flush = restoreEdit(store, kv);
3401             editsCount++;
3402           }
3403           if (flush) internalFlushcache(null, currentEditSeqId, status);
3404 
3405           if (coprocessorHost != null) {
3406             coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3407           }
3408         }
3409       } catch (EOFException eof) {
3410         Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3411         msg = "Encountered EOF. Most likely due to Master failure during " +
3412             "log splitting, so we have this data in another edit.  " +
3413             "Continuing, but renaming " + edits + " as " + p;
3414         LOG.warn(msg, eof);
3415         status.abort(msg);
3416       } catch (IOException ioe) {
3417         // If the IOE resulted from bad file format,
3418         // then this problem is idempotent and retrying won't help
3419         if (ioe.getCause() instanceof ParseException) {
3420           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3421           msg = "File corruption encountered!  " +
3422               "Continuing, but renaming " + edits + " as " + p;
3423           LOG.warn(msg, ioe);
3424           status.setStatus(msg);
3425         } else {
3426           status.abort(StringUtils.stringifyException(ioe));
3427           // other IO errors may be transient (bad network connection,
3428           // checksum exception on one datanode, etc).  throw & retry
3429           throw ioe;
3430         }
3431       }
3432       if (reporter != null && !reported_once) {
3433         reporter.progress();
3434       }
3435       msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3436         ", firstSequenceIdInLog=" + firstSeqIdInLog +
3437         ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
3438       status.markComplete(msg);
3439       LOG.debug(msg);
3440       return currentEditSeqId;
3441     } finally {
3442       status.cleanup();
3443       if (reader != null) {
3444          reader.close();
3445       }
3446     }
3447   }
3448 
3449   /**
3450    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
3451    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
3452    * See HBASE-2331.
3453    */
3454   void completeCompactionMarker(CompactionDescriptor compaction)
3455       throws IOException {
3456     Store store = this.getStore(compaction.getFamilyName().toByteArray());
3457     if (store == null) {
3458       LOG.warn("Found Compaction WAL edit for deleted family:" +
3459           Bytes.toString(compaction.getFamilyName().toByteArray()));
3460       return;
3461     }
3462     store.completeCompactionMarker(compaction);
3463   }
3464 
3465   /**
3466    * Used by tests
3467    * @param s Store to add edit too.
3468    * @param kv KeyValue to add.
3469    * @return True if we should flush.
3470    */
3471   protected boolean restoreEdit(final Store s, final KeyValue kv) {
3472     long kvSize = s.add(kv).getFirst();
3473     if (this.rsAccounting != null) {
3474       rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
3475     }
3476     return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3477   }
3478 
3479   /*
3480    * @param fs
3481    * @param p File to check.
3482    * @return True if file was zero-length (and if so, we'll delete it in here).
3483    * @throws IOException
3484    */
3485   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3486       throws IOException {
3487     FileStatus stat = fs.getFileStatus(p);
3488     if (stat.getLen() > 0) return false;
3489     LOG.warn("File " + p + " is zero-length, deleting.");
3490     fs.delete(p, false);
3491     return true;
3492   }
3493 
3494   protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
3495     return new HStore(this, family, this.conf);
3496   }
3497 
3498   /**
3499    * Return HStore instance.
3500    * Use with caution.  Exposed for use of fixup utilities.
3501    * @param column Name of column family hosted by this region.
3502    * @return Store that goes with the family on passed <code>column</code>.
3503    * TODO: Make this lookup faster.
3504    */
3505   public Store getStore(final byte[] column) {
3506     return this.stores.get(column);
3507   }
3508 
3509   /**
3510    * Return HStore instance. Does not do any copy: as the number of store is limited, we
3511    *  iterate on the list.
3512    */
3513   private Store getStore(Cell cell) {
3514     for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
3515       if (Bytes.equals(
3516           cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3517           famStore.getKey(), 0, famStore.getKey().length)) {
3518         return famStore.getValue();
3519       }
3520     }
3521 
3522     return null;
3523   }
3524 
3525   public Map<byte[], Store> getStores() {
3526     return this.stores;
3527   }
3528 
3529   /**
3530    * Return list of storeFiles for the set of CFs.
3531    * Uses closeLock to prevent the race condition where a region closes
3532    * in between the for loop - closing the stores one by one, some stores
3533    * will return 0 files.
3534    * @return List of storeFiles.
3535    */
3536   public List<String> getStoreFileList(final byte [][] columns)
3537     throws IllegalArgumentException {
3538     List<String> storeFileNames = new ArrayList<String>();
3539     synchronized(closeLock) {
3540       for(byte[] column : columns) {
3541         Store store = this.stores.get(column);
3542         if (store == null) {
3543           throw new IllegalArgumentException("No column family : " +
3544               new String(column) + " available");
3545         }
3546         for (StoreFile storeFile: store.getStorefiles()) {
3547           storeFileNames.add(storeFile.getPath().toString());
3548         }
3549       }
3550     }
3551     return storeFileNames;
3552   }
3553   //////////////////////////////////////////////////////////////////////////////
3554   // Support code
3555   //////////////////////////////////////////////////////////////////////////////
3556 
3557   /** Make sure this is a valid row for the HRegion */
3558   void checkRow(final byte [] row, String op) throws IOException {
3559     if (!rowIsInRange(getRegionInfo(), row)) {
3560       throw new WrongRegionException("Requested row out of range for " +
3561           op + " on HRegion " + this + ", startKey='" +
3562           Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" +
3563           Bytes.toStringBinary(getEndKey()) + "', row='" +
3564           Bytes.toStringBinary(row) + "'");
3565     }
3566   }
3567 
3568   /**
3569    * Tries to acquire a lock on the given row.
3570    * @param waitForLock if true, will block until the lock is available.
3571    *        Otherwise, just tries to obtain the lock and returns
3572    *        false if unavailable.
3573    * @return the row lock if acquired,
3574    *   null if waitForLock was false and the lock was not acquired
3575    * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
3576    */
3577   public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
3578     checkRow(row, "row lock");
3579     startRegionOperation();
3580     try {
3581       HashedBytes rowKey = new HashedBytes(row);
3582       RowLockContext rowLockContext = new RowLockContext(rowKey);
3583 
3584       // loop until we acquire the row lock (unless !waitForLock)
3585       while (true) {
3586         RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
3587         if (existingContext == null) {
3588           // Row is not already locked by any thread, use newly created context.
3589           break;
3590         } else if (existingContext.ownedByCurrentThread()) {
3591           // Row is already locked by current thread, reuse existing context instead.
3592           rowLockContext = existingContext;
3593           break;
3594         } else {
3595           // Row is already locked by some other thread, give up or wait for it
3596           if (!waitForLock) {
3597             return null;
3598           }
3599           try {
3600             if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
3601               throw new IOException("Timed out waiting for lock for row: " + rowKey);
3602             }
3603           } catch (InterruptedException ie) {
3604             LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
3605             InterruptedIOException iie = new InterruptedIOException();
3606             iie.initCause(ie);
3607             throw iie;
3608           }
3609         }
3610       }
3611 
3612       // allocate new lock for this thread
3613       return rowLockContext.newLock();
3614     } finally {
3615       closeRegionOperation();
3616     }
3617   }
3618 
3619   /**
3620    * Acquires a lock on the given row.
3621    * The same thread may acquire multiple locks on the same row.
3622    * @return the acquired row lock
3623    * @throws IOException if the lock could not be acquired after waiting
3624    */
3625   public RowLock getRowLock(byte[] row) throws IOException {
3626     return getRowLock(row, true);
3627   }
3628 
3629   /**
3630    * If the given list of row locks is not null, releases all locks.
3631    */
3632   public void releaseRowLocks(List<RowLock> rowLocks) {
3633     if (rowLocks != null) {
3634       for (RowLock rowLock : rowLocks) {
3635         rowLock.release();
3636       }
3637       rowLocks.clear();
3638     }
3639   }
3640 
3641   /**
3642    * Determines whether multiple column families are present
3643    * Precondition: familyPaths is not null
3644    *
3645    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3646    */
3647   private static boolean hasMultipleColumnFamilies(
3648       List<Pair<byte[], String>> familyPaths) {
3649     boolean multipleFamilies = false;
3650     byte[] family = null;
3651     for (Pair<byte[], String> pair : familyPaths) {
3652       byte[] fam = pair.getFirst();
3653       if (family == null) {
3654         family = fam;
3655       } else if (!Bytes.equals(family, fam)) {
3656         multipleFamilies = true;
3657         break;
3658       }
3659     }
3660     return multipleFamilies;
3661   }
3662 
3663 
3664   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3665                                 boolean assignSeqId) throws IOException {
3666     return bulkLoadHFiles(familyPaths, assignSeqId, null);
3667   }
3668 
3669   /**
3670    * Attempts to atomically load a group of hfiles.  This is critical for loading
3671    * rows with multiple column families atomically.
3672    *
3673    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3674    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
3675    * file about to be bulk loaded
3676    * @return true if successful, false if failed recoverably
3677    * @throws IOException if failed unrecoverably.
3678    */
3679   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
3680       BulkLoadListener bulkLoadListener) throws IOException {
3681     Preconditions.checkNotNull(familyPaths);
3682     // we need writeLock for multi-family bulk load
3683     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
3684     try {
3685       this.writeRequestsCount.increment();
3686 
3687       // There possibly was a split that happened between when the split keys
3688       // were gathered and before the HRegion's write lock was taken.  We need
3689       // to validate the HFile region before attempting to bulk load all of them
3690       List<IOException> ioes = new ArrayList<IOException>();
3691       List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
3692       for (Pair<byte[], String> p : familyPaths) {
3693         byte[] familyName = p.getFirst();
3694         String path = p.getSecond();
3695 
3696         Store store = getStore(familyName);
3697         if (store == null) {
3698           IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
3699               "No such column family " + Bytes.toStringBinary(familyName));
3700           ioes.add(ioe);
3701         } else {
3702           try {
3703             store.assertBulkLoadHFileOk(new Path(path));
3704           } catch (WrongRegionException wre) {
3705             // recoverable (file doesn't fit in region)
3706             failures.add(p);
3707           } catch (IOException ioe) {
3708             // unrecoverable (hdfs problem)
3709             ioes.add(ioe);
3710           }
3711         }
3712       }
3713 
3714       // validation failed because of some sort of IO problem.
3715       if (ioes.size() != 0) {
3716         IOException e = MultipleIOException.createIOException(ioes);
3717         LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
3718         throw e;
3719       }
3720 
3721       // validation failed, bail out before doing anything permanent.
3722       if (failures.size() != 0) {
3723         StringBuilder list = new StringBuilder();
3724         for (Pair<byte[], String> p : failures) {
3725           list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
3726             .append(p.getSecond());
3727         }
3728         // problem when validating
3729         LOG.warn("There was a recoverable bulk load failure likely due to a" +
3730             " split.  These (family, HFile) pairs were not loaded: " + list);
3731         return false;
3732       }
3733 
3734       long seqId = -1;
3735       // We need to assign a sequential ID that's in between two memstores in order to preserve
3736       // the guarantee that all the edits lower than the highest sequential ID from all the
3737       // HFiles are flushed on disk. See HBASE-10958.  The sequence id returned when we flush is
3738       // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
3739       // a sequence id that we can be sure is beyond the last hfile written).
3740       if (assignSeqId) {
3741         FlushResult fs = this.flushcache();
3742         if (fs.isFlushSucceeded()) {
3743           seqId = fs.flushSequenceId;
3744         } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
3745           seqId = fs.flushSequenceId;
3746         } else {
3747           throw new IOException("Could not bulk load with an assigned sequential ID because the " +
3748               "flush didn't run. Reason for not flushing: " + fs.failureReason);
3749         }
3750       }
3751 
3752       for (Pair<byte[], String> p : familyPaths) {
3753         byte[] familyName = p.getFirst();
3754         String path = p.getSecond();
3755         Store store = getStore(familyName);
3756         try {
3757           String finalPath = path;
3758           if(bulkLoadListener != null) {
3759             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
3760           }
3761           store.bulkLoadHFile(finalPath, seqId);
3762           if(bulkLoadListener != null) {
3763             bulkLoadListener.doneBulkLoad(familyName, path);
3764           }
3765         } catch (IOException ioe) {
3766           // A failure here can cause an atomicity violation that we currently
3767           // cannot recover from since it is likely a failed HDFS operation.
3768 
3769           // TODO Need a better story for reverting partial failures due to HDFS.
3770           LOG.error("There was a partial failure due to IO when attempting to" +
3771               " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe);
3772           if(bulkLoadListener != null) {
3773             try {
3774               bulkLoadListener.failedBulkLoad(familyName, path);
3775             } catch (Exception ex) {
3776               LOG.error("Error while calling failedBulkLoad for family "+
3777                   Bytes.toString(familyName)+" with path "+path, ex);
3778             }
3779           }
3780           throw ioe;
3781         }
3782       }
3783       return true;
3784     } finally {
3785       closeBulkRegionOperation();
3786     }
3787   }
3788 
3789   @Override
3790   public boolean equals(Object o) {
3791     return o instanceof HRegion && Bytes.equals(this.getRegionName(),
3792                                                 ((HRegion) o).getRegionName());
3793   }
3794 
3795   @Override
3796   public int hashCode() {
3797     return Bytes.hashCode(this.getRegionName());
3798   }
3799 
3800   @Override
3801   public String toString() {
3802     return this.getRegionNameAsString();
3803   }
3804 
3805   /**
3806    * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
3807    */
3808   class RegionScannerImpl implements RegionScanner {
3809     // Package local for testability
3810     KeyValueHeap storeHeap = null;
3811     /** Heap of key-values that are not essential for the provided filters and are thus read
3812      * on demand, if on-demand column family loading is enabled.*/
3813     KeyValueHeap joinedHeap = null;
3814     /**
3815      * If the joined heap data gathering is interrupted due to scan limits, this will
3816      * contain the row for which we are populating the values.*/
3817     protected Cell joinedContinuationRow = null;
3818     // KeyValue indicating that limit is reached when scanning
3819     private final KeyValue KV_LIMIT = new KeyValue();
3820     protected final byte[] stopRow;
3821     private final FilterWrapper filter;
3822     private int batch;
3823     protected int isScan;
3824     private boolean filterClosed = false;
3825     private long readPt;
3826     private long maxResultSize;
3827     protected HRegion region;
3828 
3829     @Override
3830     public HRegionInfo getRegionInfo() {
3831       return region.getRegionInfo();
3832     }
3833 
3834     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
3835         throws IOException {
3836 
3837       this.region = region;
3838       this.maxResultSize = scan.getMaxResultSize();
3839       if (scan.hasFilter()) {
3840         this.filter = new FilterWrapper(scan.getFilter());
3841       } else {
3842         this.filter = null;
3843       }
3844 
3845       this.batch = scan.getBatch();
3846       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
3847         this.stopRow = null;
3848       } else {
3849         this.stopRow = scan.getStopRow();
3850       }
3851       // If we are doing a get, we want to be [startRow,endRow] normally
3852       // it is [startRow,endRow) and if startRow=endRow we get nothing.
3853       this.isScan = scan.isGetScan() ? -1 : 0;
3854 
3855       // synchronize on scannerReadPoints so that nobody calculates
3856       // getSmallestReadPoint, before scannerReadPoints is updated.
3857       IsolationLevel isolationLevel = scan.getIsolationLevel();
3858       synchronized(scannerReadPoints) {
3859         this.readPt = getReadpoint(isolationLevel);
3860         scannerReadPoints.put(this, this.readPt);
3861       }
3862 
3863       // Here we separate all scanners into two lists - scanner that provide data required
3864       // by the filter to operate (scanners list) and all others (joinedScanners list).
3865       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
3866       List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
3867       if (additionalScanners != null) {
3868         scanners.addAll(additionalScanners);
3869       }
3870 
3871       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
3872           scan.getFamilyMap().entrySet()) {
3873         Store store = stores.get(entry.getKey());
3874         KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
3875         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
3876           || this.filter.isFamilyEssential(entry.getKey())) {
3877           scanners.add(scanner);
3878         } else {
3879           joinedScanners.add(scanner);
3880         }
3881       }
3882       initializeKVHeap(scanners, joinedScanners, region);
3883     }
3884 
3885     protected void initializeKVHeap(List<KeyValueScanner> scanners,
3886         List<KeyValueScanner> joinedScanners, HRegion region)
3887         throws IOException {
3888       this.storeHeap = new KeyValueHeap(scanners, region.comparator);
3889       if (!joinedScanners.isEmpty()) {
3890         this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
3891       }
3892     }
3893 
3894     @Override
3895     public long getMaxResultSize() {
3896       return maxResultSize;
3897     }
3898 
3899     @Override
3900     public long getMvccReadPoint() {
3901       return this.readPt;
3902     }
3903 
3904     /**
3905      * Reset both the filter and the old filter.
3906      *
3907      * @throws IOException in case a filter raises an I/O exception.
3908      */
3909     protected void resetFilters() throws IOException {
3910       if (filter != null) {
3911         filter.reset();
3912       }
3913     }
3914 
3915     @Override
3916     public boolean next(List<Cell> outResults)
3917         throws IOException {
3918       // apply the batching limit by default
3919       return next(outResults, batch);
3920     }
3921 
3922     @Override
3923     public synchronized boolean next(List<Cell> outResults, int limit) throws IOException {
3924       if (this.filterClosed) {
3925         throw new UnknownScannerException("Scanner was closed (timed out?) " +
3926             "after we renewed it. Could be caused by a very slow scanner " +
3927             "or a lengthy garbage collection");
3928       }
3929       startRegionOperation(Operation.SCAN);
3930       readRequestsCount.increment();
3931       try {
3932         return nextRaw(outResults, limit);
3933       } finally {
3934         closeRegionOperation(Operation.SCAN);
3935       }
3936     }
3937 
3938     @Override
3939     public boolean nextRaw(List<Cell> outResults)
3940         throws IOException {
3941       return nextRaw(outResults, batch);
3942     }
3943 
3944     @Override
3945     public boolean nextRaw(List<Cell> outResults, int limit) throws IOException {
3946       boolean returnResult;
3947       if (outResults.isEmpty()) {
3948         // Usually outResults is empty. This is true when next is called
3949         // to handle scan or get operation.
3950         returnResult = nextInternal(outResults, limit);
3951       } else {
3952         List<Cell> tmpList = new ArrayList<Cell>();
3953         returnResult = nextInternal(tmpList, limit);
3954         outResults.addAll(tmpList);
3955       }
3956       resetFilters();
3957       if (isFilterDoneInternal()) {
3958         returnResult = false;
3959       }
3960       if (region != null && region.metricsRegion != null) {
3961         long totalSize = 0;
3962         for(Cell c:outResults) {
3963           // TODO clean up.  Find way to remove this ensureKeyValue
3964           KeyValue kv = KeyValueUtil.ensureKeyValue(c);
3965           totalSize += kv.getLength();
3966         }
3967         region.metricsRegion.updateScanNext(totalSize);
3968       }
3969       return returnResult;
3970     }
3971 
3972 
3973     private void populateFromJoinedHeap(List<Cell> results, int limit)
3974         throws IOException {
3975       assert joinedContinuationRow != null;
3976       Cell kv = populateResult(results, this.joinedHeap, limit,
3977           joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
3978           joinedContinuationRow.getRowLength());
3979       if (kv != KV_LIMIT) {
3980         // We are done with this row, reset the continuation.
3981         joinedContinuationRow = null;
3982       }
3983       // As the data is obtained from two independent heaps, we need to
3984       // ensure that result list is sorted, because Result relies on that.
3985       Collections.sort(results, comparator);
3986     }
3987 
3988     /**
3989      * Fetches records with currentRow into results list, until next row or limit (if not -1).
3990      * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
3991      * @param limit Max amount of KVs to place in result list, -1 means no limit.
3992      * @param currentRow Byte array with key we are fetching.
3993      * @param offset offset for currentRow
3994      * @param length length for currentRow
3995      * @return KV_LIMIT if limit reached, next KeyValue otherwise.
3996      */
3997     private Cell populateResult(List<Cell> results, KeyValueHeap heap, int limit,
3998         byte[] currentRow, int offset, short length) throws IOException {
3999       Cell nextKv;
4000       do {
4001         heap.next(results, limit - results.size());
4002         if (limit > 0 && results.size() == limit) {
4003           return KV_LIMIT;
4004         }
4005         nextKv = heap.peek();
4006       } while (nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length));
4007 
4008       return nextKv;
4009     }
4010 
4011     /*
4012      * @return True if a filter rules the scanner is over, done.
4013      */
4014     @Override
4015     public synchronized boolean isFilterDone() throws IOException {
4016       return isFilterDoneInternal();
4017     }
4018 
4019     private boolean isFilterDoneInternal() throws IOException {
4020       return this.filter != null && this.filter.filterAllRemaining();
4021     }
4022 
4023     private boolean nextInternal(List<Cell> results, int limit)
4024     throws IOException {
4025       if (!results.isEmpty()) {
4026         throw new IllegalArgumentException("First parameter should be an empty list");
4027       }
4028       RpcCallContext rpcCall = RpcServer.getCurrentCall();
4029       // The loop here is used only when at some point during the next we determine
4030       // that due to effects of filters or otherwise, we have an empty row in the result.
4031       // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
4032       // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
4033       // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
4034       while (true) {
4035         if (rpcCall != null) {
4036           // If a user specifies a too-restrictive or too-slow scanner, the
4037           // client might time out and disconnect while the server side
4038           // is still processing the request. We should abort aggressively
4039           // in that case.
4040           long afterTime = rpcCall.disconnectSince();
4041           if (afterTime >= 0) {
4042             throw new CallerDisconnectedException(
4043                 "Aborting on region " + getRegionNameAsString() + ", call " +
4044                     this + " after " + afterTime + " ms, since " +
4045                     "caller disconnected");
4046           }
4047         }
4048 
4049         // Let's see what we have in the storeHeap.
4050         Cell current = this.storeHeap.peek();
4051 
4052         byte[] currentRow = null;
4053         int offset = 0;
4054         short length = 0;
4055         if (current != null) {
4056           currentRow = current.getRowArray();
4057           offset = current.getRowOffset();
4058           length = current.getRowLength();
4059         }
4060         boolean stopRow = isStopRow(currentRow, offset, length);
4061         // Check if we were getting data from the joinedHeap and hit the limit.
4062         // If not, then it's main path - getting results from storeHeap.
4063         if (joinedContinuationRow == null) {
4064           // First, check if we are at a stop row. If so, there are no more results.
4065           if (stopRow) {
4066             if (filter != null && filter.hasFilterRow()) {
4067               filter.filterRowCells(results);
4068             }
4069             return false;
4070           }
4071 
4072           // Check if rowkey filter wants to exclude this row. If so, loop to next.
4073           // Technically, if we hit limits before on this row, we don't need this call.
4074           if (filterRowKey(currentRow, offset, length)) {
4075             boolean moreRows = nextRow(currentRow, offset, length);
4076             if (!moreRows) return false;
4077             results.clear();
4078             continue;
4079           }
4080 
4081           Cell nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
4082               length);
4083           // Ok, we are good, let's try to get some results from the main heap.
4084           if (nextKv == KV_LIMIT) {
4085             if (this.filter != null && filter.hasFilterRow()) {
4086               throw new IncompatibleFilterException(
4087                 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
4088             }
4089             return true; // We hit the limit.
4090           }
4091 
4092           stopRow = nextKv == null ||
4093               isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
4094           // save that the row was empty before filters applied to it.
4095           final boolean isEmptyRow = results.isEmpty();
4096 
4097           // We have the part of the row necessary for filtering (all of it, usually).
4098           // First filter with the filterRow(List).
4099           FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
4100           if (filter != null && filter.hasFilterRow()) {
4101             ret = filter.filterRowCellsWithRet(results);
4102           }
4103 
4104           if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
4105             results.clear();
4106             boolean moreRows = nextRow(currentRow, offset, length);
4107             if (!moreRows) return false;
4108 
4109             // This row was totally filtered out, if this is NOT the last row,
4110             // we should continue on. Otherwise, nothing else to do.
4111             if (!stopRow) continue;
4112             return false;
4113           }
4114 
4115           // Ok, we are done with storeHeap for this row.
4116           // Now we may need to fetch additional, non-essential data into row.
4117           // These values are not needed for filter to work, so we postpone their
4118           // fetch to (possibly) reduce amount of data loads from disk.
4119           if (this.joinedHeap != null) {
4120             Cell nextJoinedKv = joinedHeap.peek();
4121             // If joinedHeap is pointing to some other row, try to seek to a correct one.
4122             boolean mayHaveData = (nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv,
4123                 currentRow, offset, length))
4124                 || (this.joinedHeap.requestSeek(
4125                     KeyValueUtil.createFirstOnRow(currentRow, offset, length), true, true)
4126                     && joinedHeap.peek() != null && CellUtil.matchingRow(joinedHeap.peek(),
4127                     currentRow, offset, length));
4128             if (mayHaveData) {
4129               joinedContinuationRow = current;
4130               populateFromJoinedHeap(results, limit);
4131             }
4132           }
4133         } else {
4134           // Populating from the joined heap was stopped by limits, populate some more.
4135           populateFromJoinedHeap(results, limit);
4136         }
4137 
4138         // We may have just called populateFromJoinedMap and hit the limits. If that is
4139         // the case, we need to call it again on the next next() invocation.
4140         if (joinedContinuationRow != null) {
4141           return true;
4142         }
4143 
4144         // Finally, we are done with both joinedHeap and storeHeap.
4145         // Double check to prevent empty rows from appearing in result. It could be
4146         // the case when SingleColumnValueExcludeFilter is used.
4147         if (results.isEmpty()) {
4148           boolean moreRows = nextRow(currentRow, offset, length);
4149           if (!moreRows) return false;
4150           if (!stopRow) continue;
4151         }
4152 
4153         // We are done. Return the result.
4154         return !stopRow;
4155       }
4156     }
4157 
4158     /**
4159      * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines
4160      * both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may
4161      * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns
4162      * true when filterRow(List<KeyValue> kvs) is overridden not the filterRow(). Therefore, the
4163      * filterRow() will be skipped.
4164      */
4165     private boolean filterRow() throws IOException {
4166       // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
4167       // filterRowCells(List<Cell> kvs) so we skip that scenario here.
4168       return filter != null && (!filter.hasFilterRow())
4169           && filter.filterRow();
4170     }
4171 
4172     private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
4173       return filter != null
4174           && filter.filterRowKey(row, offset, length);
4175     }
4176 
4177     protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
4178       assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
4179       Cell next;
4180       while ((next = this.storeHeap.peek()) != null &&
4181              CellUtil.matchingRow(next, currentRow, offset, length)) {
4182         this.storeHeap.next(MOCKED_LIST);
4183       }
4184       resetFilters();
4185       // Calling the hook in CP which allows it to do a fast forward
4186       return this.region.getCoprocessorHost() == null
4187           || this.region.getCoprocessorHost()
4188               .postScannerFilterRow(this, currentRow, offset, length);
4189     }
4190 
4191     protected boolean isStopRow(byte[] currentRow, int offset, short length) {
4192       return currentRow == null ||
4193           (stopRow != null &&
4194           comparator.compareRows(stopRow, 0, stopRow.length,
4195             currentRow, offset, length) <= isScan);
4196     }
4197 
4198     @Override
4199     public synchronized void close() {
4200       if (storeHeap != null) {
4201         storeHeap.close();
4202         storeHeap = null;
4203       }
4204       if (joinedHeap != null) {
4205         joinedHeap.close();
4206         joinedHeap = null;
4207       }
4208       // no need to synchronize here.
4209       scannerReadPoints.remove(this);
4210       this.filterClosed = true;
4211     }
4212 
4213     KeyValueHeap getStoreHeapForTesting() {
4214       return storeHeap;
4215     }
4216 
4217     @Override
4218     public synchronized boolean reseek(byte[] row) throws IOException {
4219       if (row == null) {
4220         throw new IllegalArgumentException("Row cannot be null.");
4221       }
4222       boolean result = false;
4223       startRegionOperation();
4224       try {
4225         KeyValue kv = KeyValueUtil.createFirstOnRow(row);
4226         // use request seek to make use of the lazy seek option. See HBASE-5520
4227         result = this.storeHeap.requestSeek(kv, true, true);
4228         if (this.joinedHeap != null) {
4229           result = this.joinedHeap.requestSeek(kv, true, true) || result;
4230         }
4231       } finally {
4232         closeRegionOperation();
4233       }
4234       return result;
4235     }
4236   }
4237 
4238   // Utility methods
4239   /**
4240    * A utility method to create new instances of HRegion based on the
4241    * {@link HConstants#REGION_IMPL} configuration property.
4242    * @param tableDir qualified path of directory where region should be located,
4243    * usually the table directory.
4244    * @param log The HLog is the outbound log for any updates to the HRegion
4245    * (There's a single HLog for all the HRegions on a single HRegionServer.)
4246    * The log file is a logfile from the previous execution that's
4247    * custom-computed for this HRegion. The HRegionServer computes and sorts the
4248    * appropriate log info for this HRegion. If there is a previous log file
4249    * (implying that the HRegion has been written-to before), then read it from
4250    * the supplied path.
4251    * @param fs is the filesystem.
4252    * @param conf is global configuration settings.
4253    * @param regionInfo - HRegionInfo that describes the region
4254    * is new), then read them from the supplied path.
4255    * @param htd the table descriptor
4256    * @return the new instance
4257    */
4258   static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
4259       Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
4260       RegionServerServices rsServices) {
4261     try {
4262       @SuppressWarnings("unchecked")
4263       Class<? extends HRegion> regionClass =
4264           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
4265 
4266       Constructor<? extends HRegion> c =
4267           regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
4268               Configuration.class, HRegionInfo.class, HTableDescriptor.class,
4269               RegionServerServices.class);
4270 
4271       return c.newInstance(tableDir, log, fs, conf, regionInfo, htd, rsServices);
4272     } catch (Throwable e) {
4273       // todo: what should I throw here?
4274       throw new IllegalStateException("Could not instantiate a region instance.", e);
4275     }
4276   }
4277 
4278   /**
4279    * Convenience method creating new HRegions. Used by createTable and by the
4280    * bootstrap code in the HMaster constructor.
4281    * Note, this method creates an {@link HLog} for the created region. It
4282    * needs to be closed explicitly.  Use {@link HRegion#getLog()} to get
4283    * access.  <b>When done with a region created using this method, you will
4284    * need to explicitly close the {@link HLog} it created too; it will not be
4285    * done for you.  Not closing the log will leave at least a daemon thread
4286    * running.</b>  Call {@link #closeHRegion(HRegion)} and it will do
4287    * necessary cleanup for you.
4288    * @param info Info for region to create.
4289    * @param rootDir Root directory for HBase instance
4290    * @return new HRegion
4291    *
4292    * @throws IOException
4293    */
4294   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4295       final Configuration conf, final HTableDescriptor hTableDescriptor)
4296   throws IOException {
4297     return createHRegion(info, rootDir, conf, hTableDescriptor, null);
4298   }
4299 
4300   /**
4301    * This will do the necessary cleanup a call to
4302    * {@link #createHRegion(HRegionInfo, Path, Configuration, HTableDescriptor)}
4303    * requires.  This method will close the region and then close its
4304    * associated {@link HLog} file.  You use it if you call the other createHRegion,
4305    * the one that takes an {@link HLog} instance but don't be surprised by the
4306    * call to the {@link HLog#closeAndDelete()} on the {@link HLog} the
4307    * HRegion was carrying.
4308    * @throws IOException
4309    */
4310   public static void closeHRegion(final HRegion r) throws IOException {
4311     if (r == null) return;
4312     r.close();
4313     if (r.getLog() == null) return;
4314     r.getLog().closeAndDelete();
4315   }
4316 
4317   /**
4318    * Convenience method creating new HRegions. Used by createTable.
4319    * The {@link HLog} for the created region needs to be closed explicitly.
4320    * Use {@link HRegion#getLog()} to get access.
4321    *
4322    * @param info Info for region to create.
4323    * @param rootDir Root directory for HBase instance
4324    * @param hlog shared HLog
4325    * @param initialize - true to initialize the region
4326    * @return new HRegion
4327    *
4328    * @throws IOException
4329    */
4330   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4331                                       final Configuration conf,
4332                                       final HTableDescriptor hTableDescriptor,
4333                                       final HLog hlog,
4334                                       final boolean initialize)
4335       throws IOException {
4336     return createHRegion(info, rootDir, conf, hTableDescriptor,
4337         hlog, initialize, false);
4338   }
4339 
4340   /**
4341    * Convenience method creating new HRegions. Used by createTable.
4342    * The {@link HLog} for the created region needs to be closed
4343    * explicitly, if it is not null.
4344    * Use {@link HRegion#getLog()} to get access.
4345    *
4346    * @param info Info for region to create.
4347    * @param rootDir Root directory for HBase instance
4348    * @param hlog shared HLog
4349    * @param initialize - true to initialize the region
4350    * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
4351    * @return new HRegion
4352    * @throws IOException
4353    */
4354   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4355                                       final Configuration conf,
4356                                       final HTableDescriptor hTableDescriptor,
4357                                       final HLog hlog,
4358                                       final boolean initialize, final boolean ignoreHLog)
4359       throws IOException {
4360       Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4361       return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
4362   }
4363 
4364   /**
4365    * Convenience method creating new HRegions. Used by createTable.
4366    * The {@link HLog} for the created region needs to be closed
4367    * explicitly, if it is not null.
4368    * Use {@link HRegion#getLog()} to get access.
4369    *
4370    * @param info Info for region to create.
4371    * @param rootDir Root directory for HBase instance
4372    * @param tableDir table directory
4373    * @param hlog shared HLog
4374    * @param initialize - true to initialize the region
4375    * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
4376    * @return new HRegion
4377    * @throws IOException
4378    */
4379   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
4380                                       final Configuration conf,
4381                                       final HTableDescriptor hTableDescriptor,
4382                                       final HLog hlog,
4383                                       final boolean initialize, final boolean ignoreHLog)
4384       throws IOException {
4385     LOG.info("creating HRegion " + info.getTable().getNameAsString()
4386         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
4387         " Table name == " + info.getTable().getNameAsString());
4388     FileSystem fs = FileSystem.get(conf);
4389     HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
4390     HLog effectiveHLog = hlog;
4391     if (hlog == null && !ignoreHLog) {
4392       effectiveHLog = HLogFactory.createHLog(fs, rfs.getRegionDir(),
4393                                              HConstants.HREGION_LOGDIR_NAME, conf);
4394     }
4395     HRegion region = HRegion.newHRegion(tableDir,
4396         effectiveHLog, fs, conf, info, hTableDescriptor, null);
4397     if (initialize) {
4398       // If initializing, set the sequenceId. It is also required by HLogPerformanceEvaluation when
4399       // verifying the WALEdits.
4400       region.setSequenceId(region.initialize(null));
4401     }
4402     return region;
4403   }
4404 
4405   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4406                                       final Configuration conf,
4407                                       final HTableDescriptor hTableDescriptor,
4408                                       final HLog hlog)
4409     throws IOException {
4410     return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
4411   }
4412 
4413 
4414   /**
4415    * Open a Region.
4416    * @param info Info for region to be opened.
4417    * @param wal HLog for region to use. This method will call
4418    * HLog#setSequenceNumber(long) passing the result of the call to
4419    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4420    * up.  HRegionStore does this every time it opens a new region.
4421    * @return new HRegion
4422    *
4423    * @throws IOException
4424    */
4425   public static HRegion openHRegion(final HRegionInfo info,
4426       final HTableDescriptor htd, final HLog wal,
4427       final Configuration conf)
4428   throws IOException {
4429     return openHRegion(info, htd, wal, conf, null, null);
4430   }
4431 
4432   /**
4433    * Open a Region.
4434    * @param info Info for region to be opened
4435    * @param htd the table descriptor
4436    * @param wal HLog for region to use. This method will call
4437    * HLog#setSequenceNumber(long) passing the result of the call to
4438    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4439    * up.  HRegionStore does this every time it opens a new region.
4440    * @param conf The Configuration object to use.
4441    * @param rsServices An interface we can request flushes against.
4442    * @param reporter An interface we can report progress against.
4443    * @return new HRegion
4444    *
4445    * @throws IOException
4446    */
4447   public static HRegion openHRegion(final HRegionInfo info,
4448     final HTableDescriptor htd, final HLog wal, final Configuration conf,
4449     final RegionServerServices rsServices,
4450     final CancelableProgressable reporter)
4451   throws IOException {
4452     return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
4453   }
4454 
4455   /**
4456    * Open a Region.
4457    * @param rootDir Root directory for HBase instance
4458    * @param info Info for region to be opened.
4459    * @param htd the table descriptor
4460    * @param wal HLog for region to use. This method will call
4461    * HLog#setSequenceNumber(long) passing the result of the call to
4462    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4463    * up.  HRegionStore does this every time it opens a new region.
4464    * @param conf The Configuration object to use.
4465    * @return new HRegion
4466    * @throws IOException
4467    */
4468   public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
4469       final HTableDescriptor htd, final HLog wal, final Configuration conf)
4470   throws IOException {
4471     return openHRegion(rootDir, info, htd, wal, conf, null, null);
4472   }
4473 
4474   /**
4475    * Open a Region.
4476    * @param rootDir Root directory for HBase instance
4477    * @param info Info for region to be opened.
4478    * @param htd the table descriptor
4479    * @param wal HLog for region to use. This method will call
4480    * HLog#setSequenceNumber(long) passing the result of the call to
4481    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4482    * up.  HRegionStore does this every time it opens a new region.
4483    * @param conf The Configuration object to use.
4484    * @param rsServices An interface we can request flushes against.
4485    * @param reporter An interface we can report progress against.
4486    * @return new HRegion
4487    * @throws IOException
4488    */
4489   public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
4490       final HTableDescriptor htd, final HLog wal, final Configuration conf,
4491       final RegionServerServices rsServices,
4492       final CancelableProgressable reporter)
4493   throws IOException {
4494     FileSystem fs = null;
4495     if (rsServices != null) {
4496       fs = rsServices.getFileSystem();
4497     }
4498     if (fs == null) {
4499       fs = FileSystem.get(conf);
4500     }
4501     return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
4502   }
4503 
4504   /**
4505    * Open a Region.
4506    * @param conf The Configuration object to use.
4507    * @param fs Filesystem to use
4508    * @param rootDir Root directory for HBase instance
4509    * @param info Info for region to be opened.
4510    * @param htd the table descriptor
4511    * @param wal HLog for region to use. This method will call
4512    * HLog#setSequenceNumber(long) passing the result of the call to
4513    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4514    * up.  HRegionStore does this every time it opens a new region.
4515    * @return new HRegion
4516    * @throws IOException
4517    */
4518   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4519       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal)
4520       throws IOException {
4521     return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
4522   }
4523 
4524   /**
4525    * Open a Region.
4526    * @param conf The Configuration object to use.
4527    * @param fs Filesystem to use
4528    * @param rootDir Root directory for HBase instance
4529    * @param info Info for region to be opened.
4530    * @param htd the table descriptor
4531    * @param wal HLog for region to use. This method will call
4532    * HLog#setSequenceNumber(long) passing the result of the call to
4533    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4534    * up.  HRegionStore does this every time it opens a new region.
4535    * @param rsServices An interface we can request flushes against.
4536    * @param reporter An interface we can report progress against.
4537    * @return new HRegion
4538    * @throws IOException
4539    */
4540   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4541       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4542       final RegionServerServices rsServices, final CancelableProgressable reporter)
4543       throws IOException {
4544     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4545     return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
4546   }
4547 
4548   /**
4549    * Open a Region.
4550    * @param conf The Configuration object to use.
4551    * @param fs Filesystem to use
4552    * @param rootDir Root directory for HBase instance
4553    * @param info Info for region to be opened.
4554    * @param htd the table descriptor
4555    * @param wal HLog for region to use. This method will call
4556    * HLog#setSequenceNumber(long) passing the result of the call to
4557    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4558    * up.  HRegionStore does this every time it opens a new region.
4559    * @param rsServices An interface we can request flushes against.
4560    * @param reporter An interface we can report progress against.
4561    * @return new HRegion
4562    * @throws IOException
4563    */
4564   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4565       final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4566       final RegionServerServices rsServices, final CancelableProgressable reporter)
4567       throws IOException {
4568     if (info == null) throw new NullPointerException("Passed region info is null");
4569     if (LOG.isDebugEnabled()) {
4570       LOG.debug("Opening region: " + info);
4571     }
4572     HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
4573     return r.openHRegion(reporter);
4574   }
4575 
4576 
4577   /**
4578    * Useful when reopening a closed region (normally for unit tests)
4579    * @param other original object
4580    * @param reporter An interface we can report progress against.
4581    * @return new HRegion
4582    * @throws IOException
4583    */
4584   public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
4585       throws IOException {
4586     HRegionFileSystem regionFs = other.getRegionFileSystem();
4587     HRegion r = newHRegion(regionFs.getTableDir(), other.getLog(), regionFs.getFileSystem(),
4588         other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
4589     return r.openHRegion(reporter);
4590   }
4591 
4592   /**
4593    * Open HRegion.
4594    * Calls initialize and sets sequenceId.
4595    * @return Returns <code>this</code>
4596    * @throws IOException
4597    */
4598   protected HRegion openHRegion(final CancelableProgressable reporter)
4599   throws IOException {
4600     checkCompressionCodecs();
4601 
4602     this.openSeqNum = initialize(reporter);
4603     this.setSequenceId(openSeqNum);
4604     return this;
4605   }
4606 
4607   private void checkCompressionCodecs() throws IOException {
4608     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4609       CompressionTest.testCompression(fam.getCompression());
4610       CompressionTest.testCompression(fam.getCompactionCompression());
4611     }
4612   }
4613 
4614   /**
4615    * Create a daughter region from given a temp directory with the region data.
4616    * @param hri Spec. for daughter region to open.
4617    * @throws IOException
4618    */
4619   HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
4620     // Move the files from the temporary .splits to the final /table/region directory
4621     fs.commitDaughterRegion(hri);
4622 
4623     // Create the daughter HRegion instance
4624     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
4625         this.getBaseConf(), hri, this.getTableDesc(), rsServices);
4626     r.readRequestsCount.set(this.getReadRequestsCount() / 2);
4627     r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
4628     return r;
4629   }
4630 
4631   /**
4632    * Create a merged region given a temp directory with the region data.
4633    * @param region_b another merging region
4634    * @return merged HRegion
4635    * @throws IOException
4636    */
4637   HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
4638       final HRegion region_b) throws IOException {
4639     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(),
4640         fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
4641         this.getTableDesc(), this.rsServices);
4642     r.readRequestsCount.set(this.getReadRequestsCount()
4643         + region_b.getReadRequestsCount());
4644     r.writeRequestsCount.set(this.getWriteRequestsCount()
4645         + region_b.getWriteRequestsCount());
4646     this.fs.commitMergedRegion(mergedRegionInfo);
4647     return r;
4648   }
4649 
4650   /**
4651    * Inserts a new region's meta information into the passed
4652    * <code>meta</code> region. Used by the HMaster bootstrap code adding
4653    * new table to hbase:meta table.
4654    *
4655    * @param meta hbase:meta HRegion to be updated
4656    * @param r HRegion to add to <code>meta</code>
4657    *
4658    * @throws IOException
4659    */
4660   // TODO remove since only test and merge use this
4661   public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
4662     meta.checkResources();
4663     // The row key is the region name
4664     byte[] row = r.getRegionName();
4665     final long now = EnvironmentEdgeManager.currentTimeMillis();
4666     final List<Cell> cells = new ArrayList<Cell>(2);
4667     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4668       HConstants.REGIONINFO_QUALIFIER, now,
4669       r.getRegionInfo().toByteArray()));
4670     // Set into the root table the version of the meta table.
4671     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4672       HConstants.META_VERSION_QUALIFIER, now,
4673       Bytes.toBytes(HConstants.META_VERSION)));
4674     meta.put(row, HConstants.CATALOG_FAMILY, cells);
4675   }
4676 
4677   /**
4678    * Computes the Path of the HRegion
4679    *
4680    * @param tabledir qualified path for table
4681    * @param name ENCODED region name
4682    * @return Path of HRegion directory
4683    */
4684   @Deprecated
4685   public static Path getRegionDir(final Path tabledir, final String name) {
4686     return new Path(tabledir, name);
4687   }
4688 
4689   /**
4690    * Computes the Path of the HRegion
4691    *
4692    * @param rootdir qualified path of HBase root directory
4693    * @param info HRegionInfo for the region
4694    * @return qualified path of region directory
4695    */
4696   @Deprecated
4697   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
4698     return new Path(
4699       FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
4700   }
4701 
4702   /**
4703    * Determines if the specified row is within the row range specified by the
4704    * specified HRegionInfo
4705    *
4706    * @param info HRegionInfo that specifies the row range
4707    * @param row row to be checked
4708    * @return true if the row is within the range specified by the HRegionInfo
4709    */
4710   public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
4711     return ((info.getStartKey().length == 0) ||
4712         (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
4713         ((info.getEndKey().length == 0) ||
4714             (Bytes.compareTo(info.getEndKey(), row) > 0));
4715   }
4716 
4717   /**
4718    * Merge two HRegions.  The regions must be adjacent and must not overlap.
4719    *
4720    * @return new merged HRegion
4721    * @throws IOException
4722    */
4723   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
4724   throws IOException {
4725     HRegion a = srcA;
4726     HRegion b = srcB;
4727 
4728     // Make sure that srcA comes first; important for key-ordering during
4729     // write of the merged file.
4730     if (srcA.getStartKey() == null) {
4731       if (srcB.getStartKey() == null) {
4732         throw new IOException("Cannot merge two regions with null start key");
4733       }
4734       // A's start key is null but B's isn't. Assume A comes before B
4735     } else if ((srcB.getStartKey() == null) ||
4736       (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
4737       a = srcB;
4738       b = srcA;
4739     }
4740 
4741     if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
4742       throw new IOException("Cannot merge non-adjacent regions");
4743     }
4744     return merge(a, b);
4745   }
4746 
4747   /**
4748    * Merge two regions whether they are adjacent or not.
4749    *
4750    * @param a region a
4751    * @param b region b
4752    * @return new merged region
4753    * @throws IOException
4754    */
4755   public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
4756     if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
4757       throw new IOException("Regions do not belong to the same table");
4758     }
4759 
4760     FileSystem fs = a.getRegionFileSystem().getFileSystem();
4761     // Make sure each region's cache is empty
4762     a.flushcache();
4763     b.flushcache();
4764 
4765     // Compact each region so we only have one store file per family
4766     a.compactStores(true);
4767     if (LOG.isDebugEnabled()) {
4768       LOG.debug("Files for region: " + a);
4769       a.getRegionFileSystem().logFileSystemState(LOG);
4770     }
4771     b.compactStores(true);
4772     if (LOG.isDebugEnabled()) {
4773       LOG.debug("Files for region: " + b);
4774       b.getRegionFileSystem().logFileSystemState(LOG);
4775     }
4776 
4777     RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
4778     if (!rmt.prepare(null)) {
4779       throw new IOException("Unable to merge regions " + a + " and " + b);
4780     }
4781     HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
4782     LOG.info("starting merge of regions: " + a + " and " + b
4783         + " into new region " + mergedRegionInfo.getRegionNameAsString()
4784         + " with start key <"
4785         + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
4786         + "> and end key <"
4787         + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
4788     HRegion dstRegion;
4789     try {
4790       dstRegion = rmt.execute(null, null);
4791     } catch (IOException ioe) {
4792       rmt.rollback(null, null);
4793       throw new IOException("Failed merging region " + a + " and " + b
4794           + ", and successfully rolled back");
4795     }
4796     dstRegion.compactStores(true);
4797 
4798     if (LOG.isDebugEnabled()) {
4799       LOG.debug("Files for new region");
4800       dstRegion.getRegionFileSystem().logFileSystemState(LOG);
4801     }
4802 
4803     if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
4804       throw new IOException("Merged region " + dstRegion
4805           + " still has references after the compaction, is compaction canceled?");
4806     }
4807 
4808     // Archiving the 'A' region
4809     HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
4810     // Archiving the 'B' region
4811     HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
4812 
4813     LOG.info("merge completed. New region is " + dstRegion);
4814     return dstRegion;
4815   }
4816 
4817   //
4818   // HBASE-880
4819   //
4820   /**
4821    * @param get get object
4822    * @return result
4823    * @throws IOException read exceptions
4824    */
4825   public Result get(final Get get) throws IOException {
4826     checkRow(get.getRow(), "Get");
4827     // Verify families are all valid
4828     if (get.hasFamilies()) {
4829       for (byte [] family: get.familySet()) {
4830         checkFamily(family);
4831       }
4832     } else { // Adding all families to scanner
4833       for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
4834         get.addFamily(family);
4835       }
4836     }
4837     List<Cell> results = get(get, true);
4838     boolean stale = this.getRegionInfo().getReplicaId() != 0;
4839     return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
4840   }
4841 
4842   /*
4843    * Do a get based on the get parameter.
4844    * @param withCoprocessor invoke coprocessor or not. We don't want to
4845    * always invoke cp for this private method.
4846    */
4847   public List<Cell> get(Get get, boolean withCoprocessor)
4848   throws IOException {
4849 
4850     List<Cell> results = new ArrayList<Cell>();
4851 
4852     // pre-get CP hook
4853     if (withCoprocessor && (coprocessorHost != null)) {
4854        if (coprocessorHost.preGet(get, results)) {
4855          return results;
4856        }
4857     }
4858 
4859     Scan scan = new Scan(get);
4860 
4861     RegionScanner scanner = null;
4862     try {
4863       scanner = getScanner(scan);
4864       scanner.next(results);
4865     } finally {
4866       if (scanner != null)
4867         scanner.close();
4868     }
4869 
4870     // post-get CP hook
4871     if (withCoprocessor && (coprocessorHost != null)) {
4872       coprocessorHost.postGet(get, results);
4873     }
4874 
4875     // do after lock
4876     if (this.metricsRegion != null) {
4877       long totalSize = 0l;
4878       for (Cell kv : results) {
4879         totalSize += KeyValueUtil.ensureKeyValue(kv).getLength();
4880       }
4881       this.metricsRegion.updateGet(totalSize);
4882     }
4883 
4884     return results;
4885   }
4886 
4887   public void mutateRow(RowMutations rm) throws IOException {
4888     // Don't need nonces here - RowMutations only supports puts and deletes
4889     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
4890   }
4891 
4892   /**
4893    * Perform atomic mutations within the region w/o nonces.
4894    * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)}
4895    */
4896   public void mutateRowsWithLocks(Collection<Mutation> mutations,
4897       Collection<byte[]> rowsToLock) throws IOException {
4898     mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
4899   }
4900 
4901   /**
4902    * Perform atomic mutations within the region.
4903    * @param mutations The list of mutations to perform.
4904    * <code>mutations</code> can contain operations for multiple rows.
4905    * Caller has to ensure that all rows are contained in this region.
4906    * @param rowsToLock Rows to lock
4907    * @param nonceGroup Optional nonce group of the operation (client Id)
4908    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4909    * If multiple rows are locked care should be taken that
4910    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
4911    * @throws IOException
4912    */
4913   public void mutateRowsWithLocks(Collection<Mutation> mutations,
4914       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
4915     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
4916     processRowsWithLocks(proc, -1, nonceGroup, nonce);
4917   }
4918 
4919   /**
4920    * Performs atomic multiple reads and writes on a given row.
4921    *
4922    * @param processor The object defines the reads and writes to a row.
4923    * @param nonceGroup Optional nonce group of the operation (client Id)
4924    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4925    */
4926   public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
4927       throws IOException {
4928     processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
4929   }
4930 
4931   /**
4932    * Performs atomic multiple reads and writes on a given row.
4933    *
4934    * @param processor The object defines the reads and writes to a row.
4935    * @param timeout The timeout of the processor.process() execution
4936    *                Use a negative number to switch off the time bound
4937    * @param nonceGroup Optional nonce group of the operation (client Id)
4938    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4939    */
4940   public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
4941       long nonceGroup, long nonce) throws IOException {
4942 
4943     for (byte[] row : processor.getRowsToLock()) {
4944       checkRow(row, "processRowsWithLocks");
4945     }
4946     if (!processor.readOnly()) {
4947       checkReadOnly();
4948     }
4949     checkResources();
4950 
4951     startRegionOperation();
4952     WALEdit walEdit = new WALEdit();
4953 
4954     // 1. Run pre-process hook
4955     try {
4956       processor.preProcess(this, walEdit);
4957     } catch (IOException e) {
4958       closeRegionOperation();
4959       throw e;
4960     }
4961     // Short circuit the read only case
4962     if (processor.readOnly()) {
4963       try {
4964         long now = EnvironmentEdgeManager.currentTimeMillis();
4965         doProcessRowWithTimeout(
4966             processor, now, this, null, null, timeout);
4967         processor.postProcess(this, walEdit, true);
4968       } catch (IOException e) {
4969         throw e;
4970       } finally {
4971         closeRegionOperation();
4972       }
4973       return;
4974     }
4975 
4976     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
4977     boolean locked;
4978     boolean walSyncSuccessful = false;
4979     List<RowLock> acquiredRowLocks;
4980     long addedSize = 0;
4981     List<Mutation> mutations = new ArrayList<Mutation>();
4982     List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
4983     Collection<byte[]> rowsToLock = processor.getRowsToLock();
4984     long mvccNum = 0;
4985     HLogKey walKey = null;
4986     try {
4987       // 2. Acquire the row lock(s)
4988       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
4989       for (byte[] row : rowsToLock) {
4990         // Attempt to lock all involved rows, throw if any lock times out
4991         acquiredRowLocks.add(getRowLock(row));
4992       }
4993       // 3. Region lock
4994       lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
4995       locked = true;
4996       // Get a mvcc write number
4997       mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
4998 
4999       long now = EnvironmentEdgeManager.currentTimeMillis();
5000       try {
5001         // 4. Let the processor scan the rows, generate mutations and add
5002         //    waledits
5003         doProcessRowWithTimeout(
5004             processor, now, this, mutations, walEdit, timeout);
5005 
5006         if (!mutations.isEmpty()) {
5007           // 5. Start mvcc transaction
5008           writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5009           // 6. Call the preBatchMutate hook
5010           processor.preBatchMutate(this, walEdit);
5011           // 7. Apply to memstore
5012           for (Mutation m : mutations) {
5013             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5014               KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
5015               kv.setSequenceId(mvccNum);
5016               Store store = getStore(kv);
5017               if (store == null) {
5018                 checkFamily(CellUtil.cloneFamily(kv));
5019                 // unreachable
5020               }
5021               Pair<Long, Cell> ret = store.add(kv);
5022               addedSize += ret.getFirst();
5023               memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
5024             }
5025           }
5026 
5027           long txid = 0;
5028           // 8. Append no sync
5029           if (!walEdit.isEmpty()) {
5030             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
5031               this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
5032               processor.getClusterIds(), nonceGroup, nonce);
5033             txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
5034               walKey, walEdit, getSequenceId(), true, memstoreCells);
5035           }
5036           if(walKey == null){
5037             // since we use log sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
5038             // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
5039             walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
5040           }
5041 
5042           // 9. Release region lock
5043           if (locked) {
5044             this.updatesLock.readLock().unlock();
5045             locked = false;
5046           }
5047 
5048           // 10. Release row lock(s)
5049           releaseRowLocks(acquiredRowLocks);
5050 
5051           // 11. Sync edit log
5052           if (txid != 0) {
5053             syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
5054           }
5055           walSyncSuccessful = true;
5056           // 12. call postBatchMutate hook
5057           processor.postBatchMutate(this);
5058         }
5059       } finally {
5060         if (!mutations.isEmpty() && !walSyncSuccessful) {
5061           LOG.warn("Wal sync failed. Roll back " + mutations.size() +
5062               " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
5063               processor.getRowsToLock().iterator().next()) + "...");
5064           for (Mutation m : mutations) {
5065             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5066               KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
5067               getStore(kv).rollback(kv);
5068             }
5069           }
5070         }
5071         // 13. Roll mvcc forward
5072         if (writeEntry != null) {
5073           mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
5074         }
5075         if (locked) {
5076           this.updatesLock.readLock().unlock();
5077         }
5078         // release locks if some were acquired but another timed out
5079         releaseRowLocks(acquiredRowLocks);
5080       }
5081 
5082       // 14. Run post-process hook
5083       processor.postProcess(this, walEdit, walSyncSuccessful);
5084 
5085     } catch (IOException e) {
5086       throw e;
5087     } finally {
5088       closeRegionOperation();
5089       if (!mutations.isEmpty() &&
5090           isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
5091         requestFlush();
5092       }
5093     }
5094   }
5095 
5096   private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
5097                                        final long now,
5098                                        final HRegion region,
5099                                        final List<Mutation> mutations,
5100                                        final WALEdit walEdit,
5101                                        final long timeout) throws IOException {
5102     // Short circuit the no time bound case.
5103     if (timeout < 0) {
5104       try {
5105         processor.process(now, region, mutations, walEdit);
5106       } catch (IOException e) {
5107         LOG.warn("RowProcessor:" + processor.getClass().getName() +
5108             " throws Exception on row(s):" +
5109             Bytes.toStringBinary(
5110               processor.getRowsToLock().iterator().next()) + "...", e);
5111         throw e;
5112       }
5113       return;
5114     }
5115 
5116     // Case with time bound
5117     FutureTask<Void> task =
5118       new FutureTask<Void>(new Callable<Void>() {
5119         @Override
5120         public Void call() throws IOException {
5121           try {
5122             processor.process(now, region, mutations, walEdit);
5123             return null;
5124           } catch (IOException e) {
5125             LOG.warn("RowProcessor:" + processor.getClass().getName() +
5126                 " throws Exception on row(s):" +
5127                 Bytes.toStringBinary(
5128                     processor.getRowsToLock().iterator().next()) + "...", e);
5129             throw e;
5130           }
5131         }
5132       });
5133     rowProcessorExecutor.execute(task);
5134     try {
5135       task.get(timeout, TimeUnit.MILLISECONDS);
5136     } catch (TimeoutException te) {
5137       LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
5138           Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
5139           "...");
5140       throw new IOException(te);
5141     } catch (Exception e) {
5142       throw new IOException(e);
5143     }
5144   }
5145 
5146   public Result append(Append append) throws IOException {
5147     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
5148   }
5149 
5150   // TODO: There's a lot of boiler plate code identical
5151   // to increment... See how to better unify that.
5152   /**
5153    * Perform one or more append operations on a row.
5154    *
5155    * @return new keyvalues after increment
5156    * @throws IOException
5157    */
5158   public Result append(Append append, long nonceGroup, long nonce)
5159       throws IOException {
5160     byte[] row = append.getRow();
5161     checkRow(row, "append");
5162     boolean flush = false;
5163     Durability durability = getEffectiveDurability(append.getDurability());
5164     boolean writeToWAL = durability != Durability.SKIP_WAL;
5165     WALEdit walEdits = null;
5166     List<Cell> allKVs = new ArrayList<Cell>(append.size());
5167     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5168 
5169     long size = 0;
5170     long txid = 0;
5171 
5172     checkReadOnly();
5173     checkResources();
5174     // Lock row
5175     startRegionOperation(Operation.APPEND);
5176     this.writeRequestsCount.increment();
5177     long mvccNum = 0;
5178     WriteEntry w = null;
5179     HLogKey walKey = null;
5180     RowLock rowLock = null;
5181     List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
5182     boolean doRollBackMemstore = false;
5183     try {
5184       rowLock = getRowLock(row);
5185       try {
5186         lock(this.updatesLock.readLock());
5187         try {
5188           // wait for all prior MVCC transactions to finish - while we hold the row lock
5189           // (so that we are guaranteed to see the latest state)
5190           mvcc.waitForPreviousTransactionsComplete();
5191           if (this.coprocessorHost != null) {
5192             Result r = this.coprocessorHost.preAppendAfterRowLock(append);
5193             if(r!= null) {
5194               return r;
5195             }
5196           }
5197           // now start my own transaction
5198           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5199           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5200           long now = EnvironmentEdgeManager.currentTimeMillis();
5201           // Process each family
5202           for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
5203 
5204             Store store = stores.get(family.getKey());
5205             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5206 
5207             // Sort the cells so that they match the order that they
5208             // appear in the Get results. Otherwise, we won't be able to
5209             // find the existing values if the cells are not specified
5210             // in order by the client since cells are in an array list.
5211             Collections.sort(family.getValue(), store.getComparator());
5212             // Get previous values for all columns in this family
5213             Get get = new Get(row);
5214             for (Cell cell : family.getValue()) {
5215               get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
5216             }
5217             List<Cell> results = get(get, false);
5218 
5219             // Iterate the input columns and update existing values if they were
5220             // found, otherwise add new column initialized to the append value
5221 
5222             // Avoid as much copying as possible. Every byte is copied at most
5223             // once.
5224             // Would be nice if KeyValue had scatter/gather logic
5225             int idx = 0;
5226             for (Cell cell : family.getValue()) {
5227               KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5228               KeyValue newKV;
5229               KeyValue oldKv = null;
5230               if (idx < results.size()
5231                   && CellUtil.matchingQualifier(results.get(idx),kv)) {
5232                 oldKv = KeyValueUtil.ensureKeyValue(results.get(idx));
5233                 // allocate an empty kv once
5234                 newKV = new KeyValue(row.length, kv.getFamilyLength(),
5235                     kv.getQualifierLength(), now, KeyValue.Type.Put,
5236                     oldKv.getValueLength() + kv.getValueLength(),
5237                     oldKv.getTagsLength() + kv.getTagsLength());
5238                 // copy in the value
5239                 System.arraycopy(oldKv.getValueArray(), oldKv.getValueOffset(),
5240                     newKV.getValueArray(), newKV.getValueOffset(),
5241                     oldKv.getValueLength());
5242                 System.arraycopy(kv.getValueArray(), kv.getValueOffset(),
5243                     newKV.getValueArray(),
5244                     newKV.getValueOffset() + oldKv.getValueLength(),
5245                     kv.getValueLength());
5246                 // copy in the tags
5247                 System.arraycopy(oldKv.getTagsArray(), oldKv.getTagsOffset(), newKV.getTagsArray(),
5248                     newKV.getTagsOffset(), oldKv.getTagsLength());
5249                 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
5250                     newKV.getTagsOffset() + oldKv.getTagsLength(), kv.getTagsLength());
5251                 // copy in row, family, and qualifier
5252                 System.arraycopy(kv.getRowArray(), kv.getRowOffset(),
5253                     newKV.getRowArray(), newKV.getRowOffset(), kv.getRowLength());
5254                 System.arraycopy(kv.getFamilyArray(), kv.getFamilyOffset(),
5255                     newKV.getFamilyArray(), newKV.getFamilyOffset(),
5256                     kv.getFamilyLength());
5257                 System.arraycopy(kv.getQualifierArray(), kv.getQualifierOffset(),
5258                     newKV.getQualifierArray(), newKV.getQualifierOffset(),
5259                     kv.getQualifierLength());
5260                 idx++;
5261               } else {
5262                 newKV = kv;
5263                 // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
5264                 // so only need to update the timestamp to 'now'
5265                 newKV.updateLatestStamp(Bytes.toBytes(now));
5266              }
5267               newKV.setSequenceId(mvccNum);
5268               // Give coprocessors a chance to update the new cell
5269               if (coprocessorHost != null) {
5270                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5271                     RegionObserver.MutationType.APPEND, append, oldKv, newKV));
5272               }
5273               kvs.add(newKV);
5274 
5275               // Append update to WAL
5276               if (writeToWAL) {
5277                 if (walEdits == null) {
5278                   walEdits = new WALEdit();
5279                 }
5280                 walEdits.add(newKV);
5281               }
5282             }
5283 
5284             //store the kvs to the temporary memstore before writing HLog
5285             tempMemstore.put(store, kvs);
5286           }
5287 
5288           //Actually write to Memstore now
5289           for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5290             Store store = entry.getKey();
5291             if (store.getFamily().getMaxVersions() == 1) {
5292               // upsert if VERSIONS for this CF == 1
5293               size += store.upsert(entry.getValue(), getSmallestReadPoint());
5294               memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue()));
5295             } else {
5296               // otherwise keep older versions around
5297               for (Cell cell: entry.getValue()) {
5298                 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5299                 Pair<Long, Cell> ret = store.add(kv);
5300                 size += ret.getFirst();
5301                 memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
5302                 doRollBackMemstore = true;
5303               }
5304             }
5305             allKVs.addAll(entry.getValue());
5306           }
5307 
5308           // Actually write to WAL now
5309           if (writeToWAL) {
5310             // Using default cluster id, as this can only happen in the originating
5311             // cluster. A slave cluster receives the final value (not the delta)
5312             // as a Put.
5313             walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
5314               this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
5315             txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
5316               this.sequenceId, true, memstoreCells);
5317           } else {
5318             recordMutationWithoutWal(append.getFamilyCellMap());
5319           }
5320           if(walKey == null){
5321             // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
5322             walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
5323           }
5324 
5325           size = this.addAndGetGlobalMemstoreSize(size);
5326           flush = isFlushSize(size);
5327         } finally {
5328           this.updatesLock.readLock().unlock();
5329         }
5330       } finally {
5331         rowLock.release();
5332         rowLock = null;
5333       }
5334       // sync the transaction log outside the rowlock
5335       if(txid != 0){
5336         syncOrDefer(txid, durability);
5337       }
5338       doRollBackMemstore = false;
5339     } finally {
5340       if (rowLock != null) {
5341         rowLock.release();
5342       }
5343       // if the wal sync was unsuccessful, remove keys from memstore
5344       if (doRollBackMemstore) {
5345         rollbackMemstore(memstoreCells);
5346       }
5347       if (w != null) {
5348         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5349       }
5350       closeRegionOperation(Operation.APPEND);
5351     }
5352 
5353     if (this.metricsRegion != null) {
5354       this.metricsRegion.updateAppend();
5355     }
5356 
5357     if (flush) {
5358       // Request a cache flush. Do it outside update lock.
5359       requestFlush();
5360     }
5361 
5362 
5363     return append.isReturnResults() ? Result.create(allKVs) : null;
5364   }
5365 
5366   public Result increment(Increment increment) throws IOException {
5367     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
5368   }
5369 
5370   /**
5371    * Perform one or more increment operations on a row.
5372    * @return new keyvalues after increment
5373    * @throws IOException
5374    */
5375   public Result increment(Increment increment, long nonceGroup, long nonce)
5376   throws IOException {
5377     byte [] row = increment.getRow();
5378     checkRow(row, "increment");
5379     TimeRange tr = increment.getTimeRange();
5380     boolean flush = false;
5381     Durability durability = getEffectiveDurability(increment.getDurability());
5382     boolean writeToWAL = durability != Durability.SKIP_WAL;
5383     WALEdit walEdits = null;
5384     List<Cell> allKVs = new ArrayList<Cell>(increment.size());
5385     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5386 
5387     long size = 0;
5388     long txid = 0;
5389 
5390     checkReadOnly();
5391     checkResources();
5392     // Lock row
5393     startRegionOperation(Operation.INCREMENT);
5394     this.writeRequestsCount.increment();
5395     RowLock rowLock = null;
5396     WriteEntry w = null;
5397     HLogKey walKey = null;
5398     long mvccNum = 0;
5399     List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
5400     boolean doRollBackMemstore = false;
5401     try {
5402       rowLock = getRowLock(row);
5403       try {
5404         lock(this.updatesLock.readLock());
5405         try {
5406           // wait for all prior MVCC transactions to finish - while we hold the row lock
5407           // (so that we are guaranteed to see the latest state)
5408           mvcc.waitForPreviousTransactionsComplete();
5409           if (this.coprocessorHost != null) {
5410             Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
5411             if (r != null) {
5412               return r;
5413             }
5414           }
5415           // now start my own transaction
5416           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5417           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5418           long now = EnvironmentEdgeManager.currentTimeMillis();
5419           // Process each family
5420           for (Map.Entry<byte [], List<Cell>> family:
5421               increment.getFamilyCellMap().entrySet()) {
5422 
5423             Store store = stores.get(family.getKey());
5424             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5425 
5426             // Sort the cells so that they match the order that they
5427             // appear in the Get results. Otherwise, we won't be able to
5428             // find the existing values if the cells are not specified
5429             // in order by the client since cells are in an array list.
5430             Collections.sort(family.getValue(), store.getComparator());
5431             // Get previous values for all columns in this family
5432             Get get = new Get(row);
5433             for (Cell cell: family.getValue()) {
5434               get.addColumn(family.getKey(),  CellUtil.cloneQualifier(cell));
5435             }
5436             get.setTimeRange(tr.getMin(), tr.getMax());
5437             List<Cell> results = get(get, false);
5438 
5439             // Iterate the input columns and update existing values if they were
5440             // found, otherwise add new column initialized to the increment amount
5441             int idx = 0;
5442             for (Cell kv: family.getValue()) {
5443               long amount = Bytes.toLong(CellUtil.cloneValue(kv));
5444               boolean noWriteBack = (amount == 0);
5445 
5446               Cell c = null;
5447               if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
5448                 c = results.get(idx);
5449                 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
5450                   amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
5451                 } else {
5452                   // throw DoNotRetryIOException instead of IllegalArgumentException
5453                   throw new org.apache.hadoop.hbase.DoNotRetryIOException(
5454                       "Attempted to increment field that isn't 64 bits wide");
5455                 }
5456                 idx++;
5457               }
5458 
5459               // Append new incremented KeyValue to list
5460               byte[] q = CellUtil.cloneQualifier(kv);
5461               byte[] val = Bytes.toBytes(amount);
5462               int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
5463               int incCellTagsLen = kv.getTagsLength();
5464               KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, now,
5465                   KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
5466               System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length);
5467               System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(),
5468                   family.getKey().length);
5469               System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length);
5470               // copy in the value
5471               System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length);
5472               // copy tags
5473               if (oldCellTagsLen > 0) {
5474                 System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(),
5475                     newKV.getTagsOffset(), oldCellTagsLen);
5476               }
5477               if (incCellTagsLen > 0) {
5478                 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
5479                     newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
5480               }
5481               newKV.setSequenceId(mvccNum);
5482               // Give coprocessors a chance to update the new cell
5483               if (coprocessorHost != null) {
5484                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5485                     RegionObserver.MutationType.INCREMENT, increment, c, newKV));
5486               }
5487               allKVs.add(newKV);
5488 
5489               if (!noWriteBack) {
5490                 kvs.add(newKV);
5491 
5492                 // Prepare WAL updates
5493                 if (writeToWAL) {
5494                   if (walEdits == null) {
5495                     walEdits = new WALEdit();
5496                   }
5497                   walEdits.add(newKV);
5498                 }
5499               }
5500             }
5501 
5502             //store the kvs to the temporary memstore before writing HLog
5503             if (!kvs.isEmpty()) {
5504               tempMemstore.put(store, kvs);
5505             }
5506           }
5507 
5508           //Actually write to Memstore now
5509           if (!tempMemstore.isEmpty()) {
5510             for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5511               Store store = entry.getKey();
5512               if (store.getFamily().getMaxVersions() == 1) {
5513                 // upsert if VERSIONS for this CF == 1
5514                 size += store.upsert(entry.getValue(), getSmallestReadPoint());
5515                 memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue()));
5516               } else {
5517                 // otherwise keep older versions around
5518                 for (Cell cell : entry.getValue()) {
5519                   KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5520                   Pair<Long, Cell> ret = store.add(kv);
5521                   size += ret.getFirst();
5522                   memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
5523                   doRollBackMemstore = true;
5524                 }
5525               }
5526             }
5527             size = this.addAndGetGlobalMemstoreSize(size);
5528             flush = isFlushSize(size);
5529           }
5530 
5531           // Actually write to WAL now
5532           if (walEdits != null && !walEdits.isEmpty()) {
5533             if (writeToWAL) {
5534               // Using default cluster id, as this can only happen in the originating
5535               // cluster. A slave cluster receives the final value (not the delta)
5536               // as a Put.
5537               walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
5538                 this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
5539               txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
5540                 walKey, walEdits, getSequenceId(), true, memstoreCells);
5541             } else {
5542               recordMutationWithoutWal(increment.getFamilyCellMap());
5543             }
5544           }
5545           if(walKey == null){
5546             // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
5547             walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
5548           }
5549         } finally {
5550           this.updatesLock.readLock().unlock();
5551         }
5552       } finally {
5553         rowLock.release();
5554         rowLock = null;
5555       }
5556       // sync the transaction log outside the rowlock
5557       if(txid != 0){
5558         syncOrDefer(txid, durability);
5559       }
5560       doRollBackMemstore = false;
5561     } finally {
5562       if (rowLock != null) {
5563         rowLock.release();
5564       }
5565       // if the wal sync was unsuccessful, remove keys from memstore
5566       if (doRollBackMemstore) {
5567         rollbackMemstore(memstoreCells);
5568       }
5569       if (w != null) {
5570         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5571       }
5572       closeRegionOperation(Operation.INCREMENT);
5573       if (this.metricsRegion != null) {
5574         this.metricsRegion.updateIncrement();
5575       }
5576     }
5577 
5578     if (flush) {
5579       // Request a cache flush.  Do it outside update lock.
5580       requestFlush();
5581     }
5582 
5583     return Result.create(allKVs);
5584   }
5585 
5586   //
5587   // New HBASE-880 Helpers
5588   //
5589 
5590   private void checkFamily(final byte [] family)
5591   throws NoSuchColumnFamilyException {
5592     if (!this.htableDescriptor.hasFamily(family)) {
5593       throw new NoSuchColumnFamilyException("Column family " +
5594           Bytes.toString(family) + " does not exist in region " + this
5595           + " in table " + this.htableDescriptor);
5596     }
5597   }
5598 
5599   public static final long FIXED_OVERHEAD = ClassSize.align(
5600       ClassSize.OBJECT +
5601       ClassSize.ARRAY +
5602       40 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
5603       (12 * Bytes.SIZEOF_LONG) +
5604       4 * Bytes.SIZEOF_BOOLEAN);
5605 
5606   // woefully out of date - currently missing:
5607   // 1 x HashMap - coprocessorServiceHandlers
5608   // 6 x Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
5609   //   checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,
5610   //   writeRequestsCount
5611   // 1 x HRegion$WriteState - writestate
5612   // 1 x RegionCoprocessorHost - coprocessorHost
5613   // 1 x RegionSplitPolicy - splitPolicy
5614   // 1 x MetricsRegion - metricsRegion
5615   // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper
5616   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
5617       ClassSize.OBJECT + // closeLock
5618       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
5619       (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
5620       (2 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, scannerReadPoints
5621       WriteState.HEAP_SIZE + // writestate
5622       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
5623       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
5624       MultiVersionConsistencyControl.FIXED_SIZE // mvcc
5625       + ClassSize.TREEMAP // maxSeqIdInStores
5626       + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
5627       ;
5628 
5629   @Override
5630   public long heapSize() {
5631     long heapSize = DEEP_OVERHEAD;
5632     for (Store store : this.stores.values()) {
5633       heapSize += store.heapSize();
5634     }
5635     // this does not take into account row locks, recent flushes, mvcc entries, and more
5636     return heapSize;
5637   }
5638 
5639   /*
5640    * This method calls System.exit.
5641    * @param message Message to print out.  May be null.
5642    */
5643   private static void printUsageAndExit(final String message) {
5644     if (message != null && message.length() > 0) System.out.println(message);
5645     System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
5646     System.out.println("Options:");
5647     System.out.println(" major_compact  Pass this option to major compact " +
5648       "passed region.");
5649     System.out.println("Default outputs scan of passed region.");
5650     System.exit(1);
5651   }
5652 
5653   /**
5654    * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
5655    * be available for handling
5656    * {@link HRegion#execService(com.google.protobuf.RpcController,
5657    *    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
5658    *
5659    * <p>
5660    * Only a single instance may be registered per region for a given {@link Service} subclass (the
5661    * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
5662    * After the first registration, subsequent calls with the same service name will fail with
5663    * a return value of {@code false}.
5664    * </p>
5665    * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
5666    * @return {@code true} if the registration was successful, {@code false}
5667    * otherwise
5668    */
5669   public boolean registerService(Service instance) {
5670     /*
5671      * No stacking of instances is allowed for a single service name
5672      */
5673     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
5674     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
5675       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
5676           " already registered, rejecting request from "+instance
5677       );
5678       return false;
5679     }
5680 
5681     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
5682     if (LOG.isDebugEnabled()) {
5683       LOG.debug("Registered coprocessor service: region="+
5684           Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
5685     }
5686     return true;
5687   }
5688 
5689   /**
5690    * Executes a single protocol buffer coprocessor endpoint {@link Service} method using
5691    * the registered protocol handlers.  {@link Service} implementations must be registered via the
5692    * {@link HRegion#registerService(com.google.protobuf.Service)}
5693    * method before they are available.
5694    *
5695    * @param controller an {@code RpcController} implementation to pass to the invoked service
5696    * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
5697    *     and parameters for the method invocation
5698    * @return a protocol buffer {@code Message} instance containing the method's result
5699    * @throws IOException if no registered service handler is found or an error
5700    *     occurs during the invocation
5701    * @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)
5702    */
5703   public Message execService(RpcController controller, CoprocessorServiceCall call)
5704       throws IOException {
5705     String serviceName = call.getServiceName();
5706     String methodName = call.getMethodName();
5707     if (!coprocessorServiceHandlers.containsKey(serviceName)) {
5708       throw new UnknownProtocolException(null,
5709           "No registered coprocessor service found for name "+serviceName+
5710           " in region "+Bytes.toStringBinary(getRegionName()));
5711     }
5712 
5713     Service service = coprocessorServiceHandlers.get(serviceName);
5714     Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
5715     Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
5716     if (methodDesc == null) {
5717       throw new UnknownProtocolException(service.getClass(),
5718           "Unknown method "+methodName+" called on service "+serviceName+
5719               " in region "+Bytes.toStringBinary(getRegionName()));
5720     }
5721 
5722     Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
5723         .mergeFrom(call.getRequest()).build();
5724 
5725     if (coprocessorHost != null) {
5726       request = coprocessorHost.preEndpointInvocation(service, methodName, request);
5727     }
5728 
5729     final Message.Builder responseBuilder =
5730         service.getResponsePrototype(methodDesc).newBuilderForType();
5731     service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
5732       @Override
5733       public void run(Message message) {
5734         if (message != null) {
5735           responseBuilder.mergeFrom(message);
5736         }
5737       }
5738     });
5739 
5740     if (coprocessorHost != null) {
5741       coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
5742     }
5743 
5744     return responseBuilder.build();
5745   }
5746 
5747   /*
5748    * Process table.
5749    * Do major compaction or list content.
5750    * @throws IOException
5751    */
5752   private static void processTable(final FileSystem fs, final Path p,
5753       final HLog log, final Configuration c,
5754       final boolean majorCompact)
5755   throws IOException {
5756     HRegion region;
5757     // Currently expects tables have one region only.
5758     if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
5759       region = HRegion.newHRegion(p, log, fs, c,
5760         HRegionInfo.FIRST_META_REGIONINFO, HTableDescriptor.META_TABLEDESC, null);
5761     } else {
5762       throw new IOException("Not a known catalog table: " + p.toString());
5763     }
5764     try {
5765       region.initialize(null);
5766       if (majorCompact) {
5767         region.compactStores(true);
5768       } else {
5769         // Default behavior
5770         Scan scan = new Scan();
5771         // scan.addFamily(HConstants.CATALOG_FAMILY);
5772         RegionScanner scanner = region.getScanner(scan);
5773         try {
5774           List<Cell> kvs = new ArrayList<Cell>();
5775           boolean done;
5776           do {
5777             kvs.clear();
5778             done = scanner.next(kvs);
5779             if (kvs.size() > 0) LOG.info(kvs);
5780           } while (done);
5781         } finally {
5782           scanner.close();
5783         }
5784       }
5785     } finally {
5786       region.close();
5787     }
5788   }
5789 
5790   boolean shouldForceSplit() {
5791     return this.splitRequest;
5792   }
5793 
5794   byte[] getExplicitSplitPoint() {
5795     return this.explicitSplitPoint;
5796   }
5797 
5798   void forceSplit(byte[] sp) {
5799     // This HRegion will go away after the forced split is successful
5800     // But if a forced split fails, we need to clear forced split.
5801     this.splitRequest = true;
5802     if (sp != null) {
5803       this.explicitSplitPoint = sp;
5804     }
5805   }
5806 
5807   void clearSplit() {
5808     this.splitRequest = false;
5809     this.explicitSplitPoint = null;
5810   }
5811 
5812   /**
5813    * Give the region a chance to prepare before it is split.
5814    */
5815   protected void prepareToSplit() {
5816     // nothing
5817   }
5818 
5819   /**
5820    * Return the splitpoint. null indicates the region isn't splittable
5821    * If the splitpoint isn't explicitly specified, it will go over the stores
5822    * to find the best splitpoint. Currently the criteria of best splitpoint
5823    * is based on the size of the store.
5824    */
5825   public byte[] checkSplit() {
5826     // Can't split META
5827     if (this.getRegionInfo().isMetaTable() ||
5828         TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
5829       if (shouldForceSplit()) {
5830         LOG.warn("Cannot split meta region in HBase 0.20 and above");
5831       }
5832       return null;
5833     }
5834 
5835     // Can't split region which is in recovering state
5836     if (this.isRecovering()) {
5837       LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
5838       return null;
5839     }
5840 
5841     if (!splitPolicy.shouldSplit()) {
5842       return null;
5843     }
5844 
5845     byte[] ret = splitPolicy.getSplitPoint();
5846 
5847     if (ret != null) {
5848       try {
5849         checkRow(ret, "calculated split");
5850       } catch (IOException e) {
5851         LOG.error("Ignoring invalid split", e);
5852         return null;
5853       }
5854     }
5855     return ret;
5856   }
5857 
5858   /**
5859    * @return The priority that this region should have in the compaction queue
5860    */
5861   public int getCompactPriority() {
5862     int count = Integer.MAX_VALUE;
5863     for (Store store : stores.values()) {
5864       count = Math.min(count, store.getCompactPriority());
5865     }
5866     return count;
5867   }
5868 
5869 
5870   /** @return the coprocessor host */
5871   public RegionCoprocessorHost getCoprocessorHost() {
5872     return coprocessorHost;
5873   }
5874 
5875   /** @param coprocessorHost the new coprocessor host */
5876   public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
5877     this.coprocessorHost = coprocessorHost;
5878   }
5879 
5880   /**
5881    * This method needs to be called before any public call that reads or
5882    * modifies data. It has to be called just before a try.
5883    * #closeRegionOperation needs to be called in the try's finally block
5884    * Acquires a read lock and checks if the region is closing or closed.
5885    * @throws IOException
5886    */
5887   public void startRegionOperation() throws IOException {
5888     startRegionOperation(Operation.ANY);
5889   }
5890 
5891   /**
5892    * @param op The operation is about to be taken on the region
5893    * @throws IOException
5894    */
5895   protected void startRegionOperation(Operation op) throws IOException {
5896     switch (op) {
5897     case GET:  // read operations
5898     case SCAN:
5899       checkReadsEnabled();
5900     case INCREMENT: // write operations
5901     case APPEND:
5902     case SPLIT_REGION:
5903     case MERGE_REGION:
5904     case PUT:
5905     case DELETE:
5906     case BATCH_MUTATE:
5907     case COMPACT_REGION:
5908       // when a region is in recovering state, no read, split or merge is allowed
5909       if (this.isRecovering() && (this.disallowWritesInRecovering ||
5910               (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
5911         throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering");
5912       }
5913       break;
5914     default:
5915       break;
5916     }
5917     if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
5918         || op == Operation.COMPACT_REGION) {
5919       // split, merge or compact region doesn't need to check the closing/closed state or lock the
5920       // region
5921       return;
5922     }
5923     if (this.closing.get()) {
5924       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5925     }
5926     lock(lock.readLock());
5927     if (this.closed.get()) {
5928       lock.readLock().unlock();
5929       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
5930     }
5931     try {
5932       if (coprocessorHost != null) {
5933         coprocessorHost.postStartRegionOperation(op);
5934       }
5935     } catch (Exception e) {
5936       lock.readLock().unlock();
5937       throw new IOException(e);
5938     }
5939   }
5940 
5941   /**
5942    * Closes the lock. This needs to be called in the finally block corresponding
5943    * to the try block of #startRegionOperation
5944    * @throws IOException
5945    */
5946   public void closeRegionOperation() throws IOException {
5947     closeRegionOperation(Operation.ANY);
5948   }
5949 
5950   /**
5951    * Closes the lock. This needs to be called in the finally block corresponding
5952    * to the try block of {@link #startRegionOperation(Operation)}
5953    * @throws IOException
5954    */
5955   public void closeRegionOperation(Operation operation) throws IOException {
5956     lock.readLock().unlock();
5957     if (coprocessorHost != null) {
5958       coprocessorHost.postCloseRegionOperation(operation);
5959     }
5960   }
5961 
5962   /**
5963    * This method needs to be called before any public call that reads or
5964    * modifies stores in bulk. It has to be called just before a try.
5965    * #closeBulkRegionOperation needs to be called in the try's finally block
5966    * Acquires a writelock and checks if the region is closing or closed.
5967    * @throws NotServingRegionException when the region is closing or closed
5968    * @throws RegionTooBusyException if failed to get the lock in time
5969    * @throws InterruptedIOException if interrupted while waiting for a lock
5970    */
5971   private void startBulkRegionOperation(boolean writeLockNeeded)
5972       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
5973     if (this.closing.get()) {
5974       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5975     }
5976     if (writeLockNeeded) lock(lock.writeLock());
5977     else lock(lock.readLock());
5978     if (this.closed.get()) {
5979       if (writeLockNeeded) lock.writeLock().unlock();
5980       else lock.readLock().unlock();
5981       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
5982     }
5983   }
5984 
5985   /**
5986    * Closes the lock. This needs to be called in the finally block corresponding
5987    * to the try block of #startRegionOperation
5988    */
5989   private void closeBulkRegionOperation(){
5990     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
5991     else lock.readLock().unlock();
5992   }
5993 
5994   /**
5995    * Update counters for numer of puts without wal and the size of possible data loss.
5996    * These information are exposed by the region server metrics.
5997    */
5998   private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
5999     numMutationsWithoutWAL.increment();
6000     if (numMutationsWithoutWAL.get() <= 1) {
6001       LOG.info("writing data to region " + this +
6002                " with WAL disabled. Data may be lost in the event of a crash.");
6003     }
6004 
6005     long mutationSize = 0;
6006     for (List<Cell> cells: familyMap.values()) {
6007       for (Cell cell : cells) {
6008         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
6009         mutationSize += kv.getKeyLength() + kv.getValueLength();
6010       }
6011     }
6012 
6013     dataInMemoryWithoutWAL.add(mutationSize);
6014   }
6015 
6016   private void lock(final Lock lock)
6017       throws RegionTooBusyException, InterruptedIOException {
6018     lock(lock, 1);
6019   }
6020 
6021   /**
6022    * Try to acquire a lock.  Throw RegionTooBusyException
6023    * if failed to get the lock in time. Throw InterruptedIOException
6024    * if interrupted while waiting for the lock.
6025    */
6026   private void lock(final Lock lock, final int multiplier)
6027       throws RegionTooBusyException, InterruptedIOException {
6028     try {
6029       final long waitTime = Math.min(maxBusyWaitDuration,
6030           busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
6031       if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
6032         throw new RegionTooBusyException(
6033             "failed to get a lock in " + waitTime + " ms. " +
6034                 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
6035                 this.getRegionInfo().getRegionNameAsString()) +
6036                 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
6037                 this.getRegionServerServices().getServerName()));
6038       }
6039     } catch (InterruptedException ie) {
6040       LOG.info("Interrupted while waiting for a lock");
6041       InterruptedIOException iie = new InterruptedIOException();
6042       iie.initCause(ie);
6043       throw iie;
6044     }
6045   }
6046 
6047   /**
6048    * Calls sync with the given transaction ID if the region's table is not
6049    * deferring it.
6050    * @param txid should sync up to which transaction
6051    * @throws IOException If anything goes wrong with DFS
6052    */
6053   private void syncOrDefer(long txid, Durability durability) throws IOException {
6054     if (this.getRegionInfo().isMetaRegion()) {
6055       this.log.sync(txid);
6056     } else {
6057       switch(durability) {
6058       case USE_DEFAULT:
6059         // do what table defaults to
6060         if (shouldSyncLog()) {
6061           this.log.sync(txid);
6062         }
6063         break;
6064       case SKIP_WAL:
6065         // nothing do to
6066         break;
6067       case ASYNC_WAL:
6068         // nothing do to
6069         break;
6070       case SYNC_WAL:
6071       case FSYNC_WAL:
6072         // sync the WAL edit (SYNC and FSYNC treated the same for now)
6073         this.log.sync(txid);
6074         break;
6075       }
6076     }
6077   }
6078 
6079   /**
6080    * Check whether we should sync the log from the table's durability settings
6081    */
6082   private boolean shouldSyncLog() {
6083     return durability.ordinal() >  Durability.ASYNC_WAL.ordinal();
6084   }
6085 
6086   /**
6087    * A mocked list implementaion - discards all updates.
6088    */
6089   private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
6090 
6091     @Override
6092     public void add(int index, Cell element) {
6093       // do nothing
6094     }
6095 
6096     @Override
6097     public boolean addAll(int index, Collection<? extends Cell> c) {
6098       return false; // this list is never changed as a result of an update
6099     }
6100 
6101     @Override
6102     public KeyValue get(int index) {
6103       throw new UnsupportedOperationException();
6104     }
6105 
6106     @Override
6107     public int size() {
6108       return 0;
6109     }
6110   };
6111 
6112   /**
6113    * Facility for dumping and compacting catalog tables.
6114    * Only does catalog tables since these are only tables we for sure know
6115    * schema on.  For usage run:
6116    * <pre>
6117    *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
6118    * </pre>
6119    * @throws IOException
6120    */
6121   public static void main(String[] args) throws IOException {
6122     if (args.length < 1) {
6123       printUsageAndExit(null);
6124     }
6125     boolean majorCompact = false;
6126     if (args.length > 1) {
6127       if (!args[1].toLowerCase().startsWith("major")) {
6128         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
6129       }
6130       majorCompact = true;
6131     }
6132     final Path tableDir = new Path(args[0]);
6133     final Configuration c = HBaseConfiguration.create();
6134     final FileSystem fs = FileSystem.get(c);
6135     final Path logdir = new Path(c.get("hbase.tmp.dir"));
6136     final String logname = "hlog" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
6137 
6138     final HLog log = HLogFactory.createHLog(fs, logdir, logname, c);
6139     try {
6140       processTable(fs, tableDir, log, c, majorCompact);
6141     } finally {
6142        log.close();
6143        // TODO: is this still right?
6144        BlockCache bc = new CacheConfig(c).getBlockCache();
6145        if (bc != null) bc.shutdown();
6146     }
6147   }
6148 
6149   /**
6150    * Gets the latest sequence number that was read from storage when this region was opened.
6151    */
6152   public long getOpenSeqNum() {
6153     return this.openSeqNum;
6154   }
6155 
6156   /**
6157    * Gets max sequence ids of stores that was read from storage when this region was opened. WAL
6158    * Edits with smaller or equal sequence number will be skipped from replay.
6159    */
6160   public Map<byte[], Long> getMaxStoreSeqIdForLogReplay() {
6161     return this.maxSeqIdInStores;
6162   }
6163 
6164   /**
6165    * @return if a given region is in compaction now.
6166    */
6167   public CompactionState getCompactionState() {
6168     boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
6169     return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
6170         : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
6171   }
6172 
6173   public void reportCompactionRequestStart(boolean isMajor){
6174     (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
6175   }
6176 
6177   public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
6178     int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
6179 
6180     // metrics
6181     compactionsFinished.incrementAndGet();
6182     compactionNumFilesCompacted.addAndGet(numFiles);
6183     compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
6184 
6185     assert newValue >= 0;
6186   }
6187 
6188   /**
6189    * Do not change this sequence id. See {@link #sequenceId} comment.
6190    * @return sequenceId
6191    */
6192   @VisibleForTesting
6193   public AtomicLong getSequenceId() {
6194     return this.sequenceId;
6195   }
6196 
6197   /**
6198    * sets this region's sequenceId.
6199    * @param value new value
6200    */
6201   private void setSequenceId(long value) {
6202     this.sequenceId.set(value);
6203   }
6204 
6205   /**
6206    * Listener class to enable callers of
6207    * bulkLoadHFile() to perform any necessary
6208    * pre/post processing of a given bulkload call
6209    */
6210   public interface BulkLoadListener {
6211 
6212     /**
6213      * Called before an HFile is actually loaded
6214      * @param family family being loaded to
6215      * @param srcPath path of HFile
6216      * @return final path to be used for actual loading
6217      * @throws IOException
6218      */
6219     String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
6220 
6221     /**
6222      * Called after a successful HFile load
6223      * @param family family being loaded to
6224      * @param srcPath path of HFile
6225      * @throws IOException
6226      */
6227     void doneBulkLoad(byte[] family, String srcPath) throws IOException;
6228 
6229     /**
6230      * Called after a failed HFile load
6231      * @param family family being loaded to
6232      * @param srcPath path of HFile
6233      * @throws IOException
6234      */
6235     void failedBulkLoad(byte[] family, String srcPath) throws IOException;
6236   }
6237 
6238   @VisibleForTesting class RowLockContext {
6239     private final HashedBytes row;
6240     private final CountDownLatch latch = new CountDownLatch(1);
6241     private final Thread thread;
6242     private int lockCount = 0;
6243 
6244     RowLockContext(HashedBytes row) {
6245       this.row = row;
6246       this.thread = Thread.currentThread();
6247     }
6248 
6249     boolean ownedByCurrentThread() {
6250       return thread == Thread.currentThread();
6251     }
6252 
6253     RowLock newLock() {
6254       lockCount++;
6255       return new RowLock(this);
6256     }
6257 
6258     void releaseLock() {
6259       if (!ownedByCurrentThread()) {
6260         throw new IllegalArgumentException("Lock held by thread: " + thread
6261           + " cannot be released by different thread: " + Thread.currentThread());
6262       }
6263       lockCount--;
6264       if (lockCount == 0) {
6265         // no remaining locks by the thread, unlock and allow other threads to access
6266         RowLockContext existingContext = lockedRows.remove(row);
6267         if (existingContext != this) {
6268           throw new RuntimeException(
6269               "Internal row lock state inconsistent, should not happen, row: " + row);
6270         }
6271         latch.countDown();
6272       }
6273     }
6274   }
6275 
6276   /**
6277    * Row lock held by a given thread.
6278    * One thread may acquire multiple locks on the same row simultaneously.
6279    * The locks must be released by calling release() from the same thread.
6280    */
6281   public static class RowLock {
6282     @VisibleForTesting final RowLockContext context;
6283     private boolean released = false;
6284 
6285     @VisibleForTesting RowLock(RowLockContext context) {
6286       this.context = context;
6287     }
6288 
6289     /**
6290      * Release the given lock.  If there are no remaining locks held by the current thread
6291      * then unlock the row and allow other threads to acquire the lock.
6292      * @throws IllegalArgumentException if called by a different thread than the lock owning thread
6293      */
6294     public void release() {
6295       if (!released) {
6296         context.releaseLock();
6297         released = true;
6298       }
6299     }
6300   }
6301 
6302   /**
6303    * Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore
6304    * the WALEdit append later.
6305    * @param wal
6306    * @param cells list of KeyValues inserted into memstore. Those KeyValues are passed in order to
6307    *        be updated with right mvcc values(their log sequence nu
6308    * @return Return the key used appending with no sync and no append.
6309    * @throws IOException
6310    */
6311   private HLogKey appendNoSyncNoAppend(final HLog wal, List<KeyValue> cells) throws IOException {
6312     HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
6313       HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
6314     // Call append but with an empty WALEdit.  The returned seqeunce id will not be associated
6315     // with any edit and we can be sure it went in after all outstanding appends.
6316     wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
6317       WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
6318     return key;
6319   }
6320 
6321   /**
6322    * Explictly sync wal
6323    * @throws IOException
6324    */
6325   public void syncWal() throws IOException {
6326     if(this.log != null) {
6327       this.log.sync();
6328     }
6329   }
6330 }