View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.io.UnsupportedEncodingException;
25  import java.lang.reflect.Constructor;
26  import java.text.ParseException;
27  import java.util.AbstractList;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.NavigableMap;
37  import java.util.NavigableSet;
38  import java.util.Set;
39  import java.util.TreeMap;
40  import java.util.concurrent.Callable;
41  import java.util.concurrent.CompletionService;
42  import java.util.concurrent.ConcurrentHashMap;
43  import java.util.concurrent.ConcurrentSkipListMap;
44  import java.util.concurrent.CountDownLatch;
45  import java.util.concurrent.ExecutionException;
46  import java.util.concurrent.ExecutorCompletionService;
47  import java.util.concurrent.ExecutorService;
48  import java.util.concurrent.Executors;
49  import java.util.concurrent.Future;
50  import java.util.concurrent.FutureTask;
51  import java.util.concurrent.ThreadFactory;
52  import java.util.concurrent.ThreadPoolExecutor;
53  import java.util.concurrent.TimeUnit;
54  import java.util.concurrent.TimeoutException;
55  import java.util.concurrent.atomic.AtomicBoolean;
56  import java.util.concurrent.atomic.AtomicInteger;
57  import java.util.concurrent.atomic.AtomicLong;
58  import java.util.concurrent.locks.Lock;
59  import java.util.concurrent.locks.ReentrantReadWriteLock;
60  
61  import org.apache.commons.logging.Log;
62  import org.apache.commons.logging.LogFactory;
63  import org.apache.hadoop.classification.InterfaceAudience;
64  import org.apache.hadoop.conf.Configuration;
65  import org.apache.hadoop.fs.FileStatus;
66  import org.apache.hadoop.fs.FileSystem;
67  import org.apache.hadoop.fs.Path;
68  import org.apache.hadoop.hbase.Cell;
69  import org.apache.hadoop.hbase.CellScanner;
70  import org.apache.hadoop.hbase.CellUtil;
71  import org.apache.hadoop.hbase.CompoundConfiguration;
72  import org.apache.hadoop.hbase.DoNotRetryIOException;
73  import org.apache.hadoop.hbase.DroppedSnapshotException;
74  import org.apache.hadoop.hbase.HBaseConfiguration;
75  import org.apache.hadoop.hbase.HColumnDescriptor;
76  import org.apache.hadoop.hbase.HConstants;
77  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
78  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
79  import org.apache.hadoop.hbase.HRegionInfo;
80  import org.apache.hadoop.hbase.HTableDescriptor;
81  import org.apache.hadoop.hbase.KeyValue;
82  import org.apache.hadoop.hbase.KeyValueUtil;
83  import org.apache.hadoop.hbase.NotServingRegionException;
84  import org.apache.hadoop.hbase.RegionTooBusyException;
85  import org.apache.hadoop.hbase.TableName;
86  import org.apache.hadoop.hbase.UnknownScannerException;
87  import org.apache.hadoop.hbase.backup.HFileArchiver;
88  import org.apache.hadoop.hbase.client.Append;
89  import org.apache.hadoop.hbase.client.Delete;
90  import org.apache.hadoop.hbase.client.Durability;
91  import org.apache.hadoop.hbase.client.Get;
92  import org.apache.hadoop.hbase.client.Increment;
93  import org.apache.hadoop.hbase.client.IsolationLevel;
94  import org.apache.hadoop.hbase.client.Mutation;
95  import org.apache.hadoop.hbase.client.Put;
96  import org.apache.hadoop.hbase.client.Result;
97  import org.apache.hadoop.hbase.client.RowMutations;
98  import org.apache.hadoop.hbase.client.Scan;
99  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
100 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
101 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
102 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
103 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
104 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
105 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
106 import org.apache.hadoop.hbase.filter.FilterWrapper;
107 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
108 import org.apache.hadoop.hbase.io.HeapSize;
109 import org.apache.hadoop.hbase.io.TimeRange;
110 import org.apache.hadoop.hbase.io.hfile.BlockCache;
111 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
112 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
113 import org.apache.hadoop.hbase.ipc.RpcCallContext;
114 import org.apache.hadoop.hbase.ipc.RpcServer;
115 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
116 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
117 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
119 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
121 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
122 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
123 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
124 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
125 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
126 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
127 import org.apache.hadoop.hbase.regionserver.wal.HLog;
128 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
129 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
130 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
131 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.MutationReplay;
132 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
133 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
134 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
135 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
136 import org.apache.hadoop.hbase.util.Bytes;
137 import org.apache.hadoop.hbase.util.CancelableProgressable;
138 import org.apache.hadoop.hbase.util.ClassSize;
139 import org.apache.hadoop.hbase.util.CompressionTest;
140 import org.apache.hadoop.hbase.util.Counter;
141 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
142 import org.apache.hadoop.hbase.util.FSUtils;
143 import org.apache.hadoop.hbase.util.HashedBytes;
144 import org.apache.hadoop.hbase.util.Pair;
145 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
146 import org.apache.hadoop.hbase.util.Threads;
147 import org.apache.hadoop.io.MultipleIOException;
148 import org.apache.hadoop.util.StringUtils;
149 
150 import com.google.common.annotations.VisibleForTesting;
151 import com.google.common.base.Preconditions;
152 import com.google.common.collect.Lists;
153 import com.google.common.collect.Maps;
154 import com.google.common.io.Closeables;
155 import com.google.protobuf.Descriptors;
156 import com.google.protobuf.Message;
157 import com.google.protobuf.RpcCallback;
158 import com.google.protobuf.RpcController;
159 import com.google.protobuf.Service;
160 
161 /**
162  * HRegion stores data for a certain region of a table.  It stores all columns
163  * for each row. A given table consists of one or more HRegions.
164  *
165  * <p>We maintain multiple HStores for a single HRegion.
166  *
167  * <p>An Store is a set of rows with some column data; together,
168  * they make up all the data for the rows.
169  *
170  * <p>Each HRegion has a 'startKey' and 'endKey'.
171  * <p>The first is inclusive, the second is exclusive (except for
172  * the final region)  The endKey of region 0 is the same as
173  * startKey for region 1 (if it exists).  The startKey for the
174  * first region is null. The endKey for the final region is null.
175  *
176  * <p>Locking at the HRegion level serves only one purpose: preventing the
177  * region from being closed (and consequently split) while other operations
178  * are ongoing. Each row level operation obtains both a row lock and a region
179  * read lock for the duration of the operation. While a scanner is being
180  * constructed, getScanner holds a read lock. If the scanner is successfully
181  * constructed, it holds a read lock until it is closed. A close takes out a
182  * write lock and consequently will block for ongoing operations and will block
183  * new operations from starting while the close is in progress.
184  *
185  * <p>An HRegion is defined by its table and its key extent.
186  *
187  * <p>It consists of at least one Store.  The number of Stores should be
188  * configurable, so that data which is accessed together is stored in the same
189  * Store.  Right now, we approximate that by building a single Store for
190  * each column family.  (This config info will be communicated via the
191  * tabledesc.)
192  *
193  * <p>The HTableDescriptor contains metainfo about the HRegion's table.
194  * regionName is a unique identifier for this HRegion. (startKey, endKey]
195  * defines the keyspace for this HRegion.
196  */
197 @InterfaceAudience.Private
198 public class HRegion implements HeapSize { // , Writable{
199   public static final Log LOG = LogFactory.getLog(HRegion.class);
200 
201   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
202       "hbase.hregion.scan.loadColumnFamiliesOnDemand";
203 
204   /**
205    * This is the global default value for durability. All tables/mutations not
206    * defining a durability or using USE_DEFAULT will default to this value.
207    */
208   private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL;
209 
210   final AtomicBoolean closed = new AtomicBoolean(false);
211   /* Closing can take some time; use the closing flag if there is stuff we don't
212    * want to do while in closing state; e.g. like offer this region up to the
213    * master as a region to close if the carrying regionserver is overloaded.
214    * Once set, it is never cleared.
215    */
216   final AtomicBoolean closing = new AtomicBoolean(false);
217 
218   /**
219    * The sequence id of the last flush on this region.  Used doing some rough calculations on
220    * whether time to flush or not.
221    */
222   protected volatile long lastFlushSeqId = -1L;
223 
224   /**
225    * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL/HLog
226    * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
227    * Its default value is -1L. This default is used as a marker to indicate
228    * that the region hasn't opened yet. Once it is opened, it is set to the derived
229    * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
230    *
231    * <p>Control of this sequence is handed off to the WAL/HLog implementation.  It is responsible
232    * for tagging edits with the correct sequence id since it is responsible for getting the
233    * edits into the WAL files. It controls updating the sequence id value.  DO NOT UPDATE IT
234    * OUTSIDE OF THE WAL.  The value you get will not be what you think it is.
235    */
236   private final AtomicLong sequenceId = new AtomicLong(-1L);
237 
238   /**
239    * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
240    * startRegionOperation to possibly invoke different checks before any region operations. Not all
241    * operations have to be defined here. It's only needed when a special check is need in
242    * startRegionOperation
243    */
244   public enum Operation {
245     ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
246     REPLAY_BATCH_MUTATE, COMPACT_REGION
247   }
248 
249   //////////////////////////////////////////////////////////////////////////////
250   // Members
251   //////////////////////////////////////////////////////////////////////////////
252 
253   // map from a locked row to the context for that lock including:
254   // - CountDownLatch for threads waiting on that row
255   // - the thread that owns the lock (allow reentrancy)
256   // - reference count of (reentrant) locks held by the thread
257   // - the row itself
258   private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
259       new ConcurrentHashMap<HashedBytes, RowLockContext>();
260 
261   protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
262       Bytes.BYTES_RAWCOMPARATOR);
263 
264   // TODO: account for each registered handler in HeapSize computation
265   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
266 
267   public final AtomicLong memstoreSize = new AtomicLong(0);
268 
269   // Debug possible data loss due to WAL off
270   final Counter numMutationsWithoutWAL = new Counter();
271   final Counter dataInMemoryWithoutWAL = new Counter();
272 
273   // Debug why CAS operations are taking a while.
274   final Counter checkAndMutateChecksPassed = new Counter();
275   final Counter checkAndMutateChecksFailed = new Counter();
276 
277   //Number of requests
278   final Counter readRequestsCount = new Counter();
279   final Counter writeRequestsCount = new Counter();
280 
281   // Compaction counters
282   final AtomicLong compactionsFinished = new AtomicLong(0L);
283   final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
284   final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
285 
286 
287   private final HLog log;
288   private final HRegionFileSystem fs;
289   protected final Configuration conf;
290   private final Configuration baseConf;
291   private final KeyValue.KVComparator comparator;
292   private final int rowLockWaitDuration;
293   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
294 
295   // The internal wait duration to acquire a lock before read/update
296   // from the region. It is not per row. The purpose of this wait time
297   // is to avoid waiting a long time while the region is busy, so that
298   // we can release the IPC handler soon enough to improve the
299   // availability of the region server. It can be adjusted by
300   // tuning configuration "hbase.busy.wait.duration".
301   final long busyWaitDuration;
302   static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
303 
304   // If updating multiple rows in one call, wait longer,
305   // i.e. waiting for busyWaitDuration * # of rows. However,
306   // we can limit the max multiplier.
307   final int maxBusyWaitMultiplier;
308 
309   // Max busy wait duration. There is no point to wait longer than the RPC
310   // purge timeout, when a RPC call will be terminated by the RPC engine.
311   final long maxBusyWaitDuration;
312 
313   // negative number indicates infinite timeout
314   static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
315   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
316 
317   private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
318 
319   /**
320    * The sequence ID that was encountered when this region was opened.
321    */
322   private long openSeqNum = HConstants.NO_SEQNUM;
323 
324   /**
325    * The default setting for whether to enable on-demand CF loading for
326    * scan requests to this region. Requests can override it.
327    */
328   private boolean isLoadingCfsOnDemandDefault = false;
329 
330   private final AtomicInteger majorInProgress = new AtomicInteger(0);
331   private final AtomicInteger minorInProgress = new AtomicInteger(0);
332 
333   //
334   // Context: During replay we want to ensure that we do not lose any data. So, we
335   // have to be conservative in how we replay logs. For each store, we calculate
336   // the maxSeqId up to which the store was flushed. And, skip the edits which
337   // are equal to or lower than maxSeqId for each store.
338   // The following map is populated when opening the region
339   Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
340 
341   /**
342    * Config setting for whether to allow writes when a region is in recovering or not.
343    */
344   private boolean disallowWritesInRecovering = false;
345 
346   // when a region is in recovering state, it can only accept writes not reads
347   private volatile boolean isRecovering = false;
348 
349   /**
350    * @return The smallest mvcc readPoint across all the scanners in this
351    * region. Writes older than this readPoint, are included  in every
352    * read operation.
353    */
354   public long getSmallestReadPoint() {
355     long minimumReadPoint;
356     // We need to ensure that while we are calculating the smallestReadPoint
357     // no new RegionScanners can grab a readPoint that we are unaware of.
358     // We achieve this by synchronizing on the scannerReadPoints object.
359     synchronized(scannerReadPoints) {
360       minimumReadPoint = mvcc.memstoreReadPoint();
361 
362       for (Long readPoint: this.scannerReadPoints.values()) {
363         if (readPoint < minimumReadPoint) {
364           minimumReadPoint = readPoint;
365         }
366       }
367     }
368     return minimumReadPoint;
369   }
370   /*
371    * Data structure of write state flags used coordinating flushes,
372    * compactions and closes.
373    */
374   static class WriteState {
375     // Set while a memstore flush is happening.
376     volatile boolean flushing = false;
377     // Set when a flush has been requested.
378     volatile boolean flushRequested = false;
379     // Number of compactions running.
380     volatile int compacting = 0;
381     // Gets set in close. If set, cannot compact or flush again.
382     volatile boolean writesEnabled = true;
383     // Set if region is read-only
384     volatile boolean readOnly = false;
385     // whether the reads are enabled. This is different than readOnly, because readOnly is
386     // static in the lifetime of the region, while readsEnabled is dynamic
387     volatile boolean readsEnabled = true;
388 
389     /**
390      * Set flags that make this region read-only.
391      *
392      * @param onOff flip value for region r/o setting
393      */
394     synchronized void setReadOnly(final boolean onOff) {
395       this.writesEnabled = !onOff;
396       this.readOnly = onOff;
397     }
398 
399     boolean isReadOnly() {
400       return this.readOnly;
401     }
402 
403     boolean isFlushRequested() {
404       return this.flushRequested;
405     }
406 
407     void setReadsEnabled(boolean readsEnabled) {
408       this.readsEnabled = readsEnabled;
409     }
410 
411     static final long HEAP_SIZE = ClassSize.align(
412         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
413   }
414 
415   /**
416    * Objects from this class are created when flushing to describe all the different states that
417    * that method ends up in. The Result enum describes those states. The sequence id should only
418    * be specified if the flush was successful, and the failure message should only be specified
419    * if it didn't flush.
420    */
421   public static class FlushResult {
422     enum Result {
423       FLUSHED_NO_COMPACTION_NEEDED,
424       FLUSHED_COMPACTION_NEEDED,
425       // Special case where a flush didn't run because there's nothing in the memstores. Used when
426       // bulk loading to know when we can still load even if a flush didn't happen.
427       CANNOT_FLUSH_MEMSTORE_EMPTY,
428       CANNOT_FLUSH
429       // Be careful adding more to this enum, look at the below methods to make sure
430     }
431 
432     final Result result;
433     final String failureReason;
434     final long flushSequenceId;
435 
436     /**
437      * Convenience constructor to use when the flush is successful, the failure message is set to
438      * null.
439      * @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED.
440      * @param flushSequenceId Generated sequence id that comes right after the edits in the
441      *                        memstores.
442      */
443     FlushResult(Result result, long flushSequenceId) {
444       this(result, flushSequenceId, null);
445       assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
446           .FLUSHED_COMPACTION_NEEDED;
447     }
448 
449     /**
450      * Convenience constructor to use when we cannot flush.
451      * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
452      * @param failureReason Reason why we couldn't flush.
453      */
454     FlushResult(Result result, String failureReason) {
455       this(result, -1, failureReason);
456       assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
457     }
458 
459     /**
460      * Constructor with all the parameters.
461      * @param result Any of the Result.
462      * @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
463      * @param failureReason Reason why we couldn't flush, or null.
464      */
465     FlushResult(Result result, long flushSequenceId, String failureReason) {
466       this.result = result;
467       this.flushSequenceId = flushSequenceId;
468       this.failureReason = failureReason;
469     }
470 
471     /**
472      * Convenience method, the equivalent of checking if result is
473      * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
474      * @return true if the memstores were flushed, else false.
475      */
476     public boolean isFlushSucceeded() {
477       return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
478           .FLUSHED_COMPACTION_NEEDED;
479     }
480 
481     /**
482      * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
483      * @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
484      */
485     public boolean isCompactionNeeded() {
486       return result == Result.FLUSHED_COMPACTION_NEEDED;
487     }
488   }
489 
490   final WriteState writestate = new WriteState();
491 
492   long memstoreFlushSize;
493   final long timestampSlop;
494   final long rowProcessorTimeout;
495   private volatile long lastFlushTime;
496   final RegionServerServices rsServices;
497   private RegionServerAccounting rsAccounting;
498   private long flushCheckInterval;
499   // flushPerChanges is to prevent too many changes in memstore
500   private long flushPerChanges;
501   private long blockingMemStoreSize;
502   final long threadWakeFrequency;
503   // Used to guard closes
504   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
505 
506   // Stop updates lock
507   private final ReentrantReadWriteLock updatesLock = new ReentrantReadWriteLock();
508   private boolean splitRequest;
509   private byte[] explicitSplitPoint = null;
510 
511   private final MultiVersionConsistencyControl mvcc =
512       new MultiVersionConsistencyControl();
513 
514   // Coprocessor host
515   private RegionCoprocessorHost coprocessorHost;
516 
517   private HTableDescriptor htableDescriptor = null;
518   private RegionSplitPolicy splitPolicy;
519 
520   private final MetricsRegion metricsRegion;
521   private final MetricsRegionWrapperImpl metricsRegionWrapper;
522   private final Durability durability;
523 
524   /**
525    * HRegion constructor. This constructor should only be used for testing and
526    * extensions.  Instances of HRegion should be instantiated with the
527    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
528    *
529    * @param tableDir qualified path of directory where region should be located,
530    * usually the table directory.
531    * @param log The HLog is the outbound log for any updates to the HRegion
532    * (There's a single HLog for all the HRegions on a single HRegionServer.)
533    * The log file is a logfile from the previous execution that's
534    * custom-computed for this HRegion. The HRegionServer computes and sorts the
535    * appropriate log info for this HRegion. If there is a previous log file
536    * (implying that the HRegion has been written-to before), then read it from
537    * the supplied path.
538    * @param fs is the filesystem.
539    * @param confParam is global configuration settings.
540    * @param regionInfo - HRegionInfo that describes the region
541    * is new), then read them from the supplied path.
542    * @param htd the table descriptor
543    * @param rsServices reference to {@link RegionServerServices} or null
544    */
545   @Deprecated
546   public HRegion(final Path tableDir, final HLog log, final FileSystem fs,
547       final Configuration confParam, final HRegionInfo regionInfo,
548       final HTableDescriptor htd, final RegionServerServices rsServices) {
549     this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
550       log, confParam, htd, rsServices);
551   }
552 
553   /**
554    * HRegion constructor. This constructor should only be used for testing and
555    * extensions.  Instances of HRegion should be instantiated with the
556    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
557    *
558    * @param fs is the filesystem.
559    * @param log The HLog is the outbound log for any updates to the HRegion
560    * (There's a single HLog for all the HRegions on a single HRegionServer.)
561    * The log file is a logfile from the previous execution that's
562    * custom-computed for this HRegion. The HRegionServer computes and sorts the
563    * appropriate log info for this HRegion. If there is a previous log file
564    * (implying that the HRegion has been written-to before), then read it from
565    * the supplied path.
566    * @param confParam is global configuration settings.
567    * @param htd the table descriptor
568    * @param rsServices reference to {@link RegionServerServices} or null
569    */
570   public HRegion(final HRegionFileSystem fs, final HLog log, final Configuration confParam,
571       final HTableDescriptor htd, final RegionServerServices rsServices) {
572     if (htd == null) {
573       throw new IllegalArgumentException("Need table descriptor");
574     }
575 
576     if (confParam instanceof CompoundConfiguration) {
577       throw new IllegalArgumentException("Need original base configuration");
578     }
579 
580     this.comparator = fs.getRegionInfo().getComparator();
581     this.log = log;
582     this.fs = fs;
583 
584     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
585     this.baseConf = confParam;
586     this.conf = new CompoundConfiguration()
587       .add(confParam)
588       .addStringMap(htd.getConfiguration())
589       .addBytesMap(htd.getValues());
590     this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
591         DEFAULT_CACHE_FLUSH_INTERVAL);
592     this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
593     if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
594       throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
595           + MAX_FLUSH_PER_CHANGES);
596     }
597 
598     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
599                     DEFAULT_ROWLOCK_WAIT_DURATION);
600 
601     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
602     this.htableDescriptor = htd;
603     this.rsServices = rsServices;
604     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
605     setHTableSpecificConf();
606     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
607 
608     this.busyWaitDuration = conf.getLong(
609       "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
610     this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
611     if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
612       throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
613         + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
614         + maxBusyWaitMultiplier + "). Their product should be positive");
615     }
616     this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
617       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
618 
619     /*
620      * timestamp.slop provides a server-side constraint on the timestamp. This
621      * assumes that you base your TS around currentTimeMillis(). In this case,
622      * throw an error to the user if the user-specified TS is newer than now +
623      * slop. LATEST_TIMESTAMP == don't use this functionality
624      */
625     this.timestampSlop = conf.getLong(
626         "hbase.hregion.keyvalue.timestamp.slop.millisecs",
627         HConstants.LATEST_TIMESTAMP);
628 
629     /**
630      * Timeout for the process time in processRowsWithLocks().
631      * Use -1 to switch off time bound.
632      */
633     this.rowProcessorTimeout = conf.getLong(
634         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
635     this.durability = htd.getDurability() == Durability.USE_DEFAULT
636         ? DEFAULT_DURABLITY
637         : htd.getDurability();
638     if (rsServices != null) {
639       this.rsAccounting = this.rsServices.getRegionServerAccounting();
640       // don't initialize coprocessors if not running within a regionserver
641       // TODO: revisit if coprocessors should load in other cases
642       this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
643       this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
644       this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
645 
646       Map<String, HRegion> recoveringRegions = rsServices.getRecoveringRegions();
647       String encodedName = getRegionInfo().getEncodedName();
648       if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
649         this.isRecovering = true;
650         recoveringRegions.put(encodedName, this);
651       }
652     } else {
653       this.metricsRegionWrapper = null;
654       this.metricsRegion = null;
655     }
656     if (LOG.isDebugEnabled()) {
657       // Write out region name as string and its encoded name.
658       LOG.debug("Instantiated " + this);
659     }
660 
661     // by default, we allow writes against a region when it's in recovering
662     this.disallowWritesInRecovering =
663         conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
664           HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
665   }
666 
667   void setHTableSpecificConf() {
668     if (this.htableDescriptor == null) return;
669     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
670 
671     if (flushSize <= 0) {
672       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
673         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
674     }
675     this.memstoreFlushSize = flushSize;
676     this.blockingMemStoreSize = this.memstoreFlushSize *
677         conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
678   }
679 
680   /**
681    * Initialize this region.
682    * Used only by tests and SplitTransaction to reopen the region.
683    * You should use createHRegion() or openHRegion()
684    * @return What the next sequence (edit) id should be.
685    * @throws IOException e
686    * @deprecated use HRegion.createHRegion() or HRegion.openHRegion()
687    */
688   @Deprecated
689   public long initialize() throws IOException {
690     return initialize(null);
691   }
692 
693   /**
694    * Initialize this region.
695    *
696    * @param reporter Tickle every so often if initialize is taking a while.
697    * @return What the next sequence (edit) id should be.
698    * @throws IOException e
699    */
700   private long initialize(final CancelableProgressable reporter) throws IOException {
701     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
702     long nextSeqId = -1;
703     try {
704       nextSeqId = initializeRegionInternals(reporter, status);
705       return nextSeqId;
706     } finally {
707       // nextSeqid will be -1 if the initialization fails.
708       // At least it will be 0 otherwise.
709       if (nextSeqId == -1) {
710         status
711             .abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
712       }
713     }
714   }
715 
716   private long initializeRegionInternals(final CancelableProgressable reporter,
717       final MonitoredTask status) throws IOException, UnsupportedEncodingException {
718     if (coprocessorHost != null) {
719       status.setStatus("Running coprocessor pre-open hook");
720       coprocessorHost.preOpen();
721     }
722 
723     // Write HRI to a file in case we need to recover hbase:meta
724     status.setStatus("Writing region info on filesystem");
725     fs.checkRegionInfoOnFilesystem();
726 
727 
728 
729     // Initialize all the HStores
730     status.setStatus("Initializing all the Stores");
731     long maxSeqId = initializeRegionStores(reporter, status);
732 
733     this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
734     this.writestate.flushRequested = false;
735     this.writestate.compacting = 0;
736 
737     if (this.writestate.writesEnabled) {
738       // Remove temporary data left over from old regions
739       status.setStatus("Cleaning up temporary data from old regions");
740       fs.cleanupTempDir();
741     }
742 
743     if (this.writestate.writesEnabled) {
744       status.setStatus("Cleaning up detritus from prior splits");
745       // Get rid of any splits or merges that were lost in-progress.  Clean out
746       // these directories here on open.  We may be opening a region that was
747       // being split but we crashed in the middle of it all.
748       fs.cleanupAnySplitDetritus();
749       fs.cleanupMergesDir();
750     }
751 
752     // Initialize split policy
753     this.splitPolicy = RegionSplitPolicy.create(this, conf);
754 
755     this.lastFlushTime = EnvironmentEdgeManager.currentTime();
756     // Use maximum of log sequenceid or that which was found in stores
757     // (particularly if no recovered edits, seqid will be -1).
758     long nextSeqid = maxSeqId + 1;
759     if (this.isRecovering) {
760       // In distributedLogReplay mode, we don't know the last change sequence number because region
761       // is opened before recovery completes. So we add a safety bumper to avoid new sequence number
762       // overlaps used sequence numbers
763       nextSeqid += this.flushPerChanges + 10000000; // add another extra 10million
764     }
765 
766     LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
767       "; next sequenceid=" + nextSeqid);
768 
769     // A region can be reopened if failed a split; reset flags
770     this.closing.set(false);
771     this.closed.set(false);
772 
773     if (coprocessorHost != null) {
774       status.setStatus("Running coprocessor post-open hooks");
775       coprocessorHost.postOpen();
776     }
777 
778     status.markComplete("Region opened successfully");
779     return nextSeqid;
780   }
781 
782   private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
783       throws IOException, UnsupportedEncodingException {
784     // Load in all the HStores.
785 
786     long maxSeqId = -1;
787     // initialized to -1 so that we pick up MemstoreTS from column families
788     long maxMemstoreTS = -1;
789 
790     if (!htableDescriptor.getFamilies().isEmpty()) {
791       // initialize the thread pool for opening stores in parallel.
792       ThreadPoolExecutor storeOpenerThreadPool =
793         getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
794       CompletionService<HStore> completionService =
795         new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
796 
797       // initialize each store in parallel
798       for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
799         status.setStatus("Instantiating store for column family " + family);
800         completionService.submit(new Callable<HStore>() {
801           @Override
802           public HStore call() throws IOException {
803             return instantiateHStore(family);
804           }
805         });
806       }
807       boolean allStoresOpened = false;
808       try {
809         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
810           Future<HStore> future = completionService.take();
811           HStore store = future.get();
812           this.stores.put(store.getColumnFamilyName().getBytes(), store);
813 
814           long storeMaxSequenceId = store.getMaxSequenceId();
815           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
816               storeMaxSequenceId);
817           if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
818             maxSeqId = storeMaxSequenceId;
819           }
820           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
821           if (maxStoreMemstoreTS > maxMemstoreTS) {
822             maxMemstoreTS = maxStoreMemstoreTS;
823           }
824         }
825         allStoresOpened = true;
826       } catch (InterruptedException e) {
827         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
828       } catch (ExecutionException e) {
829         throw new IOException(e.getCause());
830       } finally {
831         storeOpenerThreadPool.shutdownNow();
832         if (!allStoresOpened) {
833           // something went wrong, close all opened stores
834           LOG.error("Could not initialize all stores for the region=" + this);
835           for (Store store : this.stores.values()) {
836             try {
837               store.close();
838             } catch (IOException e) {
839               LOG.warn(e.getMessage());
840             }
841           }
842         }
843       }
844     }
845     if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
846       // Recover any edits if available.
847       maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
848           this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
849     }
850     maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
851     mvcc.initialize(maxSeqId);
852     return maxSeqId;
853   }
854 
855   private void writeRegionOpenMarker(HLog log, long openSeqId) throws IOException {
856     Map<byte[], List<Path>> storeFiles
857     = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
858     for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
859       Store store = entry.getValue();
860       ArrayList<Path> storeFileNames = new ArrayList<Path>();
861       for (StoreFile storeFile: store.getStorefiles()) {
862         storeFileNames.add(storeFile.getPath());
863       }
864       storeFiles.put(entry.getKey(), storeFileNames);
865     }
866 
867     RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
868       RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
869       getRegionServerServices().getServerName(), storeFiles);
870     HLogUtil.writeRegionEventMarker(log, getTableDesc(), getRegionInfo(), regionOpenDesc,
871       getSequenceId());
872   }
873 
874   private void writeRegionCloseMarker(HLog log) throws IOException {
875     Map<byte[], List<Path>> storeFiles
876     = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
877     for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
878       Store store = entry.getValue();
879       ArrayList<Path> storeFileNames = new ArrayList<Path>();
880       for (StoreFile storeFile: store.getStorefiles()) {
881         storeFileNames.add(storeFile.getPath());
882       }
883       storeFiles.put(entry.getKey(), storeFileNames);
884     }
885 
886     RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
887       RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
888       getRegionServerServices().getServerName(), storeFiles);
889     HLogUtil.writeRegionEventMarker(log, getTableDesc(), getRegionInfo(), regionEventDesc,
890       getSequenceId());
891   }
892 
893   /**
894    * @return True if this region has references.
895    */
896   public boolean hasReferences() {
897     for (Store store : this.stores.values()) {
898       if (store.hasReferences()) return true;
899     }
900     return false;
901   }
902 
903   /**
904    * This function will return the HDFS blocks distribution based on the data
905    * captured when HFile is created
906    * @return The HDFS blocks distribution for the region.
907    */
908   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
909     HDFSBlocksDistribution hdfsBlocksDistribution =
910       new HDFSBlocksDistribution();
911     synchronized (this.stores) {
912       for (Store store : this.stores.values()) {
913         for (StoreFile sf : store.getStorefiles()) {
914           HDFSBlocksDistribution storeFileBlocksDistribution =
915             sf.getHDFSBlockDistribution();
916           hdfsBlocksDistribution.add(storeFileBlocksDistribution);
917         }
918       }
919     }
920     return hdfsBlocksDistribution;
921   }
922 
923   /**
924    * This is a helper function to compute HDFS block distribution on demand
925    * @param conf configuration
926    * @param tableDescriptor HTableDescriptor of the table
927    * @param regionInfo encoded name of the region
928    * @return The HDFS blocks distribution for the given region.
929    * @throws IOException
930    */
931   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
932       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
933     Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
934     return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
935   }
936 
937   /**
938    * This is a helper function to compute HDFS block distribution on demand
939    * @param conf configuration
940    * @param tableDescriptor HTableDescriptor of the table
941    * @param regionInfo encoded name of the region
942    * @param tablePath the table directory
943    * @return The HDFS blocks distribution for the given region.
944    * @throws IOException
945    */
946   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
947       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo,  Path tablePath)
948       throws IOException {
949     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
950     FileSystem fs = tablePath.getFileSystem(conf);
951 
952     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
953     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
954       Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
955       if (storeFiles == null) continue;
956 
957       for (StoreFileInfo storeFileInfo : storeFiles) {
958         hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
959       }
960     }
961     return hdfsBlocksDistribution;
962   }
963 
964   public AtomicLong getMemstoreSize() {
965     return memstoreSize;
966   }
967 
968   /**
969    * Increase the size of mem store in this region and the size of global mem
970    * store
971    * @return the size of memstore in this region
972    */
973   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
974     if (this.rsAccounting != null) {
975       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
976     }
977     return this.memstoreSize.addAndGet(memStoreSize);
978   }
979 
980   /** @return a HRegionInfo object for this region */
981   public HRegionInfo getRegionInfo() {
982     return this.fs.getRegionInfo();
983   }
984 
985   /**
986    * @return Instance of {@link RegionServerServices} used by this HRegion.
987    * Can be null.
988    */
989   RegionServerServices getRegionServerServices() {
990     return this.rsServices;
991   }
992 
993   /** @return readRequestsCount for this region */
994   long getReadRequestsCount() {
995     return this.readRequestsCount.get();
996   }
997 
998   /** @return writeRequestsCount for this region */
999   long getWriteRequestsCount() {
1000     return this.writeRequestsCount.get();
1001   }
1002 
1003   public MetricsRegion getMetrics() {
1004     return metricsRegion;
1005   }
1006 
1007   /** @return true if region is closed */
1008   public boolean isClosed() {
1009     return this.closed.get();
1010   }
1011 
1012   /**
1013    * @return True if closing process has started.
1014    */
1015   public boolean isClosing() {
1016     return this.closing.get();
1017   }
1018 
1019   /**
1020    * Reset recovering state of current region
1021    */
1022   public void setRecovering(boolean newState) {
1023     boolean wasRecovering = this.isRecovering;
1024     this.isRecovering = newState;
1025     if (wasRecovering && !isRecovering) {
1026       // Call only when log replay is over.
1027       coprocessorHost.postLogReplay();
1028     }
1029   }
1030 
1031   /**
1032    * @return True if current region is in recovering
1033    */
1034   public boolean isRecovering() {
1035     return this.isRecovering;
1036   }
1037 
1038   /** @return true if region is available (not closed and not closing) */
1039   public boolean isAvailable() {
1040     return !isClosed() && !isClosing();
1041   }
1042 
1043   /** @return true if region is splittable */
1044   public boolean isSplittable() {
1045     return isAvailable() && !hasReferences();
1046   }
1047 
1048   /**
1049    * @return true if region is mergeable
1050    */
1051   public boolean isMergeable() {
1052     if (!isAvailable()) {
1053       LOG.debug("Region " + this.getRegionNameAsString()
1054           + " is not mergeable because it is closing or closed");
1055       return false;
1056     }
1057     if (hasReferences()) {
1058       LOG.debug("Region " + this.getRegionNameAsString()
1059           + " is not mergeable because it has references");
1060       return false;
1061     }
1062 
1063     return true;
1064   }
1065 
1066   public boolean areWritesEnabled() {
1067     synchronized(this.writestate) {
1068       return this.writestate.writesEnabled;
1069     }
1070   }
1071 
1072    public MultiVersionConsistencyControl getMVCC() {
1073      return mvcc;
1074    }
1075 
1076    /*
1077     * Returns readpoint considering given IsolationLevel
1078     */
1079    public long getReadpoint(IsolationLevel isolationLevel) {
1080      if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1081        // This scan can read even uncommitted transactions
1082        return Long.MAX_VALUE;
1083      }
1084      return mvcc.memstoreReadPoint();
1085    }
1086 
1087    public boolean isLoadingCfsOnDemandDefault() {
1088      return this.isLoadingCfsOnDemandDefault;
1089    }
1090 
1091   /**
1092    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
1093    * service any more calls.
1094    *
1095    * <p>This method could take some time to execute, so don't call it from a
1096    * time-sensitive thread.
1097    *
1098    * @return Vector of all the storage files that the HRegion's component
1099    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
1100    * vector if already closed and null if judged that it should not close.
1101    *
1102    * @throws IOException e
1103    */
1104   public Map<byte[], List<StoreFile>> close() throws IOException {
1105     return close(false);
1106   }
1107 
1108   private final Object closeLock = new Object();
1109 
1110   /** Conf key for the periodic flush interval */
1111   public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1112       "hbase.regionserver.optionalcacheflushinterval";
1113   /** Default interval for the memstore flush */
1114   public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1115 
1116   /** Conf key to force a flush if there are already enough changes for one region in memstore */
1117   public static final String MEMSTORE_FLUSH_PER_CHANGES =
1118       "hbase.regionserver.flush.per.changes";
1119   public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000; // 30 millions
1120   /**
1121    * The following MAX_FLUSH_PER_CHANGES is large enough because each KeyValue has 20+ bytes
1122    * overhead. Therefore, even 1G empty KVs occupy at least 20GB memstore size for a single region
1123    */
1124   public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G
1125 
1126   /**
1127    * Close down this HRegion.  Flush the cache unless abort parameter is true,
1128    * Shut down each HStore, don't service any more calls.
1129    *
1130    * This method could take some time to execute, so don't call it from a
1131    * time-sensitive thread.
1132    *
1133    * @param abort true if server is aborting (only during testing)
1134    * @return Vector of all the storage files that the HRegion's component
1135    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
1136    * we are not to close at this time or we are already closed.
1137    *
1138    * @throws IOException e
1139    */
1140   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1141     // Only allow one thread to close at a time. Serialize them so dual
1142     // threads attempting to close will run up against each other.
1143     MonitoredTask status = TaskMonitor.get().createStatus(
1144         "Closing region " + this +
1145         (abort ? " due to abort" : ""));
1146 
1147     status.setStatus("Waiting for close lock");
1148     try {
1149       synchronized (closeLock) {
1150         return doClose(abort, status);
1151       }
1152     } finally {
1153       status.cleanup();
1154     }
1155   }
1156 
1157   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1158       throws IOException {
1159     if (isClosed()) {
1160       LOG.warn("Region " + this + " already closed");
1161       return null;
1162     }
1163 
1164     if (coprocessorHost != null) {
1165       status.setStatus("Running coprocessor pre-close hooks");
1166       this.coprocessorHost.preClose(abort);
1167     }
1168 
1169     status.setStatus("Disabling compacts and flushes for region");
1170     synchronized (writestate) {
1171       // Disable compacting and flushing by background threads for this
1172       // region.
1173       writestate.writesEnabled = false;
1174       LOG.debug("Closing " + this + ": disabling compactions & flushes");
1175       waitForFlushesAndCompactions();
1176     }
1177     // If we were not just flushing, is it worth doing a preflush...one
1178     // that will clear out of the bulk of the memstore before we put up
1179     // the close flag?
1180     if (!abort && worthPreFlushing()) {
1181       status.setStatus("Pre-flushing region before close");
1182       LOG.info("Running close preflush of " + this.getRegionNameAsString());
1183       try {
1184         internalFlushcache(status);
1185       } catch (IOException ioe) {
1186         // Failed to flush the region. Keep going.
1187         status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1188       }
1189     }
1190 
1191     this.closing.set(true);
1192     status.setStatus("Disabling writes for close");
1193     // block waiting for the lock for closing
1194     lock.writeLock().lock();
1195     try {
1196       if (this.isClosed()) {
1197         status.abort("Already got closed by another process");
1198         // SplitTransaction handles the null
1199         return null;
1200       }
1201       LOG.debug("Updates disabled for region " + this);
1202       // Don't flush the cache if we are aborting
1203       if (!abort) {
1204         int flushCount = 0;
1205         while (this.getMemstoreSize().get() > 0) {
1206           try {
1207             if (flushCount++ > 0) {
1208               int actualFlushes = flushCount - 1;
1209               if (actualFlushes > 5) {
1210                 // If we tried 5 times and are unable to clear memory, abort
1211                 // so we do not lose data
1212                 throw new DroppedSnapshotException("Failed clearing memory after " +
1213                   actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
1214               }
1215               LOG.info("Running extra flush, " + actualFlushes +
1216                 " (carrying snapshot?) " + this);
1217             }
1218             internalFlushcache(status);
1219           } catch (IOException ioe) {
1220             status.setStatus("Failed flush " + this + ", putting online again");
1221             synchronized (writestate) {
1222               writestate.writesEnabled = true;
1223             }
1224             // Have to throw to upper layers.  I can't abort server from here.
1225             throw ioe;
1226           }
1227         }
1228       }
1229 
1230       Map<byte[], List<StoreFile>> result =
1231         new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1232       if (!stores.isEmpty()) {
1233         // initialize the thread pool for closing stores in parallel.
1234         ThreadPoolExecutor storeCloserThreadPool =
1235           getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
1236         CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1237           new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1238 
1239         // close each store in parallel
1240         for (final Store store : stores.values()) {
1241           assert abort || store.getFlushableSize() == 0;
1242           completionService
1243               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1244                 @Override
1245                 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1246                   return new Pair<byte[], Collection<StoreFile>>(
1247                     store.getFamily().getName(), store.close());
1248                 }
1249               });
1250         }
1251         try {
1252           for (int i = 0; i < stores.size(); i++) {
1253             Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1254             Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1255             List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1256             if (familyFiles == null) {
1257               familyFiles = new ArrayList<StoreFile>();
1258               result.put(storeFiles.getFirst(), familyFiles);
1259             }
1260             familyFiles.addAll(storeFiles.getSecond());
1261           }
1262         } catch (InterruptedException e) {
1263           throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1264         } catch (ExecutionException e) {
1265           throw new IOException(e.getCause());
1266         } finally {
1267           storeCloserThreadPool.shutdownNow();
1268         }
1269       }
1270 
1271       status.setStatus("Writing region close event to WAL");
1272       if (!abort  && log != null && getRegionServerServices() != null) {
1273         writeRegionCloseMarker(log);
1274       }
1275 
1276       this.closed.set(true);
1277       if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
1278       if (coprocessorHost != null) {
1279         status.setStatus("Running coprocessor post-close hooks");
1280         this.coprocessorHost.postClose(abort);
1281       }
1282       if ( this.metricsRegion != null) {
1283         this.metricsRegion.close();
1284       }
1285       if ( this.metricsRegionWrapper != null) {
1286         Closeables.closeQuietly(this.metricsRegionWrapper);
1287       }
1288       status.markComplete("Closed");
1289       LOG.info("Closed " + this);
1290       return result;
1291     } finally {
1292       lock.writeLock().unlock();
1293     }
1294   }
1295 
1296   /**
1297    * Wait for all current flushes and compactions of the region to complete.
1298    * <p>
1299    * Exposed for TESTING.
1300    */
1301   public void waitForFlushesAndCompactions() {
1302     synchronized (writestate) {
1303       boolean interrupted = false;
1304       try {
1305         while (writestate.compacting > 0 || writestate.flushing) {
1306           LOG.debug("waiting for " + writestate.compacting + " compactions"
1307             + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1308           try {
1309             writestate.wait();
1310           } catch (InterruptedException iex) {
1311             // essentially ignore and propagate the interrupt back up
1312             LOG.warn("Interrupted while waiting");
1313             interrupted = true;
1314           }
1315         }
1316       } finally {
1317         if (interrupted) {
1318           Thread.currentThread().interrupt();
1319         }
1320       }
1321     }
1322   }
1323 
1324   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1325       final String threadNamePrefix) {
1326     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1327     int maxThreads = Math.min(numStores,
1328         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1329             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1330     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1331   }
1332 
1333   protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1334       final String threadNamePrefix) {
1335     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1336     int maxThreads = Math.max(1,
1337         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1338             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1339             / numStores);
1340     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1341   }
1342 
1343   static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1344       final String threadNamePrefix) {
1345     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1346       new ThreadFactory() {
1347         private int count = 1;
1348 
1349         @Override
1350         public Thread newThread(Runnable r) {
1351           return new Thread(r, threadNamePrefix + "-" + count++);
1352         }
1353       });
1354   }
1355 
1356    /**
1357     * @return True if its worth doing a flush before we put up the close flag.
1358     */
1359   private boolean worthPreFlushing() {
1360     return this.memstoreSize.get() >
1361       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1362   }
1363 
1364   //////////////////////////////////////////////////////////////////////////////
1365   // HRegion accessors
1366   //////////////////////////////////////////////////////////////////////////////
1367 
1368   /** @return start key for region */
1369   public byte [] getStartKey() {
1370     return this.getRegionInfo().getStartKey();
1371   }
1372 
1373   /** @return end key for region */
1374   public byte [] getEndKey() {
1375     return this.getRegionInfo().getEndKey();
1376   }
1377 
1378   /** @return region id */
1379   public long getRegionId() {
1380     return this.getRegionInfo().getRegionId();
1381   }
1382 
1383   /** @return region name */
1384   public byte [] getRegionName() {
1385     return this.getRegionInfo().getRegionName();
1386   }
1387 
1388   /** @return region name as string for logging */
1389   public String getRegionNameAsString() {
1390     return this.getRegionInfo().getRegionNameAsString();
1391   }
1392 
1393   /** @return HTableDescriptor for this region */
1394   public HTableDescriptor getTableDesc() {
1395     return this.htableDescriptor;
1396   }
1397 
1398   /** @return HLog in use for this region */
1399   public HLog getLog() {
1400     return this.log;
1401   }
1402 
1403   /**
1404    * A split takes the config from the parent region & passes it to the daughter
1405    * region's constructor. If 'conf' was passed, you would end up using the HTD
1406    * of the parent region in addition to the new daughter HTD. Pass 'baseConf'
1407    * to the daughter regions to avoid this tricky dedupe problem.
1408    * @return Configuration object
1409    */
1410   Configuration getBaseConf() {
1411     return this.baseConf;
1412   }
1413 
1414   /** @return {@link FileSystem} being used by this region */
1415   public FileSystem getFilesystem() {
1416     return fs.getFileSystem();
1417   }
1418 
1419   /** @return the {@link HRegionFileSystem} used by this region */
1420   public HRegionFileSystem getRegionFileSystem() {
1421     return this.fs;
1422   }
1423 
1424   /** @return the last time the region was flushed */
1425   public long getLastFlushTime() {
1426     return this.lastFlushTime;
1427   }
1428 
1429   //////////////////////////////////////////////////////////////////////////////
1430   // HRegion maintenance.
1431   //
1432   // These methods are meant to be called periodically by the HRegionServer for
1433   // upkeep.
1434   //////////////////////////////////////////////////////////////////////////////
1435 
1436   /** @return returns size of largest HStore. */
1437   public long getLargestHStoreSize() {
1438     long size = 0;
1439     for (Store h : stores.values()) {
1440       long storeSize = h.getSize();
1441       if (storeSize > size) {
1442         size = storeSize;
1443       }
1444     }
1445     return size;
1446   }
1447 
1448   /**
1449    * @return KeyValue Comparator
1450    */
1451   public KeyValue.KVComparator getComparator() {
1452     return this.comparator;
1453   }
1454 
1455   /*
1456    * Do preparation for pending compaction.
1457    * @throws IOException
1458    */
1459   protected void doRegionCompactionPrep() throws IOException {
1460   }
1461 
1462   void triggerMajorCompaction() {
1463     for (Store h : stores.values()) {
1464       h.triggerMajorCompaction();
1465     }
1466   }
1467 
1468   /**
1469    * This is a helper function that compact all the stores synchronously
1470    * It is used by utilities and testing
1471    *
1472    * @param majorCompaction True to force a major compaction regardless of thresholds
1473    * @throws IOException e
1474    */
1475   public void compactStores(final boolean majorCompaction)
1476   throws IOException {
1477     if (majorCompaction) {
1478       this.triggerMajorCompaction();
1479     }
1480     compactStores();
1481   }
1482 
1483   /**
1484    * This is a helper function that compact all the stores synchronously
1485    * It is used by utilities and testing
1486    *
1487    * @throws IOException e
1488    */
1489   public void compactStores() throws IOException {
1490     for (Store s : getStores().values()) {
1491       CompactionContext compaction = s.requestCompaction();
1492       if (compaction != null) {
1493         compact(compaction, s);
1494       }
1495     }
1496   }
1497 
1498   /*
1499    * Called by compaction thread and after region is opened to compact the
1500    * HStores if necessary.
1501    *
1502    * <p>This operation could block for a long time, so don't call it from a
1503    * time-sensitive thread.
1504    *
1505    * Note that no locking is necessary at this level because compaction only
1506    * conflicts with a region split, and that cannot happen because the region
1507    * server does them sequentially and not in parallel.
1508    *
1509    * @param cr Compaction details, obtained by requestCompaction()
1510    * @return whether the compaction completed
1511    * @throws IOException e
1512    */
1513   public boolean compact(CompactionContext compaction, Store store) throws IOException {
1514     assert compaction != null && compaction.hasSelection();
1515     assert !compaction.getRequest().getFiles().isEmpty();
1516     if (this.closing.get() || this.closed.get()) {
1517       LOG.debug("Skipping compaction on " + this + " because closing/closed");
1518       store.cancelRequestedCompaction(compaction);
1519       return false;
1520     }
1521     MonitoredTask status = null;
1522     boolean didPerformCompaction = false;
1523     // block waiting for the lock for compaction
1524     lock.readLock().lock();
1525     try {
1526       byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1527       if (stores.get(cf) != store) {
1528         LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1529             + " has been re-instantiated, cancel this compaction request. "
1530             + " It may be caused by the roll back of split transaction");
1531         return false;
1532       }
1533 
1534       status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1535       if (this.closed.get()) {
1536         String msg = "Skipping compaction on " + this + " because closed";
1537         LOG.debug(msg);
1538         status.abort(msg);
1539         return false;
1540       }
1541       boolean wasStateSet = false;
1542       try {
1543         synchronized (writestate) {
1544           if (writestate.writesEnabled) {
1545             wasStateSet = true;
1546             ++writestate.compacting;
1547           } else {
1548             String msg = "NOT compacting region " + this + ". Writes disabled.";
1549             LOG.info(msg);
1550             status.abort(msg);
1551             return false;
1552           }
1553         }
1554         LOG.info("Starting compaction on " + store + " in region " + this
1555             + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1556         doRegionCompactionPrep();
1557         try {
1558           status.setStatus("Compacting store " + store);
1559           didPerformCompaction = true;
1560           store.compact(compaction);
1561         } catch (InterruptedIOException iioe) {
1562           String msg = "compaction interrupted";
1563           LOG.info(msg, iioe);
1564           status.abort(msg);
1565           return false;
1566         }
1567       } finally {
1568         if (wasStateSet) {
1569           synchronized (writestate) {
1570             --writestate.compacting;
1571             if (writestate.compacting <= 0) {
1572               writestate.notifyAll();
1573             }
1574           }
1575         }
1576       }
1577       status.markComplete("Compaction complete");
1578       return true;
1579     } finally {
1580       try {
1581         if (!didPerformCompaction) store.cancelRequestedCompaction(compaction);
1582         if (status != null) status.cleanup();
1583       } finally {
1584         lock.readLock().unlock();
1585       }
1586     }
1587   }
1588 
1589   /**
1590    * Flush the cache.
1591    *
1592    * When this method is called the cache will be flushed unless:
1593    * <ol>
1594    *   <li>the cache is empty</li>
1595    *   <li>the region is closed.</li>
1596    *   <li>a flush is already in progress</li>
1597    *   <li>writes are disabled</li>
1598    * </ol>
1599    *
1600    * <p>This method may block for some time, so it should not be called from a
1601    * time-sensitive thread.
1602    *
1603    * @return true if the region needs compacting
1604    *
1605    * @throws IOException general io exceptions
1606    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1607    * because a Snapshot was not properly persisted.
1608    */
1609   public FlushResult flushcache() throws IOException {
1610     // fail-fast instead of waiting on the lock
1611     if (this.closing.get()) {
1612       String msg = "Skipping flush on " + this + " because closing";
1613       LOG.debug(msg);
1614       return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1615     }
1616     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1617     status.setStatus("Acquiring readlock on region");
1618     // block waiting for the lock for flushing cache
1619     lock.readLock().lock();
1620     try {
1621       if (this.closed.get()) {
1622         String msg = "Skipping flush on " + this + " because closed";
1623         LOG.debug(msg);
1624         status.abort(msg);
1625         return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1626       }
1627       if (coprocessorHost != null) {
1628         status.setStatus("Running coprocessor pre-flush hooks");
1629         coprocessorHost.preFlush();
1630       }
1631       if (numMutationsWithoutWAL.get() > 0) {
1632         numMutationsWithoutWAL.set(0);
1633         dataInMemoryWithoutWAL.set(0);
1634       }
1635       synchronized (writestate) {
1636         if (!writestate.flushing && writestate.writesEnabled) {
1637           this.writestate.flushing = true;
1638         } else {
1639           if (LOG.isDebugEnabled()) {
1640             LOG.debug("NOT flushing memstore for region " + this
1641                 + ", flushing=" + writestate.flushing + ", writesEnabled="
1642                 + writestate.writesEnabled);
1643           }
1644           String msg = "Not flushing since "
1645               + (writestate.flushing ? "already flushing"
1646               : "writes not enabled");
1647           status.abort(msg);
1648           return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1649         }
1650       }
1651       try {
1652         FlushResult fs = internalFlushcache(status);
1653 
1654         if (coprocessorHost != null) {
1655           status.setStatus("Running post-flush coprocessor hooks");
1656           coprocessorHost.postFlush();
1657         }
1658 
1659         status.markComplete("Flush successful");
1660         return fs;
1661       } finally {
1662         synchronized (writestate) {
1663           writestate.flushing = false;
1664           this.writestate.flushRequested = false;
1665           writestate.notifyAll();
1666         }
1667       }
1668     } finally {
1669       lock.readLock().unlock();
1670       status.cleanup();
1671     }
1672   }
1673 
1674   /**
1675    * Should the memstore be flushed now
1676    */
1677   boolean shouldFlush() {
1678     // This is a rough measure.
1679     if (this.lastFlushSeqId > 0
1680           && (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get())) {
1681       return true;
1682     }
1683     if (flushCheckInterval <= 0) { //disabled
1684       return false;
1685     }
1686     long now = EnvironmentEdgeManager.currentTime();
1687     //if we flushed in the recent past, we don't need to do again now
1688     if ((now - getLastFlushTime() < flushCheckInterval)) {
1689       return false;
1690     }
1691     //since we didn't flush in the recent past, flush now if certain conditions
1692     //are met. Return true on first such memstore hit.
1693     for (Store s : this.getStores().values()) {
1694       if (s.timeOfOldestEdit() < now - flushCheckInterval) {
1695         // we have an old enough edit in the memstore, flush
1696         return true;
1697       }
1698     }
1699     return false;
1700   }
1701 
1702   /**
1703    * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
1704    * memstore, all of which have also been written to the log. We need to write those updates in the
1705    * memstore out to disk, while being able to process reads/writes as much as possible during the
1706    * flush operation.
1707    * <p>This method may block for some time.  Every time you call it, we up the regions
1708    * sequence id even if we don't flush; i.e. the returned region id will be at least one larger
1709    * than the last edit applied to this region. The returned id does not refer to an actual edit.
1710    * The returned id can be used for say installing a bulk loaded file just ahead of the last hfile
1711    * that was the result of this flush, etc.
1712    * @return object describing the flush's state
1713    *
1714    * @throws IOException general io exceptions
1715    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1716    * because a Snapshot was not properly persisted.
1717    */
1718   protected FlushResult internalFlushcache(MonitoredTask status)
1719       throws IOException {
1720     return internalFlushcache(this.log, -1, status);
1721   }
1722 
1723   /**
1724    * @param wal Null if we're NOT to go via hlog/wal.
1725    * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
1726    * @return object describing the flush's state
1727    * @throws IOException
1728    * @see #internalFlushcache(MonitoredTask)
1729    */
1730   protected FlushResult internalFlushcache(
1731       final HLog wal, final long myseqid, MonitoredTask status)
1732   throws IOException {
1733     if (this.rsServices != null && this.rsServices.isAborted()) {
1734       // Don't flush when server aborting, it's unsafe
1735       throw new IOException("Aborting flush because server is aborted...");
1736     }
1737     final long startTime = EnvironmentEdgeManager.currentTime();
1738     // If nothing to flush, return, but we need to safely update the region sequence id
1739     if (this.memstoreSize.get() <= 0) {
1740       // Take an update lock because am about to change the sequence id and we want the sequence id
1741       // to be at the border of the empty memstore.
1742       MultiVersionConsistencyControl.WriteEntry w = null;
1743       this.updatesLock.writeLock().lock();
1744       try {
1745         if (this.memstoreSize.get() <= 0) {
1746           // Presume that if there are still no edits in the memstore, then there are no edits for
1747           // this region out in the WAL/HLog subsystem so no need to do any trickery clearing out
1748           // edits in the WAL system. Up the sequence number so the resulting flush id is for
1749           // sure just beyond the last appended region edit (useful as a marker when bulk loading,
1750           // etc.)
1751           // wal can be null replaying edits.
1752           if (wal != null) {
1753             w = mvcc.beginMemstoreInsert();
1754             long flushSeqId = getNextSequenceId(wal);
1755             FlushResult flushResult = new FlushResult(
1756               FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush");
1757             w.setWriteNumber(flushSeqId);
1758             mvcc.waitForPreviousTransactionsComplete(w);
1759             w = null;
1760             return flushResult;
1761           } else {
1762             return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
1763               "Nothing to flush");
1764           }
1765         }
1766       } finally {
1767         this.updatesLock.writeLock().unlock();
1768         if (w != null) {
1769           mvcc.advanceMemstore(w);
1770         }
1771       }
1772     }
1773 
1774     LOG.info("Started memstore flush for " + this +
1775       ", current region memstore size " +
1776       StringUtils.byteDesc(this.memstoreSize.get()) +
1777       ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
1778 
1779     // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
1780     // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
1781     // allow updates again so its value will represent the size of the updates received
1782     // during flush
1783     MultiVersionConsistencyControl.WriteEntry w = null;
1784 
1785     // We have to take an update lock during snapshot, or else a write could end up in both snapshot
1786     // and memstore (makes it difficult to do atomic rows then)
1787     status.setStatus("Obtaining lock to block concurrent updates");
1788     // block waiting for the lock for internal flush
1789     this.updatesLock.writeLock().lock();
1790     long totalFlushableSize = 0;
1791     status.setStatus("Preparing to flush by snapshotting stores in " +
1792       getRegionInfo().getEncodedName());
1793     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
1794     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
1795         Bytes.BYTES_COMPARATOR);
1796     long flushSeqId = -1L;
1797 
1798     long trxId = 0;
1799     try {
1800       try {
1801         w = mvcc.beginMemstoreInsert();
1802         if (wal != null) {
1803           if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
1804             // This should never happen.
1805             String msg = "Flush will not be started for ["
1806                 + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
1807             status.setStatus(msg);
1808             return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1809           }
1810           // Get a sequence id that we can use to denote the flush. It will be one beyond the last
1811           // edit that made it into the hfile (the below does not add an edit, it just asks the
1812           // WAL system to return next sequence edit).
1813           flushSeqId = getNextSequenceId(wal);
1814         } else {
1815           // use the provided sequence Id as WAL is not being used for this flush.
1816           flushSeqId = myseqid;
1817         }
1818 
1819         for (Store s : stores.values()) {
1820           totalFlushableSize += s.getFlushableSize();
1821           storeFlushCtxs.add(s.createFlushContext(flushSeqId));
1822           committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
1823         }
1824 
1825         // write the snapshot start to WAL
1826         if (wal != null) {
1827           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
1828             getRegionInfo(), flushSeqId, committedFiles);
1829           trxId = HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1830             desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock
1831         }
1832 
1833         // Prepare flush (take a snapshot)
1834         for (StoreFlushContext flush : storeFlushCtxs) {
1835           flush.prepare();
1836         }
1837       } catch (IOException ex) {
1838         if (wal != null) {
1839           if (trxId > 0) { // check whether we have already written START_FLUSH to WAL
1840             try {
1841               FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
1842                 getRegionInfo(), flushSeqId, committedFiles);
1843               HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1844                 desc, sequenceId, false);
1845             } catch (Throwable t) {
1846               LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
1847                   StringUtils.stringifyException(t));
1848               // ignore this since we will be aborting the RS with DSE.
1849             }
1850           }
1851           // we have called wal.startCacheFlush(), now we have to abort it
1852           wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1853           throw ex; // let upper layers deal with it.
1854         }
1855       } finally {
1856         this.updatesLock.writeLock().unlock();
1857       }
1858       String s = "Finished memstore snapshotting " + this +
1859         ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
1860       status.setStatus(s);
1861       if (LOG.isTraceEnabled()) LOG.trace(s);
1862       // sync unflushed WAL changes
1863       // see HBASE-8208 for details
1864       if (wal != null) {
1865         try {
1866           wal.sync(); // ensure that flush marker is sync'ed
1867         } catch (IOException ioe) {
1868           LOG.warn("Unexpected exception while log.sync(), ignoring. Exception: "
1869               + StringUtils.stringifyException(ioe));
1870         }
1871       }
1872 
1873       // wait for all in-progress transactions to commit to HLog before
1874       // we can start the flush. This prevents
1875       // uncommitted transactions from being written into HFiles.
1876       // We have to block before we start the flush, otherwise keys that
1877       // were removed via a rollbackMemstore could be written to Hfiles.
1878       w.setWriteNumber(flushSeqId);
1879       mvcc.waitForPreviousTransactionsComplete(w);
1880       // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
1881       w = null;
1882       s = "Flushing stores of " + this;
1883       status.setStatus(s);
1884       if (LOG.isTraceEnabled()) LOG.trace(s);
1885     } finally {
1886       if (w != null) {
1887         // in case of failure just mark current w as complete
1888         mvcc.advanceMemstore(w);
1889       }
1890     }
1891 
1892     // Any failure from here on out will be catastrophic requiring server
1893     // restart so hlog content can be replayed and put back into the memstore.
1894     // Otherwise, the snapshot content while backed up in the hlog, it will not
1895     // be part of the current running servers state.
1896     boolean compactionRequested = false;
1897     try {
1898       // A.  Flush memstore to all the HStores.
1899       // Keep running vector of all store files that includes both old and the
1900       // just-made new flush store file. The new flushed file is still in the
1901       // tmp directory.
1902 
1903       for (StoreFlushContext flush : storeFlushCtxs) {
1904         flush.flushCache(status);
1905       }
1906 
1907       // Switch snapshot (in memstore) -> new hfile (thus causing
1908       // all the store scanners to reset/reseek).
1909       Iterator<Store> it = stores.values().iterator(); // stores.values() and storeFlushCtxs have
1910       // same order
1911       for (StoreFlushContext flush : storeFlushCtxs) {
1912         boolean needsCompaction = flush.commit(status);
1913         if (needsCompaction) {
1914           compactionRequested = true;
1915         }
1916         committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles());
1917       }
1918       storeFlushCtxs.clear();
1919 
1920       // Set down the memstore size by amount of flush.
1921       this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
1922 
1923       if (wal != null) {
1924         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
1925         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
1926           getRegionInfo(), flushSeqId, committedFiles);
1927         HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1928           desc, sequenceId, true);
1929       }
1930     } catch (Throwable t) {
1931       // An exception here means that the snapshot was not persisted.
1932       // The hlog needs to be replayed so its content is restored to memstore.
1933       // Currently, only a server restart will do this.
1934       // We used to only catch IOEs but its possible that we'd get other
1935       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
1936       // all and sundry.
1937       if (wal != null) {
1938         try {
1939           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
1940             getRegionInfo(), flushSeqId, committedFiles);
1941           HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1942             desc, sequenceId, false);
1943         } catch (Throwable ex) {
1944           LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
1945               StringUtils.stringifyException(ex));
1946           // ignore this since we will be aborting the RS with DSE.
1947         }
1948         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1949       }
1950       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
1951           Bytes.toStringBinary(getRegionName()));
1952       dse.initCause(t);
1953       status.abort("Flush failed: " + StringUtils.stringifyException(t));
1954       throw dse;
1955     }
1956 
1957     // If we get to here, the HStores have been written.
1958     if (wal != null) {
1959       wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1960     }
1961 
1962     // Record latest flush time
1963     this.lastFlushTime = EnvironmentEdgeManager.currentTime();
1964 
1965     // Update the last flushed sequence id for region. TODO: This is dup'd inside the WAL/FSHlog.
1966     this.lastFlushSeqId = flushSeqId;
1967 
1968     // C. Finally notify anyone waiting on memstore to clear:
1969     // e.g. checkResources().
1970     synchronized (this) {
1971       notifyAll(); // FindBugs NN_NAKED_NOTIFY
1972     }
1973 
1974     long time = EnvironmentEdgeManager.currentTime() - startTime;
1975     long memstoresize = this.memstoreSize.get();
1976     String msg = "Finished memstore flush of ~" +
1977       StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize +
1978       ", currentsize=" +
1979       StringUtils.byteDesc(memstoresize) + "/" + memstoresize +
1980       " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
1981       ", compaction requested=" + compactionRequested +
1982       ((wal == null)? "; wal=null": "");
1983     LOG.info(msg);
1984     status.setStatus(msg);
1985 
1986     return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
1987         FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
1988   }
1989 
1990   /**
1991    * Method to safely get the next sequence number.
1992    * @return Next sequence number unassociated with any actual edit.
1993    * @throws IOException
1994    */
1995   private long getNextSequenceId(final HLog wal) throws IOException {
1996     HLogKey key = this.appendNoSyncNoAppend(wal, null);
1997     return key.getSequenceId();
1998   }
1999 
2000   //////////////////////////////////////////////////////////////////////////////
2001   // get() methods for client use.
2002   //////////////////////////////////////////////////////////////////////////////
2003   /**
2004    * Return all the data for the row that matches <i>row</i> exactly,
2005    * or the one that immediately preceeds it, at or immediately before
2006    * <i>ts</i>.
2007    *
2008    * @param row row key
2009    * @return map of values
2010    * @throws IOException
2011    */
2012   Result getClosestRowBefore(final byte [] row)
2013   throws IOException{
2014     return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
2015   }
2016 
2017   /**
2018    * Return all the data for the row that matches <i>row</i> exactly,
2019    * or the one that immediately precedes it, at or immediately before
2020    * <i>ts</i>.
2021    *
2022    * @param row row key
2023    * @param family column family to find on
2024    * @return map of values
2025    * @throws IOException read exceptions
2026    */
2027   public Result getClosestRowBefore(final byte [] row, final byte [] family)
2028   throws IOException {
2029     if (coprocessorHost != null) {
2030       Result result = new Result();
2031       if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
2032         return result;
2033       }
2034     }
2035     // look across all the HStores for this region and determine what the
2036     // closest key is across all column families, since the data may be sparse
2037     checkRow(row, "getClosestRowBefore");
2038     startRegionOperation(Operation.GET);
2039     this.readRequestsCount.increment();
2040     try {
2041       Store store = getStore(family);
2042       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
2043       KeyValue key = store.getRowKeyAtOrBefore(row);
2044       Result result = null;
2045       if (key != null) {
2046         Get get = new Get(CellUtil.cloneRow(key));
2047         get.addFamily(family);
2048         result = get(get);
2049       }
2050       if (coprocessorHost != null) {
2051         coprocessorHost.postGetClosestRowBefore(row, family, result);
2052       }
2053       return result;
2054     } finally {
2055       closeRegionOperation(Operation.GET);
2056     }
2057   }
2058 
2059   /**
2060    * Return an iterator that scans over the HRegion, returning the indicated
2061    * columns and rows specified by the {@link Scan}.
2062    * <p>
2063    * This Iterator must be closed by the caller.
2064    *
2065    * @param scan configured {@link Scan}
2066    * @return RegionScanner
2067    * @throws IOException read exceptions
2068    */
2069   public RegionScanner getScanner(Scan scan) throws IOException {
2070    return getScanner(scan, null);
2071   }
2072 
2073   void prepareScanner(Scan scan) throws IOException {
2074     if(!scan.hasFamilies()) {
2075       // Adding all families to scanner
2076       for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
2077         scan.addFamily(family);
2078       }
2079     }
2080   }
2081 
2082   protected RegionScanner getScanner(Scan scan,
2083       List<KeyValueScanner> additionalScanners) throws IOException {
2084     startRegionOperation(Operation.SCAN);
2085     try {
2086       // Verify families are all valid
2087       prepareScanner(scan);
2088       if(scan.hasFamilies()) {
2089         for(byte [] family : scan.getFamilyMap().keySet()) {
2090           checkFamily(family);
2091         }
2092       }
2093       return instantiateRegionScanner(scan, additionalScanners);
2094     } finally {
2095       closeRegionOperation(Operation.SCAN);
2096     }
2097   }
2098 
2099   protected RegionScanner instantiateRegionScanner(Scan scan,
2100       List<KeyValueScanner> additionalScanners) throws IOException {
2101     if (scan.isReversed()) {
2102       if (scan.getFilter() != null) {
2103         scan.getFilter().setReversed(true);
2104       }
2105       return new ReversedRegionScannerImpl(scan, additionalScanners, this);
2106     }
2107     return new RegionScannerImpl(scan, additionalScanners, this);
2108   }
2109 
2110   /*
2111    * @param delete The passed delete is modified by this method. WARNING!
2112    */
2113   void prepareDelete(Delete delete) throws IOException {
2114     // Check to see if this is a deleteRow insert
2115     if(delete.getFamilyCellMap().isEmpty()){
2116       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
2117         // Don't eat the timestamp
2118         delete.deleteFamily(family, delete.getTimeStamp());
2119       }
2120     } else {
2121       for(byte [] family : delete.getFamilyCellMap().keySet()) {
2122         if(family == null) {
2123           throw new NoSuchColumnFamilyException("Empty family is invalid");
2124         }
2125         checkFamily(family);
2126       }
2127     }
2128   }
2129 
2130   //////////////////////////////////////////////////////////////////////////////
2131   // set() methods for client use.
2132   //////////////////////////////////////////////////////////////////////////////
2133   /**
2134    * @param delete delete object
2135    * @throws IOException read exceptions
2136    */
2137   public void delete(Delete delete)
2138   throws IOException {
2139     checkReadOnly();
2140     checkResources();
2141     startRegionOperation(Operation.DELETE);
2142     try {
2143       delete.getRow();
2144       // All edits for the given row (across all column families) must happen atomically.
2145       doBatchMutate(delete);
2146     } finally {
2147       closeRegionOperation(Operation.DELETE);
2148     }
2149   }
2150 
2151   /**
2152    * Row needed by below method.
2153    */
2154   private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2155   /**
2156    * This is used only by unit tests. Not required to be a public API.
2157    * @param familyMap map of family to edits for the given family.
2158    * @throws IOException
2159    */
2160   void delete(NavigableMap<byte[], List<Cell>> familyMap,
2161       Durability durability) throws IOException {
2162     Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2163     delete.setFamilyCellMap(familyMap);
2164     delete.setDurability(durability);
2165     doBatchMutate(delete);
2166   }
2167 
2168   /**
2169    * Setup correct timestamps in the KVs in Delete object.
2170    * Caller should have the row and region locks.
2171    * @param mutation
2172    * @param familyMap
2173    * @param byteNow
2174    * @throws IOException
2175    */
2176   void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2177       byte[] byteNow) throws IOException {
2178     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2179 
2180       byte[] family = e.getKey();
2181       List<Cell> cells = e.getValue();
2182       Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2183 
2184       for (Cell cell: cells) {
2185         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2186         //  Check if time is LATEST, change to time of most recent addition if so
2187         //  This is expensive.
2188         if (kv.isLatestTimestamp() && CellUtil.isDeleteType(kv)) {
2189           byte[] qual = CellUtil.cloneQualifier(kv);
2190           if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2191 
2192           Integer count = kvCount.get(qual);
2193           if (count == null) {
2194             kvCount.put(qual, 1);
2195           } else {
2196             kvCount.put(qual, count + 1);
2197           }
2198           count = kvCount.get(qual);
2199 
2200           Get get = new Get(CellUtil.cloneRow(kv));
2201           get.setMaxVersions(count);
2202           get.addColumn(family, qual);
2203           if (coprocessorHost != null) {
2204             if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2205                 byteNow, get)) {
2206               updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
2207             }
2208           } else {
2209             updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
2210           }
2211         } else {
2212           kv.updateLatestStamp(byteNow);
2213         }
2214       }
2215     }
2216   }
2217 
2218   void updateDeleteLatestVersionTimeStamp(KeyValue kv, Get get, int count, byte[] byteNow)
2219       throws IOException {
2220     List<Cell> result = get(get, false);
2221 
2222     if (result.size() < count) {
2223       // Nothing to delete
2224       kv.updateLatestStamp(byteNow);
2225       return;
2226     }
2227     if (result.size() > count) {
2228       throw new RuntimeException("Unexpected size: " + result.size());
2229     }
2230     KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
2231     Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), getkv.getBuffer(),
2232         getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
2233   }
2234 
2235   /**
2236    * @throws IOException
2237    */
2238   public void put(Put put)
2239   throws IOException {
2240     checkReadOnly();
2241 
2242     // Do a rough check that we have resources to accept a write.  The check is
2243     // 'rough' in that between the resource check and the call to obtain a
2244     // read lock, resources may run out.  For now, the thought is that this
2245     // will be extremely rare; we'll deal with it when it happens.
2246     checkResources();
2247     startRegionOperation(Operation.PUT);
2248     try {
2249       // All edits for the given row (across all column families) must happen atomically.
2250       doBatchMutate(put);
2251     } finally {
2252       closeRegionOperation(Operation.PUT);
2253     }
2254   }
2255 
2256   /**
2257    * Struct-like class that tracks the progress of a batch operation,
2258    * accumulating status codes and tracking the index at which processing
2259    * is proceeding.
2260    */
2261   private abstract static class BatchOperationInProgress<T> {
2262     T[] operations;
2263     int nextIndexToProcess = 0;
2264     OperationStatus[] retCodeDetails;
2265     WALEdit[] walEditsFromCoprocessors;
2266 
2267     public BatchOperationInProgress(T[] operations) {
2268       this.operations = operations;
2269       this.retCodeDetails = new OperationStatus[operations.length];
2270       this.walEditsFromCoprocessors = new WALEdit[operations.length];
2271       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2272     }
2273 
2274     public abstract Mutation getMutation(int index);
2275     public abstract long getNonceGroup(int index);
2276     public abstract long getNonce(int index);
2277     /** This method is potentially expensive and should only be used for non-replay CP path. */
2278     public abstract Mutation[] getMutationsForCoprocs();
2279     public abstract boolean isInReplay();
2280     public abstract long getReplaySequenceId();
2281 
2282     public boolean isDone() {
2283       return nextIndexToProcess == operations.length;
2284     }
2285   }
2286 
2287   private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2288     private long nonceGroup;
2289     private long nonce;
2290     public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2291       super(operations);
2292       this.nonceGroup = nonceGroup;
2293       this.nonce = nonce;
2294     }
2295 
2296     @Override
2297     public Mutation getMutation(int index) {
2298       return this.operations[index];
2299     }
2300 
2301     @Override
2302     public long getNonceGroup(int index) {
2303       return nonceGroup;
2304     }
2305 
2306     @Override
2307     public long getNonce(int index) {
2308       return nonce;
2309     }
2310 
2311     @Override
2312     public Mutation[] getMutationsForCoprocs() {
2313       return this.operations;
2314     }
2315 
2316     @Override
2317     public boolean isInReplay() {
2318       return false;
2319     }
2320 
2321     @Override
2322     public long getReplaySequenceId() {
2323       return 0;
2324     }
2325   }
2326 
2327   private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
2328     private long replaySeqId = 0;
2329     public ReplayBatch(MutationReplay[] operations, long seqId) {
2330       super(operations);
2331       this.replaySeqId = seqId;
2332     }
2333 
2334     @Override
2335     public Mutation getMutation(int index) {
2336       return this.operations[index].mutation;
2337     }
2338 
2339     @Override
2340     public long getNonceGroup(int index) {
2341       return this.operations[index].nonceGroup;
2342     }
2343 
2344     @Override
2345     public long getNonce(int index) {
2346       return this.operations[index].nonce;
2347     }
2348 
2349     @Override
2350     public Mutation[] getMutationsForCoprocs() {
2351       assert false;
2352       throw new RuntimeException("Should not be called for replay batch");
2353     }
2354 
2355     @Override
2356     public boolean isInReplay() {
2357       return true;
2358     }
2359 
2360     @Override
2361     public long getReplaySequenceId() {
2362       return this.replaySeqId;
2363     }
2364   }
2365 
2366   /**
2367    * Perform a batch of mutations.
2368    * It supports only Put and Delete mutations and will ignore other types passed.
2369    * @param mutations the list of mutations
2370    * @return an array of OperationStatus which internally contains the
2371    *         OperationStatusCode and the exceptionMessage if any.
2372    * @throws IOException
2373    */
2374   public OperationStatus[] batchMutate(
2375       Mutation[] mutations, long nonceGroup, long nonce) throws IOException {
2376     // As it stands, this is used for 3 things
2377     //  * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
2378     //  * coprocessor calls (see ex. BulkDeleteEndpoint).
2379     // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
2380     return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2381   }
2382 
2383   public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2384     return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2385   }
2386 
2387   /**
2388    * Replay a batch of mutations.
2389    * @param mutations mutations to replay.
2390    * @param replaySeqId SeqId for current mutations
2391    * @return an array of OperationStatus which internally contains the
2392    *         OperationStatusCode and the exceptionMessage if any.
2393    * @throws IOException
2394    */
2395   public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations, long replaySeqId)
2396       throws IOException {
2397     return batchMutate(new ReplayBatch(mutations, replaySeqId));
2398   }
2399 
2400   /**
2401    * Perform a batch of mutations.
2402    * It supports only Put and Delete mutations and will ignore other types passed.
2403    * @param mutations the list of mutations
2404    * @return an array of OperationStatus which internally contains the
2405    *         OperationStatusCode and the exceptionMessage if any.
2406    * @throws IOException
2407    */
2408   OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2409     boolean initialized = false;
2410     while (!batchOp.isDone()) {
2411       if (!batchOp.isInReplay()) {
2412         checkReadOnly();
2413       }
2414       checkResources();
2415 
2416       long newSize;
2417       Operation op = Operation.BATCH_MUTATE;
2418       if (batchOp.isInReplay()) op = Operation.REPLAY_BATCH_MUTATE;
2419       startRegionOperation(op);
2420 
2421       try {
2422         if (!initialized) {
2423           this.writeRequestsCount.add(batchOp.operations.length);
2424           if (!batchOp.isInReplay()) {
2425             doPreMutationHook(batchOp);
2426           }
2427           initialized = true;
2428         }
2429         long addedSize = doMiniBatchMutation(batchOp);
2430         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2431       } finally {
2432         closeRegionOperation(op);
2433       }
2434       if (isFlushSize(newSize)) {
2435         requestFlush();
2436       }
2437     }
2438     return batchOp.retCodeDetails;
2439   }
2440 
2441 
2442   private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2443       throws IOException {
2444     /* Run coprocessor pre hook outside of locks to avoid deadlock */
2445     WALEdit walEdit = new WALEdit();
2446     if (coprocessorHost != null) {
2447       for (int i = 0 ; i < batchOp.operations.length; i++) {
2448         Mutation m = batchOp.getMutation(i);
2449         if (m instanceof Put) {
2450           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2451             // pre hook says skip this Put
2452             // mark as success and skip in doMiniBatchMutation
2453             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2454           }
2455         } else if (m instanceof Delete) {
2456           Delete curDel = (Delete) m;
2457           if (curDel.getFamilyCellMap().isEmpty()) {
2458             // handle deleting a row case
2459             prepareDelete(curDel);
2460           }
2461           if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2462             // pre hook says skip this Delete
2463             // mark as success and skip in doMiniBatchMutation
2464             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2465           }
2466         } else {
2467           // In case of passing Append mutations along with the Puts and Deletes in batchMutate
2468           // mark the operation return code as failure so that it will not be considered in
2469           // the doMiniBatchMutation
2470           batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2471               "Put/Delete mutations only supported in batchMutate() now");
2472         }
2473         if (!walEdit.isEmpty()) {
2474           batchOp.walEditsFromCoprocessors[i] = walEdit;
2475           walEdit = new WALEdit();
2476         }
2477       }
2478     }
2479   }
2480 
2481   @SuppressWarnings("unchecked")
2482   private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2483     boolean isInReplay = batchOp.isInReplay();
2484     // variable to note if all Put items are for the same CF -- metrics related
2485     boolean putsCfSetConsistent = true;
2486     //The set of columnFamilies first seen for Put.
2487     Set<byte[]> putsCfSet = null;
2488     // variable to note if all Delete items are for the same CF -- metrics related
2489     boolean deletesCfSetConsistent = true;
2490     //The set of columnFamilies first seen for Delete.
2491     Set<byte[]> deletesCfSet = null;
2492 
2493     long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2494     WALEdit walEdit = new WALEdit(isInReplay);
2495     MultiVersionConsistencyControl.WriteEntry w = null;
2496     long txid = 0;
2497     boolean doRollBackMemstore = false;
2498     boolean locked = false;
2499 
2500     /** Keep track of the locks we hold so we can release them in finally clause */
2501     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2502     // reference family maps directly so coprocessors can mutate them if desired
2503     Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2504     List<Cell> memstoreCells = new ArrayList<Cell>();
2505     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
2506     int firstIndex = batchOp.nextIndexToProcess;
2507     int lastIndexExclusive = firstIndex;
2508     boolean success = false;
2509     int noOfPuts = 0, noOfDeletes = 0;
2510     HLogKey walKey = null;
2511     long mvccNum = 0;
2512     try {
2513       // ------------------------------------
2514       // STEP 1. Try to acquire as many locks as we can, and ensure
2515       // we acquire at least one.
2516       // ----------------------------------
2517       int numReadyToWrite = 0;
2518       long now = EnvironmentEdgeManager.currentTime();
2519       while (lastIndexExclusive < batchOp.operations.length) {
2520         Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2521         boolean isPutMutation = mutation instanceof Put;
2522 
2523         Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2524         // store the family map reference to allow for mutations
2525         familyMaps[lastIndexExclusive] = familyMap;
2526 
2527         // skip anything that "ran" already
2528         if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2529             != OperationStatusCode.NOT_RUN) {
2530           lastIndexExclusive++;
2531           continue;
2532         }
2533 
2534         try {
2535           if (isPutMutation) {
2536             // Check the families in the put. If bad, skip this one.
2537             if (isInReplay) {
2538               removeNonExistentColumnFamilyForReplay(familyMap);
2539             } else {
2540               checkFamilies(familyMap.keySet());
2541             }
2542             checkTimestamps(mutation.getFamilyCellMap(), now);
2543           } else {
2544             prepareDelete((Delete) mutation);
2545           }
2546         } catch (NoSuchColumnFamilyException nscf) {
2547           LOG.warn("No such column family in batch mutation", nscf);
2548           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2549               OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2550           lastIndexExclusive++;
2551           continue;
2552         } catch (FailedSanityCheckException fsce) {
2553           LOG.warn("Batch Mutation did not pass sanity check", fsce);
2554           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2555               OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2556           lastIndexExclusive++;
2557           continue;
2558         }
2559 
2560         // If we haven't got any rows in our batch, we should block to
2561         // get the next one.
2562         boolean shouldBlock = numReadyToWrite == 0;
2563         RowLock rowLock = null;
2564         try {
2565           rowLock = getRowLock(mutation.getRow(), shouldBlock);
2566         } catch (IOException ioe) {
2567           LOG.warn("Failed getting lock in batch put, row="
2568             + Bytes.toStringBinary(mutation.getRow()), ioe);
2569         }
2570         if (rowLock == null) {
2571           // We failed to grab another lock
2572           assert !shouldBlock : "Should never fail to get lock when blocking";
2573           break; // stop acquiring more rows for this batch
2574         } else {
2575           acquiredRowLocks.add(rowLock);
2576         }
2577 
2578         lastIndexExclusive++;
2579         numReadyToWrite++;
2580 
2581         if (isPutMutation) {
2582           // If Column Families stay consistent through out all of the
2583           // individual puts then metrics can be reported as a mutliput across
2584           // column families in the first put.
2585           if (putsCfSet == null) {
2586             putsCfSet = mutation.getFamilyCellMap().keySet();
2587           } else {
2588             putsCfSetConsistent = putsCfSetConsistent
2589                 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
2590           }
2591         } else {
2592           if (deletesCfSet == null) {
2593             deletesCfSet = mutation.getFamilyCellMap().keySet();
2594           } else {
2595             deletesCfSetConsistent = deletesCfSetConsistent
2596                 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
2597           }
2598         }
2599       }
2600 
2601       // we should record the timestamp only after we have acquired the rowLock,
2602       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
2603       now = EnvironmentEdgeManager.currentTime();
2604       byte[] byteNow = Bytes.toBytes(now);
2605 
2606       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
2607       if (numReadyToWrite <= 0) return 0L;
2608 
2609       // We've now grabbed as many mutations off the list as we can
2610 
2611       // ------------------------------------
2612       // STEP 2. Update any LATEST_TIMESTAMP timestamps
2613       // ----------------------------------
2614       for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
2615         // skip invalid
2616         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2617             != OperationStatusCode.NOT_RUN) continue;
2618 
2619         Mutation mutation = batchOp.getMutation(i);
2620         if (mutation instanceof Put) {
2621           updateKVTimestamps(familyMaps[i].values(), byteNow);
2622           noOfPuts++;
2623         } else {
2624           prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
2625           noOfDeletes++;
2626         }
2627       }
2628 
2629       lock(this.updatesLock.readLock(), numReadyToWrite);
2630       locked = true;
2631       if(isInReplay) {
2632         mvccNum = batchOp.getReplaySequenceId();
2633       } else {
2634         mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
2635       }
2636       //
2637       // ------------------------------------
2638       // Acquire the latest mvcc number
2639       // ----------------------------------
2640       w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
2641 
2642       // calling the pre CP hook for batch mutation
2643       if (!isInReplay && coprocessorHost != null) {
2644         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2645           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2646           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2647         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2648       }
2649 
2650       // ------------------------------------
2651       // STEP 3. Write back to memstore
2652       // Write to memstore. It is ok to write to memstore
2653       // first without updating the HLog because we do not roll
2654       // forward the memstore MVCC. The MVCC will be moved up when
2655       // the complete operation is done. These changes are not yet
2656       // visible to scanners till we update the MVCC. The MVCC is
2657       // moved only when the sync is complete.
2658       // ----------------------------------
2659       long addedSize = 0;
2660       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2661         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2662             != OperationStatusCode.NOT_RUN) {
2663           continue;
2664         }
2665         doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
2666         addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells);
2667       }
2668 
2669       // ------------------------------------
2670       // STEP 4. Build WAL edit
2671       // ----------------------------------
2672       Durability durability = Durability.USE_DEFAULT;
2673       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2674         // Skip puts that were determined to be invalid during preprocessing
2675         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2676             != OperationStatusCode.NOT_RUN) {
2677           continue;
2678         }
2679         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2680 
2681         Mutation m = batchOp.getMutation(i);
2682         Durability tmpDur = getEffectiveDurability(m.getDurability());
2683         if (tmpDur.ordinal() > durability.ordinal()) {
2684           durability = tmpDur;
2685         }
2686         if (tmpDur == Durability.SKIP_WAL) {
2687           recordMutationWithoutWal(m.getFamilyCellMap());
2688           continue;
2689         }
2690 
2691         long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
2692         // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
2693         // Given how nonces are originally written, these should be contiguous.
2694         // They don't have to be, it will still work, just write more WALEdits than needed.
2695         if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
2696           if (walEdit.size() > 0) {
2697             assert isInReplay;
2698             if (!isInReplay) {
2699               throw new IOException("Multiple nonces per batch and not in replay");
2700             }
2701             // txid should always increase, so having the one from the last call is ok.
2702             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2703               this.htableDescriptor.getTableName(), now, m.getClusterIds(),
2704               currentNonceGroup, currentNonce);
2705             txid = this.log.appendNoSync(this.htableDescriptor,  this.getRegionInfo(),  walKey,
2706               walEdit, getSequenceId(), true, null);
2707             walEdit = new WALEdit(isInReplay);
2708             walKey = null;
2709           }
2710           currentNonceGroup = nonceGroup;
2711           currentNonce = nonce;
2712         }
2713 
2714         // Add WAL edits by CP
2715         WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2716         if (fromCP != null) {
2717           for (Cell cell : fromCP.getCells()) {
2718             walEdit.add(cell);
2719           }
2720         }
2721         addFamilyMapToWALEdit(familyMaps[i], walEdit);
2722       }
2723 
2724       // -------------------------
2725       // STEP 5. Append the final edit to WAL. Do not sync wal.
2726       // -------------------------
2727       Mutation mutation = batchOp.getMutation(firstIndex);
2728       if (walEdit.size() > 0) {
2729         walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2730             this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
2731             mutation.getClusterIds(), currentNonceGroup, currentNonce);
2732         if(isInReplay) {
2733           walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
2734         }
2735         txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
2736           getSequenceId(), true, memstoreCells);
2737       }
2738       if(walKey == null){
2739         // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
2740         walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
2741       }
2742 
2743       // -------------------------------
2744       // STEP 6. Release row locks, etc.
2745       // -------------------------------
2746       if (locked) {
2747         this.updatesLock.readLock().unlock();
2748         locked = false;
2749       }
2750       releaseRowLocks(acquiredRowLocks);
2751 
2752       // -------------------------
2753       // STEP 7. Sync wal.
2754       // -------------------------
2755       if (txid != 0) {
2756         syncOrDefer(txid, durability);
2757       }
2758 
2759       doRollBackMemstore = false;
2760       // calling the post CP hook for batch mutation
2761       if (!isInReplay && coprocessorHost != null) {
2762         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2763           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2764           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2765         coprocessorHost.postBatchMutate(miniBatchOp);
2766       }
2767 
2768       // ------------------------------------------------------------------
2769       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
2770       // ------------------------------------------------------------------
2771       if (w != null) {
2772         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2773         w = null;
2774       }
2775 
2776       // ------------------------------------
2777       // STEP 9. Run coprocessor post hooks. This should be done after the wal is
2778       // synced so that the coprocessor contract is adhered to.
2779       // ------------------------------------
2780       if (!isInReplay && coprocessorHost != null) {
2781         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2782           // only for successful puts
2783           if (batchOp.retCodeDetails[i].getOperationStatusCode()
2784               != OperationStatusCode.SUCCESS) {
2785             continue;
2786           }
2787           Mutation m = batchOp.getMutation(i);
2788           if (m instanceof Put) {
2789             coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
2790           } else {
2791             coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
2792           }
2793         }
2794       }
2795 
2796       success = true;
2797       return addedSize;
2798     } finally {
2799 
2800       // if the wal sync was unsuccessful, remove keys from memstore
2801       if (doRollBackMemstore) {
2802         rollbackMemstore(memstoreCells);
2803       }
2804       if (w != null) {
2805         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2806       }
2807 
2808       if (locked) {
2809         this.updatesLock.readLock().unlock();
2810       }
2811       releaseRowLocks(acquiredRowLocks);
2812 
2813       // See if the column families were consistent through the whole thing.
2814       // if they were then keep them. If they were not then pass a null.
2815       // null will be treated as unknown.
2816       // Total time taken might be involving Puts and Deletes.
2817       // Split the time for puts and deletes based on the total number of Puts and Deletes.
2818 
2819       if (noOfPuts > 0) {
2820         // There were some Puts in the batch.
2821         if (this.metricsRegion != null) {
2822           this.metricsRegion.updatePut();
2823         }
2824       }
2825       if (noOfDeletes > 0) {
2826         // There were some Deletes in the batch.
2827         if (this.metricsRegion != null) {
2828           this.metricsRegion.updateDelete();
2829         }
2830       }
2831       if (!success) {
2832         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2833           if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
2834             batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
2835           }
2836         }
2837       }
2838       if (coprocessorHost != null && !batchOp.isInReplay()) {
2839         // call the coprocessor hook to do any finalization steps
2840         // after the put is done
2841         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2842             new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2843                 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
2844                 lastIndexExclusive);
2845         coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
2846       }
2847 
2848       batchOp.nextIndexToProcess = lastIndexExclusive;
2849     }
2850   }
2851 
2852   /**
2853    * Returns effective durability from the passed durability and
2854    * the table descriptor.
2855    */
2856   protected Durability getEffectiveDurability(Durability d) {
2857     return d == Durability.USE_DEFAULT ? this.durability : d;
2858   }
2859 
2860   //TODO, Think that gets/puts and deletes should be refactored a bit so that
2861   //the getting of the lock happens before, so that you would just pass it into
2862   //the methods. So in the case of checkAndMutate you could just do lockRow,
2863   //get, put, unlockRow or something
2864   /**
2865    *
2866    * @throws IOException
2867    * @return true if the new put was executed, false otherwise
2868    */
2869   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
2870       CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
2871       boolean writeToWAL)
2872   throws IOException{
2873     checkReadOnly();
2874     //TODO, add check for value length or maybe even better move this to the
2875     //client if this becomes a global setting
2876     checkResources();
2877     boolean isPut = w instanceof Put;
2878     if (!isPut && !(w instanceof Delete))
2879       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
2880           "be Put or Delete");
2881     if (!Bytes.equals(row, w.getRow())) {
2882       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
2883           "getRow must match the passed row");
2884     }
2885 
2886     startRegionOperation();
2887     try {
2888       Get get = new Get(row);
2889       checkFamily(family);
2890       get.addColumn(family, qualifier);
2891 
2892       // Lock row - note that doBatchMutate will relock this row if called
2893       RowLock rowLock = getRowLock(get.getRow());
2894       // wait for all previous transactions to complete (with lock held)
2895       mvcc.waitForPreviousTransactionsComplete();
2896       try {
2897         if (this.getCoprocessorHost() != null) {
2898           Boolean processed = null;
2899           if (w instanceof Put) {
2900             processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
2901                 qualifier, compareOp, comparator, (Put) w);
2902           } else if (w instanceof Delete) {
2903             processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
2904                 qualifier, compareOp, comparator, (Delete) w);
2905           }
2906           if (processed != null) {
2907             return processed;
2908           }
2909         }
2910         List<Cell> result = get(get, false);
2911 
2912         boolean valueIsNull = comparator.getValue() == null ||
2913           comparator.getValue().length == 0;
2914         boolean matches = false;
2915         if (result.size() == 0 && valueIsNull) {
2916           matches = true;
2917         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
2918             valueIsNull) {
2919           matches = true;
2920         } else if (result.size() == 1 && !valueIsNull) {
2921           Cell kv = result.get(0);
2922           int compareResult = comparator.compareTo(kv.getValueArray(),
2923               kv.getValueOffset(), kv.getValueLength());
2924           switch (compareOp) {
2925           case LESS:
2926             matches = compareResult < 0;
2927             break;
2928           case LESS_OR_EQUAL:
2929             matches = compareResult <= 0;
2930             break;
2931           case EQUAL:
2932             matches = compareResult == 0;
2933             break;
2934           case NOT_EQUAL:
2935             matches = compareResult != 0;
2936             break;
2937           case GREATER_OR_EQUAL:
2938             matches = compareResult >= 0;
2939             break;
2940           case GREATER:
2941             matches = compareResult > 0;
2942             break;
2943           default:
2944             throw new RuntimeException("Unknown Compare op " + compareOp.name());
2945           }
2946         }
2947         //If matches put the new put or delete the new delete
2948         if (matches) {
2949           // All edits for the given row (across all column families) must
2950           // happen atomically.
2951           doBatchMutate(w);
2952           this.checkAndMutateChecksPassed.increment();
2953           return true;
2954         }
2955         this.checkAndMutateChecksFailed.increment();
2956         return false;
2957       } finally {
2958         rowLock.release();
2959       }
2960     } finally {
2961       closeRegionOperation();
2962     }
2963   }
2964 
2965   private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
2966     // Currently this is only called for puts and deletes, so no nonces.
2967     OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
2968         HConstants.NO_NONCE, HConstants.NO_NONCE);
2969     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
2970       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
2971     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
2972       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
2973     }
2974   }
2975 
2976   /**
2977    * Complete taking the snapshot on the region. Writes the region info and adds references to the
2978    * working snapshot directory.
2979    *
2980    * TODO for api consistency, consider adding another version with no {@link ForeignExceptionSnare}
2981    * arg.  (In the future other cancellable HRegion methods could eventually add a
2982    * {@link ForeignExceptionSnare}, or we could do something fancier).
2983    *
2984    * @param desc snapshot description object
2985    * @param exnSnare ForeignExceptionSnare that captures external exceptions in case we need to
2986    *   bail out.  This is allowed to be null and will just be ignored in that case.
2987    * @throws IOException if there is an external or internal error causing the snapshot to fail
2988    */
2989   public void addRegionToSnapshot(SnapshotDescription desc,
2990       ForeignExceptionSnare exnSnare) throws IOException {
2991     Path rootDir = FSUtils.getRootDir(conf);
2992     Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
2993 
2994     SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
2995                                                         snapshotDir, desc, exnSnare);
2996     manifest.addRegion(this);
2997   }
2998 
2999   /**
3000    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the
3001    * provided current timestamp.
3002    */
3003   void updateKVTimestamps(final Iterable<List<Cell>> keyLists, final byte[] now) {
3004     for (List<Cell> cells: keyLists) {
3005       if (cells == null) continue;
3006       for (Cell cell : cells) {
3007         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
3008         kv.updateLatestStamp(now);
3009       }
3010     }
3011   }
3012 
3013   /*
3014    * Check if resources to support an update.
3015    *
3016    * We throw RegionTooBusyException if above memstore limit
3017    * and expect client to retry using some kind of backoff
3018   */
3019   private void checkResources()
3020     throws RegionTooBusyException {
3021     // If catalog region, do not impose resource constraints or block updates.
3022     if (this.getRegionInfo().isMetaRegion()) return;
3023 
3024     if (this.memstoreSize.get() > this.blockingMemStoreSize) {
3025       requestFlush();
3026       throw new RegionTooBusyException("Above memstore limit, " +
3027           "regionName=" + (this.getRegionInfo() == null ? "unknown" :
3028           this.getRegionInfo().getRegionNameAsString()) +
3029           ", server=" + (this.getRegionServerServices() == null ? "unknown" :
3030           this.getRegionServerServices().getServerName()) +
3031           ", memstoreSize=" + memstoreSize.get() +
3032           ", blockingMemStoreSize=" + blockingMemStoreSize);
3033     }
3034   }
3035 
3036   /**
3037    * @throws IOException Throws exception if region is in read-only mode.
3038    */
3039   protected void checkReadOnly() throws IOException {
3040     if (this.writestate.isReadOnly()) {
3041       throw new IOException("region is read only");
3042     }
3043   }
3044 
3045   protected void checkReadsEnabled() throws IOException {
3046     if (!this.writestate.readsEnabled) {
3047       throw new IOException ("The region's reads are disabled. Cannot serve the request");
3048     }
3049   }
3050 
3051   public void setReadsEnabled(boolean readsEnabled) {
3052     this.writestate.setReadsEnabled(readsEnabled);
3053   }
3054 
3055   /**
3056    * Add updates first to the hlog and then add values to memstore.
3057    * Warning: Assumption is caller has lock on passed in row.
3058    * @param edits Cell updates by column
3059    * @throws IOException
3060    */
3061   private void put(final byte [] row, byte [] family, List<Cell> edits)
3062   throws IOException {
3063     NavigableMap<byte[], List<Cell>> familyMap;
3064     familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
3065 
3066     familyMap.put(family, edits);
3067     Put p = new Put(row);
3068     p.setFamilyCellMap(familyMap);
3069     doBatchMutate(p);
3070   }
3071 
3072   /**
3073    * Atomically apply the given map of family->edits to the memstore.
3074    * This handles the consistency control on its own, but the caller
3075    * should already have locked updatesLock.readLock(). This also does
3076    * <b>not</b> check the families for validity.
3077    *
3078    * @param familyMap Map of kvs per family
3079    * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
3080    *        If null, then this method internally creates a mvcc transaction.
3081    * @param output newly added KVs into memstore
3082    * @return the additional memory usage of the memstore caused by the
3083    * new entries.
3084    * @throws IOException
3085    */
3086   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
3087     long mvccNum, List<Cell> memstoreCells) throws IOException {
3088     long size = 0;
3089 
3090     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3091       byte[] family = e.getKey();
3092       List<Cell> cells = e.getValue();
3093 
3094       Store store = getStore(family);
3095       for (Cell cell: cells) {
3096         CellUtil.setSequenceId(cell, mvccNum);
3097         Pair<Long, Cell> ret = store.add(cell);
3098         size += ret.getFirst();
3099         memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
3100       }
3101     }
3102 
3103      return size;
3104    }
3105 
3106   /**
3107    * Remove all the keys listed in the map from the memstore. This method is
3108    * called when a Put/Delete has updated memstore but subsequently fails to update
3109    * the wal. This method is then invoked to rollback the memstore.
3110    */
3111   private void rollbackMemstore(List<Cell> memstoreCells) {
3112     int kvsRolledback = 0;
3113 
3114     for (Cell cell : memstoreCells) {
3115       byte[] family = cell.getFamily();
3116       Store store = getStore(family);
3117       store.rollback(cell);
3118       kvsRolledback++;
3119     }
3120     LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
3121   }
3122 
3123   /**
3124    * Check the collection of families for validity.
3125    * @throws NoSuchColumnFamilyException if a family does not exist.
3126    */
3127   void checkFamilies(Collection<byte[]> families)
3128   throws NoSuchColumnFamilyException {
3129     for (byte[] family : families) {
3130       checkFamily(family);
3131     }
3132   }
3133 
3134   /**
3135    * During replay, there could exist column families which are removed between region server
3136    * failure and replay
3137    */
3138   private void removeNonExistentColumnFamilyForReplay(
3139       final Map<byte[], List<Cell>> familyMap) {
3140     List<byte[]> nonExistentList = null;
3141     for (byte[] family : familyMap.keySet()) {
3142       if (!this.htableDescriptor.hasFamily(family)) {
3143         if (nonExistentList == null) {
3144           nonExistentList = new ArrayList<byte[]>();
3145         }
3146         nonExistentList.add(family);
3147       }
3148     }
3149     if (nonExistentList != null) {
3150       for (byte[] family : nonExistentList) {
3151         // Perhaps schema was changed between crash and replay
3152         LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3153         familyMap.remove(family);
3154       }
3155     }
3156   }
3157 
3158   void checkTimestamps(final Map<byte[], List<Cell>> familyMap,
3159       long now) throws FailedSanityCheckException {
3160     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3161       return;
3162     }
3163     long maxTs = now + timestampSlop;
3164     for (List<Cell> kvs : familyMap.values()) {
3165       for (Cell cell : kvs) {
3166         // see if the user-side TS is out of range. latest = server-side
3167         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
3168         if (!kv.isLatestTimestamp() && kv.getTimestamp() > maxTs) {
3169           throw new FailedSanityCheckException("Timestamp for KV out of range "
3170               + cell + " (too.new=" + timestampSlop + ")");
3171         }
3172       }
3173     }
3174   }
3175 
3176   /**
3177    * Append the given map of family->edits to a WALEdit data structure.
3178    * This does not write to the HLog itself.
3179    * @param familyMap map of family->edits
3180    * @param walEdit the destination entry to append into
3181    */
3182   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3183       WALEdit walEdit) {
3184     for (List<Cell> edits : familyMap.values()) {
3185       for (Cell cell : edits) {
3186         walEdit.add(cell);
3187       }
3188     }
3189   }
3190 
3191   private void requestFlush() {
3192     if (this.rsServices == null) {
3193       return;
3194     }
3195     synchronized (writestate) {
3196       if (this.writestate.isFlushRequested()) {
3197         return;
3198       }
3199       writestate.flushRequested = true;
3200     }
3201     // Make request outside of synchronize block; HBASE-818.
3202     this.rsServices.getFlushRequester().requestFlush(this);
3203     if (LOG.isDebugEnabled()) {
3204       LOG.debug("Flush requested on " + this);
3205     }
3206   }
3207 
3208   /*
3209    * @param size
3210    * @return True if size is over the flush threshold
3211    */
3212   private boolean isFlushSize(final long size) {
3213     return size > this.memstoreFlushSize;
3214   }
3215 
3216   /**
3217    * Read the edits log put under this region by wal log splitting process.  Put
3218    * the recovered edits back up into this region.
3219    *
3220    * <p>We can ignore any log message that has a sequence ID that's equal to or
3221    * lower than minSeqId.  (Because we know such log messages are already
3222    * reflected in the HFiles.)
3223    *
3224    * <p>While this is running we are putting pressure on memory yet we are
3225    * outside of our usual accounting because we are not yet an onlined region
3226    * (this stuff is being run as part of Region initialization).  This means
3227    * that if we're up against global memory limits, we'll not be flagged to flush
3228    * because we are not online. We can't be flushed by usual mechanisms anyways;
3229    * we're not yet online so our relative sequenceids are not yet aligned with
3230    * HLog sequenceids -- not till we come up online, post processing of split
3231    * edits.
3232    *
3233    * <p>But to help relieve memory pressure, at least manage our own heap size
3234    * flushing if are in excess of per-region limits.  Flushing, though, we have
3235    * to be careful and avoid using the regionserver/hlog sequenceid.  Its running
3236    * on a different line to whats going on in here in this region context so if we
3237    * crashed replaying these edits, but in the midst had a flush that used the
3238    * regionserver log with a sequenceid in excess of whats going on in here
3239    * in this region and with its split editlogs, then we could miss edits the
3240    * next time we go to recover. So, we have to flush inline, using seqids that
3241    * make sense in a this single region context only -- until we online.
3242    *
3243    * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
3244    * the maxSeqId for the store to be applied, else its skipped.
3245    * @return the sequence id of the last edit added to this region out of the
3246    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3247    * @throws UnsupportedEncodingException
3248    * @throws IOException
3249    */
3250   protected long replayRecoveredEditsIfAny(final Path regiondir,
3251       Map<byte[], Long> maxSeqIdInStores,
3252       final CancelableProgressable reporter, final MonitoredTask status)
3253       throws UnsupportedEncodingException, IOException {
3254     long minSeqIdForTheRegion = -1;
3255     for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3256       if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3257         minSeqIdForTheRegion = maxSeqIdInStore;
3258       }
3259     }
3260     long seqid = minSeqIdForTheRegion;
3261 
3262     FileSystem fs = this.fs.getFileSystem();
3263     NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
3264     if (LOG.isDebugEnabled()) {
3265       LOG.debug("Found " + (files == null ? 0 : files.size())
3266         + " recovered edits file(s) under " + regiondir);
3267     }
3268 
3269     if (files == null || files.isEmpty()) return seqid;
3270 
3271     for (Path edits: files) {
3272       if (edits == null || !fs.exists(edits)) {
3273         LOG.warn("Null or non-existent edits file: " + edits);
3274         continue;
3275       }
3276       if (isZeroLengthThenDelete(fs, edits)) continue;
3277 
3278       long maxSeqId;
3279       String fileName = edits.getName();
3280       maxSeqId = Math.abs(Long.parseLong(fileName));
3281       if (maxSeqId <= minSeqIdForTheRegion) {
3282         if (LOG.isDebugEnabled()) {
3283           String msg = "Maximum sequenceid for this log is " + maxSeqId
3284             + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3285             + ", skipped the whole file, path=" + edits;
3286           LOG.debug(msg);
3287         }
3288         continue;
3289       }
3290 
3291       try {
3292         // replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater
3293         seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3294       } catch (IOException e) {
3295         boolean skipErrors = conf.getBoolean(
3296             HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3297             conf.getBoolean(
3298                 "hbase.skip.errors",
3299                 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3300         if (conf.get("hbase.skip.errors") != null) {
3301           LOG.warn(
3302               "The property 'hbase.skip.errors' has been deprecated. Please use " +
3303               HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3304         }
3305         if (skipErrors) {
3306           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3307           LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3308               + "=true so continuing. Renamed " + edits +
3309               " as " + p, e);
3310         } else {
3311           throw e;
3312         }
3313       }
3314     }
3315     // The edits size added into rsAccounting during this replaying will not
3316     // be required any more. So just clear it.
3317     if (this.rsAccounting != null) {
3318       this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
3319     }
3320     if (seqid > minSeqIdForTheRegion) {
3321       // Then we added some edits to memory. Flush and cleanup split edit files.
3322       internalFlushcache(null, seqid, status);
3323     }
3324     // Now delete the content of recovered edits.  We're done w/ them.
3325     for (Path file: files) {
3326       if (!fs.delete(file, false)) {
3327         LOG.error("Failed delete of " + file);
3328       } else {
3329         LOG.debug("Deleted recovered.edits file=" + file);
3330       }
3331     }
3332     return seqid;
3333   }
3334 
3335   /*
3336    * @param edits File of recovered edits.
3337    * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in log
3338    * must be larger than this to be replayed for each store.
3339    * @param reporter
3340    * @return the sequence id of the last edit added to this region out of the
3341    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3342    * @throws IOException
3343    */
3344   private long replayRecoveredEdits(final Path edits,
3345       Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3346     throws IOException {
3347     String msg = "Replaying edits from " + edits;
3348     LOG.info(msg);
3349     MonitoredTask status = TaskMonitor.get().createStatus(msg);
3350     FileSystem fs = this.fs.getFileSystem();
3351 
3352     status.setStatus("Opening logs");
3353     HLog.Reader reader = null;
3354     try {
3355       reader = HLogFactory.createReader(fs, edits, conf);
3356       long currentEditSeqId = -1;
3357       long currentReplaySeqId = -1;
3358       long firstSeqIdInLog = -1;
3359       long skippedEdits = 0;
3360       long editsCount = 0;
3361       long intervalEdits = 0;
3362       HLog.Entry entry;
3363       Store store = null;
3364       boolean reported_once = false;
3365       ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3366 
3367       try {
3368         // How many edits seen before we check elapsed time
3369         int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
3370             2000);
3371         // How often to send a progress report (default 1/2 master timeout)
3372         int period = this.conf.getInt("hbase.hstore.report.period", 300000);
3373         long lastReport = EnvironmentEdgeManager.currentTime();
3374 
3375         while ((entry = reader.next()) != null) {
3376           HLogKey key = entry.getKey();
3377           WALEdit val = entry.getEdit();
3378 
3379           if (ng != null) { // some test, or nonces disabled
3380             ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
3381           }
3382 
3383           if (reporter != null) {
3384             intervalEdits += val.size();
3385             if (intervalEdits >= interval) {
3386               // Number of edits interval reached
3387               intervalEdits = 0;
3388               long cur = EnvironmentEdgeManager.currentTime();
3389               if (lastReport + period <= cur) {
3390                 status.setStatus("Replaying edits..." +
3391                     " skipped=" + skippedEdits +
3392                     " edits=" + editsCount);
3393                 // Timeout reached
3394                 if(!reporter.progress()) {
3395                   msg = "Progressable reporter failed, stopping replay";
3396                   LOG.warn(msg);
3397                   status.abort(msg);
3398                   throw new IOException(msg);
3399                 }
3400                 reported_once = true;
3401                 lastReport = cur;
3402               }
3403             }
3404           }
3405 
3406           // Start coprocessor replay here. The coprocessor is for each WALEdit
3407           // instead of a KeyValue.
3408           if (coprocessorHost != null) {
3409             status.setStatus("Running pre-WAL-restore hook in coprocessors");
3410             if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
3411               // if bypass this log entry, ignore it ...
3412               continue;
3413             }
3414           }
3415 
3416           if (firstSeqIdInLog == -1) {
3417             firstSeqIdInLog = key.getLogSeqNum();
3418           }
3419           currentEditSeqId = key.getLogSeqNum();
3420           currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
3421             key.getOrigLogSeqNum() : currentEditSeqId;
3422           boolean flush = false;
3423           for (Cell cell: val.getCells()) {
3424             // Check this edit is for me. Also, guard against writing the special
3425             // METACOLUMN info such as HBASE::CACHEFLUSH entries
3426             if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY) ||
3427                 !Bytes.equals(key.getEncodedRegionName(),
3428                   this.getRegionInfo().getEncodedNameAsBytes())) {
3429               //this is a special edit, we should handle it
3430               CompactionDescriptor compaction = WALEdit.getCompaction(cell);
3431               if (compaction != null) {
3432                 //replay the compaction
3433                 completeCompactionMarker(compaction);
3434               }
3435 
3436               skippedEdits++;
3437               continue;
3438             }
3439             // Figure which store the edit is meant for.
3440             if (store == null || !CellUtil.matchingFamily(cell, store.getFamily().getName())) {
3441               store = getStore(cell);
3442             }
3443             if (store == null) {
3444               // This should never happen.  Perhaps schema was changed between
3445               // crash and redeploy?
3446               LOG.warn("No family for " + cell);
3447               skippedEdits++;
3448               continue;
3449             }
3450             // Now, figure if we should skip this edit.
3451             if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
3452                 .getName())) {
3453               skippedEdits++;
3454               continue;
3455             }
3456             CellUtil.setSequenceId(cell, currentReplaySeqId);
3457             // Once we are over the limit, restoreEdit will keep returning true to
3458             // flush -- but don't flush until we've played all the kvs that make up
3459             // the WALEdit.
3460             flush = restoreEdit(store, cell);
3461             editsCount++;
3462           }
3463           if (flush) internalFlushcache(null, currentEditSeqId, status);
3464 
3465           if (coprocessorHost != null) {
3466             coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3467           }
3468         }
3469       } catch (EOFException eof) {
3470         Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3471         msg = "Encountered EOF. Most likely due to Master failure during " +
3472             "log splitting, so we have this data in another edit.  " +
3473             "Continuing, but renaming " + edits + " as " + p;
3474         LOG.warn(msg, eof);
3475         status.abort(msg);
3476       } catch (IOException ioe) {
3477         // If the IOE resulted from bad file format,
3478         // then this problem is idempotent and retrying won't help
3479         if (ioe.getCause() instanceof ParseException) {
3480           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3481           msg = "File corruption encountered!  " +
3482               "Continuing, but renaming " + edits + " as " + p;
3483           LOG.warn(msg, ioe);
3484           status.setStatus(msg);
3485         } else {
3486           status.abort(StringUtils.stringifyException(ioe));
3487           // other IO errors may be transient (bad network connection,
3488           // checksum exception on one datanode, etc).  throw & retry
3489           throw ioe;
3490         }
3491       }
3492       if (reporter != null && !reported_once) {
3493         reporter.progress();
3494       }
3495       msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3496         ", firstSequenceIdInLog=" + firstSeqIdInLog +
3497         ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
3498       status.markComplete(msg);
3499       LOG.debug(msg);
3500       return currentEditSeqId;
3501     } finally {
3502       status.cleanup();
3503       if (reader != null) {
3504          reader.close();
3505       }
3506     }
3507   }
3508 
3509   /**
3510    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
3511    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
3512    * See HBASE-2331.
3513    */
3514   void completeCompactionMarker(CompactionDescriptor compaction)
3515       throws IOException {
3516     Store store = this.getStore(compaction.getFamilyName().toByteArray());
3517     if (store == null) {
3518       LOG.warn("Found Compaction WAL edit for deleted family:" +
3519           Bytes.toString(compaction.getFamilyName().toByteArray()));
3520       return;
3521     }
3522     store.completeCompactionMarker(compaction);
3523   }
3524 
3525   /**
3526    * Used by tests
3527    * @param s Store to add edit too.
3528    * @param cell Cell to add.
3529    * @return True if we should flush.
3530    */
3531   protected boolean restoreEdit(final Store s, final Cell cell) {
3532     long kvSize = s.add(cell).getFirst();
3533     if (this.rsAccounting != null) {
3534       rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
3535     }
3536     return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3537   }
3538 
3539   /*
3540    * @param fs
3541    * @param p File to check.
3542    * @return True if file was zero-length (and if so, we'll delete it in here).
3543    * @throws IOException
3544    */
3545   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3546       throws IOException {
3547     FileStatus stat = fs.getFileStatus(p);
3548     if (stat.getLen() > 0) return false;
3549     LOG.warn("File " + p + " is zero-length, deleting.");
3550     fs.delete(p, false);
3551     return true;
3552   }
3553 
3554   protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
3555     return new HStore(this, family, this.conf);
3556   }
3557 
3558   /**
3559    * Return HStore instance.
3560    * Use with caution.  Exposed for use of fixup utilities.
3561    * @param column Name of column family hosted by this region.
3562    * @return Store that goes with the family on passed <code>column</code>.
3563    * TODO: Make this lookup faster.
3564    */
3565   public Store getStore(final byte[] column) {
3566     return this.stores.get(column);
3567   }
3568 
3569   /**
3570    * Return HStore instance. Does not do any copy: as the number of store is limited, we
3571    *  iterate on the list.
3572    */
3573   private Store getStore(Cell cell) {
3574     for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
3575       if (Bytes.equals(
3576           cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3577           famStore.getKey(), 0, famStore.getKey().length)) {
3578         return famStore.getValue();
3579       }
3580     }
3581 
3582     return null;
3583   }
3584 
3585   public Map<byte[], Store> getStores() {
3586     return this.stores;
3587   }
3588 
3589   /**
3590    * Return list of storeFiles for the set of CFs.
3591    * Uses closeLock to prevent the race condition where a region closes
3592    * in between the for loop - closing the stores one by one, some stores
3593    * will return 0 files.
3594    * @return List of storeFiles.
3595    */
3596   public List<String> getStoreFileList(final byte [][] columns)
3597     throws IllegalArgumentException {
3598     List<String> storeFileNames = new ArrayList<String>();
3599     synchronized(closeLock) {
3600       for(byte[] column : columns) {
3601         Store store = this.stores.get(column);
3602         if (store == null) {
3603           throw new IllegalArgumentException("No column family : " +
3604               new String(column) + " available");
3605         }
3606         for (StoreFile storeFile: store.getStorefiles()) {
3607           storeFileNames.add(storeFile.getPath().toString());
3608         }
3609       }
3610     }
3611     return storeFileNames;
3612   }
3613 
3614   //////////////////////////////////////////////////////////////////////////////
3615   // Support code
3616   //////////////////////////////////////////////////////////////////////////////
3617 
3618   /** Make sure this is a valid row for the HRegion */
3619   void checkRow(final byte [] row, String op) throws IOException {
3620     if (!rowIsInRange(getRegionInfo(), row)) {
3621       throw new WrongRegionException("Requested row out of range for " +
3622           op + " on HRegion " + this + ", startKey='" +
3623           Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" +
3624           Bytes.toStringBinary(getEndKey()) + "', row='" +
3625           Bytes.toStringBinary(row) + "'");
3626     }
3627   }
3628 
3629   /**
3630    * Tries to acquire a lock on the given row.
3631    * @param waitForLock if true, will block until the lock is available.
3632    *        Otherwise, just tries to obtain the lock and returns
3633    *        false if unavailable.
3634    * @return the row lock if acquired,
3635    *   null if waitForLock was false and the lock was not acquired
3636    * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
3637    */
3638   public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
3639     checkRow(row, "row lock");
3640     startRegionOperation();
3641     try {
3642       HashedBytes rowKey = new HashedBytes(row);
3643       RowLockContext rowLockContext = new RowLockContext(rowKey);
3644 
3645       // loop until we acquire the row lock (unless !waitForLock)
3646       while (true) {
3647         RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
3648         if (existingContext == null) {
3649           // Row is not already locked by any thread, use newly created context.
3650           break;
3651         } else if (existingContext.ownedByCurrentThread()) {
3652           // Row is already locked by current thread, reuse existing context instead.
3653           rowLockContext = existingContext;
3654           break;
3655         } else {
3656           if (!waitForLock) {
3657             return null;
3658           }
3659           try {
3660             // Row is already locked by some other thread, give up or wait for it
3661             if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
3662               throw new IOException("Timed out waiting for lock for row: " + rowKey);
3663             }
3664           } catch (InterruptedException ie) {
3665             LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
3666             InterruptedIOException iie = new InterruptedIOException();
3667             iie.initCause(ie);
3668             throw iie;
3669           }
3670         }
3671       }
3672 
3673       // allocate new lock for this thread
3674       return rowLockContext.newLock();
3675     } finally {
3676       closeRegionOperation();
3677     }
3678   }
3679 
3680   /**
3681    * Acquires a lock on the given row.
3682    * The same thread may acquire multiple locks on the same row.
3683    * @return the acquired row lock
3684    * @throws IOException if the lock could not be acquired after waiting
3685    */
3686   public RowLock getRowLock(byte[] row) throws IOException {
3687     return getRowLock(row, true);
3688   }
3689 
3690   /**
3691    * If the given list of row locks is not null, releases all locks.
3692    */
3693   public void releaseRowLocks(List<RowLock> rowLocks) {
3694     if (rowLocks != null) {
3695       for (RowLock rowLock : rowLocks) {
3696         rowLock.release();
3697       }
3698       rowLocks.clear();
3699     }
3700   }
3701 
3702   /**
3703    * Determines whether multiple column families are present
3704    * Precondition: familyPaths is not null
3705    *
3706    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3707    */
3708   private static boolean hasMultipleColumnFamilies(
3709       List<Pair<byte[], String>> familyPaths) {
3710     boolean multipleFamilies = false;
3711     byte[] family = null;
3712     for (Pair<byte[], String> pair : familyPaths) {
3713       byte[] fam = pair.getFirst();
3714       if (family == null) {
3715         family = fam;
3716       } else if (!Bytes.equals(family, fam)) {
3717         multipleFamilies = true;
3718         break;
3719       }
3720     }
3721     return multipleFamilies;
3722   }
3723 
3724 
3725   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3726                                 boolean assignSeqId) throws IOException {
3727     return bulkLoadHFiles(familyPaths, assignSeqId, null);
3728   }
3729 
3730   /**
3731    * Attempts to atomically load a group of hfiles.  This is critical for loading
3732    * rows with multiple column families atomically.
3733    *
3734    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3735    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
3736    * file about to be bulk loaded
3737    * @return true if successful, false if failed recoverably
3738    * @throws IOException if failed unrecoverably.
3739    */
3740   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
3741       BulkLoadListener bulkLoadListener) throws IOException {
3742     Preconditions.checkNotNull(familyPaths);
3743     // we need writeLock for multi-family bulk load
3744     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
3745     try {
3746       this.writeRequestsCount.increment();
3747 
3748       // There possibly was a split that happened between when the split keys
3749       // were gathered and before the HRegion's write lock was taken.  We need
3750       // to validate the HFile region before attempting to bulk load all of them
3751       List<IOException> ioes = new ArrayList<IOException>();
3752       List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
3753       for (Pair<byte[], String> p : familyPaths) {
3754         byte[] familyName = p.getFirst();
3755         String path = p.getSecond();
3756 
3757         Store store = getStore(familyName);
3758         if (store == null) {
3759           IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
3760               "No such column family " + Bytes.toStringBinary(familyName));
3761           ioes.add(ioe);
3762         } else {
3763           try {
3764             store.assertBulkLoadHFileOk(new Path(path));
3765           } catch (WrongRegionException wre) {
3766             // recoverable (file doesn't fit in region)
3767             failures.add(p);
3768           } catch (IOException ioe) {
3769             // unrecoverable (hdfs problem)
3770             ioes.add(ioe);
3771           }
3772         }
3773       }
3774 
3775       // validation failed because of some sort of IO problem.
3776       if (ioes.size() != 0) {
3777         IOException e = MultipleIOException.createIOException(ioes);
3778         LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
3779         throw e;
3780       }
3781 
3782       // validation failed, bail out before doing anything permanent.
3783       if (failures.size() != 0) {
3784         StringBuilder list = new StringBuilder();
3785         for (Pair<byte[], String> p : failures) {
3786           list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
3787             .append(p.getSecond());
3788         }
3789         // problem when validating
3790         LOG.warn("There was a recoverable bulk load failure likely due to a" +
3791             " split.  These (family, HFile) pairs were not loaded: " + list);
3792         return false;
3793       }
3794 
3795       long seqId = -1;
3796       // We need to assign a sequential ID that's in between two memstores in order to preserve
3797       // the guarantee that all the edits lower than the highest sequential ID from all the
3798       // HFiles are flushed on disk. See HBASE-10958.  The sequence id returned when we flush is
3799       // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
3800       // a sequence id that we can be sure is beyond the last hfile written).
3801       if (assignSeqId) {
3802         FlushResult fs = this.flushcache();
3803         if (fs.isFlushSucceeded()) {
3804           seqId = fs.flushSequenceId;
3805         } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
3806           seqId = fs.flushSequenceId;
3807         } else {
3808           throw new IOException("Could not bulk load with an assigned sequential ID because the " +
3809               "flush didn't run. Reason for not flushing: " + fs.failureReason);
3810         }
3811       }
3812 
3813       for (Pair<byte[], String> p : familyPaths) {
3814         byte[] familyName = p.getFirst();
3815         String path = p.getSecond();
3816         Store store = getStore(familyName);
3817         try {
3818           String finalPath = path;
3819           if(bulkLoadListener != null) {
3820             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
3821           }
3822           store.bulkLoadHFile(finalPath, seqId);
3823           if(bulkLoadListener != null) {
3824             bulkLoadListener.doneBulkLoad(familyName, path);
3825           }
3826         } catch (IOException ioe) {
3827           // A failure here can cause an atomicity violation that we currently
3828           // cannot recover from since it is likely a failed HDFS operation.
3829 
3830           // TODO Need a better story for reverting partial failures due to HDFS.
3831           LOG.error("There was a partial failure due to IO when attempting to" +
3832               " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe);
3833           if(bulkLoadListener != null) {
3834             try {
3835               bulkLoadListener.failedBulkLoad(familyName, path);
3836             } catch (Exception ex) {
3837               LOG.error("Error while calling failedBulkLoad for family "+
3838                   Bytes.toString(familyName)+" with path "+path, ex);
3839             }
3840           }
3841           throw ioe;
3842         }
3843       }
3844       return true;
3845     } finally {
3846       closeBulkRegionOperation();
3847     }
3848   }
3849 
3850   @Override
3851   public boolean equals(Object o) {
3852     return o instanceof HRegion && Bytes.equals(this.getRegionName(),
3853                                                 ((HRegion) o).getRegionName());
3854   }
3855 
3856   @Override
3857   public int hashCode() {
3858     return Bytes.hashCode(this.getRegionName());
3859   }
3860 
3861   @Override
3862   public String toString() {
3863     return this.getRegionNameAsString();
3864   }
3865 
3866   /**
3867    * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
3868    */
3869   class RegionScannerImpl implements RegionScanner {
3870     // Package local for testability
3871     KeyValueHeap storeHeap = null;
3872     /** Heap of key-values that are not essential for the provided filters and are thus read
3873      * on demand, if on-demand column family loading is enabled.*/
3874     KeyValueHeap joinedHeap = null;
3875     /**
3876      * If the joined heap data gathering is interrupted due to scan limits, this will
3877      * contain the row for which we are populating the values.*/
3878     protected Cell joinedContinuationRow = null;
3879     // KeyValue indicating that limit is reached when scanning
3880     private final KeyValue KV_LIMIT = new KeyValue();
3881     protected final byte[] stopRow;
3882     private final FilterWrapper filter;
3883     private int batch;
3884     protected int isScan;
3885     private boolean filterClosed = false;
3886     private long readPt;
3887     private long maxResultSize;
3888     protected HRegion region;
3889 
3890     @Override
3891     public HRegionInfo getRegionInfo() {
3892       return region.getRegionInfo();
3893     }
3894 
3895     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
3896         throws IOException {
3897 
3898       this.region = region;
3899       this.maxResultSize = scan.getMaxResultSize();
3900       if (scan.hasFilter()) {
3901         this.filter = new FilterWrapper(scan.getFilter());
3902       } else {
3903         this.filter = null;
3904       }
3905 
3906       this.batch = scan.getBatch();
3907       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
3908         this.stopRow = null;
3909       } else {
3910         this.stopRow = scan.getStopRow();
3911       }
3912       // If we are doing a get, we want to be [startRow,endRow] normally
3913       // it is [startRow,endRow) and if startRow=endRow we get nothing.
3914       this.isScan = scan.isGetScan() ? -1 : 0;
3915 
3916       // synchronize on scannerReadPoints so that nobody calculates
3917       // getSmallestReadPoint, before scannerReadPoints is updated.
3918       IsolationLevel isolationLevel = scan.getIsolationLevel();
3919       synchronized(scannerReadPoints) {
3920         this.readPt = getReadpoint(isolationLevel);
3921         scannerReadPoints.put(this, this.readPt);
3922       }
3923 
3924       // Here we separate all scanners into two lists - scanner that provide data required
3925       // by the filter to operate (scanners list) and all others (joinedScanners list).
3926       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
3927       List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
3928       if (additionalScanners != null) {
3929         scanners.addAll(additionalScanners);
3930       }
3931 
3932       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
3933           scan.getFamilyMap().entrySet()) {
3934         Store store = stores.get(entry.getKey());
3935         KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
3936         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
3937           || this.filter.isFamilyEssential(entry.getKey())) {
3938           scanners.add(scanner);
3939         } else {
3940           joinedScanners.add(scanner);
3941         }
3942       }
3943       initializeKVHeap(scanners, joinedScanners, region);
3944     }
3945 
3946     protected void initializeKVHeap(List<KeyValueScanner> scanners,
3947         List<KeyValueScanner> joinedScanners, HRegion region)
3948         throws IOException {
3949       this.storeHeap = new KeyValueHeap(scanners, region.comparator);
3950       if (!joinedScanners.isEmpty()) {
3951         this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
3952       }
3953     }
3954 
3955     @Override
3956     public long getMaxResultSize() {
3957       return maxResultSize;
3958     }
3959 
3960     @Override
3961     public long getMvccReadPoint() {
3962       return this.readPt;
3963     }
3964 
3965     /**
3966      * Reset both the filter and the old filter.
3967      *
3968      * @throws IOException in case a filter raises an I/O exception.
3969      */
3970     protected void resetFilters() throws IOException {
3971       if (filter != null) {
3972         filter.reset();
3973       }
3974     }
3975 
3976     @Override
3977     public boolean next(List<Cell> outResults)
3978         throws IOException {
3979       // apply the batching limit by default
3980       return next(outResults, batch);
3981     }
3982 
3983     @Override
3984     public synchronized boolean next(List<Cell> outResults, int limit) throws IOException {
3985       if (this.filterClosed) {
3986         throw new UnknownScannerException("Scanner was closed (timed out?) " +
3987             "after we renewed it. Could be caused by a very slow scanner " +
3988             "or a lengthy garbage collection");
3989       }
3990       startRegionOperation(Operation.SCAN);
3991       readRequestsCount.increment();
3992       try {
3993         return nextRaw(outResults, limit);
3994       } finally {
3995         closeRegionOperation(Operation.SCAN);
3996       }
3997     }
3998 
3999     @Override
4000     public boolean nextRaw(List<Cell> outResults)
4001         throws IOException {
4002       return nextRaw(outResults, batch);
4003     }
4004 
4005     @Override
4006     public boolean nextRaw(List<Cell> outResults, int limit) throws IOException {
4007       boolean returnResult;
4008       if (outResults.isEmpty()) {
4009         // Usually outResults is empty. This is true when next is called
4010         // to handle scan or get operation.
4011         returnResult = nextInternal(outResults, limit);
4012       } else {
4013         List<Cell> tmpList = new ArrayList<Cell>();
4014         returnResult = nextInternal(tmpList, limit);
4015         outResults.addAll(tmpList);
4016       }
4017       resetFilters();
4018       if (isFilterDoneInternal()) {
4019         returnResult = false;
4020       }
4021       return returnResult;
4022     }
4023 
4024     private void populateFromJoinedHeap(List<Cell> results, int limit)
4025         throws IOException {
4026       assert joinedContinuationRow != null;
4027       Cell kv = populateResult(results, this.joinedHeap, limit,
4028           joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
4029           joinedContinuationRow.getRowLength());
4030       if (kv != KV_LIMIT) {
4031         // We are done with this row, reset the continuation.
4032         joinedContinuationRow = null;
4033       }
4034       // As the data is obtained from two independent heaps, we need to
4035       // ensure that result list is sorted, because Result relies on that.
4036       Collections.sort(results, comparator);
4037     }
4038 
4039     /**
4040      * Fetches records with currentRow into results list, until next row or limit (if not -1).
4041      * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
4042      * @param limit Max amount of KVs to place in result list, -1 means no limit.
4043      * @param currentRow Byte array with key we are fetching.
4044      * @param offset offset for currentRow
4045      * @param length length for currentRow
4046      * @return KV_LIMIT if limit reached, next KeyValue otherwise.
4047      */
4048     private Cell populateResult(List<Cell> results, KeyValueHeap heap, int limit,
4049         byte[] currentRow, int offset, short length) throws IOException {
4050       Cell nextKv;
4051       do {
4052         heap.next(results, limit - results.size());
4053         if (limit > 0 && results.size() == limit) {
4054           return KV_LIMIT;
4055         }
4056         nextKv = heap.peek();
4057       } while (nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length));
4058 
4059       return nextKv;
4060     }
4061 
4062     /*
4063      * @return True if a filter rules the scanner is over, done.
4064      */
4065     @Override
4066     public synchronized boolean isFilterDone() throws IOException {
4067       return isFilterDoneInternal();
4068     }
4069 
4070     private boolean isFilterDoneInternal() throws IOException {
4071       return this.filter != null && this.filter.filterAllRemaining();
4072     }
4073 
4074     private boolean nextInternal(List<Cell> results, int limit)
4075     throws IOException {
4076       if (!results.isEmpty()) {
4077         throw new IllegalArgumentException("First parameter should be an empty list");
4078       }
4079       RpcCallContext rpcCall = RpcServer.getCurrentCall();
4080       // The loop here is used only when at some point during the next we determine
4081       // that due to effects of filters or otherwise, we have an empty row in the result.
4082       // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
4083       // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
4084       // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
4085       while (true) {
4086         if (rpcCall != null) {
4087           // If a user specifies a too-restrictive or too-slow scanner, the
4088           // client might time out and disconnect while the server side
4089           // is still processing the request. We should abort aggressively
4090           // in that case.
4091           long afterTime = rpcCall.disconnectSince();
4092           if (afterTime >= 0) {
4093             throw new CallerDisconnectedException(
4094                 "Aborting on region " + getRegionNameAsString() + ", call " +
4095                     this + " after " + afterTime + " ms, since " +
4096                     "caller disconnected");
4097           }
4098         }
4099 
4100         // Let's see what we have in the storeHeap.
4101         Cell current = this.storeHeap.peek();
4102 
4103         byte[] currentRow = null;
4104         int offset = 0;
4105         short length = 0;
4106         if (current != null) {
4107           currentRow = current.getRowArray();
4108           offset = current.getRowOffset();
4109           length = current.getRowLength();
4110         }
4111         boolean stopRow = isStopRow(currentRow, offset, length);
4112         // Check if we were getting data from the joinedHeap and hit the limit.
4113         // If not, then it's main path - getting results from storeHeap.
4114         if (joinedContinuationRow == null) {
4115           // First, check if we are at a stop row. If so, there are no more results.
4116           if (stopRow) {
4117             if (filter != null && filter.hasFilterRow()) {
4118               filter.filterRowCells(results);
4119             }
4120             return false;
4121           }
4122 
4123           // Check if rowkey filter wants to exclude this row. If so, loop to next.
4124           // Technically, if we hit limits before on this row, we don't need this call.
4125           if (filterRowKey(currentRow, offset, length)) {
4126             boolean moreRows = nextRow(currentRow, offset, length);
4127             if (!moreRows) return false;
4128             results.clear();
4129             continue;
4130           }
4131 
4132           Cell nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
4133               length);
4134           // Ok, we are good, let's try to get some results from the main heap.
4135           if (nextKv == KV_LIMIT) {
4136             if (this.filter != null && filter.hasFilterRow()) {
4137               throw new IncompatibleFilterException(
4138                 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
4139             }
4140             return true; // We hit the limit.
4141           }
4142 
4143           stopRow = nextKv == null ||
4144               isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
4145           // save that the row was empty before filters applied to it.
4146           final boolean isEmptyRow = results.isEmpty();
4147 
4148           // We have the part of the row necessary for filtering (all of it, usually).
4149           // First filter with the filterRow(List).
4150           FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
4151           if (filter != null && filter.hasFilterRow()) {
4152             ret = filter.filterRowCellsWithRet(results);
4153           }
4154 
4155           if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
4156             results.clear();
4157             boolean moreRows = nextRow(currentRow, offset, length);
4158             if (!moreRows) return false;
4159 
4160             // This row was totally filtered out, if this is NOT the last row,
4161             // we should continue on. Otherwise, nothing else to do.
4162             if (!stopRow) continue;
4163             return false;
4164           }
4165 
4166           // Ok, we are done with storeHeap for this row.
4167           // Now we may need to fetch additional, non-essential data into row.
4168           // These values are not needed for filter to work, so we postpone their
4169           // fetch to (possibly) reduce amount of data loads from disk.
4170           if (this.joinedHeap != null) {
4171             Cell nextJoinedKv = joinedHeap.peek();
4172             // If joinedHeap is pointing to some other row, try to seek to a correct one.
4173             boolean mayHaveData = (nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv,
4174                 currentRow, offset, length))
4175                 || (this.joinedHeap.requestSeek(
4176                     KeyValueUtil.createFirstOnRow(currentRow, offset, length), true, true)
4177                     && joinedHeap.peek() != null && CellUtil.matchingRow(joinedHeap.peek(),
4178                     currentRow, offset, length));
4179             if (mayHaveData) {
4180               joinedContinuationRow = current;
4181               populateFromJoinedHeap(results, limit);
4182             }
4183           }
4184         } else {
4185           // Populating from the joined heap was stopped by limits, populate some more.
4186           populateFromJoinedHeap(results, limit);
4187         }
4188 
4189         // We may have just called populateFromJoinedMap and hit the limits. If that is
4190         // the case, we need to call it again on the next next() invocation.
4191         if (joinedContinuationRow != null) {
4192           return true;
4193         }
4194 
4195         // Finally, we are done with both joinedHeap and storeHeap.
4196         // Double check to prevent empty rows from appearing in result. It could be
4197         // the case when SingleColumnValueExcludeFilter is used.
4198         if (results.isEmpty()) {
4199           boolean moreRows = nextRow(currentRow, offset, length);
4200           if (!moreRows) return false;
4201           if (!stopRow) continue;
4202         }
4203 
4204         // We are done. Return the result.
4205         return !stopRow;
4206       }
4207     }
4208 
4209     /**
4210      * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines
4211      * both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may
4212      * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns
4213      * true when filterRow(List<KeyValue> kvs) is overridden not the filterRow(). Therefore, the
4214      * filterRow() will be skipped.
4215      */
4216     private boolean filterRow() throws IOException {
4217       // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
4218       // filterRowCells(List<Cell> kvs) so we skip that scenario here.
4219       return filter != null && (!filter.hasFilterRow())
4220           && filter.filterRow();
4221     }
4222 
4223     private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
4224       return filter != null
4225           && filter.filterRowKey(row, offset, length);
4226     }
4227 
4228     protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
4229       assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
4230       Cell next;
4231       while ((next = this.storeHeap.peek()) != null &&
4232              CellUtil.matchingRow(next, currentRow, offset, length)) {
4233         this.storeHeap.next(MOCKED_LIST);
4234       }
4235       resetFilters();
4236       // Calling the hook in CP which allows it to do a fast forward
4237       return this.region.getCoprocessorHost() == null
4238           || this.region.getCoprocessorHost()
4239               .postScannerFilterRow(this, currentRow, offset, length);
4240     }
4241 
4242     protected boolean isStopRow(byte[] currentRow, int offset, short length) {
4243       return currentRow == null ||
4244           (stopRow != null &&
4245           comparator.compareRows(stopRow, 0, stopRow.length,
4246             currentRow, offset, length) <= isScan);
4247     }
4248 
4249     @Override
4250     public synchronized void close() {
4251       if (storeHeap != null) {
4252         storeHeap.close();
4253         storeHeap = null;
4254       }
4255       if (joinedHeap != null) {
4256         joinedHeap.close();
4257         joinedHeap = null;
4258       }
4259       // no need to synchronize here.
4260       scannerReadPoints.remove(this);
4261       this.filterClosed = true;
4262     }
4263 
4264     KeyValueHeap getStoreHeapForTesting() {
4265       return storeHeap;
4266     }
4267 
4268     @Override
4269     public synchronized boolean reseek(byte[] row) throws IOException {
4270       if (row == null) {
4271         throw new IllegalArgumentException("Row cannot be null.");
4272       }
4273       boolean result = false;
4274       startRegionOperation();
4275       try {
4276         KeyValue kv = KeyValueUtil.createFirstOnRow(row);
4277         // use request seek to make use of the lazy seek option. See HBASE-5520
4278         result = this.storeHeap.requestSeek(kv, true, true);
4279         if (this.joinedHeap != null) {
4280           result = this.joinedHeap.requestSeek(kv, true, true) || result;
4281         }
4282       } finally {
4283         closeRegionOperation();
4284       }
4285       return result;
4286     }
4287   }
4288 
4289   // Utility methods
4290   /**
4291    * A utility method to create new instances of HRegion based on the
4292    * {@link HConstants#REGION_IMPL} configuration property.
4293    * @param tableDir qualified path of directory where region should be located,
4294    * usually the table directory.
4295    * @param log The HLog is the outbound log for any updates to the HRegion
4296    * (There's a single HLog for all the HRegions on a single HRegionServer.)
4297    * The log file is a logfile from the previous execution that's
4298    * custom-computed for this HRegion. The HRegionServer computes and sorts the
4299    * appropriate log info for this HRegion. If there is a previous log file
4300    * (implying that the HRegion has been written-to before), then read it from
4301    * the supplied path.
4302    * @param fs is the filesystem.
4303    * @param conf is global configuration settings.
4304    * @param regionInfo - HRegionInfo that describes the region
4305    * is new), then read them from the supplied path.
4306    * @param htd the table descriptor
4307    * @return the new instance
4308    */
4309   static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
4310       Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
4311       RegionServerServices rsServices) {
4312     try {
4313       @SuppressWarnings("unchecked")
4314       Class<? extends HRegion> regionClass =
4315           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
4316 
4317       Constructor<? extends HRegion> c =
4318           regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
4319               Configuration.class, HRegionInfo.class, HTableDescriptor.class,
4320               RegionServerServices.class);
4321 
4322       return c.newInstance(tableDir, log, fs, conf, regionInfo, htd, rsServices);
4323     } catch (Throwable e) {
4324       // todo: what should I throw here?
4325       throw new IllegalStateException("Could not instantiate a region instance.", e);
4326     }
4327   }
4328 
4329   /**
4330    * Convenience method creating new HRegions. Used by createTable and by the
4331    * bootstrap code in the HMaster constructor.
4332    * Note, this method creates an {@link HLog} for the created region. It
4333    * needs to be closed explicitly.  Use {@link HRegion#getLog()} to get
4334    * access.  <b>When done with a region created using this method, you will
4335    * need to explicitly close the {@link HLog} it created too; it will not be
4336    * done for you.  Not closing the log will leave at least a daemon thread
4337    * running.</b>  Call {@link #closeHRegion(HRegion)} and it will do
4338    * necessary cleanup for you.
4339    * @param info Info for region to create.
4340    * @param rootDir Root directory for HBase instance
4341    * @return new HRegion
4342    *
4343    * @throws IOException
4344    */
4345   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4346       final Configuration conf, final HTableDescriptor hTableDescriptor)
4347   throws IOException {
4348     return createHRegion(info, rootDir, conf, hTableDescriptor, null);
4349   }
4350 
4351   /**
4352    * This will do the necessary cleanup a call to
4353    * {@link #createHRegion(HRegionInfo, Path, Configuration, HTableDescriptor)}
4354    * requires.  This method will close the region and then close its
4355    * associated {@link HLog} file.  You use it if you call the other createHRegion,
4356    * the one that takes an {@link HLog} instance but don't be surprised by the
4357    * call to the {@link HLog#closeAndDelete()} on the {@link HLog} the
4358    * HRegion was carrying.
4359    * @throws IOException
4360    */
4361   public static void closeHRegion(final HRegion r) throws IOException {
4362     if (r == null) return;
4363     r.close();
4364     if (r.getLog() == null) return;
4365     r.getLog().closeAndDelete();
4366   }
4367 
4368   /**
4369    * Convenience method creating new HRegions. Used by createTable.
4370    * The {@link HLog} for the created region needs to be closed explicitly.
4371    * Use {@link HRegion#getLog()} to get access.
4372    *
4373    * @param info Info for region to create.
4374    * @param rootDir Root directory for HBase instance
4375    * @param hlog shared HLog
4376    * @param initialize - true to initialize the region
4377    * @return new HRegion
4378    *
4379    * @throws IOException
4380    */
4381   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4382                                       final Configuration conf,
4383                                       final HTableDescriptor hTableDescriptor,
4384                                       final HLog hlog,
4385                                       final boolean initialize)
4386       throws IOException {
4387     return createHRegion(info, rootDir, conf, hTableDescriptor,
4388         hlog, initialize, false);
4389   }
4390 
4391   /**
4392    * Convenience method creating new HRegions. Used by createTable.
4393    * The {@link HLog} for the created region needs to be closed
4394    * explicitly, if it is not null.
4395    * Use {@link HRegion#getLog()} to get access.
4396    *
4397    * @param info Info for region to create.
4398    * @param rootDir Root directory for HBase instance
4399    * @param hlog shared HLog
4400    * @param initialize - true to initialize the region
4401    * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
4402    * @return new HRegion
4403    * @throws IOException
4404    */
4405   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4406                                       final Configuration conf,
4407                                       final HTableDescriptor hTableDescriptor,
4408                                       final HLog hlog,
4409                                       final boolean initialize, final boolean ignoreHLog)
4410       throws IOException {
4411       Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4412       return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
4413   }
4414 
4415   /**
4416    * Convenience method creating new HRegions. Used by createTable.
4417    * The {@link HLog} for the created region needs to be closed
4418    * explicitly, if it is not null.
4419    * Use {@link HRegion#getLog()} to get access.
4420    *
4421    * @param info Info for region to create.
4422    * @param rootDir Root directory for HBase instance
4423    * @param tableDir table directory
4424    * @param hlog shared HLog
4425    * @param initialize - true to initialize the region
4426    * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
4427    * @return new HRegion
4428    * @throws IOException
4429    */
4430   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
4431                                       final Configuration conf,
4432                                       final HTableDescriptor hTableDescriptor,
4433                                       final HLog hlog,
4434                                       final boolean initialize, final boolean ignoreHLog)
4435       throws IOException {
4436     LOG.info("creating HRegion " + info.getTable().getNameAsString()
4437         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
4438         " Table name == " + info.getTable().getNameAsString());
4439     FileSystem fs = FileSystem.get(conf);
4440     HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
4441     HLog effectiveHLog = hlog;
4442     if (hlog == null && !ignoreHLog) {
4443       effectiveHLog = HLogFactory.createHLog(fs, rfs.getRegionDir(),
4444                                              HConstants.HREGION_LOGDIR_NAME, conf);
4445     }
4446     HRegion region = HRegion.newHRegion(tableDir,
4447         effectiveHLog, fs, conf, info, hTableDescriptor, null);
4448     if (initialize) {
4449       // If initializing, set the sequenceId. It is also required by HLogPerformanceEvaluation when
4450       // verifying the WALEdits.
4451       region.setSequenceId(region.initialize(null));
4452     }
4453     return region;
4454   }
4455 
4456   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4457                                       final Configuration conf,
4458                                       final HTableDescriptor hTableDescriptor,
4459                                       final HLog hlog)
4460     throws IOException {
4461     return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
4462   }
4463 
4464 
4465   /**
4466    * Open a Region.
4467    * @param info Info for region to be opened.
4468    * @param wal HLog for region to use. This method will call
4469    * HLog#setSequenceNumber(long) passing the result of the call to
4470    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4471    * up.  HRegionStore does this every time it opens a new region.
4472    * @return new HRegion
4473    *
4474    * @throws IOException
4475    */
4476   public static HRegion openHRegion(final HRegionInfo info,
4477       final HTableDescriptor htd, final HLog wal,
4478       final Configuration conf)
4479   throws IOException {
4480     return openHRegion(info, htd, wal, conf, null, null);
4481   }
4482 
4483   /**
4484    * Open a Region.
4485    * @param info Info for region to be opened
4486    * @param htd the table descriptor
4487    * @param wal HLog for region to use. This method will call
4488    * HLog#setSequenceNumber(long) passing the result of the call to
4489    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4490    * up.  HRegionStore does this every time it opens a new region.
4491    * @param conf The Configuration object to use.
4492    * @param rsServices An interface we can request flushes against.
4493    * @param reporter An interface we can report progress against.
4494    * @return new HRegion
4495    *
4496    * @throws IOException
4497    */
4498   public static HRegion openHRegion(final HRegionInfo info,
4499     final HTableDescriptor htd, final HLog wal, final Configuration conf,
4500     final RegionServerServices rsServices,
4501     final CancelableProgressable reporter)
4502   throws IOException {
4503     return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
4504   }
4505 
4506   /**
4507    * Open a Region.
4508    * @param rootDir Root directory for HBase instance
4509    * @param info Info for region to be opened.
4510    * @param htd the table descriptor
4511    * @param wal HLog for region to use. This method will call
4512    * HLog#setSequenceNumber(long) passing the result of the call to
4513    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4514    * up.  HRegionStore does this every time it opens a new region.
4515    * @param conf The Configuration object to use.
4516    * @return new HRegion
4517    * @throws IOException
4518    */
4519   public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
4520       final HTableDescriptor htd, final HLog wal, final Configuration conf)
4521   throws IOException {
4522     return openHRegion(rootDir, info, htd, wal, conf, null, null);
4523   }
4524 
4525   /**
4526    * Open a Region.
4527    * @param rootDir Root directory for HBase instance
4528    * @param info Info for region to be opened.
4529    * @param htd the table descriptor
4530    * @param wal HLog for region to use. This method will call
4531    * HLog#setSequenceNumber(long) passing the result of the call to
4532    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4533    * up.  HRegionStore does this every time it opens a new region.
4534    * @param conf The Configuration object to use.
4535    * @param rsServices An interface we can request flushes against.
4536    * @param reporter An interface we can report progress against.
4537    * @return new HRegion
4538    * @throws IOException
4539    */
4540   public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
4541       final HTableDescriptor htd, final HLog wal, final Configuration conf,
4542       final RegionServerServices rsServices,
4543       final CancelableProgressable reporter)
4544   throws IOException {
4545     FileSystem fs = null;
4546     if (rsServices != null) {
4547       fs = rsServices.getFileSystem();
4548     }
4549     if (fs == null) {
4550       fs = FileSystem.get(conf);
4551     }
4552     return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
4553   }
4554 
4555   /**
4556    * Open a Region.
4557    * @param conf The Configuration object to use.
4558    * @param fs Filesystem to use
4559    * @param rootDir Root directory for HBase instance
4560    * @param info Info for region to be opened.
4561    * @param htd the table descriptor
4562    * @param wal HLog for region to use. This method will call
4563    * HLog#setSequenceNumber(long) passing the result of the call to
4564    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4565    * up.  HRegionStore does this every time it opens a new region.
4566    * @return new HRegion
4567    * @throws IOException
4568    */
4569   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4570       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal)
4571       throws IOException {
4572     return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
4573   }
4574 
4575   /**
4576    * Open a Region.
4577    * @param conf The Configuration object to use.
4578    * @param fs Filesystem to use
4579    * @param rootDir Root directory for HBase instance
4580    * @param info Info for region to be opened.
4581    * @param htd the table descriptor
4582    * @param wal HLog for region to use. This method will call
4583    * HLog#setSequenceNumber(long) passing the result of the call to
4584    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4585    * up.  HRegionStore does this every time it opens a new region.
4586    * @param rsServices An interface we can request flushes against.
4587    * @param reporter An interface we can report progress against.
4588    * @return new HRegion
4589    * @throws IOException
4590    */
4591   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4592       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4593       final RegionServerServices rsServices, final CancelableProgressable reporter)
4594       throws IOException {
4595     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4596     return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
4597   }
4598 
4599   /**
4600    * Open a Region.
4601    * @param conf The Configuration object to use.
4602    * @param fs Filesystem to use
4603    * @param rootDir Root directory for HBase instance
4604    * @param info Info for region to be opened.
4605    * @param htd the table descriptor
4606    * @param wal HLog for region to use. This method will call
4607    * HLog#setSequenceNumber(long) passing the result of the call to
4608    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4609    * up.  HRegionStore does this every time it opens a new region.
4610    * @param rsServices An interface we can request flushes against.
4611    * @param reporter An interface we can report progress against.
4612    * @return new HRegion
4613    * @throws IOException
4614    */
4615   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4616       final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4617       final RegionServerServices rsServices, final CancelableProgressable reporter)
4618       throws IOException {
4619     if (info == null) throw new NullPointerException("Passed region info is null");
4620     if (LOG.isDebugEnabled()) {
4621       LOG.debug("Opening region: " + info);
4622     }
4623     HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
4624     return r.openHRegion(reporter);
4625   }
4626 
4627 
4628   /**
4629    * Useful when reopening a closed region (normally for unit tests)
4630    * @param other original object
4631    * @param reporter An interface we can report progress against.
4632    * @return new HRegion
4633    * @throws IOException
4634    */
4635   public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
4636       throws IOException {
4637     HRegionFileSystem regionFs = other.getRegionFileSystem();
4638     HRegion r = newHRegion(regionFs.getTableDir(), other.getLog(), regionFs.getFileSystem(),
4639         other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
4640     return r.openHRegion(reporter);
4641   }
4642 
4643   /**
4644    * Open HRegion.
4645    * Calls initialize and sets sequenceId.
4646    * @return Returns <code>this</code>
4647    * @throws IOException
4648    */
4649   protected HRegion openHRegion(final CancelableProgressable reporter)
4650   throws IOException {
4651     checkCompressionCodecs();
4652 
4653     this.openSeqNum = initialize(reporter);
4654     this.setSequenceId(openSeqNum);
4655     if (log != null && getRegionServerServices() != null) {
4656       writeRegionOpenMarker(log, openSeqNum);
4657     }
4658     return this;
4659   }
4660 
4661   private void checkCompressionCodecs() throws IOException {
4662     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4663       CompressionTest.testCompression(fam.getCompression());
4664       CompressionTest.testCompression(fam.getCompactionCompression());
4665     }
4666   }
4667 
4668   /**
4669    * Create a daughter region from given a temp directory with the region data.
4670    * @param hri Spec. for daughter region to open.
4671    * @throws IOException
4672    */
4673   HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
4674     // Move the files from the temporary .splits to the final /table/region directory
4675     fs.commitDaughterRegion(hri);
4676 
4677     // Create the daughter HRegion instance
4678     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
4679         this.getBaseConf(), hri, this.getTableDesc(), rsServices);
4680     r.readRequestsCount.set(this.getReadRequestsCount() / 2);
4681     r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
4682     return r;
4683   }
4684 
4685   /**
4686    * Create a merged region given a temp directory with the region data.
4687    * @param region_b another merging region
4688    * @return merged HRegion
4689    * @throws IOException
4690    */
4691   HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
4692       final HRegion region_b) throws IOException {
4693     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(),
4694         fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
4695         this.getTableDesc(), this.rsServices);
4696     r.readRequestsCount.set(this.getReadRequestsCount()
4697         + region_b.getReadRequestsCount());
4698     r.writeRequestsCount.set(this.getWriteRequestsCount()
4699         + region_b.getWriteRequestsCount());
4700     this.fs.commitMergedRegion(mergedRegionInfo);
4701     return r;
4702   }
4703 
4704   /**
4705    * Inserts a new region's meta information into the passed
4706    * <code>meta</code> region. Used by the HMaster bootstrap code adding
4707    * new table to hbase:meta table.
4708    *
4709    * @param meta hbase:meta HRegion to be updated
4710    * @param r HRegion to add to <code>meta</code>
4711    *
4712    * @throws IOException
4713    */
4714   // TODO remove since only test and merge use this
4715   public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
4716     meta.checkResources();
4717     // The row key is the region name
4718     byte[] row = r.getRegionName();
4719     final long now = EnvironmentEdgeManager.currentTime();
4720     final List<Cell> cells = new ArrayList<Cell>(2);
4721     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4722       HConstants.REGIONINFO_QUALIFIER, now,
4723       r.getRegionInfo().toByteArray()));
4724     // Set into the root table the version of the meta table.
4725     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4726       HConstants.META_VERSION_QUALIFIER, now,
4727       Bytes.toBytes(HConstants.META_VERSION)));
4728     meta.put(row, HConstants.CATALOG_FAMILY, cells);
4729   }
4730 
4731   /**
4732    * Computes the Path of the HRegion
4733    *
4734    * @param tabledir qualified path for table
4735    * @param name ENCODED region name
4736    * @return Path of HRegion directory
4737    */
4738   @Deprecated
4739   public static Path getRegionDir(final Path tabledir, final String name) {
4740     return new Path(tabledir, name);
4741   }
4742 
4743   /**
4744    * Computes the Path of the HRegion
4745    *
4746    * @param rootdir qualified path of HBase root directory
4747    * @param info HRegionInfo for the region
4748    * @return qualified path of region directory
4749    */
4750   @Deprecated
4751   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
4752     return new Path(
4753       FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
4754   }
4755 
4756   /**
4757    * Determines if the specified row is within the row range specified by the
4758    * specified HRegionInfo
4759    *
4760    * @param info HRegionInfo that specifies the row range
4761    * @param row row to be checked
4762    * @return true if the row is within the range specified by the HRegionInfo
4763    */
4764   public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
4765     return ((info.getStartKey().length == 0) ||
4766         (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
4767         ((info.getEndKey().length == 0) ||
4768             (Bytes.compareTo(info.getEndKey(), row) > 0));
4769   }
4770 
4771   /**
4772    * Merge two HRegions.  The regions must be adjacent and must not overlap.
4773    *
4774    * @return new merged HRegion
4775    * @throws IOException
4776    */
4777   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
4778   throws IOException {
4779     HRegion a = srcA;
4780     HRegion b = srcB;
4781 
4782     // Make sure that srcA comes first; important for key-ordering during
4783     // write of the merged file.
4784     if (srcA.getStartKey() == null) {
4785       if (srcB.getStartKey() == null) {
4786         throw new IOException("Cannot merge two regions with null start key");
4787       }
4788       // A's start key is null but B's isn't. Assume A comes before B
4789     } else if ((srcB.getStartKey() == null) ||
4790       (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
4791       a = srcB;
4792       b = srcA;
4793     }
4794 
4795     if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
4796       throw new IOException("Cannot merge non-adjacent regions");
4797     }
4798     return merge(a, b);
4799   }
4800 
4801   /**
4802    * Merge two regions whether they are adjacent or not.
4803    *
4804    * @param a region a
4805    * @param b region b
4806    * @return new merged region
4807    * @throws IOException
4808    */
4809   public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
4810     if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
4811       throw new IOException("Regions do not belong to the same table");
4812     }
4813 
4814     FileSystem fs = a.getRegionFileSystem().getFileSystem();
4815     // Make sure each region's cache is empty
4816     a.flushcache();
4817     b.flushcache();
4818 
4819     // Compact each region so we only have one store file per family
4820     a.compactStores(true);
4821     if (LOG.isDebugEnabled()) {
4822       LOG.debug("Files for region: " + a);
4823       a.getRegionFileSystem().logFileSystemState(LOG);
4824     }
4825     b.compactStores(true);
4826     if (LOG.isDebugEnabled()) {
4827       LOG.debug("Files for region: " + b);
4828       b.getRegionFileSystem().logFileSystemState(LOG);
4829     }
4830 
4831     RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
4832     if (!rmt.prepare(null)) {
4833       throw new IOException("Unable to merge regions " + a + " and " + b);
4834     }
4835     HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
4836     LOG.info("starting merge of regions: " + a + " and " + b
4837         + " into new region " + mergedRegionInfo.getRegionNameAsString()
4838         + " with start key <"
4839         + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
4840         + "> and end key <"
4841         + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
4842     HRegion dstRegion;
4843     try {
4844       dstRegion = rmt.execute(null, null);
4845     } catch (IOException ioe) {
4846       rmt.rollback(null, null);
4847       throw new IOException("Failed merging region " + a + " and " + b
4848           + ", and successfully rolled back");
4849     }
4850     dstRegion.compactStores(true);
4851 
4852     if (LOG.isDebugEnabled()) {
4853       LOG.debug("Files for new region");
4854       dstRegion.getRegionFileSystem().logFileSystemState(LOG);
4855     }
4856 
4857     if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
4858       throw new IOException("Merged region " + dstRegion
4859           + " still has references after the compaction, is compaction canceled?");
4860     }
4861 
4862     // Archiving the 'A' region
4863     HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
4864     // Archiving the 'B' region
4865     HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
4866 
4867     LOG.info("merge completed. New region is " + dstRegion);
4868     return dstRegion;
4869   }
4870 
4871   //
4872   // HBASE-880
4873   //
4874   /**
4875    * @param get get object
4876    * @return result
4877    * @throws IOException read exceptions
4878    */
4879   public Result get(final Get get) throws IOException {
4880     checkRow(get.getRow(), "Get");
4881     // Verify families are all valid
4882     if (get.hasFamilies()) {
4883       for (byte [] family: get.familySet()) {
4884         checkFamily(family);
4885       }
4886     } else { // Adding all families to scanner
4887       for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
4888         get.addFamily(family);
4889       }
4890     }
4891     List<Cell> results = get(get, true);
4892     boolean stale = this.getRegionInfo().getReplicaId() != 0;
4893     return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
4894   }
4895 
4896   /*
4897    * Do a get based on the get parameter.
4898    * @param withCoprocessor invoke coprocessor or not. We don't want to
4899    * always invoke cp for this private method.
4900    */
4901   public List<Cell> get(Get get, boolean withCoprocessor)
4902   throws IOException {
4903 
4904     List<Cell> results = new ArrayList<Cell>();
4905 
4906     // pre-get CP hook
4907     if (withCoprocessor && (coprocessorHost != null)) {
4908        if (coprocessorHost.preGet(get, results)) {
4909          return results;
4910        }
4911     }
4912 
4913     Scan scan = new Scan(get);
4914 
4915     RegionScanner scanner = null;
4916     try {
4917       scanner = getScanner(scan);
4918       scanner.next(results);
4919     } finally {
4920       if (scanner != null)
4921         scanner.close();
4922     }
4923 
4924     // post-get CP hook
4925     if (withCoprocessor && (coprocessorHost != null)) {
4926       coprocessorHost.postGet(get, results);
4927     }
4928 
4929     // do after lock
4930     if (this.metricsRegion != null) {
4931       long totalSize = 0l;
4932       for (Cell kv : results) {
4933         totalSize += KeyValueUtil.ensureKeyValue(kv).getLength();
4934       }
4935       this.metricsRegion.updateGet(totalSize);
4936     }
4937 
4938     return results;
4939   }
4940 
4941   public void mutateRow(RowMutations rm) throws IOException {
4942     // Don't need nonces here - RowMutations only supports puts and deletes
4943     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
4944   }
4945 
4946   /**
4947    * Perform atomic mutations within the region w/o nonces.
4948    * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)}
4949    */
4950   public void mutateRowsWithLocks(Collection<Mutation> mutations,
4951       Collection<byte[]> rowsToLock) throws IOException {
4952     mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
4953   }
4954 
4955   /**
4956    * Perform atomic mutations within the region.
4957    * @param mutations The list of mutations to perform.
4958    * <code>mutations</code> can contain operations for multiple rows.
4959    * Caller has to ensure that all rows are contained in this region.
4960    * @param rowsToLock Rows to lock
4961    * @param nonceGroup Optional nonce group of the operation (client Id)
4962    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4963    * If multiple rows are locked care should be taken that
4964    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
4965    * @throws IOException
4966    */
4967   public void mutateRowsWithLocks(Collection<Mutation> mutations,
4968       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
4969     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
4970     processRowsWithLocks(proc, -1, nonceGroup, nonce);
4971   }
4972 
4973   /**
4974    * Performs atomic multiple reads and writes on a given row.
4975    *
4976    * @param processor The object defines the reads and writes to a row.
4977    * @param nonceGroup Optional nonce group of the operation (client Id)
4978    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4979    */
4980   public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
4981       throws IOException {
4982     processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
4983   }
4984 
4985   /**
4986    * Performs atomic multiple reads and writes on a given row.
4987    *
4988    * @param processor The object defines the reads and writes to a row.
4989    * @param timeout The timeout of the processor.process() execution
4990    *                Use a negative number to switch off the time bound
4991    * @param nonceGroup Optional nonce group of the operation (client Id)
4992    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4993    */
4994   public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
4995       long nonceGroup, long nonce) throws IOException {
4996 
4997     for (byte[] row : processor.getRowsToLock()) {
4998       checkRow(row, "processRowsWithLocks");
4999     }
5000     if (!processor.readOnly()) {
5001       checkReadOnly();
5002     }
5003     checkResources();
5004 
5005     startRegionOperation();
5006     WALEdit walEdit = new WALEdit();
5007 
5008     // 1. Run pre-process hook
5009     try {
5010       processor.preProcess(this, walEdit);
5011     } catch (IOException e) {
5012       closeRegionOperation();
5013       throw e;
5014     }
5015     // Short circuit the read only case
5016     if (processor.readOnly()) {
5017       try {
5018         long now = EnvironmentEdgeManager.currentTime();
5019         doProcessRowWithTimeout(
5020             processor, now, this, null, null, timeout);
5021         processor.postProcess(this, walEdit, true);
5022       } catch (IOException e) {
5023         throw e;
5024       } finally {
5025         closeRegionOperation();
5026       }
5027       return;
5028     }
5029 
5030     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
5031     boolean locked;
5032     boolean walSyncSuccessful = false;
5033     List<RowLock> acquiredRowLocks;
5034     long addedSize = 0;
5035     List<Mutation> mutations = new ArrayList<Mutation>();
5036     List<Cell> memstoreCells = new ArrayList<Cell>();
5037     Collection<byte[]> rowsToLock = processor.getRowsToLock();
5038     long mvccNum = 0;
5039     HLogKey walKey = null;
5040     try {
5041       // 2. Acquire the row lock(s)
5042       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
5043       for (byte[] row : rowsToLock) {
5044         // Attempt to lock all involved rows, throw if any lock times out
5045         acquiredRowLocks.add(getRowLock(row));
5046       }
5047       // 3. Region lock
5048       lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
5049       locked = true;
5050       // Get a mvcc write number
5051       mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5052 
5053       long now = EnvironmentEdgeManager.currentTime();
5054       try {
5055         // 4. Let the processor scan the rows, generate mutations and add
5056         //    waledits
5057         doProcessRowWithTimeout(
5058             processor, now, this, mutations, walEdit, timeout);
5059 
5060         if (!mutations.isEmpty()) {
5061           // 5. Start mvcc transaction
5062           writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5063           // 6. Call the preBatchMutate hook
5064           processor.preBatchMutate(this, walEdit);
5065           // 7. Apply to memstore
5066           for (Mutation m : mutations) {
5067             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5068               Cell cell = cellScanner.current();
5069               CellUtil.setSequenceId(cell, mvccNum);
5070               Store store = getStore(cell);
5071               if (store == null) {
5072                 checkFamily(CellUtil.cloneFamily(cell));
5073                 // unreachable
5074               }
5075               Pair<Long, Cell> ret = store.add(cell);
5076               addedSize += ret.getFirst();
5077               memstoreCells.add(ret.getSecond());
5078             }
5079           }
5080 
5081           long txid = 0;
5082           // 8. Append no sync
5083           if (!walEdit.isEmpty()) {
5084             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
5085               this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
5086               processor.getClusterIds(), nonceGroup, nonce);
5087             txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
5088               walKey, walEdit, getSequenceId(), true, memstoreCells);
5089           }
5090           if(walKey == null){
5091             // since we use log sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
5092             // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
5093             walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
5094           }
5095 
5096           // 9. Release region lock
5097           if (locked) {
5098             this.updatesLock.readLock().unlock();
5099             locked = false;
5100           }
5101 
5102           // 10. Release row lock(s)
5103           releaseRowLocks(acquiredRowLocks);
5104 
5105           // 11. Sync edit log
5106           if (txid != 0) {
5107             syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
5108           }
5109           walSyncSuccessful = true;
5110           // 12. call postBatchMutate hook
5111           processor.postBatchMutate(this);
5112         }
5113       } finally {
5114         if (!mutations.isEmpty() && !walSyncSuccessful) {
5115           LOG.warn("Wal sync failed. Roll back " + mutations.size() +
5116               " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
5117               processor.getRowsToLock().iterator().next()) + "...");
5118           for (Mutation m : mutations) {
5119             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5120               KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
5121               getStore(kv).rollback(kv);
5122             }
5123           }
5124         }
5125         // 13. Roll mvcc forward
5126         if (writeEntry != null) {
5127           mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
5128         }
5129         if (locked) {
5130           this.updatesLock.readLock().unlock();
5131         }
5132         // release locks if some were acquired but another timed out
5133         releaseRowLocks(acquiredRowLocks);
5134       }
5135 
5136       // 14. Run post-process hook
5137       processor.postProcess(this, walEdit, walSyncSuccessful);
5138 
5139     } catch (IOException e) {
5140       throw e;
5141     } finally {
5142       closeRegionOperation();
5143       if (!mutations.isEmpty() &&
5144           isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
5145         requestFlush();
5146       }
5147     }
5148   }
5149 
5150   private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
5151                                        final long now,
5152                                        final HRegion region,
5153                                        final List<Mutation> mutations,
5154                                        final WALEdit walEdit,
5155                                        final long timeout) throws IOException {
5156     // Short circuit the no time bound case.
5157     if (timeout < 0) {
5158       try {
5159         processor.process(now, region, mutations, walEdit);
5160       } catch (IOException e) {
5161         LOG.warn("RowProcessor:" + processor.getClass().getName() +
5162             " throws Exception on row(s):" +
5163             Bytes.toStringBinary(
5164               processor.getRowsToLock().iterator().next()) + "...", e);
5165         throw e;
5166       }
5167       return;
5168     }
5169 
5170     // Case with time bound
5171     FutureTask<Void> task =
5172       new FutureTask<Void>(new Callable<Void>() {
5173         @Override
5174         public Void call() throws IOException {
5175           try {
5176             processor.process(now, region, mutations, walEdit);
5177             return null;
5178           } catch (IOException e) {
5179             LOG.warn("RowProcessor:" + processor.getClass().getName() +
5180                 " throws Exception on row(s):" +
5181                 Bytes.toStringBinary(
5182                     processor.getRowsToLock().iterator().next()) + "...", e);
5183             throw e;
5184           }
5185         }
5186       });
5187     rowProcessorExecutor.execute(task);
5188     try {
5189       task.get(timeout, TimeUnit.MILLISECONDS);
5190     } catch (TimeoutException te) {
5191       LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
5192           Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
5193           "...");
5194       throw new IOException(te);
5195     } catch (Exception e) {
5196       throw new IOException(e);
5197     }
5198   }
5199 
5200   public Result append(Append append) throws IOException {
5201     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
5202   }
5203 
5204   // TODO: There's a lot of boiler plate code identical
5205   // to increment... See how to better unify that.
5206   /**
5207    * Perform one or more append operations on a row.
5208    *
5209    * @return new keyvalues after increment
5210    * @throws IOException
5211    */
5212   public Result append(Append append, long nonceGroup, long nonce)
5213       throws IOException {
5214     byte[] row = append.getRow();
5215     checkRow(row, "append");
5216     boolean flush = false;
5217     Durability durability = getEffectiveDurability(append.getDurability());
5218     boolean writeToWAL = durability != Durability.SKIP_WAL;
5219     WALEdit walEdits = null;
5220     List<Cell> allKVs = new ArrayList<Cell>(append.size());
5221     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5222 
5223     long size = 0;
5224     long txid = 0;
5225 
5226     checkReadOnly();
5227     checkResources();
5228     // Lock row
5229     startRegionOperation(Operation.APPEND);
5230     this.writeRequestsCount.increment();
5231     long mvccNum = 0;
5232     WriteEntry w = null;
5233     HLogKey walKey = null;
5234     RowLock rowLock = null;
5235     List<Cell> memstoreCells = new ArrayList<Cell>();
5236     boolean doRollBackMemstore = false;
5237     try {
5238       rowLock = getRowLock(row);
5239       try {
5240         lock(this.updatesLock.readLock());
5241         try {
5242           // wait for all prior MVCC transactions to finish - while we hold the row lock
5243           // (so that we are guaranteed to see the latest state)
5244           mvcc.waitForPreviousTransactionsComplete();
5245           if (this.coprocessorHost != null) {
5246             Result r = this.coprocessorHost.preAppendAfterRowLock(append);
5247             if(r!= null) {
5248               return r;
5249             }
5250           }
5251           // now start my own transaction
5252           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5253           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5254           long now = EnvironmentEdgeManager.currentTime();
5255           // Process each family
5256           for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
5257 
5258             Store store = stores.get(family.getKey());
5259             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5260 
5261             // Sort the cells so that they match the order that they
5262             // appear in the Get results. Otherwise, we won't be able to
5263             // find the existing values if the cells are not specified
5264             // in order by the client since cells are in an array list.
5265             Collections.sort(family.getValue(), store.getComparator());
5266             // Get previous values for all columns in this family
5267             Get get = new Get(row);
5268             for (Cell cell : family.getValue()) {
5269               get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
5270             }
5271             List<Cell> results = get(get, false);
5272             // Iterate the input columns and update existing values if they were
5273             // found, otherwise add new column initialized to the append value
5274 
5275             // Avoid as much copying as possible. Every byte is copied at most
5276             // once.
5277             // Would be nice if KeyValue had scatter/gather logic
5278             int idx = 0;
5279             for (Cell cell : family.getValue()) {
5280               KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5281               KeyValue newKV;
5282               KeyValue oldKv = null;
5283               if (idx < results.size()
5284                   && CellUtil.matchingQualifier(results.get(idx),kv)) {
5285                 oldKv = KeyValueUtil.ensureKeyValue(results.get(idx));
5286                 // allocate an empty kv once
5287                 newKV = new KeyValue(row.length, kv.getFamilyLength(),
5288                     kv.getQualifierLength(), now, KeyValue.Type.Put,
5289                     oldKv.getValueLength() + kv.getValueLength(),
5290                     oldKv.getTagsLength() + kv.getTagsLength());
5291                 // copy in the value
5292                 System.arraycopy(oldKv.getValueArray(), oldKv.getValueOffset(),
5293                     newKV.getValueArray(), newKV.getValueOffset(),
5294                     oldKv.getValueLength());
5295                 System.arraycopy(kv.getValueArray(), kv.getValueOffset(),
5296                     newKV.getValueArray(),
5297                     newKV.getValueOffset() + oldKv.getValueLength(),
5298                     kv.getValueLength());
5299                 // copy in the tags
5300                 System.arraycopy(oldKv.getTagsArray(), oldKv.getTagsOffset(), newKV.getTagsArray(),
5301                     newKV.getTagsOffset(), oldKv.getTagsLength());
5302                 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
5303                     newKV.getTagsOffset() + oldKv.getTagsLength(), kv.getTagsLength());
5304                 // copy in row, family, and qualifier
5305                 System.arraycopy(kv.getRowArray(), kv.getRowOffset(),
5306                     newKV.getRowArray(), newKV.getRowOffset(), kv.getRowLength());
5307                 System.arraycopy(kv.getFamilyArray(), kv.getFamilyOffset(),
5308                     newKV.getFamilyArray(), newKV.getFamilyOffset(),
5309                     kv.getFamilyLength());
5310                 System.arraycopy(kv.getQualifierArray(), kv.getQualifierOffset(),
5311                     newKV.getQualifierArray(), newKV.getQualifierOffset(),
5312                     kv.getQualifierLength());
5313                 idx++;
5314               } else {
5315                 newKV = kv;
5316                 // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
5317                 // so only need to update the timestamp to 'now'
5318                 newKV.updateLatestStamp(Bytes.toBytes(now));
5319              }
5320               newKV.setSequenceId(mvccNum);
5321               // Give coprocessors a chance to update the new cell
5322               if (coprocessorHost != null) {
5323                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5324                     RegionObserver.MutationType.APPEND, append, oldKv, newKV));
5325               }
5326               kvs.add(newKV);
5327 
5328               // Append update to WAL
5329               if (writeToWAL) {
5330                 if (walEdits == null) {
5331                   walEdits = new WALEdit();
5332                 }
5333                 walEdits.add(newKV);
5334               }
5335             }
5336 
5337             //store the kvs to the temporary memstore before writing HLog
5338             tempMemstore.put(store, kvs);
5339           }
5340 
5341           //Actually write to Memstore now
5342           for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5343             Store store = entry.getKey();
5344             if (store.getFamily().getMaxVersions() == 1) {
5345               // upsert if VERSIONS for this CF == 1
5346               size += store.upsert(entry.getValue(), getSmallestReadPoint());
5347               memstoreCells.addAll(entry.getValue());
5348             } else {
5349               // otherwise keep older versions around
5350               for (Cell cell: entry.getValue()) {
5351                 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5352                 Pair<Long, Cell> ret = store.add(kv);
5353                 size += ret.getFirst();
5354                 memstoreCells.add(ret.getSecond());
5355                 doRollBackMemstore = true;
5356               }
5357             }
5358             allKVs.addAll(entry.getValue());
5359           }
5360 
5361           // Actually write to WAL now
5362           if (writeToWAL) {
5363             // Using default cluster id, as this can only happen in the originating
5364             // cluster. A slave cluster receives the final value (not the delta)
5365             // as a Put.
5366             walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
5367               this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
5368             txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
5369               this.sequenceId, true, memstoreCells);
5370           } else {
5371             recordMutationWithoutWal(append.getFamilyCellMap());
5372           }
5373           if (walKey == null) {
5374             // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
5375             walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
5376           }
5377 
5378           size = this.addAndGetGlobalMemstoreSize(size);
5379           flush = isFlushSize(size);
5380         } finally {
5381           this.updatesLock.readLock().unlock();
5382         }
5383       } finally {
5384         rowLock.release();
5385         rowLock = null;
5386       }
5387       // sync the transaction log outside the rowlock
5388       if(txid != 0){
5389         syncOrDefer(txid, durability);
5390       }
5391       doRollBackMemstore = false;
5392     } finally {
5393       if (rowLock != null) {
5394         rowLock.release();
5395       }
5396       // if the wal sync was unsuccessful, remove keys from memstore
5397       if (doRollBackMemstore) {
5398         rollbackMemstore(memstoreCells);
5399       }
5400       if (w != null) {
5401         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5402       }
5403       closeRegionOperation(Operation.APPEND);
5404     }
5405 
5406     if (this.metricsRegion != null) {
5407       this.metricsRegion.updateAppend();
5408     }
5409 
5410     if (flush) {
5411       // Request a cache flush. Do it outside update lock.
5412       requestFlush();
5413     }
5414 
5415 
5416     return append.isReturnResults() ? Result.create(allKVs) : null;
5417   }
5418 
5419   public Result increment(Increment increment) throws IOException {
5420     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
5421   }
5422 
5423   /**
5424    * Perform one or more increment operations on a row.
5425    * @return new keyvalues after increment
5426    * @throws IOException
5427    */
5428   public Result increment(Increment increment, long nonceGroup, long nonce)
5429   throws IOException {
5430     byte [] row = increment.getRow();
5431     checkRow(row, "increment");
5432     TimeRange tr = increment.getTimeRange();
5433     boolean flush = false;
5434     Durability durability = getEffectiveDurability(increment.getDurability());
5435     boolean writeToWAL = durability != Durability.SKIP_WAL;
5436     WALEdit walEdits = null;
5437     List<Cell> allKVs = new ArrayList<Cell>(increment.size());
5438     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5439 
5440     long size = 0;
5441     long txid = 0;
5442 
5443     checkReadOnly();
5444     checkResources();
5445     // Lock row
5446     startRegionOperation(Operation.INCREMENT);
5447     this.writeRequestsCount.increment();
5448     RowLock rowLock = null;
5449     WriteEntry w = null;
5450     HLogKey walKey = null;
5451     long mvccNum = 0;
5452     List<Cell> memstoreCells = new ArrayList<Cell>();
5453     boolean doRollBackMemstore = false;
5454     try {
5455       rowLock = getRowLock(row);
5456       try {
5457         lock(this.updatesLock.readLock());
5458         try {
5459           // wait for all prior MVCC transactions to finish - while we hold the row lock
5460           // (so that we are guaranteed to see the latest state)
5461           mvcc.waitForPreviousTransactionsComplete();
5462           if (this.coprocessorHost != null) {
5463             Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
5464             if (r != null) {
5465               return r;
5466             }
5467           }
5468           // now start my own transaction
5469           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5470           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5471           long now = EnvironmentEdgeManager.currentTime();
5472           // Process each family
5473           for (Map.Entry<byte [], List<Cell>> family:
5474               increment.getFamilyCellMap().entrySet()) {
5475 
5476             Store store = stores.get(family.getKey());
5477             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5478 
5479             // Sort the cells so that they match the order that they
5480             // appear in the Get results. Otherwise, we won't be able to
5481             // find the existing values if the cells are not specified
5482             // in order by the client since cells are in an array list.
5483             Collections.sort(family.getValue(), store.getComparator());
5484             // Get previous values for all columns in this family
5485             Get get = new Get(row);
5486             for (Cell cell: family.getValue()) {
5487               get.addColumn(family.getKey(),  CellUtil.cloneQualifier(cell));
5488             }
5489             get.setTimeRange(tr.getMin(), tr.getMax());
5490             List<Cell> results = get(get, false);
5491 
5492             // Iterate the input columns and update existing values if they were
5493             // found, otherwise add new column initialized to the increment amount
5494             int idx = 0;
5495             for (Cell kv: family.getValue()) {
5496               long amount = Bytes.toLong(CellUtil.cloneValue(kv));
5497               boolean noWriteBack = (amount == 0);
5498 
5499               Cell c = null;
5500               if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
5501                 c = results.get(idx);
5502                 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
5503                   amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
5504                 } else {
5505                   // throw DoNotRetryIOException instead of IllegalArgumentException
5506                   throw new org.apache.hadoop.hbase.DoNotRetryIOException(
5507                       "Attempted to increment field that isn't 64 bits wide");
5508                 }
5509                 idx++;
5510               }
5511 
5512               // Append new incremented KeyValue to list
5513               byte[] q = CellUtil.cloneQualifier(kv);
5514               byte[] val = Bytes.toBytes(amount);
5515               int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
5516               int incCellTagsLen = kv.getTagsLength();
5517               KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, now,
5518                   KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
5519               System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length);
5520               System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(),
5521                   family.getKey().length);
5522               System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length);
5523               // copy in the value
5524               System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length);
5525               // copy tags
5526               if (oldCellTagsLen > 0) {
5527                 System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(),
5528                     newKV.getTagsOffset(), oldCellTagsLen);
5529               }
5530               if (incCellTagsLen > 0) {
5531                 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
5532                     newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
5533               }
5534               newKV.setSequenceId(mvccNum);
5535               // Give coprocessors a chance to update the new cell
5536               if (coprocessorHost != null) {
5537                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5538                     RegionObserver.MutationType.INCREMENT, increment, c, newKV));
5539               }
5540               allKVs.add(newKV);
5541 
5542               if (!noWriteBack) {
5543                 kvs.add(newKV);
5544 
5545                 // Prepare WAL updates
5546                 if (writeToWAL) {
5547                   if (walEdits == null) {
5548                     walEdits = new WALEdit();
5549                   }
5550                   walEdits.add(newKV);
5551                 }
5552               }
5553             }
5554 
5555             //store the kvs to the temporary memstore before writing HLog
5556             if (!kvs.isEmpty()) {
5557               tempMemstore.put(store, kvs);
5558             }
5559           }
5560 
5561           //Actually write to Memstore now
5562           if (!tempMemstore.isEmpty()) {
5563             for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5564               Store store = entry.getKey();
5565               if (store.getFamily().getMaxVersions() == 1) {
5566                 // upsert if VERSIONS for this CF == 1
5567                 size += store.upsert(entry.getValue(), getSmallestReadPoint());
5568                 memstoreCells.addAll(entry.getValue());
5569               } else {
5570                 // otherwise keep older versions around
5571                 for (Cell cell : entry.getValue()) {
5572                   KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5573                   Pair<Long, Cell> ret = store.add(kv);
5574                   size += ret.getFirst();
5575                   memstoreCells.add(ret.getSecond());
5576                   doRollBackMemstore = true;
5577                 }
5578               }
5579             }
5580             size = this.addAndGetGlobalMemstoreSize(size);
5581             flush = isFlushSize(size);
5582           }
5583 
5584           // Actually write to WAL now
5585           if (walEdits != null && !walEdits.isEmpty()) {
5586             if (writeToWAL) {
5587               // Using default cluster id, as this can only happen in the originating
5588               // cluster. A slave cluster receives the final value (not the delta)
5589               // as a Put.
5590               walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
5591                 this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
5592               txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
5593                 walKey, walEdits, getSequenceId(), true, memstoreCells);
5594             } else {
5595               recordMutationWithoutWal(increment.getFamilyCellMap());
5596             }
5597           }
5598           if(walKey == null){
5599             // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
5600             walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
5601           }
5602         } finally {
5603           this.updatesLock.readLock().unlock();
5604         }
5605       } finally {
5606         rowLock.release();
5607         rowLock = null;
5608       }
5609       // sync the transaction log outside the rowlock
5610       if(txid != 0){
5611         syncOrDefer(txid, durability);
5612       }
5613       doRollBackMemstore = false;
5614     } finally {
5615       if (rowLock != null) {
5616         rowLock.release();
5617       }
5618       // if the wal sync was unsuccessful, remove keys from memstore
5619       if (doRollBackMemstore) {
5620         rollbackMemstore(memstoreCells);
5621       }
5622       if (w != null) {
5623         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5624       }
5625       closeRegionOperation(Operation.INCREMENT);
5626       if (this.metricsRegion != null) {
5627         this.metricsRegion.updateIncrement();
5628       }
5629     }
5630 
5631     if (flush) {
5632       // Request a cache flush.  Do it outside update lock.
5633       requestFlush();
5634     }
5635 
5636     return Result.create(allKVs);
5637   }
5638 
5639   //
5640   // New HBASE-880 Helpers
5641   //
5642 
5643   private void checkFamily(final byte [] family)
5644   throws NoSuchColumnFamilyException {
5645     if (!this.htableDescriptor.hasFamily(family)) {
5646       throw new NoSuchColumnFamilyException("Column family " +
5647           Bytes.toString(family) + " does not exist in region " + this
5648           + " in table " + this.htableDescriptor);
5649     }
5650   }
5651 
5652   public static final long FIXED_OVERHEAD = ClassSize.align(
5653       ClassSize.OBJECT +
5654       ClassSize.ARRAY +
5655       40 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
5656       (12 * Bytes.SIZEOF_LONG) +
5657       4 * Bytes.SIZEOF_BOOLEAN);
5658 
5659   // woefully out of date - currently missing:
5660   // 1 x HashMap - coprocessorServiceHandlers
5661   // 6 x Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
5662   //   checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,
5663   //   writeRequestsCount
5664   // 1 x HRegion$WriteState - writestate
5665   // 1 x RegionCoprocessorHost - coprocessorHost
5666   // 1 x RegionSplitPolicy - splitPolicy
5667   // 1 x MetricsRegion - metricsRegion
5668   // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper
5669   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
5670       ClassSize.OBJECT + // closeLock
5671       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
5672       (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
5673       (2 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, scannerReadPoints
5674       WriteState.HEAP_SIZE + // writestate
5675       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
5676       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
5677       MultiVersionConsistencyControl.FIXED_SIZE // mvcc
5678       + ClassSize.TREEMAP // maxSeqIdInStores
5679       + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
5680       ;
5681 
5682   @Override
5683   public long heapSize() {
5684     long heapSize = DEEP_OVERHEAD;
5685     for (Store store : this.stores.values()) {
5686       heapSize += store.heapSize();
5687     }
5688     // this does not take into account row locks, recent flushes, mvcc entries, and more
5689     return heapSize;
5690   }
5691 
5692   /*
5693    * This method calls System.exit.
5694    * @param message Message to print out.  May be null.
5695    */
5696   private static void printUsageAndExit(final String message) {
5697     if (message != null && message.length() > 0) System.out.println(message);
5698     System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
5699     System.out.println("Options:");
5700     System.out.println(" major_compact  Pass this option to major compact " +
5701       "passed region.");
5702     System.out.println("Default outputs scan of passed region.");
5703     System.exit(1);
5704   }
5705 
5706   /**
5707    * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
5708    * be available for handling
5709    * {@link HRegion#execService(com.google.protobuf.RpcController,
5710    *    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
5711    *
5712    * <p>
5713    * Only a single instance may be registered per region for a given {@link Service} subclass (the
5714    * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
5715    * After the first registration, subsequent calls with the same service name will fail with
5716    * a return value of {@code false}.
5717    * </p>
5718    * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
5719    * @return {@code true} if the registration was successful, {@code false}
5720    * otherwise
5721    */
5722   public boolean registerService(Service instance) {
5723     /*
5724      * No stacking of instances is allowed for a single service name
5725      */
5726     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
5727     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
5728       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
5729           " already registered, rejecting request from "+instance
5730       );
5731       return false;
5732     }
5733 
5734     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
5735     if (LOG.isDebugEnabled()) {
5736       LOG.debug("Registered coprocessor service: region="+
5737           Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
5738     }
5739     return true;
5740   }
5741 
5742   /**
5743    * Executes a single protocol buffer coprocessor endpoint {@link Service} method using
5744    * the registered protocol handlers.  {@link Service} implementations must be registered via the
5745    * {@link HRegion#registerService(com.google.protobuf.Service)}
5746    * method before they are available.
5747    *
5748    * @param controller an {@code RpcController} implementation to pass to the invoked service
5749    * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
5750    *     and parameters for the method invocation
5751    * @return a protocol buffer {@code Message} instance containing the method's result
5752    * @throws IOException if no registered service handler is found or an error
5753    *     occurs during the invocation
5754    * @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)
5755    */
5756   public Message execService(RpcController controller, CoprocessorServiceCall call)
5757       throws IOException {
5758     String serviceName = call.getServiceName();
5759     String methodName = call.getMethodName();
5760     if (!coprocessorServiceHandlers.containsKey(serviceName)) {
5761       throw new UnknownProtocolException(null,
5762           "No registered coprocessor service found for name "+serviceName+
5763           " in region "+Bytes.toStringBinary(getRegionName()));
5764     }
5765 
5766     Service service = coprocessorServiceHandlers.get(serviceName);
5767     Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
5768     Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
5769     if (methodDesc == null) {
5770       throw new UnknownProtocolException(service.getClass(),
5771           "Unknown method "+methodName+" called on service "+serviceName+
5772               " in region "+Bytes.toStringBinary(getRegionName()));
5773     }
5774 
5775     Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
5776         .mergeFrom(call.getRequest()).build();
5777 
5778     if (coprocessorHost != null) {
5779       request = coprocessorHost.preEndpointInvocation(service, methodName, request);
5780     }
5781 
5782     final Message.Builder responseBuilder =
5783         service.getResponsePrototype(methodDesc).newBuilderForType();
5784     service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
5785       @Override
5786       public void run(Message message) {
5787         if (message != null) {
5788           responseBuilder.mergeFrom(message);
5789         }
5790       }
5791     });
5792 
5793     if (coprocessorHost != null) {
5794       coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
5795     }
5796 
5797     return responseBuilder.build();
5798   }
5799 
5800   /*
5801    * Process table.
5802    * Do major compaction or list content.
5803    * @throws IOException
5804    */
5805   private static void processTable(final FileSystem fs, final Path p,
5806       final HLog log, final Configuration c,
5807       final boolean majorCompact)
5808   throws IOException {
5809     HRegion region;
5810     // Currently expects tables have one region only.
5811     if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
5812       region = HRegion.newHRegion(p, log, fs, c,
5813         HRegionInfo.FIRST_META_REGIONINFO, HTableDescriptor.META_TABLEDESC, null);
5814     } else {
5815       throw new IOException("Not a known catalog table: " + p.toString());
5816     }
5817     try {
5818       region.initialize(null);
5819       if (majorCompact) {
5820         region.compactStores(true);
5821       } else {
5822         // Default behavior
5823         Scan scan = new Scan();
5824         // scan.addFamily(HConstants.CATALOG_FAMILY);
5825         RegionScanner scanner = region.getScanner(scan);
5826         try {
5827           List<Cell> kvs = new ArrayList<Cell>();
5828           boolean done;
5829           do {
5830             kvs.clear();
5831             done = scanner.next(kvs);
5832             if (kvs.size() > 0) LOG.info(kvs);
5833           } while (done);
5834         } finally {
5835           scanner.close();
5836         }
5837       }
5838     } finally {
5839       region.close();
5840     }
5841   }
5842 
5843   boolean shouldForceSplit() {
5844     return this.splitRequest;
5845   }
5846 
5847   byte[] getExplicitSplitPoint() {
5848     return this.explicitSplitPoint;
5849   }
5850 
5851   void forceSplit(byte[] sp) {
5852     // This HRegion will go away after the forced split is successful
5853     // But if a forced split fails, we need to clear forced split.
5854     this.splitRequest = true;
5855     if (sp != null) {
5856       this.explicitSplitPoint = sp;
5857     }
5858   }
5859 
5860   void clearSplit() {
5861     this.splitRequest = false;
5862     this.explicitSplitPoint = null;
5863   }
5864 
5865   /**
5866    * Give the region a chance to prepare before it is split.
5867    */
5868   protected void prepareToSplit() {
5869     // nothing
5870   }
5871 
5872   /**
5873    * Return the splitpoint. null indicates the region isn't splittable
5874    * If the splitpoint isn't explicitly specified, it will go over the stores
5875    * to find the best splitpoint. Currently the criteria of best splitpoint
5876    * is based on the size of the store.
5877    */
5878   public byte[] checkSplit() {
5879     // Can't split META
5880     if (this.getRegionInfo().isMetaTable() ||
5881         TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
5882       if (shouldForceSplit()) {
5883         LOG.warn("Cannot split meta region in HBase 0.20 and above");
5884       }
5885       return null;
5886     }
5887 
5888     // Can't split region which is in recovering state
5889     if (this.isRecovering()) {
5890       LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
5891       return null;
5892     }
5893 
5894     if (!splitPolicy.shouldSplit()) {
5895       return null;
5896     }
5897 
5898     byte[] ret = splitPolicy.getSplitPoint();
5899 
5900     if (ret != null) {
5901       try {
5902         checkRow(ret, "calculated split");
5903       } catch (IOException e) {
5904         LOG.error("Ignoring invalid split", e);
5905         return null;
5906       }
5907     }
5908     return ret;
5909   }
5910 
5911   /**
5912    * @return The priority that this region should have in the compaction queue
5913    */
5914   public int getCompactPriority() {
5915     int count = Integer.MAX_VALUE;
5916     for (Store store : stores.values()) {
5917       count = Math.min(count, store.getCompactPriority());
5918     }
5919     return count;
5920   }
5921 
5922 
5923   /** @return the coprocessor host */
5924   public RegionCoprocessorHost getCoprocessorHost() {
5925     return coprocessorHost;
5926   }
5927 
5928   /** @param coprocessorHost the new coprocessor host */
5929   public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
5930     this.coprocessorHost = coprocessorHost;
5931   }
5932 
5933   /**
5934    * This method needs to be called before any public call that reads or
5935    * modifies data. It has to be called just before a try.
5936    * #closeRegionOperation needs to be called in the try's finally block
5937    * Acquires a read lock and checks if the region is closing or closed.
5938    * @throws IOException
5939    */
5940   public void startRegionOperation() throws IOException {
5941     startRegionOperation(Operation.ANY);
5942   }
5943 
5944   /**
5945    * @param op The operation is about to be taken on the region
5946    * @throws IOException
5947    */
5948   protected void startRegionOperation(Operation op) throws IOException {
5949     switch (op) {
5950     case GET:  // read operations
5951     case SCAN:
5952       checkReadsEnabled();
5953     case INCREMENT: // write operations
5954     case APPEND:
5955     case SPLIT_REGION:
5956     case MERGE_REGION:
5957     case PUT:
5958     case DELETE:
5959     case BATCH_MUTATE:
5960     case COMPACT_REGION:
5961       // when a region is in recovering state, no read, split or merge is allowed
5962       if (this.isRecovering() && (this.disallowWritesInRecovering ||
5963               (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
5964         throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering");
5965       }
5966       break;
5967     default:
5968       break;
5969     }
5970     if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
5971         || op == Operation.COMPACT_REGION) {
5972       // split, merge or compact region doesn't need to check the closing/closed state or lock the
5973       // region
5974       return;
5975     }
5976     if (this.closing.get()) {
5977       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5978     }
5979     lock(lock.readLock());
5980     if (this.closed.get()) {
5981       lock.readLock().unlock();
5982       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
5983     }
5984     try {
5985       if (coprocessorHost != null) {
5986         coprocessorHost.postStartRegionOperation(op);
5987       }
5988     } catch (Exception e) {
5989       lock.readLock().unlock();
5990       throw new IOException(e);
5991     }
5992   }
5993 
5994   /**
5995    * Closes the lock. This needs to be called in the finally block corresponding
5996    * to the try block of #startRegionOperation
5997    * @throws IOException
5998    */
5999   public void closeRegionOperation() throws IOException {
6000     closeRegionOperation(Operation.ANY);
6001   }
6002 
6003   /**
6004    * Closes the lock. This needs to be called in the finally block corresponding
6005    * to the try block of {@link #startRegionOperation(Operation)}
6006    * @throws IOException
6007    */
6008   public void closeRegionOperation(Operation operation) throws IOException {
6009     lock.readLock().unlock();
6010     if (coprocessorHost != null) {
6011       coprocessorHost.postCloseRegionOperation(operation);
6012     }
6013   }
6014 
6015   /**
6016    * This method needs to be called before any public call that reads or
6017    * modifies stores in bulk. It has to be called just before a try.
6018    * #closeBulkRegionOperation needs to be called in the try's finally block
6019    * Acquires a writelock and checks if the region is closing or closed.
6020    * @throws NotServingRegionException when the region is closing or closed
6021    * @throws RegionTooBusyException if failed to get the lock in time
6022    * @throws InterruptedIOException if interrupted while waiting for a lock
6023    */
6024   private void startBulkRegionOperation(boolean writeLockNeeded)
6025       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
6026     if (this.closing.get()) {
6027       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
6028     }
6029     if (writeLockNeeded) lock(lock.writeLock());
6030     else lock(lock.readLock());
6031     if (this.closed.get()) {
6032       if (writeLockNeeded) lock.writeLock().unlock();
6033       else lock.readLock().unlock();
6034       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
6035     }
6036   }
6037 
6038   /**
6039    * Closes the lock. This needs to be called in the finally block corresponding
6040    * to the try block of #startRegionOperation
6041    */
6042   private void closeBulkRegionOperation(){
6043     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
6044     else lock.readLock().unlock();
6045   }
6046 
6047   /**
6048    * Update counters for numer of puts without wal and the size of possible data loss.
6049    * These information are exposed by the region server metrics.
6050    */
6051   private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
6052     numMutationsWithoutWAL.increment();
6053     if (numMutationsWithoutWAL.get() <= 1) {
6054       LOG.info("writing data to region " + this +
6055                " with WAL disabled. Data may be lost in the event of a crash.");
6056     }
6057 
6058     long mutationSize = 0;
6059     for (List<Cell> cells: familyMap.values()) {
6060       for (Cell cell : cells) {
6061         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
6062         mutationSize += kv.getKeyLength() + kv.getValueLength();
6063       }
6064     }
6065 
6066     dataInMemoryWithoutWAL.add(mutationSize);
6067   }
6068 
6069   private void lock(final Lock lock)
6070       throws RegionTooBusyException, InterruptedIOException {
6071     lock(lock, 1);
6072   }
6073 
6074   /**
6075    * Try to acquire a lock.  Throw RegionTooBusyException
6076    * if failed to get the lock in time. Throw InterruptedIOException
6077    * if interrupted while waiting for the lock.
6078    */
6079   private void lock(final Lock lock, final int multiplier)
6080       throws RegionTooBusyException, InterruptedIOException {
6081     try {
6082       final long waitTime = Math.min(maxBusyWaitDuration,
6083           busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
6084       if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
6085         throw new RegionTooBusyException(
6086             "failed to get a lock in " + waitTime + " ms. " +
6087                 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
6088                 this.getRegionInfo().getRegionNameAsString()) +
6089                 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
6090                 this.getRegionServerServices().getServerName()));
6091       }
6092     } catch (InterruptedException ie) {
6093       LOG.info("Interrupted while waiting for a lock");
6094       InterruptedIOException iie = new InterruptedIOException();
6095       iie.initCause(ie);
6096       throw iie;
6097     }
6098   }
6099 
6100   /**
6101    * Calls sync with the given transaction ID if the region's table is not
6102    * deferring it.
6103    * @param txid should sync up to which transaction
6104    * @throws IOException If anything goes wrong with DFS
6105    */
6106   private void syncOrDefer(long txid, Durability durability) throws IOException {
6107     if (this.getRegionInfo().isMetaRegion()) {
6108       this.log.sync(txid);
6109     } else {
6110       switch(durability) {
6111       case USE_DEFAULT:
6112         // do what table defaults to
6113         if (shouldSyncLog()) {
6114           this.log.sync(txid);
6115         }
6116         break;
6117       case SKIP_WAL:
6118         // nothing do to
6119         break;
6120       case ASYNC_WAL:
6121         // nothing do to
6122         break;
6123       case SYNC_WAL:
6124       case FSYNC_WAL:
6125         // sync the WAL edit (SYNC and FSYNC treated the same for now)
6126         this.log.sync(txid);
6127         break;
6128       }
6129     }
6130   }
6131 
6132   /**
6133    * Check whether we should sync the log from the table's durability settings
6134    */
6135   private boolean shouldSyncLog() {
6136     return durability.ordinal() >  Durability.ASYNC_WAL.ordinal();
6137   }
6138 
6139   /**
6140    * A mocked list implementaion - discards all updates.
6141    */
6142   private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
6143 
6144     @Override
6145     public void add(int index, Cell element) {
6146       // do nothing
6147     }
6148 
6149     @Override
6150     public boolean addAll(int index, Collection<? extends Cell> c) {
6151       return false; // this list is never changed as a result of an update
6152     }
6153 
6154     @Override
6155     public KeyValue get(int index) {
6156       throw new UnsupportedOperationException();
6157     }
6158 
6159     @Override
6160     public int size() {
6161       return 0;
6162     }
6163   };
6164 
6165   /**
6166    * Facility for dumping and compacting catalog tables.
6167    * Only does catalog tables since these are only tables we for sure know
6168    * schema on.  For usage run:
6169    * <pre>
6170    *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
6171    * </pre>
6172    * @throws IOException
6173    */
6174   public static void main(String[] args) throws IOException {
6175     if (args.length < 1) {
6176       printUsageAndExit(null);
6177     }
6178     boolean majorCompact = false;
6179     if (args.length > 1) {
6180       if (!args[1].toLowerCase().startsWith("major")) {
6181         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
6182       }
6183       majorCompact = true;
6184     }
6185     final Path tableDir = new Path(args[0]);
6186     final Configuration c = HBaseConfiguration.create();
6187     final FileSystem fs = FileSystem.get(c);
6188     final Path logdir = new Path(c.get("hbase.tmp.dir"));
6189     final String logname = "hlog" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
6190 
6191     final HLog log = HLogFactory.createHLog(fs, logdir, logname, c);
6192     try {
6193       processTable(fs, tableDir, log, c, majorCompact);
6194     } finally {
6195        log.close();
6196        // TODO: is this still right?
6197        BlockCache bc = new CacheConfig(c).getBlockCache();
6198        if (bc != null) bc.shutdown();
6199     }
6200   }
6201 
6202   /**
6203    * Gets the latest sequence number that was read from storage when this region was opened.
6204    */
6205   public long getOpenSeqNum() {
6206     return this.openSeqNum;
6207   }
6208 
6209   /**
6210    * Gets max sequence ids of stores that was read from storage when this region was opened. WAL
6211    * Edits with smaller or equal sequence number will be skipped from replay.
6212    */
6213   public Map<byte[], Long> getMaxStoreSeqIdForLogReplay() {
6214     return this.maxSeqIdInStores;
6215   }
6216 
6217   /**
6218    * @return if a given region is in compaction now.
6219    */
6220   public CompactionState getCompactionState() {
6221     boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
6222     return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
6223         : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
6224   }
6225 
6226   public void reportCompactionRequestStart(boolean isMajor){
6227     (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
6228   }
6229 
6230   public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
6231     int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
6232 
6233     // metrics
6234     compactionsFinished.incrementAndGet();
6235     compactionNumFilesCompacted.addAndGet(numFiles);
6236     compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
6237 
6238     assert newValue >= 0;
6239   }
6240 
6241   /**
6242    * Do not change this sequence id. See {@link #sequenceId} comment.
6243    * @return sequenceId
6244    */
6245   @VisibleForTesting
6246   public AtomicLong getSequenceId() {
6247     return this.sequenceId;
6248   }
6249 
6250   /**
6251    * sets this region's sequenceId.
6252    * @param value new value
6253    */
6254   private void setSequenceId(long value) {
6255     this.sequenceId.set(value);
6256   }
6257 
6258   /**
6259    * Listener class to enable callers of
6260    * bulkLoadHFile() to perform any necessary
6261    * pre/post processing of a given bulkload call
6262    */
6263   public interface BulkLoadListener {
6264 
6265     /**
6266      * Called before an HFile is actually loaded
6267      * @param family family being loaded to
6268      * @param srcPath path of HFile
6269      * @return final path to be used for actual loading
6270      * @throws IOException
6271      */
6272     String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
6273 
6274     /**
6275      * Called after a successful HFile load
6276      * @param family family being loaded to
6277      * @param srcPath path of HFile
6278      * @throws IOException
6279      */
6280     void doneBulkLoad(byte[] family, String srcPath) throws IOException;
6281 
6282     /**
6283      * Called after a failed HFile load
6284      * @param family family being loaded to
6285      * @param srcPath path of HFile
6286      * @throws IOException
6287      */
6288     void failedBulkLoad(byte[] family, String srcPath) throws IOException;
6289   }
6290 
6291   @VisibleForTesting class RowLockContext {
6292     private final HashedBytes row;
6293     private final CountDownLatch latch = new CountDownLatch(1);
6294     private final Thread thread;
6295     private int lockCount = 0;
6296 
6297     RowLockContext(HashedBytes row) {
6298       this.row = row;
6299       this.thread = Thread.currentThread();
6300     }
6301 
6302     boolean ownedByCurrentThread() {
6303       return thread == Thread.currentThread();
6304     }
6305 
6306     RowLock newLock() {
6307       lockCount++;
6308       return new RowLock(this);
6309     }
6310 
6311     @Override
6312     public String toString() {
6313       Thread t = this.thread;
6314       return "Thread=" + (t == null? "null": t.getName()) + ", row=" + this.row +
6315         ", lockCount=" + this.lockCount;
6316     }
6317 
6318     void releaseLock() {
6319       if (!ownedByCurrentThread()) {
6320         throw new IllegalArgumentException("Lock held by thread: " + thread
6321           + " cannot be released by different thread: " + Thread.currentThread());
6322       }
6323       lockCount--;
6324       if (lockCount == 0) {
6325         // no remaining locks by the thread, unlock and allow other threads to access
6326         RowLockContext existingContext = lockedRows.remove(row);
6327         if (existingContext != this) {
6328           throw new RuntimeException(
6329               "Internal row lock state inconsistent, should not happen, row: " + row);
6330         }
6331         latch.countDown();
6332       }
6333     }
6334   }
6335 
6336   /**
6337    * Row lock held by a given thread.
6338    * One thread may acquire multiple locks on the same row simultaneously.
6339    * The locks must be released by calling release() from the same thread.
6340    */
6341   public static class RowLock {
6342     @VisibleForTesting final RowLockContext context;
6343     private boolean released = false;
6344 
6345     @VisibleForTesting RowLock(RowLockContext context) {
6346       this.context = context;
6347     }
6348 
6349     /**
6350      * Release the given lock.  If there are no remaining locks held by the current thread
6351      * then unlock the row and allow other threads to acquire the lock.
6352      * @throws IllegalArgumentException if called by a different thread than the lock owning thread
6353      */
6354     public void release() {
6355       if (!released) {
6356         context.releaseLock();
6357         released = true;
6358       }
6359     }
6360   }
6361 
6362   /**
6363    * Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore
6364    * the WALEdit append later.
6365    * @param wal
6366    * @param cells list of Cells inserted into memstore. Those Cells are passed in order to
6367    *        be updated with right mvcc values(their log sequence number)
6368    * @return Return the key used appending with no sync and no append.
6369    * @throws IOException
6370    */
6371   private HLogKey appendNoSyncNoAppend(final HLog wal, List<Cell> cells) throws IOException {
6372     HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
6373       HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
6374     // Call append but with an empty WALEdit.  The returned seqeunce id will not be associated
6375     // with any edit and we can be sure it went in after all outstanding appends.
6376     wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
6377       WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
6378     return key;
6379   }
6380 
6381   /**
6382    * Explictly sync wal
6383    * @throws IOException
6384    */
6385   public void syncWal() throws IOException {
6386     if(this.log != null) {
6387       this.log.sync();
6388     }
6389   }
6390 }