View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.io.UnsupportedEncodingException;
25  import java.lang.reflect.Constructor;
26  import java.text.ParseException;
27  import java.util.AbstractList;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.HashSet;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.NavigableMap;
38  import java.util.NavigableSet;
39  import java.util.RandomAccess;
40  import java.util.Set;
41  import java.util.TreeMap;
42  import java.util.concurrent.Callable;
43  import java.util.concurrent.CompletionService;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.ConcurrentMap;
46  import java.util.concurrent.ConcurrentSkipListMap;
47  import java.util.concurrent.CountDownLatch;
48  import java.util.concurrent.ExecutionException;
49  import java.util.concurrent.ExecutorCompletionService;
50  import java.util.concurrent.ExecutorService;
51  import java.util.concurrent.Executors;
52  import java.util.concurrent.Future;
53  import java.util.concurrent.FutureTask;
54  import java.util.concurrent.ThreadFactory;
55  import java.util.concurrent.ThreadPoolExecutor;
56  import java.util.concurrent.TimeUnit;
57  import java.util.concurrent.TimeoutException;
58  import java.util.concurrent.atomic.AtomicBoolean;
59  import java.util.concurrent.atomic.AtomicInteger;
60  import java.util.concurrent.atomic.AtomicLong;
61  import java.util.concurrent.locks.Lock;
62  import java.util.concurrent.locks.ReentrantReadWriteLock;
63  
64  import org.apache.commons.logging.Log;
65  import org.apache.commons.logging.LogFactory;
66  import org.apache.hadoop.conf.Configuration;
67  import org.apache.hadoop.fs.FileStatus;
68  import org.apache.hadoop.fs.FileSystem;
69  import org.apache.hadoop.fs.Path;
70  import org.apache.hadoop.hbase.Cell;
71  import org.apache.hadoop.hbase.CellScanner;
72  import org.apache.hadoop.hbase.CellUtil;
73  import org.apache.hadoop.hbase.CompoundConfiguration;
74  import org.apache.hadoop.hbase.DoNotRetryIOException;
75  import org.apache.hadoop.hbase.DroppedSnapshotException;
76  import org.apache.hadoop.hbase.HBaseConfiguration;
77  import org.apache.hadoop.hbase.HColumnDescriptor;
78  import org.apache.hadoop.hbase.HConstants;
79  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
80  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
81  import org.apache.hadoop.hbase.HRegionInfo;
82  import org.apache.hadoop.hbase.HTableDescriptor;
83  import org.apache.hadoop.hbase.KeyValue;
84  import org.apache.hadoop.hbase.KeyValueUtil;
85  import org.apache.hadoop.hbase.NamespaceDescriptor;
86  import org.apache.hadoop.hbase.NotServingRegionException;
87  import org.apache.hadoop.hbase.RegionTooBusyException;
88  import org.apache.hadoop.hbase.TableName;
89  import org.apache.hadoop.hbase.Tag;
90  import org.apache.hadoop.hbase.TagType;
91  import org.apache.hadoop.hbase.UnknownScannerException;
92  import org.apache.hadoop.hbase.backup.HFileArchiver;
93  import org.apache.hadoop.hbase.classification.InterfaceAudience;
94  import org.apache.hadoop.hbase.client.Append;
95  import org.apache.hadoop.hbase.client.Delete;
96  import org.apache.hadoop.hbase.client.Durability;
97  import org.apache.hadoop.hbase.client.Get;
98  import org.apache.hadoop.hbase.client.Increment;
99  import org.apache.hadoop.hbase.client.IsolationLevel;
100 import org.apache.hadoop.hbase.client.Mutation;
101 import org.apache.hadoop.hbase.client.Put;
102 import org.apache.hadoop.hbase.client.Result;
103 import org.apache.hadoop.hbase.client.RowMutations;
104 import org.apache.hadoop.hbase.client.Scan;
105 import org.apache.hadoop.hbase.conf.ConfigurationManager;
106 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
107 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
108 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
109 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
110 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
111 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
112 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
113 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
114 import org.apache.hadoop.hbase.filter.FilterWrapper;
115 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
116 import org.apache.hadoop.hbase.io.HeapSize;
117 import org.apache.hadoop.hbase.io.TimeRange;
118 import org.apache.hadoop.hbase.io.hfile.BlockCache;
119 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
120 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
121 import org.apache.hadoop.hbase.ipc.RpcCallContext;
122 import org.apache.hadoop.hbase.ipc.RpcServer;
123 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
124 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
125 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
127 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
128 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
129 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
130 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
131 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
132 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
133 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
134 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
135 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
136 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
137 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
138 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
139 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
140 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
141 import org.apache.hadoop.hbase.util.Bytes;
142 import org.apache.hadoop.hbase.util.CancelableProgressable;
143 import org.apache.hadoop.hbase.util.ClassSize;
144 import org.apache.hadoop.hbase.util.CompressionTest;
145 import org.apache.hadoop.hbase.util.Counter;
146 import org.apache.hadoop.hbase.util.EncryptionTest;
147 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
148 import org.apache.hadoop.hbase.util.FSTableDescriptors;
149 import org.apache.hadoop.hbase.util.FSUtils;
150 import org.apache.hadoop.hbase.util.HashedBytes;
151 import org.apache.hadoop.hbase.util.Pair;
152 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
153 import org.apache.hadoop.hbase.util.Threads;
154 import org.apache.hadoop.hbase.wal.WAL;
155 import org.apache.hadoop.hbase.wal.WALFactory;
156 import org.apache.hadoop.hbase.wal.WALKey;
157 import org.apache.hadoop.hbase.wal.WALSplitter;
158 import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
159 import org.apache.hadoop.io.MultipleIOException;
160 import org.apache.hadoop.util.StringUtils;
161 
162 import com.google.common.annotations.VisibleForTesting;
163 import com.google.common.base.Optional;
164 import com.google.common.base.Preconditions;
165 import com.google.common.collect.Lists;
166 import com.google.common.collect.Maps;
167 import com.google.common.io.Closeables;
168 import com.google.protobuf.Descriptors;
169 import com.google.protobuf.Message;
170 import com.google.protobuf.RpcCallback;
171 import com.google.protobuf.RpcController;
172 import com.google.protobuf.Service;
173 
174 /**
175  * HRegion stores data for a certain region of a table.  It stores all columns
176  * for each row. A given table consists of one or more HRegions.
177  *
178  * <p>We maintain multiple HStores for a single HRegion.
179  *
180  * <p>An Store is a set of rows with some column data; together,
181  * they make up all the data for the rows.
182  *
183  * <p>Each HRegion has a 'startKey' and 'endKey'.
184  * <p>The first is inclusive, the second is exclusive (except for
185  * the final region)  The endKey of region 0 is the same as
186  * startKey for region 1 (if it exists).  The startKey for the
187  * first region is null. The endKey for the final region is null.
188  *
189  * <p>Locking at the HRegion level serves only one purpose: preventing the
190  * region from being closed (and consequently split) while other operations
191  * are ongoing. Each row level operation obtains both a row lock and a region
192  * read lock for the duration of the operation. While a scanner is being
193  * constructed, getScanner holds a read lock. If the scanner is successfully
194  * constructed, it holds a read lock until it is closed. A close takes out a
195  * write lock and consequently will block for ongoing operations and will block
196  * new operations from starting while the close is in progress.
197  *
198  * <p>An HRegion is defined by its table and its key extent.
199  *
200  * <p>It consists of at least one Store.  The number of Stores should be
201  * configurable, so that data which is accessed together is stored in the same
202  * Store.  Right now, we approximate that by building a single Store for
203  * each column family.  (This config info will be communicated via the
204  * tabledesc.)
205  *
206  * <p>The HTableDescriptor contains metainfo about the HRegion's table.
207  * regionName is a unique identifier for this HRegion. (startKey, endKey]
208  * defines the keyspace for this HRegion.
209  */
210 @InterfaceAudience.Private
211 public class HRegion implements HeapSize, PropagatingConfigurationObserver { // , Writable{
212   public static final Log LOG = LogFactory.getLog(HRegion.class);
213 
214   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
215       "hbase.hregion.scan.loadColumnFamiliesOnDemand";
216 
217   /**
218    * This is the global default value for durability. All tables/mutations not
219    * defining a durability or using USE_DEFAULT will default to this value.
220    */
221   private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL;
222 
223   final AtomicBoolean closed = new AtomicBoolean(false);
224   /* Closing can take some time; use the closing flag if there is stuff we don't
225    * want to do while in closing state; e.g. like offer this region up to the
226    * master as a region to close if the carrying regionserver is overloaded.
227    * Once set, it is never cleared.
228    */
229   final AtomicBoolean closing = new AtomicBoolean(false);
230 
231   /**
232    * The max sequence id of flushed data on this region.  Used doing some rough calculations on
233    * whether time to flush or not.
234    */
235   protected volatile long maxFlushedSeqId = -1L;
236 
237   /**
238    * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL
239    * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
240    * Its default value is -1L. This default is used as a marker to indicate
241    * that the region hasn't opened yet. Once it is opened, it is set to the derived
242    * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
243    *
244    * <p>Control of this sequence is handed off to the WAL implementation.  It is responsible
245    * for tagging edits with the correct sequence id since it is responsible for getting the
246    * edits into the WAL files. It controls updating the sequence id value.  DO NOT UPDATE IT
247    * OUTSIDE OF THE WAL.  The value you get will not be what you think it is.
248    */
249   private final AtomicLong sequenceId = new AtomicLong(-1L);
250 
251   /**
252    * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
253    * startRegionOperation to possibly invoke different checks before any region operations. Not all
254    * operations have to be defined here. It's only needed when a special check is need in
255    * startRegionOperation
256    */
257   public enum Operation {
258     ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
259     REPLAY_BATCH_MUTATE, COMPACT_REGION
260   }
261 
262   //////////////////////////////////////////////////////////////////////////////
263   // Members
264   //////////////////////////////////////////////////////////////////////////////
265 
266   // map from a locked row to the context for that lock including:
267   // - CountDownLatch for threads waiting on that row
268   // - the thread that owns the lock (allow reentrancy)
269   // - reference count of (reentrant) locks held by the thread
270   // - the row itself
271   private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
272       new ConcurrentHashMap<HashedBytes, RowLockContext>();
273 
274   protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
275       Bytes.BYTES_RAWCOMPARATOR);
276 
277   // TODO: account for each registered handler in HeapSize computation
278   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
279 
280   public final AtomicLong memstoreSize = new AtomicLong(0);
281 
282   // Debug possible data loss due to WAL off
283   final Counter numMutationsWithoutWAL = new Counter();
284   final Counter dataInMemoryWithoutWAL = new Counter();
285 
286   // Debug why CAS operations are taking a while.
287   final Counter checkAndMutateChecksPassed = new Counter();
288   final Counter checkAndMutateChecksFailed = new Counter();
289 
290   //Number of requests
291   final Counter readRequestsCount = new Counter();
292   final Counter writeRequestsCount = new Counter();
293 
294   // Number of requests blocked by memstore size.
295   private final Counter blockedRequestsCount = new Counter();
296 
297   /**
298    * @return the number of blocked requests count.
299    */
300   public long getBlockedRequestsCount() {
301     return this.blockedRequestsCount.get();
302   }
303 
304   // Compaction counters
305   final AtomicLong compactionsFinished = new AtomicLong(0L);
306   final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
307   final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
308 
309 
310   private final WAL wal;
311   private final HRegionFileSystem fs;
312   protected final Configuration conf;
313   private final Configuration baseConf;
314   private final KeyValue.KVComparator comparator;
315   private final int rowLockWaitDuration;
316   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
317 
318   // The internal wait duration to acquire a lock before read/update
319   // from the region. It is not per row. The purpose of this wait time
320   // is to avoid waiting a long time while the region is busy, so that
321   // we can release the IPC handler soon enough to improve the
322   // availability of the region server. It can be adjusted by
323   // tuning configuration "hbase.busy.wait.duration".
324   final long busyWaitDuration;
325   static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
326 
327   // If updating multiple rows in one call, wait longer,
328   // i.e. waiting for busyWaitDuration * # of rows. However,
329   // we can limit the max multiplier.
330   final int maxBusyWaitMultiplier;
331 
332   // Max busy wait duration. There is no point to wait longer than the RPC
333   // purge timeout, when a RPC call will be terminated by the RPC engine.
334   final long maxBusyWaitDuration;
335 
336   // negative number indicates infinite timeout
337   static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
338   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
339 
340   private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
341 
342   /**
343    * The sequence ID that was encountered when this region was opened.
344    */
345   private long openSeqNum = HConstants.NO_SEQNUM;
346 
347   /**
348    * The default setting for whether to enable on-demand CF loading for
349    * scan requests to this region. Requests can override it.
350    */
351   private boolean isLoadingCfsOnDemandDefault = false;
352 
353   private final AtomicInteger majorInProgress = new AtomicInteger(0);
354   private final AtomicInteger minorInProgress = new AtomicInteger(0);
355 
356   //
357   // Context: During replay we want to ensure that we do not lose any data. So, we
358   // have to be conservative in how we replay wals. For each store, we calculate
359   // the maxSeqId up to which the store was flushed. And, skip the edits which
360   // are equal to or lower than maxSeqId for each store.
361   // The following map is populated when opening the region
362   Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
363 
364   /**
365    * Config setting for whether to allow writes when a region is in recovering or not.
366    */
367   private boolean disallowWritesInRecovering = false;
368 
369   // when a region is in recovering state, it can only accept writes not reads
370   private volatile boolean isRecovering = false;
371 
372   private volatile Optional<ConfigurationManager> configurationManager;
373 
374   /**
375    * @return The smallest mvcc readPoint across all the scanners in this
376    * region. Writes older than this readPoint, are included  in every
377    * read operation.
378    */
379   public long getSmallestReadPoint() {
380     long minimumReadPoint;
381     // We need to ensure that while we are calculating the smallestReadPoint
382     // no new RegionScanners can grab a readPoint that we are unaware of.
383     // We achieve this by synchronizing on the scannerReadPoints object.
384     synchronized(scannerReadPoints) {
385       minimumReadPoint = mvcc.memstoreReadPoint();
386 
387       for (Long readPoint: this.scannerReadPoints.values()) {
388         if (readPoint < minimumReadPoint) {
389           minimumReadPoint = readPoint;
390         }
391       }
392     }
393     return minimumReadPoint;
394   }
395   /*
396    * Data structure of write state flags used coordinating flushes,
397    * compactions and closes.
398    */
399   static class WriteState {
400     // Set while a memstore flush is happening.
401     volatile boolean flushing = false;
402     // Set when a flush has been requested.
403     volatile boolean flushRequested = false;
404     // Number of compactions running.
405     volatile int compacting = 0;
406     // Gets set in close. If set, cannot compact or flush again.
407     volatile boolean writesEnabled = true;
408     // Set if region is read-only
409     volatile boolean readOnly = false;
410     // whether the reads are enabled. This is different than readOnly, because readOnly is
411     // static in the lifetime of the region, while readsEnabled is dynamic
412     volatile boolean readsEnabled = true;
413 
414     /**
415      * Set flags that make this region read-only.
416      *
417      * @param onOff flip value for region r/o setting
418      */
419     synchronized void setReadOnly(final boolean onOff) {
420       this.writesEnabled = !onOff;
421       this.readOnly = onOff;
422     }
423 
424     boolean isReadOnly() {
425       return this.readOnly;
426     }
427 
428     boolean isFlushRequested() {
429       return this.flushRequested;
430     }
431 
432     void setReadsEnabled(boolean readsEnabled) {
433       this.readsEnabled = readsEnabled;
434     }
435 
436     static final long HEAP_SIZE = ClassSize.align(
437         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
438   }
439 
440   /**
441    * Objects from this class are created when flushing to describe all the different states that
442    * that method ends up in. The Result enum describes those states. The sequence id should only
443    * be specified if the flush was successful, and the failure message should only be specified
444    * if it didn't flush.
445    */
446   public static class FlushResult {
447     enum Result {
448       FLUSHED_NO_COMPACTION_NEEDED,
449       FLUSHED_COMPACTION_NEEDED,
450       // Special case where a flush didn't run because there's nothing in the memstores. Used when
451       // bulk loading to know when we can still load even if a flush didn't happen.
452       CANNOT_FLUSH_MEMSTORE_EMPTY,
453       CANNOT_FLUSH
454       // Be careful adding more to this enum, look at the below methods to make sure
455     }
456 
457     final Result result;
458     final String failureReason;
459     final long flushSequenceId;
460 
461     /**
462      * Convenience constructor to use when the flush is successful, the failure message is set to
463      * null.
464      * @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED.
465      * @param flushSequenceId Generated sequence id that comes right after the edits in the
466      *                        memstores.
467      */
468     FlushResult(Result result, long flushSequenceId) {
469       this(result, flushSequenceId, null);
470       assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
471           .FLUSHED_COMPACTION_NEEDED;
472     }
473 
474     /**
475      * Convenience constructor to use when we cannot flush.
476      * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
477      * @param failureReason Reason why we couldn't flush.
478      */
479     FlushResult(Result result, String failureReason) {
480       this(result, -1, failureReason);
481       assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
482     }
483 
484     /**
485      * Constructor with all the parameters.
486      * @param result Any of the Result.
487      * @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
488      * @param failureReason Reason why we couldn't flush, or null.
489      */
490     FlushResult(Result result, long flushSequenceId, String failureReason) {
491       this.result = result;
492       this.flushSequenceId = flushSequenceId;
493       this.failureReason = failureReason;
494     }
495 
496     /**
497      * Convenience method, the equivalent of checking if result is
498      * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
499      * @return true if the memstores were flushed, else false.
500      */
501     public boolean isFlushSucceeded() {
502       return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
503           .FLUSHED_COMPACTION_NEEDED;
504     }
505 
506     /**
507      * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
508      * @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
509      */
510     public boolean isCompactionNeeded() {
511       return result == Result.FLUSHED_COMPACTION_NEEDED;
512     }
513   }
514 
515   final WriteState writestate = new WriteState();
516 
517   long memstoreFlushSize;
518   final long timestampSlop;
519   final long rowProcessorTimeout;
520 
521   // Last flush time for each Store. Useful when we are flushing for each column
522   private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap =
523       new ConcurrentHashMap<Store, Long>();
524 
525   final RegionServerServices rsServices;
526   private RegionServerAccounting rsAccounting;
527   private long flushCheckInterval;
528   // flushPerChanges is to prevent too many changes in memstore
529   private long flushPerChanges;
530   private long blockingMemStoreSize;
531   final long threadWakeFrequency;
532   // Used to guard closes
533   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
534 
535   // Stop updates lock
536   private final ReentrantReadWriteLock updatesLock = new ReentrantReadWriteLock();
537   private boolean splitRequest;
538   private byte[] explicitSplitPoint = null;
539 
540   private final MultiVersionConsistencyControl mvcc =
541       new MultiVersionConsistencyControl();
542 
543   // Coprocessor host
544   private RegionCoprocessorHost coprocessorHost;
545 
546   private HTableDescriptor htableDescriptor = null;
547   private RegionSplitPolicy splitPolicy;
548   private FlushPolicy flushPolicy;
549 
550   private final MetricsRegion metricsRegion;
551   private final MetricsRegionWrapperImpl metricsRegionWrapper;
552   private final Durability durability;
553   private final boolean regionStatsEnabled;
554 
555   /**
556    * HRegion constructor. This constructor should only be used for testing and
557    * extensions.  Instances of HRegion should be instantiated with the
558    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
559    *
560    * @param tableDir qualified path of directory where region should be located,
561    * usually the table directory.
562    * @param wal The WAL is the outbound log for any updates to the HRegion
563    * The wal file is a logfile from the previous execution that's
564    * custom-computed for this HRegion. The HRegionServer computes and sorts the
565    * appropriate wal info for this HRegion. If there is a previous wal file
566    * (implying that the HRegion has been written-to before), then read it from
567    * the supplied path.
568    * @param fs is the filesystem.
569    * @param confParam is global configuration settings.
570    * @param regionInfo - HRegionInfo that describes the region
571    * is new), then read them from the supplied path.
572    * @param htd the table descriptor
573    * @param rsServices reference to {@link RegionServerServices} or null
574    */
575   @Deprecated
576   public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
577       final Configuration confParam, final HRegionInfo regionInfo,
578       final HTableDescriptor htd, final RegionServerServices rsServices) {
579     this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
580       wal, confParam, htd, rsServices);
581   }
582 
583   /**
584    * HRegion constructor. This constructor should only be used for testing and
585    * extensions.  Instances of HRegion should be instantiated with the
586    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
587    *
588    * @param fs is the filesystem.
589    * @param wal The WAL is the outbound log for any updates to the HRegion
590    * The wal file is a logfile from the previous execution that's
591    * custom-computed for this HRegion. The HRegionServer computes and sorts the
592    * appropriate wal info for this HRegion. If there is a previous wal file
593    * (implying that the HRegion has been written-to before), then read it from
594    * the supplied path.
595    * @param confParam is global configuration settings.
596    * @param htd the table descriptor
597    * @param rsServices reference to {@link RegionServerServices} or null
598    */
599   public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration confParam,
600       final HTableDescriptor htd, final RegionServerServices rsServices) {
601     if (htd == null) {
602       throw new IllegalArgumentException("Need table descriptor");
603     }
604 
605     if (confParam instanceof CompoundConfiguration) {
606       throw new IllegalArgumentException("Need original base configuration");
607     }
608 
609     this.comparator = fs.getRegionInfo().getComparator();
610     this.wal = wal;
611     this.fs = fs;
612 
613     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
614     this.baseConf = confParam;
615     this.conf = new CompoundConfiguration()
616       .add(confParam)
617       .addStringMap(htd.getConfiguration())
618       .addBytesMap(htd.getValues());
619     this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
620         DEFAULT_CACHE_FLUSH_INTERVAL);
621     this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
622     if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
623       throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
624           + MAX_FLUSH_PER_CHANGES);
625     }
626     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
627                     DEFAULT_ROWLOCK_WAIT_DURATION);
628 
629     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
630     this.htableDescriptor = htd;
631     this.rsServices = rsServices;
632     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
633     setHTableSpecificConf();
634     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
635 
636     this.busyWaitDuration = conf.getLong(
637       "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
638     this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
639     if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
640       throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
641         + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
642         + maxBusyWaitMultiplier + "). Their product should be positive");
643     }
644     this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
645       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
646 
647     /*
648      * timestamp.slop provides a server-side constraint on the timestamp. This
649      * assumes that you base your TS around currentTimeMillis(). In this case,
650      * throw an error to the user if the user-specified TS is newer than now +
651      * slop. LATEST_TIMESTAMP == don't use this functionality
652      */
653     this.timestampSlop = conf.getLong(
654         "hbase.hregion.keyvalue.timestamp.slop.millisecs",
655         HConstants.LATEST_TIMESTAMP);
656 
657     /**
658      * Timeout for the process time in processRowsWithLocks().
659      * Use -1 to switch off time bound.
660      */
661     this.rowProcessorTimeout = conf.getLong(
662         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
663     this.durability = htd.getDurability() == Durability.USE_DEFAULT
664         ? DEFAULT_DURABLITY
665         : htd.getDurability();
666     if (rsServices != null) {
667       this.rsAccounting = this.rsServices.getRegionServerAccounting();
668       // don't initialize coprocessors if not running within a regionserver
669       // TODO: revisit if coprocessors should load in other cases
670       this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
671       this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
672       this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
673 
674       Map<String, HRegion> recoveringRegions = rsServices.getRecoveringRegions();
675       String encodedName = getRegionInfo().getEncodedName();
676       if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
677         this.isRecovering = true;
678         recoveringRegions.put(encodedName, this);
679       }
680     } else {
681       this.metricsRegionWrapper = null;
682       this.metricsRegion = null;
683     }
684     if (LOG.isDebugEnabled()) {
685       // Write out region name as string and its encoded name.
686       LOG.debug("Instantiated " + this);
687     }
688 
689     // by default, we allow writes against a region when it's in recovering
690     this.disallowWritesInRecovering =
691         conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
692           HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
693     configurationManager = Optional.absent();
694 
695     // disable stats tracking system tables, but check the config for everything else
696     this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals(
697         NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ?
698           false :
699           conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
700               HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
701   }
702 
703   void setHTableSpecificConf() {
704     if (this.htableDescriptor == null) return;
705     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
706 
707     if (flushSize <= 0) {
708       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
709         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
710     }
711     this.memstoreFlushSize = flushSize;
712     this.blockingMemStoreSize = this.memstoreFlushSize *
713         conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
714   }
715 
716   /**
717    * Initialize this region.
718    * Used only by tests and SplitTransaction to reopen the region.
719    * You should use createHRegion() or openHRegion()
720    * @return What the next sequence (edit) id should be.
721    * @throws IOException e
722    * @deprecated use HRegion.createHRegion() or HRegion.openHRegion()
723    */
724   @Deprecated
725   public long initialize() throws IOException {
726     return initialize(null);
727   }
728 
729   /**
730    * Initialize this region.
731    *
732    * @param reporter Tickle every so often if initialize is taking a while.
733    * @return What the next sequence (edit) id should be.
734    * @throws IOException e
735    */
736   private long initialize(final CancelableProgressable reporter) throws IOException {
737     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
738     long nextSeqId = -1;
739     try {
740       nextSeqId = initializeRegionInternals(reporter, status);
741       return nextSeqId;
742     } finally {
743       // nextSeqid will be -1 if the initialization fails.
744       // At least it will be 0 otherwise.
745       if (nextSeqId == -1) {
746         status
747             .abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
748       }
749     }
750   }
751 
752   private long initializeRegionInternals(final CancelableProgressable reporter,
753       final MonitoredTask status) throws IOException, UnsupportedEncodingException {
754     if (coprocessorHost != null) {
755       status.setStatus("Running coprocessor pre-open hook");
756       coprocessorHost.preOpen();
757     }
758 
759     // Write HRI to a file in case we need to recover hbase:meta
760     status.setStatus("Writing region info on filesystem");
761     fs.checkRegionInfoOnFilesystem();
762 
763 
764 
765     // Initialize all the HStores
766     status.setStatus("Initializing all the Stores");
767     long maxSeqId = initializeRegionStores(reporter, status);
768 
769     this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
770     this.writestate.flushRequested = false;
771     this.writestate.compacting = 0;
772 
773     if (this.writestate.writesEnabled) {
774       // Remove temporary data left over from old regions
775       status.setStatus("Cleaning up temporary data from old regions");
776       fs.cleanupTempDir();
777     }
778 
779     if (this.writestate.writesEnabled) {
780       status.setStatus("Cleaning up detritus from prior splits");
781       // Get rid of any splits or merges that were lost in-progress.  Clean out
782       // these directories here on open.  We may be opening a region that was
783       // being split but we crashed in the middle of it all.
784       fs.cleanupAnySplitDetritus();
785       fs.cleanupMergesDir();
786     }
787 
788     // Initialize split policy
789     this.splitPolicy = RegionSplitPolicy.create(this, conf);
790 
791     // Initialize flush policy
792     this.flushPolicy = FlushPolicyFactory.create(this, conf);
793 
794     long lastFlushTime = EnvironmentEdgeManager.currentTime();
795     for (Store store: stores.values()) {
796       this.lastStoreFlushTimeMap.put(store, lastFlushTime);
797     }
798 
799     // Use maximum of log sequenceid or that which was found in stores
800     // (particularly if no recovered edits, seqid will be -1).
801     long nextSeqid = maxSeqId;
802 
803     // In distributedLogReplay mode, we don't know the last change sequence number because region
804     // is opened before recovery completes. So we add a safety bumper to avoid new sequence number
805     // overlaps used sequence numbers
806     if (this.writestate.writesEnabled) {
807       nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs
808           .getRegionDir(), nextSeqid, (this.isRecovering ? (this.flushPerChanges + 10000000) : 1));
809     } else {
810       nextSeqid++;
811     }
812 
813     LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
814       "; next sequenceid=" + nextSeqid);
815 
816     // A region can be reopened if failed a split; reset flags
817     this.closing.set(false);
818     this.closed.set(false);
819 
820     if (coprocessorHost != null) {
821       status.setStatus("Running coprocessor post-open hooks");
822       coprocessorHost.postOpen();
823     }
824 
825     status.markComplete("Region opened successfully");
826     return nextSeqid;
827   }
828 
829   private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
830       throws IOException, UnsupportedEncodingException {
831     // Load in all the HStores.
832 
833     long maxSeqId = -1;
834     // initialized to -1 so that we pick up MemstoreTS from column families
835     long maxMemstoreTS = -1;
836 
837     if (!htableDescriptor.getFamilies().isEmpty()) {
838       // initialize the thread pool for opening stores in parallel.
839       ThreadPoolExecutor storeOpenerThreadPool =
840         getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
841       CompletionService<HStore> completionService =
842         new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
843 
844       // initialize each store in parallel
845       for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
846         status.setStatus("Instantiating store for column family " + family);
847         completionService.submit(new Callable<HStore>() {
848           @Override
849           public HStore call() throws IOException {
850             return instantiateHStore(family);
851           }
852         });
853       }
854       boolean allStoresOpened = false;
855       try {
856         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
857           Future<HStore> future = completionService.take();
858           HStore store = future.get();
859           this.stores.put(store.getColumnFamilyName().getBytes(), store);
860 
861           long storeMaxSequenceId = store.getMaxSequenceId();
862           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
863               storeMaxSequenceId);
864           if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
865             maxSeqId = storeMaxSequenceId;
866           }
867           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
868           if (maxStoreMemstoreTS > maxMemstoreTS) {
869             maxMemstoreTS = maxStoreMemstoreTS;
870           }
871         }
872         allStoresOpened = true;
873       } catch (InterruptedException e) {
874         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
875       } catch (ExecutionException e) {
876         throw new IOException(e.getCause());
877       } finally {
878         storeOpenerThreadPool.shutdownNow();
879         if (!allStoresOpened) {
880           // something went wrong, close all opened stores
881           LOG.error("Could not initialize all stores for the region=" + this);
882           for (Store store : this.stores.values()) {
883             try {
884               store.close();
885             } catch (IOException e) {
886               LOG.warn(e.getMessage());
887             }
888           }
889         }
890       }
891     }
892     if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
893       // Recover any edits if available.
894       maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
895           this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
896     }
897     maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
898     mvcc.initialize(maxSeqId);
899     return maxSeqId;
900   }
901 
902   private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
903     Map<byte[], List<Path>> storeFiles
904     = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
905     for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
906       Store store = entry.getValue();
907       ArrayList<Path> storeFileNames = new ArrayList<Path>();
908       for (StoreFile storeFile: store.getStorefiles()) {
909         storeFileNames.add(storeFile.getPath());
910       }
911       storeFiles.put(entry.getKey(), storeFileNames);
912     }
913 
914     RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
915       RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
916       getRegionServerServices().getServerName(), storeFiles);
917     WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc,
918       getSequenceId());
919   }
920 
921   private void writeRegionCloseMarker(WAL wal) throws IOException {
922     Map<byte[], List<Path>> storeFiles
923     = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
924     for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
925       Store store = entry.getValue();
926       ArrayList<Path> storeFileNames = new ArrayList<Path>();
927       for (StoreFile storeFile: store.getStorefiles()) {
928         storeFileNames.add(storeFile.getPath());
929       }
930       storeFiles.put(entry.getKey(), storeFileNames);
931     }
932 
933     RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
934       RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
935       getRegionServerServices().getServerName(), storeFiles);
936     WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc,
937       getSequenceId());
938 
939     // Store SeqId in HDFS when a region closes
940     // checking region folder exists is due to many tests which delete the table folder while a
941     // table is still online
942     if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
943       WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
944         getSequenceId().get(), 0);
945     }
946   }
947 
948   /**
949    * @return True if this region has references.
950    */
951   public boolean hasReferences() {
952     for (Store store : this.stores.values()) {
953       if (store.hasReferences()) return true;
954     }
955     return false;
956   }
957 
958   /**
959    * This function will return the HDFS blocks distribution based on the data
960    * captured when HFile is created
961    * @return The HDFS blocks distribution for the region.
962    */
963   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
964     HDFSBlocksDistribution hdfsBlocksDistribution =
965       new HDFSBlocksDistribution();
966     synchronized (this.stores) {
967       for (Store store : this.stores.values()) {
968         for (StoreFile sf : store.getStorefiles()) {
969           HDFSBlocksDistribution storeFileBlocksDistribution =
970             sf.getHDFSBlockDistribution();
971           hdfsBlocksDistribution.add(storeFileBlocksDistribution);
972         }
973       }
974     }
975     return hdfsBlocksDistribution;
976   }
977 
978   /**
979    * This is a helper function to compute HDFS block distribution on demand
980    * @param conf configuration
981    * @param tableDescriptor HTableDescriptor of the table
982    * @param regionInfo encoded name of the region
983    * @return The HDFS blocks distribution for the given region.
984    * @throws IOException
985    */
986   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
987       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
988     Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
989     return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
990   }
991 
992   /**
993    * This is a helper function to compute HDFS block distribution on demand
994    * @param conf configuration
995    * @param tableDescriptor HTableDescriptor of the table
996    * @param regionInfo encoded name of the region
997    * @param tablePath the table directory
998    * @return The HDFS blocks distribution for the given region.
999    * @throws IOException
1000    */
1001   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
1002       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo,  Path tablePath)
1003       throws IOException {
1004     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
1005     FileSystem fs = tablePath.getFileSystem(conf);
1006 
1007     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
1008     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
1009       Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
1010       if (storeFiles == null) continue;
1011 
1012       for (StoreFileInfo storeFileInfo : storeFiles) {
1013         hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
1014       }
1015     }
1016     return hdfsBlocksDistribution;
1017   }
1018 
1019   public AtomicLong getMemstoreSize() {
1020     return memstoreSize;
1021   }
1022 
1023   /**
1024    * Increase the size of mem store in this region and the size of global mem
1025    * store
1026    * @return the size of memstore in this region
1027    */
1028   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
1029     if (this.rsAccounting != null) {
1030       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
1031     }
1032     return this.memstoreSize.addAndGet(memStoreSize);
1033   }
1034 
1035   /** @return a HRegionInfo object for this region */
1036   public HRegionInfo getRegionInfo() {
1037     return this.fs.getRegionInfo();
1038   }
1039 
1040   /**
1041    * @return Instance of {@link RegionServerServices} used by this HRegion.
1042    * Can be null.
1043    */
1044   RegionServerServices getRegionServerServices() {
1045     return this.rsServices;
1046   }
1047 
1048   /** @return readRequestsCount for this region */
1049   long getReadRequestsCount() {
1050     return this.readRequestsCount.get();
1051   }
1052 
1053   /** @return writeRequestsCount for this region */
1054   long getWriteRequestsCount() {
1055     return this.writeRequestsCount.get();
1056   }
1057 
1058   public MetricsRegion getMetrics() {
1059     return metricsRegion;
1060   }
1061 
1062   /** @return true if region is closed */
1063   public boolean isClosed() {
1064     return this.closed.get();
1065   }
1066 
1067   /**
1068    * @return True if closing process has started.
1069    */
1070   public boolean isClosing() {
1071     return this.closing.get();
1072   }
1073 
1074   /**
1075    * Reset recovering state of current region
1076    */
1077   public void setRecovering(boolean newState) {
1078     boolean wasRecovering = this.isRecovering;
1079     this.isRecovering = newState;
1080     if (wasRecovering && !isRecovering) {
1081       // Call only when wal replay is over.
1082       coprocessorHost.postLogReplay();
1083     }
1084   }
1085 
1086   /**
1087    * @return True if current region is in recovering
1088    */
1089   public boolean isRecovering() {
1090     return this.isRecovering;
1091   }
1092 
1093   /** @return true if region is available (not closed and not closing) */
1094   public boolean isAvailable() {
1095     return !isClosed() && !isClosing();
1096   }
1097 
1098   /** @return true if region is splittable */
1099   public boolean isSplittable() {
1100     return isAvailable() && !hasReferences();
1101   }
1102 
1103   /**
1104    * @return true if region is mergeable
1105    */
1106   public boolean isMergeable() {
1107     if (!isAvailable()) {
1108       LOG.debug("Region " + this.getRegionNameAsString()
1109           + " is not mergeable because it is closing or closed");
1110       return false;
1111     }
1112     if (hasReferences()) {
1113       LOG.debug("Region " + this.getRegionNameAsString()
1114           + " is not mergeable because it has references");
1115       return false;
1116     }
1117 
1118     return true;
1119   }
1120 
1121   public boolean areWritesEnabled() {
1122     synchronized(this.writestate) {
1123       return this.writestate.writesEnabled;
1124     }
1125   }
1126 
1127    public MultiVersionConsistencyControl getMVCC() {
1128      return mvcc;
1129    }
1130 
1131    /*
1132     * Returns readpoint considering given IsolationLevel
1133     */
1134    public long getReadpoint(IsolationLevel isolationLevel) {
1135      if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1136        // This scan can read even uncommitted transactions
1137        return Long.MAX_VALUE;
1138      }
1139      return mvcc.memstoreReadPoint();
1140    }
1141 
1142    public boolean isLoadingCfsOnDemandDefault() {
1143      return this.isLoadingCfsOnDemandDefault;
1144    }
1145 
1146   /**
1147    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
1148    * service any more calls.
1149    *
1150    * <p>This method could take some time to execute, so don't call it from a
1151    * time-sensitive thread.
1152    *
1153    * @return Vector of all the storage files that the HRegion's component
1154    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
1155    * vector if already closed and null if judged that it should not close.
1156    *
1157    * @throws IOException e
1158    */
1159   public Map<byte[], List<StoreFile>> close() throws IOException {
1160     return close(false);
1161   }
1162 
1163   private final Object closeLock = new Object();
1164 
1165   /** Conf key for the periodic flush interval */
1166   public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1167       "hbase.regionserver.optionalcacheflushinterval";
1168   /** Default interval for the memstore flush */
1169   public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1170 
1171   /** Conf key to force a flush if there are already enough changes for one region in memstore */
1172   public static final String MEMSTORE_FLUSH_PER_CHANGES =
1173       "hbase.regionserver.flush.per.changes";
1174   public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000; // 30 millions
1175   /**
1176    * The following MAX_FLUSH_PER_CHANGES is large enough because each KeyValue has 20+ bytes
1177    * overhead. Therefore, even 1G empty KVs occupy at least 20GB memstore size for a single region
1178    */
1179   public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G
1180 
1181   /**
1182    * Close down this HRegion.  Flush the cache unless abort parameter is true,
1183    * Shut down each HStore, don't service any more calls.
1184    *
1185    * This method could take some time to execute, so don't call it from a
1186    * time-sensitive thread.
1187    *
1188    * @param abort true if server is aborting (only during testing)
1189    * @return Vector of all the storage files that the HRegion's component
1190    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
1191    * we are not to close at this time or we are already closed.
1192    *
1193    * @throws IOException e
1194    */
1195   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1196     // Only allow one thread to close at a time. Serialize them so dual
1197     // threads attempting to close will run up against each other.
1198     MonitoredTask status = TaskMonitor.get().createStatus(
1199         "Closing region " + this +
1200         (abort ? " due to abort" : ""));
1201 
1202     status.setStatus("Waiting for close lock");
1203     try {
1204       synchronized (closeLock) {
1205         return doClose(abort, status);
1206       }
1207     } finally {
1208       status.cleanup();
1209     }
1210   }
1211 
1212   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1213       throws IOException {
1214     if (isClosed()) {
1215       LOG.warn("Region " + this + " already closed");
1216       return null;
1217     }
1218 
1219     if (coprocessorHost != null) {
1220       status.setStatus("Running coprocessor pre-close hooks");
1221       this.coprocessorHost.preClose(abort);
1222     }
1223 
1224     status.setStatus("Disabling compacts and flushes for region");
1225     synchronized (writestate) {
1226       // Disable compacting and flushing by background threads for this
1227       // region.
1228       writestate.writesEnabled = false;
1229       LOG.debug("Closing " + this + ": disabling compactions & flushes");
1230       waitForFlushesAndCompactions();
1231     }
1232     // If we were not just flushing, is it worth doing a preflush...one
1233     // that will clear out of the bulk of the memstore before we put up
1234     // the close flag?
1235     if (!abort && worthPreFlushing()) {
1236       status.setStatus("Pre-flushing region before close");
1237       LOG.info("Running close preflush of " + this.getRegionNameAsString());
1238       try {
1239         internalFlushcache(status);
1240       } catch (IOException ioe) {
1241         // Failed to flush the region. Keep going.
1242         status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1243       }
1244     }
1245 
1246     this.closing.set(true);
1247     status.setStatus("Disabling writes for close");
1248     // block waiting for the lock for closing
1249     lock.writeLock().lock();
1250     try {
1251       if (this.isClosed()) {
1252         status.abort("Already got closed by another process");
1253         // SplitTransaction handles the null
1254         return null;
1255       }
1256       LOG.debug("Updates disabled for region " + this);
1257       // Don't flush the cache if we are aborting
1258       if (!abort) {
1259         int flushCount = 0;
1260         while (this.getMemstoreSize().get() > 0) {
1261           try {
1262             if (flushCount++ > 0) {
1263               int actualFlushes = flushCount - 1;
1264               if (actualFlushes > 5) {
1265                 // If we tried 5 times and are unable to clear memory, abort
1266                 // so we do not lose data
1267                 throw new DroppedSnapshotException("Failed clearing memory after " +
1268                   actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
1269               }
1270               LOG.info("Running extra flush, " + actualFlushes +
1271                 " (carrying snapshot?) " + this);
1272             }
1273             internalFlushcache(status);
1274           } catch (IOException ioe) {
1275             status.setStatus("Failed flush " + this + ", putting online again");
1276             synchronized (writestate) {
1277               writestate.writesEnabled = true;
1278             }
1279             // Have to throw to upper layers.  I can't abort server from here.
1280             throw ioe;
1281           }
1282         }
1283       }
1284 
1285       Map<byte[], List<StoreFile>> result =
1286         new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1287       if (!stores.isEmpty()) {
1288         // initialize the thread pool for closing stores in parallel.
1289         ThreadPoolExecutor storeCloserThreadPool =
1290           getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
1291         CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1292           new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1293 
1294         // close each store in parallel
1295         for (final Store store : stores.values()) {
1296           assert abort || store.getFlushableSize() == 0;
1297           completionService
1298               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1299                 @Override
1300                 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1301                   return new Pair<byte[], Collection<StoreFile>>(
1302                     store.getFamily().getName(), store.close());
1303                 }
1304               });
1305         }
1306         try {
1307           for (int i = 0; i < stores.size(); i++) {
1308             Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1309             Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1310             List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1311             if (familyFiles == null) {
1312               familyFiles = new ArrayList<StoreFile>();
1313               result.put(storeFiles.getFirst(), familyFiles);
1314             }
1315             familyFiles.addAll(storeFiles.getSecond());
1316           }
1317         } catch (InterruptedException e) {
1318           throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1319         } catch (ExecutionException e) {
1320           throw new IOException(e.getCause());
1321         } finally {
1322           storeCloserThreadPool.shutdownNow();
1323         }
1324       }
1325 
1326       status.setStatus("Writing region close event to WAL");
1327       if (!abort && wal != null && getRegionServerServices() != null && !writestate.readOnly) {
1328         writeRegionCloseMarker(wal);
1329       }
1330 
1331       this.closed.set(true);
1332       if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
1333       if (coprocessorHost != null) {
1334         status.setStatus("Running coprocessor post-close hooks");
1335         this.coprocessorHost.postClose(abort);
1336       }
1337       if (this.metricsRegion != null) {
1338         this.metricsRegion.close();
1339       }
1340       if (this.metricsRegionWrapper != null) {
1341         Closeables.closeQuietly(this.metricsRegionWrapper);
1342       }
1343       status.markComplete("Closed");
1344       LOG.info("Closed " + this);
1345       return result;
1346     } finally {
1347       lock.writeLock().unlock();
1348     }
1349   }
1350 
1351   /**
1352    * Wait for all current flushes and compactions of the region to complete.
1353    * <p>
1354    * Exposed for TESTING.
1355    */
1356   public void waitForFlushesAndCompactions() {
1357     synchronized (writestate) {
1358       boolean interrupted = false;
1359       try {
1360         while (writestate.compacting > 0 || writestate.flushing) {
1361           LOG.debug("waiting for " + writestate.compacting + " compactions"
1362             + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1363           try {
1364             writestate.wait();
1365           } catch (InterruptedException iex) {
1366             // essentially ignore and propagate the interrupt back up
1367             LOG.warn("Interrupted while waiting");
1368             interrupted = true;
1369           }
1370         }
1371       } finally {
1372         if (interrupted) {
1373           Thread.currentThread().interrupt();
1374         }
1375       }
1376     }
1377   }
1378 
1379   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1380       final String threadNamePrefix) {
1381     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1382     int maxThreads = Math.min(numStores,
1383         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1384             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1385     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1386   }
1387 
1388   protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1389       final String threadNamePrefix) {
1390     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1391     int maxThreads = Math.max(1,
1392         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1393             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1394             / numStores);
1395     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1396   }
1397 
1398   static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1399       final String threadNamePrefix) {
1400     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1401       new ThreadFactory() {
1402         private int count = 1;
1403 
1404         @Override
1405         public Thread newThread(Runnable r) {
1406           return new Thread(r, threadNamePrefix + "-" + count++);
1407         }
1408       });
1409   }
1410 
1411    /**
1412     * @return True if its worth doing a flush before we put up the close flag.
1413     */
1414   private boolean worthPreFlushing() {
1415     return this.memstoreSize.get() >
1416       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1417   }
1418 
1419   //////////////////////////////////////////////////////////////////////////////
1420   // HRegion accessors
1421   //////////////////////////////////////////////////////////////////////////////
1422 
1423   /** @return start key for region */
1424   public byte [] getStartKey() {
1425     return this.getRegionInfo().getStartKey();
1426   }
1427 
1428   /** @return end key for region */
1429   public byte [] getEndKey() {
1430     return this.getRegionInfo().getEndKey();
1431   }
1432 
1433   /** @return region id */
1434   public long getRegionId() {
1435     return this.getRegionInfo().getRegionId();
1436   }
1437 
1438   /** @return region name */
1439   public byte [] getRegionName() {
1440     return this.getRegionInfo().getRegionName();
1441   }
1442 
1443   /** @return region name as string for logging */
1444   public String getRegionNameAsString() {
1445     return this.getRegionInfo().getRegionNameAsString();
1446   }
1447 
1448   /** @return HTableDescriptor for this region */
1449   public HTableDescriptor getTableDesc() {
1450     return this.htableDescriptor;
1451   }
1452 
1453   /** @return WAL in use for this region */
1454   public WAL getWAL() {
1455     return this.wal;
1456   }
1457 
1458   /**
1459    * @return split policy for this region.
1460    */
1461   public RegionSplitPolicy getSplitPolicy() {
1462     return this.splitPolicy;
1463   }
1464 
1465   /**
1466    * A split takes the config from the parent region & passes it to the daughter
1467    * region's constructor. If 'conf' was passed, you would end up using the HTD
1468    * of the parent region in addition to the new daughter HTD. Pass 'baseConf'
1469    * to the daughter regions to avoid this tricky dedupe problem.
1470    * @return Configuration object
1471    */
1472   Configuration getBaseConf() {
1473     return this.baseConf;
1474   }
1475 
1476   /** @return {@link FileSystem} being used by this region */
1477   public FileSystem getFilesystem() {
1478     return fs.getFileSystem();
1479   }
1480 
1481   /** @return the {@link HRegionFileSystem} used by this region */
1482   public HRegionFileSystem getRegionFileSystem() {
1483     return this.fs;
1484   }
1485 
1486   /**
1487    * @return Returns the earliest time a store in the region was flushed. All
1488    *         other stores in the region would have been flushed either at, or
1489    *         after this time.
1490    */
1491   @VisibleForTesting
1492   public long getEarliestFlushTimeForAllStores() {
1493     return Collections.min(lastStoreFlushTimeMap.values());
1494   }
1495 
1496   //////////////////////////////////////////////////////////////////////////////
1497   // HRegion maintenance.
1498   //
1499   // These methods are meant to be called periodically by the HRegionServer for
1500   // upkeep.
1501   //////////////////////////////////////////////////////////////////////////////
1502 
1503   /** @return returns size of largest HStore. */
1504   public long getLargestHStoreSize() {
1505     long size = 0;
1506     for (Store h : stores.values()) {
1507       long storeSize = h.getSize();
1508       if (storeSize > size) {
1509         size = storeSize;
1510       }
1511     }
1512     return size;
1513   }
1514 
1515   /**
1516    * @return KeyValue Comparator
1517    */
1518   public KeyValue.KVComparator getComparator() {
1519     return this.comparator;
1520   }
1521 
1522   /*
1523    * Do preparation for pending compaction.
1524    * @throws IOException
1525    */
1526   protected void doRegionCompactionPrep() throws IOException {
1527   }
1528 
1529   void triggerMajorCompaction() {
1530     for (Store h : stores.values()) {
1531       h.triggerMajorCompaction();
1532     }
1533   }
1534 
1535   /**
1536    * This is a helper function that compact all the stores synchronously
1537    * It is used by utilities and testing
1538    *
1539    * @param majorCompaction True to force a major compaction regardless of thresholds
1540    * @throws IOException e
1541    */
1542   public void compactStores(final boolean majorCompaction)
1543   throws IOException {
1544     if (majorCompaction) {
1545       this.triggerMajorCompaction();
1546     }
1547     compactStores();
1548   }
1549 
1550   /**
1551    * This is a helper function that compact all the stores synchronously
1552    * It is used by utilities and testing
1553    *
1554    * @throws IOException e
1555    */
1556   public void compactStores() throws IOException {
1557     for (Store s : getStores().values()) {
1558       CompactionContext compaction = s.requestCompaction();
1559       if (compaction != null) {
1560         compact(compaction, s);
1561       }
1562     }
1563   }
1564 
1565   /*
1566    * Called by compaction thread and after region is opened to compact the
1567    * HStores if necessary.
1568    *
1569    * <p>This operation could block for a long time, so don't call it from a
1570    * time-sensitive thread.
1571    *
1572    * Note that no locking is necessary at this level because compaction only
1573    * conflicts with a region split, and that cannot happen because the region
1574    * server does them sequentially and not in parallel.
1575    *
1576    * @param cr Compaction details, obtained by requestCompaction()
1577    * @return whether the compaction completed
1578    * @throws IOException e
1579    */
1580   public boolean compact(CompactionContext compaction, Store store) throws IOException {
1581     assert compaction != null && compaction.hasSelection();
1582     assert !compaction.getRequest().getFiles().isEmpty();
1583     if (this.closing.get() || this.closed.get()) {
1584       LOG.debug("Skipping compaction on " + this + " because closing/closed");
1585       store.cancelRequestedCompaction(compaction);
1586       return false;
1587     }
1588     MonitoredTask status = null;
1589     boolean requestNeedsCancellation = true;
1590     // block waiting for the lock for compaction
1591     lock.readLock().lock();
1592     try {
1593       byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1594       if (stores.get(cf) != store) {
1595         LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1596             + " has been re-instantiated, cancel this compaction request. "
1597             + " It may be caused by the roll back of split transaction");
1598         return false;
1599       }
1600 
1601       status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1602       if (this.closed.get()) {
1603         String msg = "Skipping compaction on " + this + " because closed";
1604         LOG.debug(msg);
1605         status.abort(msg);
1606         return false;
1607       }
1608       boolean wasStateSet = false;
1609       try {
1610         synchronized (writestate) {
1611           if (writestate.writesEnabled) {
1612             wasStateSet = true;
1613             ++writestate.compacting;
1614           } else {
1615             String msg = "NOT compacting region " + this + ". Writes disabled.";
1616             LOG.info(msg);
1617             status.abort(msg);
1618             return false;
1619           }
1620         }
1621         LOG.info("Starting compaction on " + store + " in region " + this
1622             + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1623         doRegionCompactionPrep();
1624         try {
1625           status.setStatus("Compacting store " + store);
1626           // We no longer need to cancel the request on the way out of this
1627           // method because Store#compact will clean up unconditionally
1628           requestNeedsCancellation = false;
1629           store.compact(compaction);
1630         } catch (InterruptedIOException iioe) {
1631           String msg = "compaction interrupted";
1632           LOG.info(msg, iioe);
1633           status.abort(msg);
1634           return false;
1635         }
1636       } finally {
1637         if (wasStateSet) {
1638           synchronized (writestate) {
1639             --writestate.compacting;
1640             if (writestate.compacting <= 0) {
1641               writestate.notifyAll();
1642             }
1643           }
1644         }
1645       }
1646       status.markComplete("Compaction complete");
1647       return true;
1648     } finally {
1649       try {
1650         if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
1651         if (status != null) status.cleanup();
1652       } finally {
1653         lock.readLock().unlock();
1654       }
1655     }
1656   }
1657 
1658   /**
1659    * Flush all stores.
1660    * <p>
1661    * See {@link #flushcache(boolean)}.
1662    *
1663    * @return whether the flush is success and whether the region needs compacting
1664    * @throws IOException
1665    */
1666   public FlushResult flushcache() throws IOException {
1667     return flushcache(true);
1668   }
1669 
1670   /**
1671    * Flush the cache.
1672    *
1673    * When this method is called the cache will be flushed unless:
1674    * <ol>
1675    *   <li>the cache is empty</li>
1676    *   <li>the region is closed.</li>
1677    *   <li>a flush is already in progress</li>
1678    *   <li>writes are disabled</li>
1679    * </ol>
1680    *
1681    * <p>This method may block for some time, so it should not be called from a
1682    * time-sensitive thread.
1683    * @param forceFlushAllStores whether we want to flush all stores
1684    * @return whether the flush is success and whether the region needs compacting
1685    *
1686    * @throws IOException general io exceptions
1687    * @throws DroppedSnapshotException Thrown when replay of wal is required
1688    * because a Snapshot was not properly persisted.
1689    */
1690   public FlushResult flushcache(boolean forceFlushAllStores) throws IOException {
1691     // fail-fast instead of waiting on the lock
1692     if (this.closing.get()) {
1693       String msg = "Skipping flush on " + this + " because closing";
1694       LOG.debug(msg);
1695       return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1696     }
1697     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1698     status.setStatus("Acquiring readlock on region");
1699     // block waiting for the lock for flushing cache
1700     lock.readLock().lock();
1701     try {
1702       if (this.closed.get()) {
1703         String msg = "Skipping flush on " + this + " because closed";
1704         LOG.debug(msg);
1705         status.abort(msg);
1706         return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1707       }
1708       if (coprocessorHost != null) {
1709         status.setStatus("Running coprocessor pre-flush hooks");
1710         coprocessorHost.preFlush();
1711       }
1712       if (numMutationsWithoutWAL.get() > 0) {
1713         numMutationsWithoutWAL.set(0);
1714         dataInMemoryWithoutWAL.set(0);
1715       }
1716       synchronized (writestate) {
1717         if (!writestate.flushing && writestate.writesEnabled) {
1718           this.writestate.flushing = true;
1719         } else {
1720           if (LOG.isDebugEnabled()) {
1721             LOG.debug("NOT flushing memstore for region " + this
1722                 + ", flushing=" + writestate.flushing + ", writesEnabled="
1723                 + writestate.writesEnabled);
1724           }
1725           String msg = "Not flushing since "
1726               + (writestate.flushing ? "already flushing"
1727               : "writes not enabled");
1728           status.abort(msg);
1729           return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1730         }
1731       }
1732 
1733       try {
1734         Collection<Store> specificStoresToFlush =
1735             forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
1736         FlushResult fs = internalFlushcache(specificStoresToFlush, status);
1737 
1738         if (coprocessorHost != null) {
1739           status.setStatus("Running post-flush coprocessor hooks");
1740           coprocessorHost.postFlush();
1741         }
1742 
1743         status.markComplete("Flush successful");
1744         return fs;
1745       } finally {
1746         synchronized (writestate) {
1747           writestate.flushing = false;
1748           this.writestate.flushRequested = false;
1749           writestate.notifyAll();
1750         }
1751       }
1752     } finally {
1753       lock.readLock().unlock();
1754       status.cleanup();
1755     }
1756   }
1757 
1758   /**
1759    * Should the store be flushed because it is old enough.
1760    * <p>
1761    * Every FlushPolicy should call this to determine whether a store is old enough to flush(except
1762    * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always
1763    * returns true which will make a lot of flush requests.
1764    */
1765   boolean shouldFlushStore(Store store) {
1766     long maxFlushedSeqId =
1767         this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store
1768             .getFamily().getName()) - 1;
1769     if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) {
1770       if (LOG.isDebugEnabled()) {
1771         LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
1772             + " will be flushed because its max flushed seqId(" + maxFlushedSeqId
1773             + ") is far away from current(" + sequenceId.get() + "), max allowed is "
1774             + flushPerChanges);
1775       }
1776       return true;
1777     }
1778     if (flushCheckInterval <= 0) {
1779       return false;
1780     }
1781     long now = EnvironmentEdgeManager.currentTime();
1782     if (store.timeOfOldestEdit() < now - flushCheckInterval) {
1783       if (LOG.isDebugEnabled()) {
1784         LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
1785             + " will be flushed because time of its oldest edit (" + store.timeOfOldestEdit()
1786             + ") is far away from now(" + now + "), max allowed is " + flushCheckInterval);
1787       }
1788       return true;
1789     }
1790     return false;
1791   }
1792 
1793   /**
1794    * Should the memstore be flushed now
1795    */
1796   boolean shouldFlush() {
1797     // This is a rough measure.
1798     if (this.maxFlushedSeqId > 0
1799           && (this.maxFlushedSeqId + this.flushPerChanges < this.sequenceId.get())) {
1800       return true;
1801     }
1802     if (flushCheckInterval <= 0) { //disabled
1803       return false;
1804     }
1805     long now = EnvironmentEdgeManager.currentTime();
1806     //if we flushed in the recent past, we don't need to do again now
1807     if ((now - getEarliestFlushTimeForAllStores() < flushCheckInterval)) {
1808       return false;
1809     }
1810     //since we didn't flush in the recent past, flush now if certain conditions
1811     //are met. Return true on first such memstore hit.
1812     for (Store s : this.getStores().values()) {
1813       if (s.timeOfOldestEdit() < now - flushCheckInterval) {
1814         // we have an old enough edit in the memstore, flush
1815         return true;
1816       }
1817     }
1818     return false;
1819   }
1820 
1821   /**
1822    * Flushing all stores.
1823    *
1824    * @see #internalFlushcache(Collection, MonitoredTask)
1825    */
1826   private FlushResult internalFlushcache(MonitoredTask status)
1827       throws IOException {
1828     return internalFlushcache(stores.values(), status);
1829   }
1830 
1831   /**
1832    * Flushing given stores.
1833    *
1834    * @see #internalFlushcache(WAL, long, Collection, MonitoredTask)
1835    */
1836   private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
1837       MonitoredTask status) throws IOException {
1838     return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
1839         status);
1840   }
1841 
1842   /**
1843    * Flush the memstore. Flushing the memstore is a little tricky. We have a lot
1844    * of updates in the memstore, all of which have also been written to the wal.
1845    * We need to write those updates in the memstore out to disk, while being
1846    * able to process reads/writes as much as possible during the flush
1847    * operation.
1848    * <p>
1849    * This method may block for some time. Every time you call it, we up the
1850    * regions sequence id even if we don't flush; i.e. the returned region id
1851    * will be at least one larger than the last edit applied to this region. The
1852    * returned id does not refer to an actual edit. The returned id can be used
1853    * for say installing a bulk loaded file just ahead of the last hfile that was
1854    * the result of this flush, etc.
1855    *
1856    * @param wal
1857    *          Null if we're NOT to go via wal.
1858    * @param myseqid
1859    *          The seqid to use if <code>wal</code> is null writing out flush
1860    *          file.
1861    * @param storesToFlush
1862    *          The list of stores to flush.
1863    * @return object describing the flush's state
1864    * @throws IOException
1865    *           general io exceptions
1866    * @throws DroppedSnapshotException
1867    *           Thrown when replay of wal is required because a Snapshot was not
1868    *           properly persisted.
1869    */
1870   protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
1871       final Collection<Store> storesToFlush, MonitoredTask status) throws IOException {
1872     if (this.rsServices != null && this.rsServices.isAborted()) {
1873       // Don't flush when server aborting, it's unsafe
1874       throw new IOException("Aborting flush because server is aborted...");
1875     }
1876     final long startTime = EnvironmentEdgeManager.currentTime();
1877     // If nothing to flush, return, but we need to safely update the region sequence id
1878     if (this.memstoreSize.get() <= 0) {
1879       // Take an update lock because am about to change the sequence id and we want the sequence id
1880       // to be at the border of the empty memstore.
1881       MultiVersionConsistencyControl.WriteEntry w = null;
1882       this.updatesLock.writeLock().lock();
1883       try {
1884         if (this.memstoreSize.get() <= 0) {
1885           // Presume that if there are still no edits in the memstore, then there are no edits for
1886           // this region out in the WAL subsystem so no need to do any trickery clearing out
1887           // edits in the WAL system. Up the sequence number so the resulting flush id is for
1888           // sure just beyond the last appended region edit (useful as a marker when bulk loading,
1889           // etc.)
1890           // wal can be null replaying edits.
1891           if (wal != null) {
1892             w = mvcc.beginMemstoreInsert();
1893             long flushSeqId = getNextSequenceId(wal);
1894             FlushResult flushResult = new FlushResult(
1895               FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush");
1896             w.setWriteNumber(flushSeqId);
1897             mvcc.waitForPreviousTransactionsComplete(w);
1898             w = null;
1899             return flushResult;
1900           } else {
1901             return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
1902               "Nothing to flush");
1903           }
1904         }
1905       } finally {
1906         this.updatesLock.writeLock().unlock();
1907         if (w != null) {
1908           mvcc.advanceMemstore(w);
1909         }
1910       }
1911     }
1912 
1913     if (LOG.isInfoEnabled()) {
1914       LOG.info("Started memstore flush for " + this + ", current region memstore size "
1915           + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/"
1916           + stores.size() + " column families' memstores are being flushed."
1917           + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid));
1918       // only log when we are not flushing all stores.
1919       if (this.stores.size() > storesToFlush.size()) {
1920         for (Store store: storesToFlush) {
1921           LOG.info("Flushing Column Family: " + store.getColumnFamilyName()
1922               + " which was occupying "
1923               + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore.");
1924         }
1925       }
1926     }
1927     // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
1928     // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
1929     // allow updates again so its value will represent the size of the updates received
1930     // during flush
1931     MultiVersionConsistencyControl.WriteEntry w = null;
1932     // We have to take an update lock during snapshot, or else a write could end up in both snapshot
1933     // and memstore (makes it difficult to do atomic rows then)
1934     status.setStatus("Obtaining lock to block concurrent updates");
1935     // block waiting for the lock for internal flush
1936     this.updatesLock.writeLock().lock();
1937     status.setStatus("Preparing to flush by snapshotting stores in " +
1938       getRegionInfo().getEncodedName());
1939     long totalFlushableSizeOfFlushableStores = 0;
1940 
1941     Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
1942     for (Store store: storesToFlush) {
1943       flushedFamilyNames.add(store.getFamily().getName());
1944     }
1945 
1946     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
1947     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
1948         Bytes.BYTES_COMPARATOR);
1949     // The sequence id of this flush operation which is used to log FlushMarker and pass to
1950     // createFlushContext to use as the store file's sequence id.
1951     long flushOpSeqId = HConstants.NO_SEQNUM;
1952     // The max flushed sequence id after this flush operation. Used as completeSequenceId which is
1953     // passed to HMaster.
1954     long flushedSeqId = HConstants.NO_SEQNUM;
1955     byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
1956 
1957     long trxId = 0;
1958     try {
1959       try {
1960         w = mvcc.beginMemstoreInsert();
1961         if (wal != null) {
1962           if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) {
1963             // This should never happen.
1964             String msg = "Flush will not be started for ["
1965                 + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
1966             status.setStatus(msg);
1967             return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1968           }
1969           flushOpSeqId = getNextSequenceId(wal);
1970           long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
1971           // no oldestUnflushedSeqId means we flushed all stores.
1972           // or the unflushed stores are all empty.
1973           flushedSeqId =
1974               oldestUnflushedSeqId == HConstants.NO_SEQNUM ? flushOpSeqId : oldestUnflushedSeqId - 1;
1975         } else {
1976           // use the provided sequence Id as WAL is not being used for this flush.
1977           flushedSeqId = flushOpSeqId = myseqid;
1978         }
1979 
1980         for (Store s : storesToFlush) {
1981           totalFlushableSizeOfFlushableStores += s.getFlushableSize();
1982           storeFlushCtxs.add(s.createFlushContext(flushOpSeqId));
1983           committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
1984         }
1985 
1986         // write the snapshot start to WAL
1987         if (wal != null) {
1988           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
1989             getRegionInfo(), flushOpSeqId, committedFiles);
1990           // no sync. Sync is below where we do not hold the updates lock
1991           trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1992             desc, sequenceId, false);
1993         }
1994 
1995         // Prepare flush (take a snapshot)
1996         for (StoreFlushContext flush : storeFlushCtxs) {
1997           flush.prepare();
1998         }
1999       } catch (IOException ex) {
2000         if (wal != null) {
2001           if (trxId > 0) { // check whether we have already written START_FLUSH to WAL
2002             try {
2003               FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2004                 getRegionInfo(), flushOpSeqId, committedFiles);
2005               WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2006                 desc, sequenceId, false);
2007             } catch (Throwable t) {
2008               LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
2009                   StringUtils.stringifyException(t));
2010               // ignore this since we will be aborting the RS with DSE.
2011             }
2012           }
2013           // we have called wal.startCacheFlush(), now we have to abort it
2014           wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2015           throw ex; // let upper layers deal with it.
2016         }
2017       } finally {
2018         this.updatesLock.writeLock().unlock();
2019       }
2020       String s = "Finished memstore snapshotting " + this +
2021         ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores;
2022       status.setStatus(s);
2023       if (LOG.isTraceEnabled()) LOG.trace(s);
2024       // sync unflushed WAL changes
2025       // see HBASE-8208 for details
2026       if (wal != null) {
2027         try {
2028           wal.sync(); // ensure that flush marker is sync'ed
2029         } catch (IOException ioe) {
2030           LOG.warn("Unexpected exception while wal.sync(), ignoring. Exception: "
2031               + StringUtils.stringifyException(ioe));
2032         }
2033       }
2034 
2035       // wait for all in-progress transactions to commit to WAL before
2036       // we can start the flush. This prevents
2037       // uncommitted transactions from being written into HFiles.
2038       // We have to block before we start the flush, otherwise keys that
2039       // were removed via a rollbackMemstore could be written to Hfiles.
2040       w.setWriteNumber(flushOpSeqId);
2041       mvcc.waitForPreviousTransactionsComplete(w);
2042       // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
2043       w = null;
2044       s = "Flushing stores of " + this;
2045       status.setStatus(s);
2046       if (LOG.isTraceEnabled()) LOG.trace(s);
2047     } finally {
2048       if (w != null) {
2049         // in case of failure just mark current w as complete
2050         mvcc.advanceMemstore(w);
2051       }
2052     }
2053 
2054     // Any failure from here on out will be catastrophic requiring server
2055     // restart so wal content can be replayed and put back into the memstore.
2056     // Otherwise, the snapshot content while backed up in the wal, it will not
2057     // be part of the current running servers state.
2058     boolean compactionRequested = false;
2059     try {
2060       // A.  Flush memstore to all the HStores.
2061       // Keep running vector of all store files that includes both old and the
2062       // just-made new flush store file. The new flushed file is still in the
2063       // tmp directory.
2064 
2065       for (StoreFlushContext flush : storeFlushCtxs) {
2066         flush.flushCache(status);
2067       }
2068 
2069       // Switch snapshot (in memstore) -> new hfile (thus causing
2070       // all the store scanners to reset/reseek).
2071       Iterator<Store> it = storesToFlush.iterator();
2072       // stores.values() and storeFlushCtxs have same order
2073       for (StoreFlushContext flush : storeFlushCtxs) {
2074         boolean needsCompaction = flush.commit(status);
2075         if (needsCompaction) {
2076           compactionRequested = true;
2077         }
2078         committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles());
2079       }
2080       storeFlushCtxs.clear();
2081 
2082       // Set down the memstore size by amount of flush.
2083       this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
2084 
2085       if (wal != null) {
2086         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
2087         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
2088           getRegionInfo(), flushOpSeqId, committedFiles);
2089         WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2090           desc, sequenceId, true);
2091       }
2092     } catch (Throwable t) {
2093       // An exception here means that the snapshot was not persisted.
2094       // The wal needs to be replayed so its content is restored to memstore.
2095       // Currently, only a server restart will do this.
2096       // We used to only catch IOEs but its possible that we'd get other
2097       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
2098       // all and sundry.
2099       if (wal != null) {
2100         try {
2101           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2102             getRegionInfo(), flushOpSeqId, committedFiles);
2103           WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2104             desc, sequenceId, false);
2105         } catch (Throwable ex) {
2106           LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
2107               StringUtils.stringifyException(ex));
2108           // ignore this since we will be aborting the RS with DSE.
2109         }
2110         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2111       }
2112       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
2113           Bytes.toStringBinary(getRegionName()));
2114       dse.initCause(t);
2115       status.abort("Flush failed: " + StringUtils.stringifyException(t));
2116       throw dse;
2117     }
2118 
2119     // If we get to here, the HStores have been written.
2120     if (wal != null) {
2121       wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2122     }
2123 
2124     // Record latest flush time
2125     for (Store store: storesToFlush) {
2126       this.lastStoreFlushTimeMap.put(store, startTime);
2127     }
2128 
2129     // Update the oldest unflushed sequence id for region.
2130     this.maxFlushedSeqId = flushedSeqId;
2131 
2132     // C. Finally notify anyone waiting on memstore to clear:
2133     // e.g. checkResources().
2134     synchronized (this) {
2135       notifyAll(); // FindBugs NN_NAKED_NOTIFY
2136     }
2137 
2138     long time = EnvironmentEdgeManager.currentTime() - startTime;
2139     long memstoresize = this.memstoreSize.get();
2140     String msg = "Finished memstore flush of ~"
2141         + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
2142         + totalFlushableSizeOfFlushableStores + ", currentsize="
2143         + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
2144         + " for region " + this + " in " + time + "ms, sequenceid="
2145         + flushOpSeqId +  ", compaction requested=" + compactionRequested
2146         + ((wal == null) ? "; wal=null" : "");
2147     LOG.info(msg);
2148     status.setStatus(msg);
2149 
2150     return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
2151         FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId);
2152   }
2153 
2154   /**
2155    * Method to safely get the next sequence number.
2156    * @return Next sequence number unassociated with any actual edit.
2157    * @throws IOException
2158    */
2159   private long getNextSequenceId(final WAL wal) throws IOException {
2160     WALKey key = this.appendEmptyEdit(wal, null);
2161     return key.getSequenceId();
2162   }
2163 
2164   //////////////////////////////////////////////////////////////////////////////
2165   // get() methods for client use.
2166   //////////////////////////////////////////////////////////////////////////////
2167   /**
2168    * Return all the data for the row that matches <i>row</i> exactly,
2169    * or the one that immediately preceeds it, at or immediately before
2170    * <i>ts</i>.
2171    *
2172    * @param row row key
2173    * @return map of values
2174    * @throws IOException
2175    */
2176   Result getClosestRowBefore(final byte [] row)
2177   throws IOException{
2178     return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
2179   }
2180 
2181   /**
2182    * Return all the data for the row that matches <i>row</i> exactly,
2183    * or the one that immediately precedes it, at or immediately before
2184    * <i>ts</i>.
2185    *
2186    * @param row row key
2187    * @param family column family to find on
2188    * @return map of values
2189    * @throws IOException read exceptions
2190    */
2191   public Result getClosestRowBefore(final byte [] row, final byte [] family)
2192   throws IOException {
2193     if (coprocessorHost != null) {
2194       Result result = new Result();
2195       if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
2196         return result;
2197       }
2198     }
2199     // look across all the HStores for this region and determine what the
2200     // closest key is across all column families, since the data may be sparse
2201     checkRow(row, "getClosestRowBefore");
2202     startRegionOperation(Operation.GET);
2203     this.readRequestsCount.increment();
2204     try {
2205       Store store = getStore(family);
2206       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
2207       Cell key = store.getRowKeyAtOrBefore(row);
2208       Result result = null;
2209       if (key != null) {
2210         Get get = new Get(CellUtil.cloneRow(key));
2211         get.addFamily(family);
2212         result = get(get);
2213       }
2214       if (coprocessorHost != null) {
2215         coprocessorHost.postGetClosestRowBefore(row, family, result);
2216       }
2217       return result;
2218     } finally {
2219       closeRegionOperation(Operation.GET);
2220     }
2221   }
2222 
2223   /**
2224    * Return an iterator that scans over the HRegion, returning the indicated
2225    * columns and rows specified by the {@link Scan}.
2226    * <p>
2227    * This Iterator must be closed by the caller.
2228    *
2229    * @param scan configured {@link Scan}
2230    * @return RegionScanner
2231    * @throws IOException read exceptions
2232    */
2233   public RegionScanner getScanner(Scan scan) throws IOException {
2234    return getScanner(scan, null);
2235   }
2236 
2237   void prepareScanner(Scan scan) throws IOException {
2238     if(!scan.hasFamilies()) {
2239       // Adding all families to scanner
2240       for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
2241         scan.addFamily(family);
2242       }
2243     }
2244   }
2245 
2246   protected RegionScanner getScanner(Scan scan,
2247       List<KeyValueScanner> additionalScanners) throws IOException {
2248     startRegionOperation(Operation.SCAN);
2249     try {
2250       // Verify families are all valid
2251       prepareScanner(scan);
2252       if(scan.hasFamilies()) {
2253         for(byte [] family : scan.getFamilyMap().keySet()) {
2254           checkFamily(family);
2255         }
2256       }
2257       return instantiateRegionScanner(scan, additionalScanners);
2258     } finally {
2259       closeRegionOperation(Operation.SCAN);
2260     }
2261   }
2262 
2263   protected RegionScanner instantiateRegionScanner(Scan scan,
2264       List<KeyValueScanner> additionalScanners) throws IOException {
2265     if (scan.isReversed()) {
2266       if (scan.getFilter() != null) {
2267         scan.getFilter().setReversed(true);
2268       }
2269       return new ReversedRegionScannerImpl(scan, additionalScanners, this);
2270     }
2271     return new RegionScannerImpl(scan, additionalScanners, this);
2272   }
2273 
2274   /*
2275    * @param delete The passed delete is modified by this method. WARNING!
2276    */
2277   void prepareDelete(Delete delete) throws IOException {
2278     // Check to see if this is a deleteRow insert
2279     if(delete.getFamilyCellMap().isEmpty()){
2280       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
2281         // Don't eat the timestamp
2282         delete.addFamily(family, delete.getTimeStamp());
2283       }
2284     } else {
2285       for(byte [] family : delete.getFamilyCellMap().keySet()) {
2286         if(family == null) {
2287           throw new NoSuchColumnFamilyException("Empty family is invalid");
2288         }
2289         checkFamily(family);
2290       }
2291     }
2292   }
2293 
2294   //////////////////////////////////////////////////////////////////////////////
2295   // set() methods for client use.
2296   //////////////////////////////////////////////////////////////////////////////
2297   /**
2298    * @param delete delete object
2299    * @throws IOException read exceptions
2300    */
2301   public void delete(Delete delete)
2302   throws IOException {
2303     checkReadOnly();
2304     checkResources();
2305     startRegionOperation(Operation.DELETE);
2306     try {
2307       delete.getRow();
2308       // All edits for the given row (across all column families) must happen atomically.
2309       doBatchMutate(delete);
2310     } finally {
2311       closeRegionOperation(Operation.DELETE);
2312     }
2313   }
2314 
2315   /**
2316    * Row needed by below method.
2317    */
2318   private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2319   /**
2320    * This is used only by unit tests. Not required to be a public API.
2321    * @param familyMap map of family to edits for the given family.
2322    * @throws IOException
2323    */
2324   void delete(NavigableMap<byte[], List<Cell>> familyMap,
2325       Durability durability) throws IOException {
2326     Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2327     delete.setFamilyCellMap(familyMap);
2328     delete.setDurability(durability);
2329     doBatchMutate(delete);
2330   }
2331 
2332   /**
2333    * Setup correct timestamps in the KVs in Delete object.
2334    * Caller should have the row and region locks.
2335    * @param mutation
2336    * @param familyMap
2337    * @param byteNow
2338    * @throws IOException
2339    */
2340   void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2341       byte[] byteNow) throws IOException {
2342     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2343 
2344       byte[] family = e.getKey();
2345       List<Cell> cells = e.getValue();
2346       assert cells instanceof RandomAccess;
2347 
2348       Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2349       int listSize = cells.size();
2350       for (int i=0; i < listSize; i++) {
2351         Cell cell = cells.get(i);
2352         //  Check if time is LATEST, change to time of most recent addition if so
2353         //  This is expensive.
2354         if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) {
2355           byte[] qual = CellUtil.cloneQualifier(cell);
2356           if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2357 
2358           Integer count = kvCount.get(qual);
2359           if (count == null) {
2360             kvCount.put(qual, 1);
2361           } else {
2362             kvCount.put(qual, count + 1);
2363           }
2364           count = kvCount.get(qual);
2365 
2366           Get get = new Get(CellUtil.cloneRow(cell));
2367           get.setMaxVersions(count);
2368           get.addColumn(family, qual);
2369           if (coprocessorHost != null) {
2370             if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2371                 byteNow, get)) {
2372               updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2373             }
2374           } else {
2375             updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2376           }
2377         } else {
2378           CellUtil.updateLatestStamp(cell, byteNow, 0);
2379         }
2380       }
2381     }
2382   }
2383 
2384   void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow)
2385       throws IOException {
2386     List<Cell> result = get(get, false);
2387 
2388     if (result.size() < count) {
2389       // Nothing to delete
2390       CellUtil.updateLatestStamp(cell, byteNow, 0);
2391       return;
2392     }
2393     if (result.size() > count) {
2394       throw new RuntimeException("Unexpected size: " + result.size());
2395     }
2396     Cell getCell = result.get(count - 1);
2397     CellUtil.setTimestamp(cell, getCell.getTimestamp());
2398   }
2399 
2400   /**
2401    * @throws IOException
2402    */
2403   public void put(Put put)
2404   throws IOException {
2405     checkReadOnly();
2406 
2407     // Do a rough check that we have resources to accept a write.  The check is
2408     // 'rough' in that between the resource check and the call to obtain a
2409     // read lock, resources may run out.  For now, the thought is that this
2410     // will be extremely rare; we'll deal with it when it happens.
2411     checkResources();
2412     startRegionOperation(Operation.PUT);
2413     try {
2414       // All edits for the given row (across all column families) must happen atomically.
2415       doBatchMutate(put);
2416     } finally {
2417       closeRegionOperation(Operation.PUT);
2418     }
2419   }
2420 
2421   /**
2422    * Struct-like class that tracks the progress of a batch operation,
2423    * accumulating status codes and tracking the index at which processing
2424    * is proceeding.
2425    */
2426   private abstract static class BatchOperationInProgress<T> {
2427     T[] operations;
2428     int nextIndexToProcess = 0;
2429     OperationStatus[] retCodeDetails;
2430     WALEdit[] walEditsFromCoprocessors;
2431 
2432     public BatchOperationInProgress(T[] operations) {
2433       this.operations = operations;
2434       this.retCodeDetails = new OperationStatus[operations.length];
2435       this.walEditsFromCoprocessors = new WALEdit[operations.length];
2436       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2437     }
2438 
2439     public abstract Mutation getMutation(int index);
2440     public abstract long getNonceGroup(int index);
2441     public abstract long getNonce(int index);
2442     /** This method is potentially expensive and should only be used for non-replay CP path. */
2443     public abstract Mutation[] getMutationsForCoprocs();
2444     public abstract boolean isInReplay();
2445     public abstract long getReplaySequenceId();
2446 
2447     public boolean isDone() {
2448       return nextIndexToProcess == operations.length;
2449     }
2450   }
2451 
2452   private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2453     private long nonceGroup;
2454     private long nonce;
2455     public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2456       super(operations);
2457       this.nonceGroup = nonceGroup;
2458       this.nonce = nonce;
2459     }
2460 
2461     @Override
2462     public Mutation getMutation(int index) {
2463       return this.operations[index];
2464     }
2465 
2466     @Override
2467     public long getNonceGroup(int index) {
2468       return nonceGroup;
2469     }
2470 
2471     @Override
2472     public long getNonce(int index) {
2473       return nonce;
2474     }
2475 
2476     @Override
2477     public Mutation[] getMutationsForCoprocs() {
2478       return this.operations;
2479     }
2480 
2481     @Override
2482     public boolean isInReplay() {
2483       return false;
2484     }
2485 
2486     @Override
2487     public long getReplaySequenceId() {
2488       return 0;
2489     }
2490   }
2491 
2492   private static class ReplayBatch extends BatchOperationInProgress<MutationReplay> {
2493     private long replaySeqId = 0;
2494     public ReplayBatch(MutationReplay[] operations, long seqId) {
2495       super(operations);
2496       this.replaySeqId = seqId;
2497     }
2498 
2499     @Override
2500     public Mutation getMutation(int index) {
2501       return this.operations[index].mutation;
2502     }
2503 
2504     @Override
2505     public long getNonceGroup(int index) {
2506       return this.operations[index].nonceGroup;
2507     }
2508 
2509     @Override
2510     public long getNonce(int index) {
2511       return this.operations[index].nonce;
2512     }
2513 
2514     @Override
2515     public Mutation[] getMutationsForCoprocs() {
2516       assert false;
2517       throw new RuntimeException("Should not be called for replay batch");
2518     }
2519 
2520     @Override
2521     public boolean isInReplay() {
2522       return true;
2523     }
2524 
2525     @Override
2526     public long getReplaySequenceId() {
2527       return this.replaySeqId;
2528     }
2529   }
2530 
2531   /**
2532    * Perform a batch of mutations.
2533    * It supports only Put and Delete mutations and will ignore other types passed.
2534    * @param mutations the list of mutations
2535    * @return an array of OperationStatus which internally contains the
2536    *         OperationStatusCode and the exceptionMessage if any.
2537    * @throws IOException
2538    */
2539   public OperationStatus[] batchMutate(
2540       Mutation[] mutations, long nonceGroup, long nonce) throws IOException {
2541     // As it stands, this is used for 3 things
2542     //  * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
2543     //  * coprocessor calls (see ex. BulkDeleteEndpoint).
2544     // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
2545     return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2546   }
2547 
2548   public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2549     return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2550   }
2551 
2552   /**
2553    * Replay a batch of mutations.
2554    * @param mutations mutations to replay.
2555    * @param replaySeqId SeqId for current mutations
2556    * @return an array of OperationStatus which internally contains the
2557    *         OperationStatusCode and the exceptionMessage if any.
2558    * @throws IOException
2559    */
2560   public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
2561       throws IOException {
2562     return batchMutate(new ReplayBatch(mutations, replaySeqId));
2563   }
2564 
2565   /**
2566    * Perform a batch of mutations.
2567    * It supports only Put and Delete mutations and will ignore other types passed.
2568    * @param batchOp contains the list of mutations
2569    * @return an array of OperationStatus which internally contains the
2570    *         OperationStatusCode and the exceptionMessage if any.
2571    * @throws IOException
2572    */
2573   OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2574     boolean initialized = false;
2575     Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
2576     startRegionOperation(op);
2577     try {
2578       while (!batchOp.isDone()) {
2579         if (!batchOp.isInReplay()) {
2580           checkReadOnly();
2581         }
2582         checkResources();
2583 
2584         if (!initialized) {
2585           this.writeRequestsCount.add(batchOp.operations.length);
2586           if (!batchOp.isInReplay()) {
2587             doPreMutationHook(batchOp);
2588           }
2589           initialized = true;
2590         }
2591         long addedSize = doMiniBatchMutation(batchOp);
2592         long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2593         if (isFlushSize(newSize)) {
2594           requestFlush();
2595         }
2596       }
2597     } finally {
2598       closeRegionOperation(op);
2599     }
2600     return batchOp.retCodeDetails;
2601   }
2602 
2603 
2604   private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2605       throws IOException {
2606     /* Run coprocessor pre hook outside of locks to avoid deadlock */
2607     WALEdit walEdit = new WALEdit();
2608     if (coprocessorHost != null) {
2609       for (int i = 0 ; i < batchOp.operations.length; i++) {
2610         Mutation m = batchOp.getMutation(i);
2611         if (m instanceof Put) {
2612           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2613             // pre hook says skip this Put
2614             // mark as success and skip in doMiniBatchMutation
2615             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2616           }
2617         } else if (m instanceof Delete) {
2618           Delete curDel = (Delete) m;
2619           if (curDel.getFamilyCellMap().isEmpty()) {
2620             // handle deleting a row case
2621             prepareDelete(curDel);
2622           }
2623           if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2624             // pre hook says skip this Delete
2625             // mark as success and skip in doMiniBatchMutation
2626             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2627           }
2628         } else {
2629           // In case of passing Append mutations along with the Puts and Deletes in batchMutate
2630           // mark the operation return code as failure so that it will not be considered in
2631           // the doMiniBatchMutation
2632           batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2633               "Put/Delete mutations only supported in batchMutate() now");
2634         }
2635         if (!walEdit.isEmpty()) {
2636           batchOp.walEditsFromCoprocessors[i] = walEdit;
2637           walEdit = new WALEdit();
2638         }
2639       }
2640     }
2641   }
2642 
2643   @SuppressWarnings("unchecked")
2644   private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2645     boolean isInReplay = batchOp.isInReplay();
2646     // variable to note if all Put items are for the same CF -- metrics related
2647     boolean putsCfSetConsistent = true;
2648     //The set of columnFamilies first seen for Put.
2649     Set<byte[]> putsCfSet = null;
2650     // variable to note if all Delete items are for the same CF -- metrics related
2651     boolean deletesCfSetConsistent = true;
2652     //The set of columnFamilies first seen for Delete.
2653     Set<byte[]> deletesCfSet = null;
2654 
2655     long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2656     WALEdit walEdit = new WALEdit(isInReplay);
2657     MultiVersionConsistencyControl.WriteEntry w = null;
2658     long txid = 0;
2659     boolean doRollBackMemstore = false;
2660     boolean locked = false;
2661 
2662     /** Keep track of the locks we hold so we can release them in finally clause */
2663     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2664     // reference family maps directly so coprocessors can mutate them if desired
2665     Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2666     List<Cell> memstoreCells = new ArrayList<Cell>();
2667     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
2668     int firstIndex = batchOp.nextIndexToProcess;
2669     int lastIndexExclusive = firstIndex;
2670     boolean success = false;
2671     int noOfPuts = 0, noOfDeletes = 0;
2672     WALKey walKey = null;
2673     long mvccNum = 0;
2674     try {
2675       // ------------------------------------
2676       // STEP 1. Try to acquire as many locks as we can, and ensure
2677       // we acquire at least one.
2678       // ----------------------------------
2679       int numReadyToWrite = 0;
2680       long now = EnvironmentEdgeManager.currentTime();
2681       while (lastIndexExclusive < batchOp.operations.length) {
2682         Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2683         boolean isPutMutation = mutation instanceof Put;
2684 
2685         Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2686         // store the family map reference to allow for mutations
2687         familyMaps[lastIndexExclusive] = familyMap;
2688 
2689         // skip anything that "ran" already
2690         if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2691             != OperationStatusCode.NOT_RUN) {
2692           lastIndexExclusive++;
2693           continue;
2694         }
2695 
2696         try {
2697           if (isPutMutation) {
2698             // Check the families in the put. If bad, skip this one.
2699             if (isInReplay) {
2700               removeNonExistentColumnFamilyForReplay(familyMap);
2701             } else {
2702               checkFamilies(familyMap.keySet());
2703             }
2704             checkTimestamps(mutation.getFamilyCellMap(), now);
2705           } else {
2706             prepareDelete((Delete) mutation);
2707           }
2708         } catch (NoSuchColumnFamilyException nscf) {
2709           LOG.warn("No such column family in batch mutation", nscf);
2710           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2711               OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2712           lastIndexExclusive++;
2713           continue;
2714         } catch (FailedSanityCheckException fsce) {
2715           LOG.warn("Batch Mutation did not pass sanity check", fsce);
2716           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2717               OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2718           lastIndexExclusive++;
2719           continue;
2720         }
2721 
2722         // If we haven't got any rows in our batch, we should block to
2723         // get the next one.
2724         boolean shouldBlock = numReadyToWrite == 0;
2725         RowLock rowLock = null;
2726         try {
2727           rowLock = getRowLockInternal(mutation.getRow(), shouldBlock);
2728         } catch (IOException ioe) {
2729           LOG.warn("Failed getting lock in batch put, row="
2730             + Bytes.toStringBinary(mutation.getRow()), ioe);
2731         }
2732         if (rowLock == null) {
2733           // We failed to grab another lock
2734           assert !shouldBlock : "Should never fail to get lock when blocking";
2735           break; // stop acquiring more rows for this batch
2736         } else {
2737           acquiredRowLocks.add(rowLock);
2738         }
2739 
2740         lastIndexExclusive++;
2741         numReadyToWrite++;
2742 
2743         if (isPutMutation) {
2744           // If Column Families stay consistent through out all of the
2745           // individual puts then metrics can be reported as a mutliput across
2746           // column families in the first put.
2747           if (putsCfSet == null) {
2748             putsCfSet = mutation.getFamilyCellMap().keySet();
2749           } else {
2750             putsCfSetConsistent = putsCfSetConsistent
2751                 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
2752           }
2753         } else {
2754           if (deletesCfSet == null) {
2755             deletesCfSet = mutation.getFamilyCellMap().keySet();
2756           } else {
2757             deletesCfSetConsistent = deletesCfSetConsistent
2758                 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
2759           }
2760         }
2761       }
2762 
2763       // we should record the timestamp only after we have acquired the rowLock,
2764       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
2765       now = EnvironmentEdgeManager.currentTime();
2766       byte[] byteNow = Bytes.toBytes(now);
2767 
2768       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
2769       if (numReadyToWrite <= 0) return 0L;
2770 
2771       // We've now grabbed as many mutations off the list as we can
2772 
2773       // ------------------------------------
2774       // STEP 2. Update any LATEST_TIMESTAMP timestamps
2775       // ----------------------------------
2776       for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
2777         // skip invalid
2778         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2779             != OperationStatusCode.NOT_RUN) continue;
2780 
2781         Mutation mutation = batchOp.getMutation(i);
2782         if (mutation instanceof Put) {
2783           updateCellTimestamps(familyMaps[i].values(), byteNow);
2784           noOfPuts++;
2785         } else {
2786           prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
2787           noOfDeletes++;
2788         }
2789         rewriteCellTags(familyMaps[i], mutation);
2790       }
2791 
2792       lock(this.updatesLock.readLock(), numReadyToWrite);
2793       locked = true;
2794       if(isInReplay) {
2795         mvccNum = batchOp.getReplaySequenceId();
2796       } else {
2797         mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
2798       }
2799       //
2800       // ------------------------------------
2801       // Acquire the latest mvcc number
2802       // ----------------------------------
2803       w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
2804 
2805       // calling the pre CP hook for batch mutation
2806       if (!isInReplay && coprocessorHost != null) {
2807         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2808           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2809           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2810         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2811       }
2812 
2813       // ------------------------------------
2814       // STEP 3. Write back to memstore
2815       // Write to memstore. It is ok to write to memstore
2816       // first without updating the WAL because we do not roll
2817       // forward the memstore MVCC. The MVCC will be moved up when
2818       // the complete operation is done. These changes are not yet
2819       // visible to scanners till we update the MVCC. The MVCC is
2820       // moved only when the sync is complete.
2821       // ----------------------------------
2822       long addedSize = 0;
2823       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2824         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2825             != OperationStatusCode.NOT_RUN) {
2826           continue;
2827         }
2828         doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
2829         addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay);
2830       }
2831 
2832       // ------------------------------------
2833       // STEP 4. Build WAL edit
2834       // ----------------------------------
2835       Durability durability = Durability.USE_DEFAULT;
2836       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2837         // Skip puts that were determined to be invalid during preprocessing
2838         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2839             != OperationStatusCode.NOT_RUN) {
2840           continue;
2841         }
2842         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2843 
2844         Mutation m = batchOp.getMutation(i);
2845         Durability tmpDur = getEffectiveDurability(m.getDurability());
2846         if (tmpDur.ordinal() > durability.ordinal()) {
2847           durability = tmpDur;
2848         }
2849         if (tmpDur == Durability.SKIP_WAL) {
2850           recordMutationWithoutWal(m.getFamilyCellMap());
2851           continue;
2852         }
2853 
2854         long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
2855         // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
2856         // Given how nonces are originally written, these should be contiguous.
2857         // They don't have to be, it will still work, just write more WALEdits than needed.
2858         if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
2859           if (walEdit.size() > 0) {
2860             assert isInReplay;
2861             if (!isInReplay) {
2862               throw new IOException("Multiple nonces per batch and not in replay");
2863             }
2864             // txid should always increase, so having the one from the last call is ok.
2865             // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
2866             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2867               this.htableDescriptor.getTableName(), now, m.getClusterIds(),
2868               currentNonceGroup, currentNonce);
2869             txid = this.wal.append(this.htableDescriptor,  this.getRegionInfo(),  walKey,
2870               walEdit, getSequenceId(), true, null);
2871             walEdit = new WALEdit(isInReplay);
2872             walKey = null;
2873           }
2874           currentNonceGroup = nonceGroup;
2875           currentNonce = nonce;
2876         }
2877 
2878         // Add WAL edits by CP
2879         WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2880         if (fromCP != null) {
2881           for (Cell cell : fromCP.getCells()) {
2882             walEdit.add(cell);
2883           }
2884         }
2885         addFamilyMapToWALEdit(familyMaps[i], walEdit);
2886       }
2887 
2888       // -------------------------
2889       // STEP 5. Append the final edit to WAL. Do not sync wal.
2890       // -------------------------
2891       Mutation mutation = batchOp.getMutation(firstIndex);
2892       if (walEdit.size() > 0) {
2893         // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
2894         walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2895             this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
2896             mutation.getClusterIds(), currentNonceGroup, currentNonce);
2897         if(isInReplay) {
2898           walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
2899         }
2900         txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
2901           getSequenceId(), true, memstoreCells);
2902       }
2903       if(walKey == null){
2904         // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
2905         walKey = this.appendEmptyEdit(this.wal, memstoreCells);
2906       }
2907 
2908       // -------------------------------
2909       // STEP 6. Release row locks, etc.
2910       // -------------------------------
2911       if (locked) {
2912         this.updatesLock.readLock().unlock();
2913         locked = false;
2914       }
2915       releaseRowLocks(acquiredRowLocks);
2916 
2917       // -------------------------
2918       // STEP 7. Sync wal.
2919       // -------------------------
2920       if (txid != 0) {
2921         syncOrDefer(txid, durability);
2922       }
2923 
2924       doRollBackMemstore = false;
2925       // calling the post CP hook for batch mutation
2926       if (!isInReplay && coprocessorHost != null) {
2927         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2928           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2929           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2930         coprocessorHost.postBatchMutate(miniBatchOp);
2931       }
2932 
2933 
2934       // ------------------------------------------------------------------
2935       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
2936       // ------------------------------------------------------------------
2937       if (w != null) {
2938         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2939         w = null;
2940       }
2941 
2942       // ------------------------------------
2943       // STEP 9. Run coprocessor post hooks. This should be done after the wal is
2944       // synced so that the coprocessor contract is adhered to.
2945       // ------------------------------------
2946       if (!isInReplay && coprocessorHost != null) {
2947         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2948           // only for successful puts
2949           if (batchOp.retCodeDetails[i].getOperationStatusCode()
2950               != OperationStatusCode.SUCCESS) {
2951             continue;
2952           }
2953           Mutation m = batchOp.getMutation(i);
2954           if (m instanceof Put) {
2955             coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
2956           } else {
2957             coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
2958           }
2959         }
2960       }
2961 
2962       success = true;
2963       return addedSize;
2964     } finally {
2965       // if the wal sync was unsuccessful, remove keys from memstore
2966       if (doRollBackMemstore) {
2967         rollbackMemstore(memstoreCells);
2968       }
2969       if (w != null) {
2970         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2971       }
2972 
2973       if (locked) {
2974         this.updatesLock.readLock().unlock();
2975       }
2976       releaseRowLocks(acquiredRowLocks);
2977 
2978       // See if the column families were consistent through the whole thing.
2979       // if they were then keep them. If they were not then pass a null.
2980       // null will be treated as unknown.
2981       // Total time taken might be involving Puts and Deletes.
2982       // Split the time for puts and deletes based on the total number of Puts and Deletes.
2983 
2984       if (noOfPuts > 0) {
2985         // There were some Puts in the batch.
2986         if (this.metricsRegion != null) {
2987           this.metricsRegion.updatePut();
2988         }
2989       }
2990       if (noOfDeletes > 0) {
2991         // There were some Deletes in the batch.
2992         if (this.metricsRegion != null) {
2993           this.metricsRegion.updateDelete();
2994         }
2995       }
2996       if (!success) {
2997         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2998           if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
2999             batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
3000           }
3001         }
3002       }
3003       if (coprocessorHost != null && !batchOp.isInReplay()) {
3004         // call the coprocessor hook to do any finalization steps
3005         // after the put is done
3006         MiniBatchOperationInProgress<Mutation> miniBatchOp =
3007             new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3008                 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
3009                 lastIndexExclusive);
3010         coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
3011       }
3012 
3013       batchOp.nextIndexToProcess = lastIndexExclusive;
3014     }
3015   }
3016 
3017   /**
3018    * Returns effective durability from the passed durability and
3019    * the table descriptor.
3020    */
3021   protected Durability getEffectiveDurability(Durability d) {
3022     return d == Durability.USE_DEFAULT ? this.durability : d;
3023   }
3024 
3025   //TODO, Think that gets/puts and deletes should be refactored a bit so that
3026   //the getting of the lock happens before, so that you would just pass it into
3027   //the methods. So in the case of checkAndMutate you could just do lockRow,
3028   //get, put, unlockRow or something
3029   /**
3030    *
3031    * @throws IOException
3032    * @return true if the new put was executed, false otherwise
3033    */
3034   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
3035       CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
3036       boolean writeToWAL)
3037   throws IOException{
3038     checkReadOnly();
3039     //TODO, add check for value length or maybe even better move this to the
3040     //client if this becomes a global setting
3041     checkResources();
3042     boolean isPut = w instanceof Put;
3043     if (!isPut && !(w instanceof Delete))
3044       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
3045           "be Put or Delete");
3046     if (!Bytes.equals(row, w.getRow())) {
3047       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
3048           "getRow must match the passed row");
3049     }
3050 
3051     startRegionOperation();
3052     try {
3053       Get get = new Get(row);
3054       checkFamily(family);
3055       get.addColumn(family, qualifier);
3056 
3057       // Lock row - note that doBatchMutate will relock this row if called
3058       RowLock rowLock = getRowLock(get.getRow());
3059       // wait for all previous transactions to complete (with lock held)
3060       mvcc.waitForPreviousTransactionsComplete();
3061       try {
3062         if (this.getCoprocessorHost() != null) {
3063           Boolean processed = null;
3064           if (w instanceof Put) {
3065             processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
3066                 qualifier, compareOp, comparator, (Put) w);
3067           } else if (w instanceof Delete) {
3068             processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
3069                 qualifier, compareOp, comparator, (Delete) w);
3070           }
3071           if (processed != null) {
3072             return processed;
3073           }
3074         }
3075         List<Cell> result = get(get, false);
3076 
3077         boolean valueIsNull = comparator.getValue() == null ||
3078           comparator.getValue().length == 0;
3079         boolean matches = false;
3080         if (result.size() == 0 && valueIsNull) {
3081           matches = true;
3082         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3083             valueIsNull) {
3084           matches = true;
3085         } else if (result.size() == 1 && !valueIsNull) {
3086           Cell kv = result.get(0);
3087           int compareResult = comparator.compareTo(kv.getValueArray(),
3088               kv.getValueOffset(), kv.getValueLength());
3089           switch (compareOp) {
3090           case LESS:
3091             matches = compareResult < 0;
3092             break;
3093           case LESS_OR_EQUAL:
3094             matches = compareResult <= 0;
3095             break;
3096           case EQUAL:
3097             matches = compareResult == 0;
3098             break;
3099           case NOT_EQUAL:
3100             matches = compareResult != 0;
3101             break;
3102           case GREATER_OR_EQUAL:
3103             matches = compareResult >= 0;
3104             break;
3105           case GREATER:
3106             matches = compareResult > 0;
3107             break;
3108           default:
3109             throw new RuntimeException("Unknown Compare op " + compareOp.name());
3110           }
3111         }
3112         //If matches put the new put or delete the new delete
3113         if (matches) {
3114           // All edits for the given row (across all column families) must
3115           // happen atomically.
3116           doBatchMutate(w);
3117           this.checkAndMutateChecksPassed.increment();
3118           return true;
3119         }
3120         this.checkAndMutateChecksFailed.increment();
3121         return false;
3122       } finally {
3123         rowLock.release();
3124       }
3125     } finally {
3126       closeRegionOperation();
3127     }
3128   }
3129 
3130   //TODO, Think that gets/puts and deletes should be refactored a bit so that
3131   //the getting of the lock happens before, so that you would just pass it into
3132   //the methods. So in the case of checkAndMutate you could just do lockRow,
3133   //get, put, unlockRow or something
3134   /**
3135    *
3136    * @throws IOException
3137    * @return true if the new put was executed, false otherwise
3138    */
3139   public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
3140       CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
3141       boolean writeToWAL)
3142       throws IOException{
3143     checkReadOnly();
3144     //TODO, add check for value length or maybe even better move this to the
3145     //client if this becomes a global setting
3146     checkResources();
3147 
3148     startRegionOperation();
3149     try {
3150       Get get = new Get(row);
3151       checkFamily(family);
3152       get.addColumn(family, qualifier);
3153 
3154       // Lock row - note that doBatchMutate will relock this row if called
3155       RowLock rowLock = getRowLock(get.getRow());
3156       // wait for all previous transactions to complete (with lock held)
3157       mvcc.waitForPreviousTransactionsComplete();
3158       try {
3159         List<Cell> result = get(get, false);
3160 
3161         boolean valueIsNull = comparator.getValue() == null ||
3162             comparator.getValue().length == 0;
3163         boolean matches = false;
3164         if (result.size() == 0 && valueIsNull) {
3165           matches = true;
3166         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3167             valueIsNull) {
3168           matches = true;
3169         } else if (result.size() == 1 && !valueIsNull) {
3170           Cell kv = result.get(0);
3171           int compareResult = comparator.compareTo(kv.getValueArray(),
3172               kv.getValueOffset(), kv.getValueLength());
3173           switch (compareOp) {
3174           case LESS:
3175             matches = compareResult < 0;
3176             break;
3177           case LESS_OR_EQUAL:
3178             matches = compareResult <= 0;
3179             break;
3180           case EQUAL:
3181             matches = compareResult == 0;
3182             break;
3183           case NOT_EQUAL:
3184             matches = compareResult != 0;
3185             break;
3186           case GREATER_OR_EQUAL:
3187             matches = compareResult >= 0;
3188             break;
3189           case GREATER:
3190             matches = compareResult > 0;
3191             break;
3192           default:
3193             throw new RuntimeException("Unknown Compare op " + compareOp.name());
3194           }
3195         }
3196         //If matches put the new put or delete the new delete
3197         if (matches) {
3198           // All edits for the given row (across all column families) must
3199           // happen atomically.
3200           mutateRow(rm);
3201           this.checkAndMutateChecksPassed.increment();
3202           return true;
3203         }
3204         this.checkAndMutateChecksFailed.increment();
3205         return false;
3206       } finally {
3207         rowLock.release();
3208       }
3209     } finally {
3210       closeRegionOperation();
3211     }
3212   }
3213   private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
3214     // Currently this is only called for puts and deletes, so no nonces.
3215     OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
3216         HConstants.NO_NONCE, HConstants.NO_NONCE);
3217     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
3218       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
3219     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
3220       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
3221     }
3222   }
3223 
3224   /**
3225    * Complete taking the snapshot on the region. Writes the region info and adds references to the
3226    * working snapshot directory.
3227    *
3228    * TODO for api consistency, consider adding another version with no {@link ForeignExceptionSnare}
3229    * arg.  (In the future other cancellable HRegion methods could eventually add a
3230    * {@link ForeignExceptionSnare}, or we could do something fancier).
3231    *
3232    * @param desc snapshot description object
3233    * @param exnSnare ForeignExceptionSnare that captures external exceptions in case we need to
3234    *   bail out.  This is allowed to be null and will just be ignored in that case.
3235    * @throws IOException if there is an external or internal error causing the snapshot to fail
3236    */
3237   public void addRegionToSnapshot(SnapshotDescription desc,
3238       ForeignExceptionSnare exnSnare) throws IOException {
3239     Path rootDir = FSUtils.getRootDir(conf);
3240     Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
3241 
3242     SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
3243                                                         snapshotDir, desc, exnSnare);
3244     manifest.addRegion(this);
3245   }
3246 
3247   /**
3248    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the
3249    * provided current timestamp.
3250    * @throws IOException
3251    */
3252   void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
3253       throws IOException {
3254     for (List<Cell> cells: cellItr) {
3255       if (cells == null) continue;
3256       assert cells instanceof RandomAccess;
3257       int listSize = cells.size();
3258       for (int i = 0; i < listSize; i++) {
3259         CellUtil.updateLatestStamp(cells.get(i), now, 0);
3260       }
3261     }
3262   }
3263 
3264   /**
3265    * Possibly rewrite incoming cell tags.
3266    */
3267   void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
3268     // Check if we have any work to do and early out otherwise
3269     // Update these checks as more logic is added here
3270 
3271     if (m.getTTL() == Long.MAX_VALUE) {
3272       return;
3273     }
3274 
3275     // From this point we know we have some work to do
3276 
3277     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
3278       List<Cell> cells = e.getValue();
3279       assert cells instanceof RandomAccess;
3280       int listSize = cells.size();
3281       for (int i = 0; i < listSize; i++) {
3282         Cell cell = cells.get(i);
3283         List<Tag> newTags = new ArrayList<Tag>();
3284         Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
3285           cell.getTagsOffset(), cell.getTagsLength());
3286 
3287         // Carry forward existing tags
3288 
3289         while (tagIterator.hasNext()) {
3290 
3291           // Add any filters or tag specific rewrites here
3292 
3293           newTags.add(tagIterator.next());
3294         }
3295 
3296         // Cell TTL handling
3297 
3298         // Check again if we need to add a cell TTL because early out logic
3299         // above may change when there are more tag based features in core.
3300         if (m.getTTL() != Long.MAX_VALUE) {
3301           // Add a cell TTL tag
3302           newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
3303         }
3304 
3305         // Rewrite the cell with the updated set of tags
3306 
3307         cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
3308           cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3309           cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
3310           cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
3311           cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
3312           newTags));
3313       }
3314     }
3315   }
3316 
3317   /*
3318    * Check if resources to support an update.
3319    *
3320    * We throw RegionTooBusyException if above memstore limit
3321    * and expect client to retry using some kind of backoff
3322   */
3323   private void checkResources() throws RegionTooBusyException {
3324     // If catalog region, do not impose resource constraints or block updates.
3325     if (this.getRegionInfo().isMetaRegion()) return;
3326 
3327     if (this.memstoreSize.get() > this.blockingMemStoreSize) {
3328       blockedRequestsCount.increment();
3329       requestFlush();
3330       throw new RegionTooBusyException("Above memstore limit, " +
3331           "regionName=" + (this.getRegionInfo() == null ? "unknown" :
3332           this.getRegionInfo().getRegionNameAsString()) +
3333           ", server=" + (this.getRegionServerServices() == null ? "unknown" :
3334           this.getRegionServerServices().getServerName()) +
3335           ", memstoreSize=" + memstoreSize.get() +
3336           ", blockingMemStoreSize=" + blockingMemStoreSize);
3337     }
3338   }
3339 
3340   /**
3341    * @throws IOException Throws exception if region is in read-only mode.
3342    */
3343   protected void checkReadOnly() throws IOException {
3344     if (this.writestate.isReadOnly()) {
3345       throw new IOException("region is read only");
3346     }
3347   }
3348 
3349   protected void checkReadsEnabled() throws IOException {
3350     if (!this.writestate.readsEnabled) {
3351       throw new IOException ("The region's reads are disabled. Cannot serve the request");
3352     }
3353   }
3354 
3355   public void setReadsEnabled(boolean readsEnabled) {
3356     this.writestate.setReadsEnabled(readsEnabled);
3357   }
3358 
3359   /**
3360    * Add updates first to the wal and then add values to memstore.
3361    * Warning: Assumption is caller has lock on passed in row.
3362    * @param edits Cell updates by column
3363    * @throws IOException
3364    */
3365   private void put(final byte [] row, byte [] family, List<Cell> edits)
3366   throws IOException {
3367     NavigableMap<byte[], List<Cell>> familyMap;
3368     familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
3369 
3370     familyMap.put(family, edits);
3371     Put p = new Put(row);
3372     p.setFamilyCellMap(familyMap);
3373     doBatchMutate(p);
3374   }
3375 
3376   /**
3377    * Atomically apply the given map of family->edits to the memstore.
3378    * This handles the consistency control on its own, but the caller
3379    * should already have locked updatesLock.readLock(). This also does
3380    * <b>not</b> check the families for validity.
3381    *
3382    * @param familyMap Map of kvs per family
3383    * @param mvccNum The MVCC for this transaction.
3384    * @param isInReplay true when adding replayed KVs into memstore
3385    * @return the additional memory usage of the memstore caused by the
3386    * new entries.
3387    */
3388   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
3389     long mvccNum, List<Cell> memstoreCells, boolean isInReplay) throws IOException {
3390     long size = 0;
3391 
3392     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3393       byte[] family = e.getKey();
3394       List<Cell> cells = e.getValue();
3395       assert cells instanceof RandomAccess;
3396       Store store = getStore(family);
3397       int listSize = cells.size();
3398       for (int i=0; i < listSize; i++) {
3399         Cell cell = cells.get(i);
3400         CellUtil.setSequenceId(cell, mvccNum);
3401         Pair<Long, Cell> ret = store.add(cell);
3402         size += ret.getFirst();
3403         memstoreCells.add(ret.getSecond());
3404         if(isInReplay) {
3405           // set memstore newly added cells with replay mvcc number
3406           CellUtil.setSequenceId(ret.getSecond(), mvccNum);
3407         }
3408       }
3409     }
3410 
3411      return size;
3412    }
3413 
3414   /**
3415    * Remove all the keys listed in the map from the memstore. This method is
3416    * called when a Put/Delete has updated memstore but subsequently fails to update
3417    * the wal. This method is then invoked to rollback the memstore.
3418    */
3419   private void rollbackMemstore(List<Cell> memstoreCells) {
3420     int kvsRolledback = 0;
3421 
3422     for (Cell cell : memstoreCells) {
3423       byte[] family = CellUtil.cloneFamily(cell);
3424       Store store = getStore(family);
3425       store.rollback(cell);
3426       kvsRolledback++;
3427     }
3428     LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
3429   }
3430 
3431   /**
3432    * Check the collection of families for validity.
3433    * @throws NoSuchColumnFamilyException if a family does not exist.
3434    */
3435   void checkFamilies(Collection<byte[]> families)
3436   throws NoSuchColumnFamilyException {
3437     for (byte[] family : families) {
3438       checkFamily(family);
3439     }
3440   }
3441 
3442   /**
3443    * During replay, there could exist column families which are removed between region server
3444    * failure and replay
3445    */
3446   private void removeNonExistentColumnFamilyForReplay(
3447       final Map<byte[], List<Cell>> familyMap) {
3448     List<byte[]> nonExistentList = null;
3449     for (byte[] family : familyMap.keySet()) {
3450       if (!this.htableDescriptor.hasFamily(family)) {
3451         if (nonExistentList == null) {
3452           nonExistentList = new ArrayList<byte[]>();
3453         }
3454         nonExistentList.add(family);
3455       }
3456     }
3457     if (nonExistentList != null) {
3458       for (byte[] family : nonExistentList) {
3459         // Perhaps schema was changed between crash and replay
3460         LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3461         familyMap.remove(family);
3462       }
3463     }
3464   }
3465 
3466   void checkTimestamps(final Map<byte[], List<Cell>> familyMap,
3467       long now) throws FailedSanityCheckException {
3468     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3469       return;
3470     }
3471     long maxTs = now + timestampSlop;
3472     for (List<Cell> kvs : familyMap.values()) {
3473       assert kvs instanceof RandomAccess;
3474       int listSize  = kvs.size();
3475       for (int i=0; i < listSize; i++) {
3476         Cell cell = kvs.get(i);
3477         // see if the user-side TS is out of range. latest = server-side
3478         long ts = cell.getTimestamp();
3479         if (ts != HConstants.LATEST_TIMESTAMP && ts > maxTs) {
3480           throw new FailedSanityCheckException("Timestamp for KV out of range "
3481               + cell + " (too.new=" + timestampSlop + ")");
3482         }
3483       }
3484     }
3485   }
3486 
3487   /**
3488    * Append the given map of family->edits to a WALEdit data structure.
3489    * This does not write to the WAL itself.
3490    * @param familyMap map of family->edits
3491    * @param walEdit the destination entry to append into
3492    */
3493   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3494       WALEdit walEdit) {
3495     for (List<Cell> edits : familyMap.values()) {
3496       assert edits instanceof RandomAccess;
3497       int listSize = edits.size();
3498       for (int i=0; i < listSize; i++) {
3499         Cell cell = edits.get(i);
3500         walEdit.add(cell);
3501       }
3502     }
3503   }
3504 
3505   private void requestFlush() {
3506     if (this.rsServices == null) {
3507       return;
3508     }
3509     synchronized (writestate) {
3510       if (this.writestate.isFlushRequested()) {
3511         return;
3512       }
3513       writestate.flushRequested = true;
3514     }
3515     // Make request outside of synchronize block; HBASE-818.
3516     this.rsServices.getFlushRequester().requestFlush(this, false);
3517     if (LOG.isDebugEnabled()) {
3518       LOG.debug("Flush requested on " + this);
3519     }
3520   }
3521 
3522   /*
3523    * @param size
3524    * @return True if size is over the flush threshold
3525    */
3526   private boolean isFlushSize(final long size) {
3527     return size > this.memstoreFlushSize;
3528   }
3529 
3530   /**
3531    * Read the edits put under this region by wal splitting process.  Put
3532    * the recovered edits back up into this region.
3533    *
3534    * <p>We can ignore any wal message that has a sequence ID that's equal to or
3535    * lower than minSeqId.  (Because we know such messages are already
3536    * reflected in the HFiles.)
3537    *
3538    * <p>While this is running we are putting pressure on memory yet we are
3539    * outside of our usual accounting because we are not yet an onlined region
3540    * (this stuff is being run as part of Region initialization).  This means
3541    * that if we're up against global memory limits, we'll not be flagged to flush
3542    * because we are not online. We can't be flushed by usual mechanisms anyways;
3543    * we're not yet online so our relative sequenceids are not yet aligned with
3544    * WAL sequenceids -- not till we come up online, post processing of split
3545    * edits.
3546    *
3547    * <p>But to help relieve memory pressure, at least manage our own heap size
3548    * flushing if are in excess of per-region limits.  Flushing, though, we have
3549    * to be careful and avoid using the regionserver/wal sequenceid.  Its running
3550    * on a different line to whats going on in here in this region context so if we
3551    * crashed replaying these edits, but in the midst had a flush that used the
3552    * regionserver wal with a sequenceid in excess of whats going on in here
3553    * in this region and with its split editlogs, then we could miss edits the
3554    * next time we go to recover. So, we have to flush inline, using seqids that
3555    * make sense in a this single region context only -- until we online.
3556    *
3557    * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
3558    * the maxSeqId for the store to be applied, else its skipped.
3559    * @return the sequence id of the last edit added to this region out of the
3560    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3561    * @throws UnsupportedEncodingException
3562    * @throws IOException
3563    */
3564   protected long replayRecoveredEditsIfAny(final Path regiondir,
3565       Map<byte[], Long> maxSeqIdInStores,
3566       final CancelableProgressable reporter, final MonitoredTask status)
3567       throws UnsupportedEncodingException, IOException {
3568     long minSeqIdForTheRegion = -1;
3569     for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3570       if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3571         minSeqIdForTheRegion = maxSeqIdInStore;
3572       }
3573     }
3574     long seqid = minSeqIdForTheRegion;
3575 
3576     FileSystem fs = this.fs.getFileSystem();
3577     NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
3578     if (LOG.isDebugEnabled()) {
3579       LOG.debug("Found " + (files == null ? 0 : files.size())
3580         + " recovered edits file(s) under " + regiondir);
3581     }
3582 
3583     if (files == null || files.isEmpty()) return seqid;
3584 
3585     for (Path edits: files) {
3586       if (edits == null || !fs.exists(edits)) {
3587         LOG.warn("Null or non-existent edits file: " + edits);
3588         continue;
3589       }
3590       if (isZeroLengthThenDelete(fs, edits)) continue;
3591 
3592       long maxSeqId;
3593       String fileName = edits.getName();
3594       maxSeqId = Math.abs(Long.parseLong(fileName));
3595       if (maxSeqId <= minSeqIdForTheRegion) {
3596         if (LOG.isDebugEnabled()) {
3597           String msg = "Maximum sequenceid for this wal is " + maxSeqId
3598             + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3599             + ", skipped the whole file, path=" + edits;
3600           LOG.debug(msg);
3601         }
3602         continue;
3603       }
3604 
3605       try {
3606         // replay the edits. Replay can return -1 if everything is skipped, only update
3607         // if seqId is greater
3608         seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3609       } catch (IOException e) {
3610         boolean skipErrors = conf.getBoolean(
3611             HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3612             conf.getBoolean(
3613                 "hbase.skip.errors",
3614                 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3615         if (conf.get("hbase.skip.errors") != null) {
3616           LOG.warn(
3617               "The property 'hbase.skip.errors' has been deprecated. Please use " +
3618               HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3619         }
3620         if (skipErrors) {
3621           Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3622           LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3623               + "=true so continuing. Renamed " + edits +
3624               " as " + p, e);
3625         } else {
3626           throw e;
3627         }
3628       }
3629     }
3630     // The edits size added into rsAccounting during this replaying will not
3631     // be required any more. So just clear it.
3632     if (this.rsAccounting != null) {
3633       this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
3634     }
3635     if (seqid > minSeqIdForTheRegion) {
3636       // Then we added some edits to memory. Flush and cleanup split edit files.
3637       internalFlushcache(null, seqid, stores.values(), status);
3638     }
3639     // Now delete the content of recovered edits.  We're done w/ them.
3640     for (Path file: files) {
3641       if (!fs.delete(file, false)) {
3642         LOG.error("Failed delete of " + file);
3643       } else {
3644         LOG.debug("Deleted recovered.edits file=" + file);
3645       }
3646     }
3647     return seqid;
3648   }
3649 
3650   /*
3651    * @param edits File of recovered edits.
3652    * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in wal
3653    * must be larger than this to be replayed for each store.
3654    * @param reporter
3655    * @return the sequence id of the last edit added to this region out of the
3656    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3657    * @throws IOException
3658    */
3659   private long replayRecoveredEdits(final Path edits,
3660       Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3661     throws IOException {
3662     String msg = "Replaying edits from " + edits;
3663     LOG.info(msg);
3664     MonitoredTask status = TaskMonitor.get().createStatus(msg);
3665     FileSystem fs = this.fs.getFileSystem();
3666 
3667     status.setStatus("Opening recovered edits");
3668     WAL.Reader reader = null;
3669     try {
3670       reader = WALFactory.createReader(fs, edits, conf);
3671       long currentEditSeqId = -1;
3672       long currentReplaySeqId = -1;
3673       long firstSeqIdInLog = -1;
3674       long skippedEdits = 0;
3675       long editsCount = 0;
3676       long intervalEdits = 0;
3677       WAL.Entry entry;
3678       Store store = null;
3679       boolean reported_once = false;
3680       ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3681 
3682       try {
3683         // How many edits seen before we check elapsed time
3684         int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
3685             2000);
3686         // How often to send a progress report (default 1/2 master timeout)
3687         int period = this.conf.getInt("hbase.hstore.report.period", 300000);
3688         long lastReport = EnvironmentEdgeManager.currentTime();
3689 
3690         while ((entry = reader.next()) != null) {
3691           WALKey key = entry.getKey();
3692           WALEdit val = entry.getEdit();
3693 
3694           if (ng != null) { // some test, or nonces disabled
3695             ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
3696           }
3697 
3698           if (reporter != null) {
3699             intervalEdits += val.size();
3700             if (intervalEdits >= interval) {
3701               // Number of edits interval reached
3702               intervalEdits = 0;
3703               long cur = EnvironmentEdgeManager.currentTime();
3704               if (lastReport + period <= cur) {
3705                 status.setStatus("Replaying edits..." +
3706                     " skipped=" + skippedEdits +
3707                     " edits=" + editsCount);
3708                 // Timeout reached
3709                 if(!reporter.progress()) {
3710                   msg = "Progressable reporter failed, stopping replay";
3711                   LOG.warn(msg);
3712                   status.abort(msg);
3713                   throw new IOException(msg);
3714                 }
3715                 reported_once = true;
3716                 lastReport = cur;
3717               }
3718             }
3719           }
3720 
3721           if (firstSeqIdInLog == -1) {
3722             firstSeqIdInLog = key.getLogSeqNum();
3723           }
3724           if (currentEditSeqId > key.getLogSeqNum()) {
3725             // when this condition is true, it means we have a serious defect because we need to
3726             // maintain increasing SeqId for WAL edits per region
3727             LOG.error("Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key
3728                 + "; edit=" + val);
3729           } else {
3730             currentEditSeqId = key.getLogSeqNum();
3731           }
3732           currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
3733             key.getOrigLogSeqNum() : currentEditSeqId;
3734 
3735           // Start coprocessor replay here. The coprocessor is for each WALEdit
3736           // instead of a KeyValue.
3737           if (coprocessorHost != null) {
3738             status.setStatus("Running pre-WAL-restore hook in coprocessors");
3739             if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
3740               // if bypass this wal entry, ignore it ...
3741               continue;
3742             }
3743           }
3744 
3745           boolean flush = false;
3746           for (Cell cell: val.getCells()) {
3747             // Check this edit is for me. Also, guard against writing the special
3748             // METACOLUMN info such as HBASE::CACHEFLUSH entries
3749             if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY) ||
3750                 !Bytes.equals(key.getEncodedRegionName(),
3751                   this.getRegionInfo().getEncodedNameAsBytes())) {
3752               //this is a special edit, we should handle it
3753               CompactionDescriptor compaction = WALEdit.getCompaction(cell);
3754               if (compaction != null) {
3755                 //replay the compaction
3756                 completeCompactionMarker(compaction);
3757               }
3758 
3759               skippedEdits++;
3760               continue;
3761             }
3762             // Figure which store the edit is meant for.
3763             if (store == null || !CellUtil.matchingFamily(cell, store.getFamily().getName())) {
3764               store = getStore(cell);
3765             }
3766             if (store == null) {
3767               // This should never happen.  Perhaps schema was changed between
3768               // crash and redeploy?
3769               LOG.warn("No family for " + cell);
3770               skippedEdits++;
3771               continue;
3772             }
3773             // Now, figure if we should skip this edit.
3774             if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
3775                 .getName())) {
3776               skippedEdits++;
3777               continue;
3778             }
3779             CellUtil.setSequenceId(cell, currentReplaySeqId);
3780 
3781             // Once we are over the limit, restoreEdit will keep returning true to
3782             // flush -- but don't flush until we've played all the kvs that make up
3783             // the WALEdit.
3784             if (!flush) {
3785               flush = restoreEdit(store, cell);
3786             }
3787 
3788             editsCount++;
3789           }
3790           if (flush) {
3791             internalFlushcache(null, currentEditSeqId, stores.values(), status);
3792           }
3793 
3794           if (coprocessorHost != null) {
3795             coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3796           }
3797         }
3798       } catch (EOFException eof) {
3799         Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3800         msg = "Encountered EOF. Most likely due to Master failure during " +
3801             "wal splitting, so we have this data in another edit.  " +
3802             "Continuing, but renaming " + edits + " as " + p;
3803         LOG.warn(msg, eof);
3804         status.abort(msg);
3805       } catch (IOException ioe) {
3806         // If the IOE resulted from bad file format,
3807         // then this problem is idempotent and retrying won't help
3808         if (ioe.getCause() instanceof ParseException) {
3809           Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3810           msg = "File corruption encountered!  " +
3811               "Continuing, but renaming " + edits + " as " + p;
3812           LOG.warn(msg, ioe);
3813           status.setStatus(msg);
3814         } else {
3815           status.abort(StringUtils.stringifyException(ioe));
3816           // other IO errors may be transient (bad network connection,
3817           // checksum exception on one datanode, etc).  throw & retry
3818           throw ioe;
3819         }
3820       }
3821       if (reporter != null && !reported_once) {
3822         reporter.progress();
3823       }
3824       msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3825         ", firstSequenceIdInLog=" + firstSeqIdInLog +
3826         ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
3827       status.markComplete(msg);
3828       LOG.debug(msg);
3829       return currentEditSeqId;
3830     } finally {
3831       status.cleanup();
3832       if (reader != null) {
3833          reader.close();
3834       }
3835     }
3836   }
3837 
3838   /**
3839    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
3840    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
3841    * See HBASE-2331.
3842    */
3843   void completeCompactionMarker(CompactionDescriptor compaction)
3844       throws IOException {
3845     Store store = this.getStore(compaction.getFamilyName().toByteArray());
3846     if (store == null) {
3847       LOG.warn("Found Compaction WAL edit for deleted family:" +
3848           Bytes.toString(compaction.getFamilyName().toByteArray()));
3849       return;
3850     }
3851     store.completeCompactionMarker(compaction);
3852   }
3853 
3854   /**
3855    * Used by tests
3856    * @param s Store to add edit too.
3857    * @param cell Cell to add.
3858    * @return True if we should flush.
3859    */
3860   protected boolean restoreEdit(final Store s, final Cell cell) {
3861     long kvSize = s.add(cell).getFirst();
3862     if (this.rsAccounting != null) {
3863       rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
3864     }
3865     return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3866   }
3867 
3868   /*
3869    * @param fs
3870    * @param p File to check.
3871    * @return True if file was zero-length (and if so, we'll delete it in here).
3872    * @throws IOException
3873    */
3874   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3875       throws IOException {
3876     FileStatus stat = fs.getFileStatus(p);
3877     if (stat.getLen() > 0) return false;
3878     LOG.warn("File " + p + " is zero-length, deleting.");
3879     fs.delete(p, false);
3880     return true;
3881   }
3882 
3883   protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
3884     return new HStore(this, family, this.conf);
3885   }
3886 
3887   /**
3888    * Return HStore instance.
3889    * Use with caution.  Exposed for use of fixup utilities.
3890    * @param column Name of column family hosted by this region.
3891    * @return Store that goes with the family on passed <code>column</code>.
3892    * TODO: Make this lookup faster.
3893    */
3894   public Store getStore(final byte[] column) {
3895     return this.stores.get(column);
3896   }
3897 
3898   /**
3899    * Return HStore instance. Does not do any copy: as the number of store is limited, we
3900    *  iterate on the list.
3901    */
3902   private Store getStore(Cell cell) {
3903     for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
3904       if (Bytes.equals(
3905           cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3906           famStore.getKey(), 0, famStore.getKey().length)) {
3907         return famStore.getValue();
3908       }
3909     }
3910 
3911     return null;
3912   }
3913 
3914   public Map<byte[], Store> getStores() {
3915     return this.stores;
3916   }
3917 
3918   /**
3919    * Return list of storeFiles for the set of CFs.
3920    * Uses closeLock to prevent the race condition where a region closes
3921    * in between the for loop - closing the stores one by one, some stores
3922    * will return 0 files.
3923    * @return List of storeFiles.
3924    */
3925   public List<String> getStoreFileList(final byte [][] columns)
3926     throws IllegalArgumentException {
3927     List<String> storeFileNames = new ArrayList<String>();
3928     synchronized(closeLock) {
3929       for(byte[] column : columns) {
3930         Store store = this.stores.get(column);
3931         if (store == null) {
3932           throw new IllegalArgumentException("No column family : " +
3933               new String(column) + " available");
3934         }
3935         for (StoreFile storeFile: store.getStorefiles()) {
3936           storeFileNames.add(storeFile.getPath().toString());
3937         }
3938       }
3939     }
3940     return storeFileNames;
3941   }
3942 
3943   //////////////////////////////////////////////////////////////////////////////
3944   // Support code
3945   //////////////////////////////////////////////////////////////////////////////
3946 
3947   /** Make sure this is a valid row for the HRegion */
3948   void checkRow(final byte [] row, String op) throws IOException {
3949     if (!rowIsInRange(getRegionInfo(), row)) {
3950       throw new WrongRegionException("Requested row out of range for " +
3951           op + " on HRegion " + this + ", startKey='" +
3952           Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" +
3953           Bytes.toStringBinary(getEndKey()) + "', row='" +
3954           Bytes.toStringBinary(row) + "'");
3955     }
3956   }
3957 
3958   /**
3959    * Tries to acquire a lock on the given row.
3960    * @param waitForLock if true, will block until the lock is available.
3961    *        Otherwise, just tries to obtain the lock and returns
3962    *        false if unavailable.
3963    * @return the row lock if acquired,
3964    *   null if waitForLock was false and the lock was not acquired
3965    * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
3966    */
3967   public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
3968     startRegionOperation();
3969     try {
3970       return getRowLockInternal(row, waitForLock);
3971     } finally {
3972       closeRegionOperation();
3973     }
3974   }
3975 
3976   /**
3977    * A version of getRowLock(byte[], boolean) to use when a region operation has already been
3978    * started (the calling thread has already acquired the region-close-guard lock).
3979    */
3980   protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
3981     checkRow(row, "row lock");
3982     HashedBytes rowKey = new HashedBytes(row);
3983     RowLockContext rowLockContext = new RowLockContext(rowKey);
3984 
3985     // loop until we acquire the row lock (unless !waitForLock)
3986     while (true) {
3987       RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
3988       if (existingContext == null) {
3989         // Row is not already locked by any thread, use newly created context.
3990         break;
3991       } else if (existingContext.ownedByCurrentThread()) {
3992         // Row is already locked by current thread, reuse existing context instead.
3993         rowLockContext = existingContext;
3994         break;
3995       } else {
3996         if (!waitForLock) {
3997           return null;
3998         }
3999         try {
4000           // Row is already locked by some other thread, give up or wait for it
4001           if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
4002             throw new IOException("Timed out waiting for lock for row: " + rowKey);
4003           }
4004         } catch (InterruptedException ie) {
4005           LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
4006           InterruptedIOException iie = new InterruptedIOException();
4007           iie.initCause(ie);
4008           throw iie;
4009         }
4010       }
4011     }
4012 
4013     // allocate new lock for this thread
4014     return rowLockContext.newLock();
4015   }
4016 
4017   /**
4018    * Acquires a lock on the given row.
4019    * The same thread may acquire multiple locks on the same row.
4020    * @return the acquired row lock
4021    * @throws IOException if the lock could not be acquired after waiting
4022    */
4023   public RowLock getRowLock(byte[] row) throws IOException {
4024     return getRowLock(row, true);
4025   }
4026 
4027   /**
4028    * If the given list of row locks is not null, releases all locks.
4029    */
4030   public void releaseRowLocks(List<RowLock> rowLocks) {
4031     if (rowLocks != null) {
4032       for (RowLock rowLock : rowLocks) {
4033         rowLock.release();
4034       }
4035       rowLocks.clear();
4036     }
4037   }
4038 
4039   /**
4040    * Determines whether multiple column families are present
4041    * Precondition: familyPaths is not null
4042    *
4043    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
4044    */
4045   private static boolean hasMultipleColumnFamilies(
4046       List<Pair<byte[], String>> familyPaths) {
4047     boolean multipleFamilies = false;
4048     byte[] family = null;
4049     for (Pair<byte[], String> pair : familyPaths) {
4050       byte[] fam = pair.getFirst();
4051       if (family == null) {
4052         family = fam;
4053       } else if (!Bytes.equals(family, fam)) {
4054         multipleFamilies = true;
4055         break;
4056       }
4057     }
4058     return multipleFamilies;
4059   }
4060 
4061 
4062   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
4063                                 boolean assignSeqId) throws IOException {
4064     return bulkLoadHFiles(familyPaths, assignSeqId, null);
4065   }
4066 
4067   /**
4068    * Attempts to atomically load a group of hfiles.  This is critical for loading
4069    * rows with multiple column families atomically.
4070    *
4071    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
4072    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
4073    * file about to be bulk loaded
4074    * @return true if successful, false if failed recoverably
4075    * @throws IOException if failed unrecoverably.
4076    */
4077   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
4078       BulkLoadListener bulkLoadListener) throws IOException {
4079     Preconditions.checkNotNull(familyPaths);
4080     // we need writeLock for multi-family bulk load
4081     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
4082     try {
4083       this.writeRequestsCount.increment();
4084 
4085       // There possibly was a split that happened between when the split keys
4086       // were gathered and before the HRegion's write lock was taken.  We need
4087       // to validate the HFile region before attempting to bulk load all of them
4088       List<IOException> ioes = new ArrayList<IOException>();
4089       List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
4090       for (Pair<byte[], String> p : familyPaths) {
4091         byte[] familyName = p.getFirst();
4092         String path = p.getSecond();
4093 
4094         Store store = getStore(familyName);
4095         if (store == null) {
4096           IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
4097               "No such column family " + Bytes.toStringBinary(familyName));
4098           ioes.add(ioe);
4099         } else {
4100           try {
4101             store.assertBulkLoadHFileOk(new Path(path));
4102           } catch (WrongRegionException wre) {
4103             // recoverable (file doesn't fit in region)
4104             failures.add(p);
4105           } catch (IOException ioe) {
4106             // unrecoverable (hdfs problem)
4107             ioes.add(ioe);
4108           }
4109         }
4110       }
4111 
4112       // validation failed because of some sort of IO problem.
4113       if (ioes.size() != 0) {
4114         IOException e = MultipleIOException.createIOException(ioes);
4115         LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
4116         throw e;
4117       }
4118 
4119       // validation failed, bail out before doing anything permanent.
4120       if (failures.size() != 0) {
4121         StringBuilder list = new StringBuilder();
4122         for (Pair<byte[], String> p : failures) {
4123           list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
4124             .append(p.getSecond());
4125         }
4126         // problem when validating
4127         LOG.warn("There was a recoverable bulk load failure likely due to a" +
4128             " split.  These (family, HFile) pairs were not loaded: " + list);
4129         return false;
4130       }
4131 
4132       long seqId = -1;
4133       // We need to assign a sequential ID that's in between two memstores in order to preserve
4134       // the guarantee that all the edits lower than the highest sequential ID from all the
4135       // HFiles are flushed on disk. See HBASE-10958.  The sequence id returned when we flush is
4136       // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
4137       // a sequence id that we can be sure is beyond the last hfile written).
4138       if (assignSeqId) {
4139         FlushResult fs = this.flushcache(true);
4140         if (fs.isFlushSucceeded()) {
4141           seqId = fs.flushSequenceId;
4142         } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
4143           seqId = fs.flushSequenceId;
4144         } else {
4145           throw new IOException("Could not bulk load with an assigned sequential ID because the " +
4146               "flush didn't run. Reason for not flushing: " + fs.failureReason);
4147         }
4148       }
4149 
4150       for (Pair<byte[], String> p : familyPaths) {
4151         byte[] familyName = p.getFirst();
4152         String path = p.getSecond();
4153         Store store = getStore(familyName);
4154         try {
4155           String finalPath = path;
4156           if(bulkLoadListener != null) {
4157             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
4158           }
4159           store.bulkLoadHFile(finalPath, seqId);
4160           if(bulkLoadListener != null) {
4161             bulkLoadListener.doneBulkLoad(familyName, path);
4162           }
4163         } catch (IOException ioe) {
4164           // A failure here can cause an atomicity violation that we currently
4165           // cannot recover from since it is likely a failed HDFS operation.
4166 
4167           // TODO Need a better story for reverting partial failures due to HDFS.
4168           LOG.error("There was a partial failure due to IO when attempting to" +
4169               " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe);
4170           if(bulkLoadListener != null) {
4171             try {
4172               bulkLoadListener.failedBulkLoad(familyName, path);
4173             } catch (Exception ex) {
4174               LOG.error("Error while calling failedBulkLoad for family "+
4175                   Bytes.toString(familyName)+" with path "+path, ex);
4176             }
4177           }
4178           throw ioe;
4179         }
4180       }
4181       return true;
4182     } finally {
4183       closeBulkRegionOperation();
4184     }
4185   }
4186 
4187   @Override
4188   public boolean equals(Object o) {
4189     return o instanceof HRegion && Bytes.equals(this.getRegionName(),
4190                                                 ((HRegion) o).getRegionName());
4191   }
4192 
4193   @Override
4194   public int hashCode() {
4195     return Bytes.hashCode(this.getRegionName());
4196   }
4197 
4198   @Override
4199   public String toString() {
4200     return this.getRegionNameAsString();
4201   }
4202 
4203   /**
4204    * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
4205    */
4206   class RegionScannerImpl implements RegionScanner {
4207     // Package local for testability
4208     KeyValueHeap storeHeap = null;
4209     /** Heap of key-values that are not essential for the provided filters and are thus read
4210      * on demand, if on-demand column family loading is enabled.*/
4211     KeyValueHeap joinedHeap = null;
4212     /**
4213      * If the joined heap data gathering is interrupted due to scan limits, this will
4214      * contain the row for which we are populating the values.*/
4215     protected Cell joinedContinuationRow = null;
4216     // KeyValue indicating that limit is reached when scanning
4217     private final KeyValue KV_LIMIT = new KeyValue();
4218     protected final byte[] stopRow;
4219     private final FilterWrapper filter;
4220     private int batch;
4221     protected int isScan;
4222     private boolean filterClosed = false;
4223     private long readPt;
4224     private long maxResultSize;
4225     protected HRegion region;
4226 
4227     @Override
4228     public HRegionInfo getRegionInfo() {
4229       return region.getRegionInfo();
4230     }
4231 
4232     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
4233         throws IOException {
4234 
4235       this.region = region;
4236       this.maxResultSize = scan.getMaxResultSize();
4237       if (scan.hasFilter()) {
4238         this.filter = new FilterWrapper(scan.getFilter());
4239       } else {
4240         this.filter = null;
4241       }
4242 
4243       this.batch = scan.getBatch();
4244       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
4245         this.stopRow = null;
4246       } else {
4247         this.stopRow = scan.getStopRow();
4248       }
4249       // If we are doing a get, we want to be [startRow,endRow] normally
4250       // it is [startRow,endRow) and if startRow=endRow we get nothing.
4251       this.isScan = scan.isGetScan() ? -1 : 0;
4252 
4253       // synchronize on scannerReadPoints so that nobody calculates
4254       // getSmallestReadPoint, before scannerReadPoints is updated.
4255       IsolationLevel isolationLevel = scan.getIsolationLevel();
4256       synchronized(scannerReadPoints) {
4257         this.readPt = getReadpoint(isolationLevel);
4258         scannerReadPoints.put(this, this.readPt);
4259       }
4260 
4261       // Here we separate all scanners into two lists - scanner that provide data required
4262       // by the filter to operate (scanners list) and all others (joinedScanners list).
4263       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
4264       List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
4265       if (additionalScanners != null) {
4266         scanners.addAll(additionalScanners);
4267       }
4268 
4269       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
4270           scan.getFamilyMap().entrySet()) {
4271         Store store = stores.get(entry.getKey());
4272         KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
4273         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
4274           || this.filter.isFamilyEssential(entry.getKey())) {
4275           scanners.add(scanner);
4276         } else {
4277           joinedScanners.add(scanner);
4278         }
4279       }
4280       initializeKVHeap(scanners, joinedScanners, region);
4281     }
4282 
4283     protected void initializeKVHeap(List<KeyValueScanner> scanners,
4284         List<KeyValueScanner> joinedScanners, HRegion region)
4285         throws IOException {
4286       this.storeHeap = new KeyValueHeap(scanners, region.comparator);
4287       if (!joinedScanners.isEmpty()) {
4288         this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
4289       }
4290     }
4291 
4292     @Override
4293     public long getMaxResultSize() {
4294       return maxResultSize;
4295     }
4296 
4297     @Override
4298     public long getMvccReadPoint() {
4299       return this.readPt;
4300     }
4301 
4302     /**
4303      * Reset both the filter and the old filter.
4304      *
4305      * @throws IOException in case a filter raises an I/O exception.
4306      */
4307     protected void resetFilters() throws IOException {
4308       if (filter != null) {
4309         filter.reset();
4310       }
4311     }
4312 
4313     @Override
4314     public boolean next(List<Cell> outResults)
4315         throws IOException {
4316       // apply the batching limit by default
4317       return next(outResults, batch);
4318     }
4319 
4320     @Override
4321     public synchronized boolean next(List<Cell> outResults, int limit) throws IOException {
4322       if (this.filterClosed) {
4323         throw new UnknownScannerException("Scanner was closed (timed out?) " +
4324             "after we renewed it. Could be caused by a very slow scanner " +
4325             "or a lengthy garbage collection");
4326       }
4327       startRegionOperation(Operation.SCAN);
4328       readRequestsCount.increment();
4329       try {
4330         return nextRaw(outResults, limit);
4331       } finally {
4332         closeRegionOperation(Operation.SCAN);
4333       }
4334     }
4335 
4336     @Override
4337     public boolean nextRaw(List<Cell> outResults)
4338         throws IOException {
4339       return nextRaw(outResults, batch);
4340     }
4341 
4342     @Override
4343     public boolean nextRaw(List<Cell> outResults, int limit) throws IOException {
4344       if (storeHeap == null) {
4345         // scanner is closed
4346         throw new UnknownScannerException("Scanner was closed");
4347       }
4348       boolean returnResult;
4349       if (outResults.isEmpty()) {
4350         // Usually outResults is empty. This is true when next is called
4351         // to handle scan or get operation.
4352         returnResult = nextInternal(outResults, limit);
4353       } else {
4354         List<Cell> tmpList = new ArrayList<Cell>();
4355         returnResult = nextInternal(tmpList, limit);
4356         outResults.addAll(tmpList);
4357       }
4358       resetFilters();
4359       if (isFilterDoneInternal()) {
4360         returnResult = false;
4361       }
4362       return returnResult;
4363     }
4364 
4365     private void populateFromJoinedHeap(List<Cell> results, int limit)
4366         throws IOException {
4367       assert joinedContinuationRow != null;
4368       Cell kv = populateResult(results, this.joinedHeap, limit,
4369           joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
4370           joinedContinuationRow.getRowLength());
4371       if (kv != KV_LIMIT) {
4372         // We are done with this row, reset the continuation.
4373         joinedContinuationRow = null;
4374       }
4375       // As the data is obtained from two independent heaps, we need to
4376       // ensure that result list is sorted, because Result relies on that.
4377       Collections.sort(results, comparator);
4378     }
4379 
4380     /**
4381      * Fetches records with currentRow into results list, until next row or limit (if not -1).
4382      * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
4383      * @param limit Max amount of KVs to place in result list, -1 means no limit.
4384      * @param currentRow Byte array with key we are fetching.
4385      * @param offset offset for currentRow
4386      * @param length length for currentRow
4387      * @return KV_LIMIT if limit reached, next KeyValue otherwise.
4388      */
4389     private Cell populateResult(List<Cell> results, KeyValueHeap heap, int limit,
4390         byte[] currentRow, int offset, short length) throws IOException {
4391       Cell nextKv;
4392       do {
4393         heap.next(results, limit - results.size());
4394         if (limit > 0 && results.size() == limit) {
4395           return KV_LIMIT;
4396         }
4397         nextKv = heap.peek();
4398       } while (nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length));
4399 
4400       return nextKv;
4401     }
4402 
4403     /*
4404      * @return True if a filter rules the scanner is over, done.
4405      */
4406     @Override
4407     public synchronized boolean isFilterDone() throws IOException {
4408       return isFilterDoneInternal();
4409     }
4410 
4411     private boolean isFilterDoneInternal() throws IOException {
4412       return this.filter != null && this.filter.filterAllRemaining();
4413     }
4414 
4415     private boolean nextInternal(List<Cell> results, int limit)
4416     throws IOException {
4417       if (!results.isEmpty()) {
4418         throw new IllegalArgumentException("First parameter should be an empty list");
4419       }
4420       RpcCallContext rpcCall = RpcServer.getCurrentCall();
4421       // The loop here is used only when at some point during the next we determine
4422       // that due to effects of filters or otherwise, we have an empty row in the result.
4423       // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
4424       // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
4425       // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
4426       while (true) {
4427         if (rpcCall != null) {
4428           // If a user specifies a too-restrictive or too-slow scanner, the
4429           // client might time out and disconnect while the server side
4430           // is still processing the request. We should abort aggressively
4431           // in that case.
4432           long afterTime = rpcCall.disconnectSince();
4433           if (afterTime >= 0) {
4434             throw new CallerDisconnectedException(
4435                 "Aborting on region " + getRegionNameAsString() + ", call " +
4436                     this + " after " + afterTime + " ms, since " +
4437                     "caller disconnected");
4438           }
4439         }
4440 
4441         // Let's see what we have in the storeHeap.
4442         Cell current = this.storeHeap.peek();
4443 
4444         byte[] currentRow = null;
4445         int offset = 0;
4446         short length = 0;
4447         if (current != null) {
4448           currentRow = current.getRowArray();
4449           offset = current.getRowOffset();
4450           length = current.getRowLength();
4451         }
4452         boolean stopRow = isStopRow(currentRow, offset, length);
4453         // Check if we were getting data from the joinedHeap and hit the limit.
4454         // If not, then it's main path - getting results from storeHeap.
4455         if (joinedContinuationRow == null) {
4456           // First, check if we are at a stop row. If so, there are no more results.
4457           if (stopRow) {
4458             if (filter != null && filter.hasFilterRow()) {
4459               filter.filterRowCells(results);
4460             }
4461             return false;
4462           }
4463 
4464           // Check if rowkey filter wants to exclude this row. If so, loop to next.
4465           // Technically, if we hit limits before on this row, we don't need this call.
4466           if (filterRowKey(currentRow, offset, length)) {
4467             boolean moreRows = nextRow(currentRow, offset, length);
4468             if (!moreRows) return false;
4469             results.clear();
4470             continue;
4471           }
4472 
4473           Cell nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
4474               length);
4475           // Ok, we are good, let's try to get some results from the main heap.
4476           if (nextKv == KV_LIMIT) {
4477             if (this.filter != null && filter.hasFilterRow()) {
4478               throw new IncompatibleFilterException(
4479                 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
4480             }
4481             return true; // We hit the limit.
4482           }
4483 
4484           stopRow = nextKv == null ||
4485               isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
4486           // save that the row was empty before filters applied to it.
4487           final boolean isEmptyRow = results.isEmpty();
4488 
4489           // We have the part of the row necessary for filtering (all of it, usually).
4490           // First filter with the filterRow(List).
4491           FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
4492           if (filter != null && filter.hasFilterRow()) {
4493             ret = filter.filterRowCellsWithRet(results);
4494           }
4495 
4496           if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
4497             results.clear();
4498             boolean moreRows = nextRow(currentRow, offset, length);
4499             if (!moreRows) return false;
4500 
4501             // This row was totally filtered out, if this is NOT the last row,
4502             // we should continue on. Otherwise, nothing else to do.
4503             if (!stopRow) continue;
4504             return false;
4505           }
4506 
4507           // Ok, we are done with storeHeap for this row.
4508           // Now we may need to fetch additional, non-essential data into row.
4509           // These values are not needed for filter to work, so we postpone their
4510           // fetch to (possibly) reduce amount of data loads from disk.
4511           if (this.joinedHeap != null) {
4512             Cell nextJoinedKv = joinedHeap.peek();
4513             // If joinedHeap is pointing to some other row, try to seek to a correct one.
4514             boolean mayHaveData = (nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv,
4515                 currentRow, offset, length))
4516                 || (this.joinedHeap.requestSeek(
4517                     KeyValueUtil.createFirstOnRow(currentRow, offset, length), true, true)
4518                     && joinedHeap.peek() != null && CellUtil.matchingRow(joinedHeap.peek(),
4519                     currentRow, offset, length));
4520             if (mayHaveData) {
4521               joinedContinuationRow = current;
4522               populateFromJoinedHeap(results, limit);
4523             }
4524           }
4525         } else {
4526           // Populating from the joined heap was stopped by limits, populate some more.
4527           populateFromJoinedHeap(results, limit);
4528         }
4529 
4530         // We may have just called populateFromJoinedMap and hit the limits. If that is
4531         // the case, we need to call it again on the next next() invocation.
4532         if (joinedContinuationRow != null) {
4533           return true;
4534         }
4535 
4536         // Finally, we are done with both joinedHeap and storeHeap.
4537         // Double check to prevent empty rows from appearing in result. It could be
4538         // the case when SingleColumnValueExcludeFilter is used.
4539         if (results.isEmpty()) {
4540           boolean moreRows = nextRow(currentRow, offset, length);
4541           if (!moreRows) return false;
4542           if (!stopRow) continue;
4543         }
4544 
4545         // We are done. Return the result.
4546         return !stopRow;
4547       }
4548     }
4549 
4550     /**
4551      * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines
4552      * both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may
4553      * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns
4554      * true when filterRow(List<KeyValue> kvs) is overridden not the filterRow(). Therefore, the
4555      * filterRow() will be skipped.
4556      */
4557     private boolean filterRow() throws IOException {
4558       // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
4559       // filterRowCells(List<Cell> kvs) so we skip that scenario here.
4560       return filter != null && (!filter.hasFilterRow())
4561           && filter.filterRow();
4562     }
4563 
4564     private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
4565       return filter != null
4566           && filter.filterRowKey(row, offset, length);
4567     }
4568 
4569     protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
4570       assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
4571       Cell next;
4572       while ((next = this.storeHeap.peek()) != null &&
4573              CellUtil.matchingRow(next, currentRow, offset, length)) {
4574         this.storeHeap.next(MOCKED_LIST);
4575       }
4576       resetFilters();
4577       // Calling the hook in CP which allows it to do a fast forward
4578       return this.region.getCoprocessorHost() == null
4579           || this.region.getCoprocessorHost()
4580               .postScannerFilterRow(this, currentRow, offset, length);
4581     }
4582 
4583     protected boolean isStopRow(byte[] currentRow, int offset, short length) {
4584       return currentRow == null ||
4585           (stopRow != null &&
4586           comparator.compareRows(stopRow, 0, stopRow.length,
4587             currentRow, offset, length) <= isScan);
4588     }
4589 
4590     @Override
4591     public synchronized void close() {
4592       if (storeHeap != null) {
4593         storeHeap.close();
4594         storeHeap = null;
4595       }
4596       if (joinedHeap != null) {
4597         joinedHeap.close();
4598         joinedHeap = null;
4599       }
4600       // no need to synchronize here.
4601       scannerReadPoints.remove(this);
4602       this.filterClosed = true;
4603     }
4604 
4605     KeyValueHeap getStoreHeapForTesting() {
4606       return storeHeap;
4607     }
4608 
4609     @Override
4610     public synchronized boolean reseek(byte[] row) throws IOException {
4611       if (row == null) {
4612         throw new IllegalArgumentException("Row cannot be null.");
4613       }
4614       boolean result = false;
4615       startRegionOperation();
4616       try {
4617         KeyValue kv = KeyValueUtil.createFirstOnRow(row);
4618         // use request seek to make use of the lazy seek option. See HBASE-5520
4619         result = this.storeHeap.requestSeek(kv, true, true);
4620         if (this.joinedHeap != null) {
4621           result = this.joinedHeap.requestSeek(kv, true, true) || result;
4622         }
4623       } finally {
4624         closeRegionOperation();
4625       }
4626       return result;
4627     }
4628   }
4629 
4630   // Utility methods
4631   /**
4632    * A utility method to create new instances of HRegion based on the
4633    * {@link HConstants#REGION_IMPL} configuration property.
4634    * @param tableDir qualified path of directory where region should be located,
4635    * usually the table directory.
4636    * @param wal The WAL is the outbound log for any updates to the HRegion
4637    * The wal file is a logfile from the previous execution that's
4638    * custom-computed for this HRegion. The HRegionServer computes and sorts the
4639    * appropriate wal info for this HRegion. If there is a previous file
4640    * (implying that the HRegion has been written-to before), then read it from
4641    * the supplied path.
4642    * @param fs is the filesystem.
4643    * @param conf is global configuration settings.
4644    * @param regionInfo - HRegionInfo that describes the region
4645    * is new), then read them from the supplied path.
4646    * @param htd the table descriptor
4647    * @return the new instance
4648    */
4649   static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
4650       Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
4651       RegionServerServices rsServices) {
4652     try {
4653       @SuppressWarnings("unchecked")
4654       Class<? extends HRegion> regionClass =
4655           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
4656 
4657       Constructor<? extends HRegion> c =
4658           regionClass.getConstructor(Path.class, WAL.class, FileSystem.class,
4659               Configuration.class, HRegionInfo.class, HTableDescriptor.class,
4660               RegionServerServices.class);
4661 
4662       return c.newInstance(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
4663     } catch (Throwable e) {
4664       // todo: what should I throw here?
4665       throw new IllegalStateException("Could not instantiate a region instance.", e);
4666     }
4667   }
4668 
4669   /**
4670    * Convenience method creating new HRegions. Used by createTable.
4671    *
4672    * @param info Info for region to create.
4673    * @param rootDir Root directory for HBase instance
4674    * @param wal shared WAL
4675    * @param initialize - true to initialize the region
4676    * @return new HRegion
4677    * @throws IOException
4678    */
4679   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4680                                       final Configuration conf,
4681                                       final HTableDescriptor hTableDescriptor,
4682                                       final WAL wal,
4683                                       final boolean initialize)
4684       throws IOException {
4685     LOG.info("creating HRegion " + info.getTable().getNameAsString()
4686         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
4687         " Table name == " + info.getTable().getNameAsString());
4688     FileSystem fs = FileSystem.get(conf);
4689     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4690     HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
4691     HRegion region = HRegion.newHRegion(tableDir,
4692         wal, fs, conf, info, hTableDescriptor, null);
4693     if (initialize) {
4694       // If initializing, set the sequenceId. It is also required by WALPerformanceEvaluation when
4695       // verifying the WALEdits.
4696       region.setSequenceId(region.initialize(null));
4697     }
4698     return region;
4699   }
4700 
4701   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4702                                       final Configuration conf,
4703                                       final HTableDescriptor hTableDescriptor,
4704                                       final WAL wal)
4705     throws IOException {
4706     return createHRegion(info, rootDir, conf, hTableDescriptor, wal, true);
4707   }
4708 
4709 
4710   /**
4711    * Open a Region.
4712    * @param info Info for region to be opened.
4713    * @param wal WAL for region to use. This method will call
4714    * WAL#setSequenceNumber(long) passing the result of the call to
4715    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4716    * up.  HRegionStore does this every time it opens a new region.
4717    * @return new HRegion
4718    *
4719    * @throws IOException
4720    */
4721   public static HRegion openHRegion(final HRegionInfo info,
4722       final HTableDescriptor htd, final WAL wal,
4723       final Configuration conf)
4724   throws IOException {
4725     return openHRegion(info, htd, wal, conf, null, null);
4726   }
4727 
4728   /**
4729    * Open a Region.
4730    * @param info Info for region to be opened
4731    * @param htd the table descriptor
4732    * @param wal WAL for region to use. This method will call
4733    * WAL#setSequenceNumber(long) passing the result of the call to
4734    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4735    * up.  HRegionStore does this every time it opens a new region.
4736    * @param conf The Configuration object to use.
4737    * @param rsServices An interface we can request flushes against.
4738    * @param reporter An interface we can report progress against.
4739    * @return new HRegion
4740    *
4741    * @throws IOException
4742    */
4743   public static HRegion openHRegion(final HRegionInfo info,
4744     final HTableDescriptor htd, final WAL wal, final Configuration conf,
4745     final RegionServerServices rsServices,
4746     final CancelableProgressable reporter)
4747   throws IOException {
4748     return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
4749   }
4750 
4751   /**
4752    * Open a Region.
4753    * @param rootDir Root directory for HBase instance
4754    * @param info Info for region to be opened.
4755    * @param htd the table descriptor
4756    * @param wal WAL for region to use. This method will call
4757    * WAL#setSequenceNumber(long) passing the result of the call to
4758    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4759    * up.  HRegionStore does this every time it opens a new region.
4760    * @param conf The Configuration object to use.
4761    * @return new HRegion
4762    * @throws IOException
4763    */
4764   public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
4765       final HTableDescriptor htd, final WAL wal, final Configuration conf)
4766   throws IOException {
4767     return openHRegion(rootDir, info, htd, wal, conf, null, null);
4768   }
4769 
4770   /**
4771    * Open a Region.
4772    * @param rootDir Root directory for HBase instance
4773    * @param info Info for region to be opened.
4774    * @param htd the table descriptor
4775    * @param wal WAL for region to use. This method will call
4776    * WAL#setSequenceNumber(long) passing the result of the call to
4777    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4778    * up.  HRegionStore does this every time it opens a new region.
4779    * @param conf The Configuration object to use.
4780    * @param rsServices An interface we can request flushes against.
4781    * @param reporter An interface we can report progress against.
4782    * @return new HRegion
4783    * @throws IOException
4784    */
4785   public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
4786       final HTableDescriptor htd, final WAL wal, final Configuration conf,
4787       final RegionServerServices rsServices,
4788       final CancelableProgressable reporter)
4789   throws IOException {
4790     FileSystem fs = null;
4791     if (rsServices != null) {
4792       fs = rsServices.getFileSystem();
4793     }
4794     if (fs == null) {
4795       fs = FileSystem.get(conf);
4796     }
4797     return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
4798   }
4799 
4800   /**
4801    * Open a Region.
4802    * @param conf The Configuration object to use.
4803    * @param fs Filesystem to use
4804    * @param rootDir Root directory for HBase instance
4805    * @param info Info for region to be opened.
4806    * @param htd the table descriptor
4807    * @param wal WAL for region to use. This method will call
4808    * WAL#setSequenceNumber(long) passing the result of the call to
4809    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4810    * up.  HRegionStore does this every time it opens a new region.
4811    * @return new HRegion
4812    * @throws IOException
4813    */
4814   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4815       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal)
4816       throws IOException {
4817     return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
4818   }
4819 
4820   /**
4821    * Open a Region.
4822    * @param conf The Configuration object to use.
4823    * @param fs Filesystem to use
4824    * @param rootDir Root directory for HBase instance
4825    * @param info Info for region to be opened.
4826    * @param htd the table descriptor
4827    * @param wal WAL for region to use. This method will call
4828    * WAL#setSequenceNumber(long) passing the result of the call to
4829    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4830    * up.  HRegionStore does this every time it opens a new region.
4831    * @param rsServices An interface we can request flushes against.
4832    * @param reporter An interface we can report progress against.
4833    * @return new HRegion
4834    * @throws IOException
4835    */
4836   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4837       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal,
4838       final RegionServerServices rsServices, final CancelableProgressable reporter)
4839       throws IOException {
4840     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4841     return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
4842   }
4843 
4844   /**
4845    * Open a Region.
4846    * @param conf The Configuration object to use.
4847    * @param fs Filesystem to use
4848    * @param rootDir Root directory for HBase instance
4849    * @param info Info for region to be opened.
4850    * @param htd the table descriptor
4851    * @param wal WAL for region to use. This method will call
4852    * WAL#setSequenceNumber(long) passing the result of the call to
4853    * HRegion#getMinSequenceId() to ensure the wal id is properly kept
4854    * up.  HRegionStore does this every time it opens a new region.
4855    * @param rsServices An interface we can request flushes against.
4856    * @param reporter An interface we can report progress against.
4857    * @return new HRegion
4858    * @throws IOException
4859    */
4860   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4861       final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd,
4862       final WAL wal, final RegionServerServices rsServices,
4863       final CancelableProgressable reporter)
4864       throws IOException {
4865     if (info == null) throw new NullPointerException("Passed region info is null");
4866     if (LOG.isDebugEnabled()) {
4867       LOG.debug("Opening region: " + info);
4868     }
4869     HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
4870     return r.openHRegion(reporter);
4871   }
4872 
4873 
4874   /**
4875    * Useful when reopening a closed region (normally for unit tests)
4876    * @param other original object
4877    * @param reporter An interface we can report progress against.
4878    * @return new HRegion
4879    * @throws IOException
4880    */
4881   public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
4882       throws IOException {
4883     HRegionFileSystem regionFs = other.getRegionFileSystem();
4884     HRegion r = newHRegion(regionFs.getTableDir(), other.getWAL(), regionFs.getFileSystem(),
4885         other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
4886     return r.openHRegion(reporter);
4887   }
4888 
4889   /**
4890    * Open HRegion.
4891    * Calls initialize and sets sequenceId.
4892    * @return Returns <code>this</code>
4893    * @throws IOException
4894    */
4895   protected HRegion openHRegion(final CancelableProgressable reporter)
4896   throws IOException {
4897     // Refuse to open the region if we are missing local compression support
4898     checkCompressionCodecs();
4899     // Refuse to open the region if encryption configuration is incorrect or
4900     // codec support is missing
4901     checkEncryption();
4902     // Refuse to open the region if a required class cannot be loaded
4903     checkClassLoading();
4904     this.openSeqNum = initialize(reporter);
4905     this.setSequenceId(openSeqNum);
4906     if (wal != null && getRegionServerServices() != null) {
4907       writeRegionOpenMarker(wal, openSeqNum);
4908     }
4909     return this;
4910   }
4911 
4912   private void checkCompressionCodecs() throws IOException {
4913     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4914       CompressionTest.testCompression(fam.getCompression());
4915       CompressionTest.testCompression(fam.getCompactionCompression());
4916     }
4917   }
4918 
4919   private void checkEncryption() throws IOException {
4920     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4921       EncryptionTest.testEncryption(conf, fam.getEncryptionType(), fam.getEncryptionKey());
4922     }
4923   }
4924 
4925   private void checkClassLoading() throws IOException {
4926     RegionSplitPolicy.getSplitPolicyClass(this.htableDescriptor, conf);
4927     RegionCoprocessorHost.testTableCoprocessorAttrs(conf, this.htableDescriptor);
4928   }
4929 
4930   /**
4931    * Create a daughter region from given a temp directory with the region data.
4932    * @param hri Spec. for daughter region to open.
4933    * @throws IOException
4934    */
4935   HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
4936     // Move the files from the temporary .splits to the final /table/region directory
4937     fs.commitDaughterRegion(hri);
4938 
4939     // Create the daughter HRegion instance
4940     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
4941         this.getBaseConf(), hri, this.getTableDesc(), rsServices);
4942     r.readRequestsCount.set(this.getReadRequestsCount() / 2);
4943     r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
4944     return r;
4945   }
4946 
4947   /**
4948    * Create a merged region given a temp directory with the region data.
4949    * @param region_b another merging region
4950    * @return merged HRegion
4951    * @throws IOException
4952    */
4953   HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
4954       final HRegion region_b) throws IOException {
4955     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
4956         fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
4957         this.getTableDesc(), this.rsServices);
4958     r.readRequestsCount.set(this.getReadRequestsCount()
4959         + region_b.getReadRequestsCount());
4960     r.writeRequestsCount.set(this.getWriteRequestsCount()
4961 
4962         + region_b.getWriteRequestsCount());
4963     this.fs.commitMergedRegion(mergedRegionInfo);
4964     return r;
4965   }
4966 
4967   /**
4968    * Inserts a new region's meta information into the passed
4969    * <code>meta</code> region. Used by the HMaster bootstrap code adding
4970    * new table to hbase:meta table.
4971    *
4972    * @param meta hbase:meta HRegion to be updated
4973    * @param r HRegion to add to <code>meta</code>
4974    *
4975    * @throws IOException
4976    */
4977   // TODO remove since only test and merge use this
4978   public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
4979     meta.checkResources();
4980     // The row key is the region name
4981     byte[] row = r.getRegionName();
4982     final long now = EnvironmentEdgeManager.currentTime();
4983     final List<Cell> cells = new ArrayList<Cell>(2);
4984     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4985       HConstants.REGIONINFO_QUALIFIER, now,
4986       r.getRegionInfo().toByteArray()));
4987     // Set into the root table the version of the meta table.
4988     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4989       HConstants.META_VERSION_QUALIFIER, now,
4990       Bytes.toBytes(HConstants.META_VERSION)));
4991     meta.put(row, HConstants.CATALOG_FAMILY, cells);
4992   }
4993 
4994   /**
4995    * Computes the Path of the HRegion
4996    *
4997    * @param tabledir qualified path for table
4998    * @param name ENCODED region name
4999    * @return Path of HRegion directory
5000    */
5001   @Deprecated
5002   public static Path getRegionDir(final Path tabledir, final String name) {
5003     return new Path(tabledir, name);
5004   }
5005 
5006   /**
5007    * Computes the Path of the HRegion
5008    *
5009    * @param rootdir qualified path of HBase root directory
5010    * @param info HRegionInfo for the region
5011    * @return qualified path of region directory
5012    */
5013   @Deprecated
5014   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
5015     return new Path(
5016       FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
5017   }
5018 
5019   /**
5020    * Determines if the specified row is within the row range specified by the
5021    * specified HRegionInfo
5022    *
5023    * @param info HRegionInfo that specifies the row range
5024    * @param row row to be checked
5025    * @return true if the row is within the range specified by the HRegionInfo
5026    */
5027   public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
5028     return ((info.getStartKey().length == 0) ||
5029         (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
5030         ((info.getEndKey().length == 0) ||
5031             (Bytes.compareTo(info.getEndKey(), row) > 0));
5032   }
5033 
5034   /**
5035    * Merge two HRegions.  The regions must be adjacent and must not overlap.
5036    *
5037    * @return new merged HRegion
5038    * @throws IOException
5039    */
5040   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
5041   throws IOException {
5042     HRegion a = srcA;
5043     HRegion b = srcB;
5044 
5045     // Make sure that srcA comes first; important for key-ordering during
5046     // write of the merged file.
5047     if (srcA.getStartKey() == null) {
5048       if (srcB.getStartKey() == null) {
5049         throw new IOException("Cannot merge two regions with null start key");
5050       }
5051       // A's start key is null but B's isn't. Assume A comes before B
5052     } else if ((srcB.getStartKey() == null) ||
5053       (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
5054       a = srcB;
5055       b = srcA;
5056     }
5057 
5058     if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
5059       throw new IOException("Cannot merge non-adjacent regions");
5060     }
5061     return merge(a, b);
5062   }
5063 
5064   /**
5065    * Merge two regions whether they are adjacent or not.
5066    *
5067    * @param a region a
5068    * @param b region b
5069    * @return new merged region
5070    * @throws IOException
5071    */
5072   public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
5073     if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
5074       throw new IOException("Regions do not belong to the same table");
5075     }
5076 
5077     FileSystem fs = a.getRegionFileSystem().getFileSystem();
5078     // Make sure each region's cache is empty
5079     a.flushcache(true);
5080     b.flushcache(true);
5081 
5082     // Compact each region so we only have one store file per family
5083     a.compactStores(true);
5084     if (LOG.isDebugEnabled()) {
5085       LOG.debug("Files for region: " + a);
5086       a.getRegionFileSystem().logFileSystemState(LOG);
5087     }
5088     b.compactStores(true);
5089     if (LOG.isDebugEnabled()) {
5090       LOG.debug("Files for region: " + b);
5091       b.getRegionFileSystem().logFileSystemState(LOG);
5092     }
5093 
5094     RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
5095     if (!rmt.prepare(null)) {
5096       throw new IOException("Unable to merge regions " + a + " and " + b);
5097     }
5098     HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
5099     LOG.info("starting merge of regions: " + a + " and " + b
5100         + " into new region " + mergedRegionInfo.getRegionNameAsString()
5101         + " with start key <"
5102         + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
5103         + "> and end key <"
5104         + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
5105     HRegion dstRegion;
5106     try {
5107       dstRegion = rmt.execute(null, null);
5108     } catch (IOException ioe) {
5109       rmt.rollback(null, null);
5110       throw new IOException("Failed merging region " + a + " and " + b
5111           + ", and successfully rolled back");
5112     }
5113     dstRegion.compactStores(true);
5114 
5115     if (LOG.isDebugEnabled()) {
5116       LOG.debug("Files for new region");
5117       dstRegion.getRegionFileSystem().logFileSystemState(LOG);
5118     }
5119 
5120     if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
5121       throw new IOException("Merged region " + dstRegion
5122           + " still has references after the compaction, is compaction canceled?");
5123     }
5124 
5125     // Archiving the 'A' region
5126     HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
5127     // Archiving the 'B' region
5128     HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
5129 
5130     LOG.info("merge completed. New region is " + dstRegion);
5131     return dstRegion;
5132   }
5133 
5134   //
5135   // HBASE-880
5136   //
5137   /**
5138    * @param get get object
5139    * @return result
5140    * @throws IOException read exceptions
5141    */
5142   public Result get(final Get get) throws IOException {
5143     checkRow(get.getRow(), "Get");
5144     // Verify families are all valid
5145     if (get.hasFamilies()) {
5146       for (byte [] family: get.familySet()) {
5147         checkFamily(family);
5148       }
5149     } else { // Adding all families to scanner
5150       for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
5151         get.addFamily(family);
5152       }
5153     }
5154     List<Cell> results = get(get, true);
5155     boolean stale = this.getRegionInfo().getReplicaId() != 0;
5156     return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
5157   }
5158 
5159   /*
5160    * Do a get based on the get parameter.
5161    * @param withCoprocessor invoke coprocessor or not. We don't want to
5162    * always invoke cp for this private method.
5163    */
5164   public List<Cell> get(Get get, boolean withCoprocessor)
5165   throws IOException {
5166 
5167     List<Cell> results = new ArrayList<Cell>();
5168 
5169     // pre-get CP hook
5170     if (withCoprocessor && (coprocessorHost != null)) {
5171        if (coprocessorHost.preGet(get, results)) {
5172          return results;
5173        }
5174     }
5175 
5176     Scan scan = new Scan(get);
5177 
5178     RegionScanner scanner = null;
5179     try {
5180       scanner = getScanner(scan);
5181       scanner.next(results);
5182     } finally {
5183       if (scanner != null)
5184         scanner.close();
5185     }
5186 
5187     // post-get CP hook
5188     if (withCoprocessor && (coprocessorHost != null)) {
5189       coprocessorHost.postGet(get, results);
5190     }
5191 
5192     // do after lock
5193     if (this.metricsRegion != null) {
5194       long totalSize = 0L;
5195       for (Cell cell : results) {
5196         totalSize += CellUtil.estimatedSerializedSizeOf(cell);
5197       }
5198       this.metricsRegion.updateGet(totalSize);
5199     }
5200 
5201     return results;
5202   }
5203 
5204   public ClientProtos.RegionLoadStats mutateRow(RowMutations rm) throws IOException {
5205     // Don't need nonces here - RowMutations only supports puts and deletes
5206     return mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
5207   }
5208 
5209   /**
5210    * Perform atomic mutations within the region w/o nonces.
5211    * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)}
5212    */
5213   public ClientProtos.RegionLoadStats mutateRowsWithLocks(Collection<Mutation> mutations,
5214       Collection<byte[]> rowsToLock) throws IOException {
5215     return mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
5216   }
5217 
5218   /**
5219    * Perform atomic mutations within the region.
5220    * @param mutations The list of mutations to perform.
5221    * <code>mutations</code> can contain operations for multiple rows.
5222    * Caller has to ensure that all rows are contained in this region.
5223    * @param rowsToLock Rows to lock
5224    * @param nonceGroup Optional nonce group of the operation (client Id)
5225    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5226    * If multiple rows are locked care should be taken that
5227    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
5228    * @throws IOException
5229    */
5230   public ClientProtos.RegionLoadStats mutateRowsWithLocks(Collection<Mutation> mutations,
5231       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
5232     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
5233     processRowsWithLocks(proc, -1, nonceGroup, nonce);
5234     return getRegionStats();
5235   }
5236 
5237   /**
5238    * @return the current load statistics for the the region
5239    */
5240   public ClientProtos.RegionLoadStats getRegionStats() {
5241     if (!regionStatsEnabled) {
5242       return null;
5243     }
5244     ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
5245     stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this
5246         .memstoreFlushSize)));
5247     stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100);
5248     return stats.build();
5249   }
5250 
5251   /**
5252    * Performs atomic multiple reads and writes on a given row.
5253    *
5254    * @param processor The object defines the reads and writes to a row.
5255    * @param nonceGroup Optional nonce group of the operation (client Id)
5256    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5257    */
5258   public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
5259       throws IOException {
5260     processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
5261   }
5262 
5263   /**
5264    * Performs atomic multiple reads and writes on a given row.
5265    *
5266    * @param processor The object defines the reads and writes to a row.
5267    * @param timeout The timeout of the processor.process() execution
5268    *                Use a negative number to switch off the time bound
5269    * @param nonceGroup Optional nonce group of the operation (client Id)
5270    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5271    */
5272   public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
5273       long nonceGroup, long nonce) throws IOException {
5274 
5275     for (byte[] row : processor.getRowsToLock()) {
5276       checkRow(row, "processRowsWithLocks");
5277     }
5278     if (!processor.readOnly()) {
5279       checkReadOnly();
5280     }
5281     checkResources();
5282 
5283     startRegionOperation();
5284     WALEdit walEdit = new WALEdit();
5285 
5286     // 1. Run pre-process hook
5287     try {
5288       processor.preProcess(this, walEdit);
5289     } catch (IOException e) {
5290       closeRegionOperation();
5291       throw e;
5292     }
5293     // Short circuit the read only case
5294     if (processor.readOnly()) {
5295       try {
5296         long now = EnvironmentEdgeManager.currentTime();
5297         doProcessRowWithTimeout(
5298             processor, now, this, null, null, timeout);
5299         processor.postProcess(this, walEdit, true);
5300       } catch (IOException e) {
5301         throw e;
5302       } finally {
5303         closeRegionOperation();
5304       }
5305       return;
5306     }
5307 
5308     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
5309     boolean locked;
5310     boolean walSyncSuccessful = false;
5311     List<RowLock> acquiredRowLocks;
5312     long addedSize = 0;
5313     List<Mutation> mutations = new ArrayList<Mutation>();
5314     List<Cell> memstoreCells = new ArrayList<Cell>();
5315     Collection<byte[]> rowsToLock = processor.getRowsToLock();
5316     long mvccNum = 0;
5317     WALKey walKey = null;
5318     try {
5319       // 2. Acquire the row lock(s)
5320       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
5321       for (byte[] row : rowsToLock) {
5322         // Attempt to lock all involved rows, throw if any lock times out
5323         acquiredRowLocks.add(getRowLock(row));
5324       }
5325       // 3. Region lock
5326       lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
5327       locked = true;
5328       // Get a mvcc write number
5329       mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5330 
5331       long now = EnvironmentEdgeManager.currentTime();
5332       try {
5333         // 4. Let the processor scan the rows, generate mutations and add
5334         //    waledits
5335         doProcessRowWithTimeout(
5336             processor, now, this, mutations, walEdit, timeout);
5337 
5338         if (!mutations.isEmpty()) {
5339           // 5. Start mvcc transaction
5340           writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5341           // 6. Call the preBatchMutate hook
5342           processor.preBatchMutate(this, walEdit);
5343           // 7. Apply to memstore
5344           for (Mutation m : mutations) {
5345             // Handle any tag based cell features
5346             rewriteCellTags(m.getFamilyCellMap(), m);
5347 
5348             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5349               Cell cell = cellScanner.current();
5350               CellUtil.setSequenceId(cell, mvccNum);
5351               Store store = getStore(cell);
5352               if (store == null) {
5353                 checkFamily(CellUtil.cloneFamily(cell));
5354                 // unreachable
5355               }
5356               Pair<Long, Cell> ret = store.add(cell);
5357               addedSize += ret.getFirst();
5358               memstoreCells.add(ret.getSecond());
5359             }
5360           }
5361 
5362           long txid = 0;
5363           // 8. Append no sync
5364           if (!walEdit.isEmpty()) {
5365             // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
5366             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
5367               this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
5368               processor.getClusterIds(), nonceGroup, nonce);
5369             txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
5370               walKey, walEdit, getSequenceId(), true, memstoreCells);
5371           }
5372           if(walKey == null){
5373             // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
5374             // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
5375             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
5376           }
5377           // 9. Release region lock
5378           if (locked) {
5379             this.updatesLock.readLock().unlock();
5380             locked = false;
5381           }
5382 
5383           // 10. Release row lock(s)
5384           releaseRowLocks(acquiredRowLocks);
5385 
5386           // 11. Sync edit log
5387           if (txid != 0) {
5388             syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
5389           }
5390           walSyncSuccessful = true;
5391           // 12. call postBatchMutate hook
5392           processor.postBatchMutate(this);
5393         }
5394       } finally {
5395         if (!mutations.isEmpty() && !walSyncSuccessful) {
5396           LOG.warn("Wal sync failed. Roll back " + mutations.size() +
5397               " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
5398               processor.getRowsToLock().iterator().next()) + "...");
5399           for (Mutation m : mutations) {
5400             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5401               Cell cell = cellScanner.current();
5402               getStore(cell).rollback(cell);
5403             }
5404           }
5405         }
5406         // 13. Roll mvcc forward
5407         if (writeEntry != null) {
5408           mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
5409         }
5410         if (locked) {
5411           this.updatesLock.readLock().unlock();
5412         }
5413         // release locks if some were acquired but another timed out
5414         releaseRowLocks(acquiredRowLocks);
5415       }
5416 
5417       // 14. Run post-process hook
5418       processor.postProcess(this, walEdit, walSyncSuccessful);
5419 
5420     } catch (IOException e) {
5421       throw e;
5422     } finally {
5423       closeRegionOperation();
5424       if (!mutations.isEmpty() &&
5425           isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
5426         requestFlush();
5427       }
5428     }
5429   }
5430 
5431   private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
5432                                        final long now,
5433                                        final HRegion region,
5434                                        final List<Mutation> mutations,
5435                                        final WALEdit walEdit,
5436                                        final long timeout) throws IOException {
5437     // Short circuit the no time bound case.
5438     if (timeout < 0) {
5439       try {
5440         processor.process(now, region, mutations, walEdit);
5441       } catch (IOException e) {
5442         LOG.warn("RowProcessor:" + processor.getClass().getName() +
5443             " throws Exception on row(s):" +
5444             Bytes.toStringBinary(
5445               processor.getRowsToLock().iterator().next()) + "...", e);
5446         throw e;
5447       }
5448       return;
5449     }
5450 
5451     // Case with time bound
5452     FutureTask<Void> task =
5453       new FutureTask<Void>(new Callable<Void>() {
5454         @Override
5455         public Void call() throws IOException {
5456           try {
5457             processor.process(now, region, mutations, walEdit);
5458             return null;
5459           } catch (IOException e) {
5460             LOG.warn("RowProcessor:" + processor.getClass().getName() +
5461                 " throws Exception on row(s):" +
5462                 Bytes.toStringBinary(
5463                     processor.getRowsToLock().iterator().next()) + "...", e);
5464             throw e;
5465           }
5466         }
5467       });
5468     rowProcessorExecutor.execute(task);
5469     try {
5470       task.get(timeout, TimeUnit.MILLISECONDS);
5471     } catch (TimeoutException te) {
5472       LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
5473           Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
5474           "...");
5475       throw new IOException(te);
5476     } catch (Exception e) {
5477       throw new IOException(e);
5478     }
5479   }
5480 
5481   public Result append(Append append) throws IOException {
5482     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
5483   }
5484 
5485   // TODO: There's a lot of boiler plate code identical to increment.
5486   // We should refactor append and increment as local get-mutate-put
5487   // transactions, so all stores only go through one code path for puts.
5488   /**
5489    * Perform one or more append operations on a row.
5490    *
5491    * @return new keyvalues after increment
5492    * @throws IOException
5493    */
5494   public Result append(Append append, long nonceGroup, long nonce)
5495       throws IOException {
5496     byte[] row = append.getRow();
5497     checkRow(row, "append");
5498     boolean flush = false;
5499     Durability durability = getEffectiveDurability(append.getDurability());
5500     boolean writeToWAL = durability != Durability.SKIP_WAL;
5501     WALEdit walEdits = null;
5502     List<Cell> allKVs = new ArrayList<Cell>(append.size());
5503     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5504     long size = 0;
5505     long txid = 0;
5506 
5507     checkReadOnly();
5508     checkResources();
5509     // Lock row
5510     startRegionOperation(Operation.APPEND);
5511     this.writeRequestsCount.increment();
5512     long mvccNum = 0;
5513     WriteEntry w = null;
5514     WALKey walKey = null;
5515     RowLock rowLock = null;
5516     List<Cell> memstoreCells = new ArrayList<Cell>();
5517     boolean doRollBackMemstore = false;
5518     try {
5519       rowLock = getRowLock(row);
5520       try {
5521         lock(this.updatesLock.readLock());
5522         try {
5523           // wait for all prior MVCC transactions to finish - while we hold the row lock
5524           // (so that we are guaranteed to see the latest state)
5525           mvcc.waitForPreviousTransactionsComplete();
5526           if (this.coprocessorHost != null) {
5527             Result r = this.coprocessorHost.preAppendAfterRowLock(append);
5528             if(r!= null) {
5529               return r;
5530             }
5531           }
5532           // now start my own transaction
5533           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5534           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5535           long now = EnvironmentEdgeManager.currentTime();
5536           // Process each family
5537           for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
5538 
5539             Store store = stores.get(family.getKey());
5540             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5541 
5542             // Sort the cells so that they match the order that they
5543             // appear in the Get results. Otherwise, we won't be able to
5544             // find the existing values if the cells are not specified
5545             // in order by the client since cells are in an array list.
5546             Collections.sort(family.getValue(), store.getComparator());
5547             // Get previous values for all columns in this family
5548             Get get = new Get(row);
5549             for (Cell cell : family.getValue()) {
5550               get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
5551             }
5552             List<Cell> results = get(get, false);
5553             // Iterate the input columns and update existing values if they were
5554             // found, otherwise add new column initialized to the append value
5555 
5556             // Avoid as much copying as possible. We may need to rewrite and
5557             // consolidate tags. Bytes are only copied once.
5558             // Would be nice if KeyValue had scatter/gather logic
5559             int idx = 0;
5560             for (Cell cell : family.getValue()) {
5561               Cell newCell;
5562               Cell oldCell = null;
5563               if (idx < results.size()
5564                   && CellUtil.matchingQualifier(results.get(idx), cell)) {
5565                 oldCell = results.get(idx);
5566                 long ts = Math.max(now, oldCell.getTimestamp());
5567 
5568                 // Process cell tags
5569                 List<Tag> newTags = new ArrayList<Tag>();
5570 
5571                 // Make a union of the set of tags in the old and new KVs
5572 
5573                 if (oldCell.getTagsLength() > 0) {
5574                   Iterator<Tag> i = CellUtil.tagsIterator(oldCell.getTagsArray(),
5575                     oldCell.getTagsOffset(), oldCell.getTagsLength());
5576                   while (i.hasNext()) {
5577                     newTags.add(i.next());
5578                   }
5579                 }
5580                 if (cell.getTagsLength() > 0) {
5581                   Iterator<Tag> i  = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
5582                     cell.getTagsLength());
5583                   while (i.hasNext()) {
5584                     newTags.add(i.next());
5585                   }
5586                 }
5587 
5588                 // Cell TTL handling
5589 
5590                 if (append.getTTL() != Long.MAX_VALUE) {
5591                   // Add the new TTL tag
5592                   newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
5593                 }
5594 
5595                 // Rebuild tags
5596                 byte[] tagBytes = Tag.fromList(newTags);
5597 
5598                 // allocate an empty cell once
5599                 newCell = new KeyValue(row.length, cell.getFamilyLength(),
5600                     cell.getQualifierLength(), ts, KeyValue.Type.Put,
5601                     oldCell.getValueLength() + cell.getValueLength(),
5602                     tagBytes.length);
5603                 // copy in row, family, and qualifier
5604                 System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
5605                   newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
5606                 System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
5607                   newCell.getFamilyArray(), newCell.getFamilyOffset(),
5608                   cell.getFamilyLength());
5609                 System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
5610                   newCell.getQualifierArray(), newCell.getQualifierOffset(),
5611                   cell.getQualifierLength());
5612                 // copy in the value
5613                 System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
5614                   newCell.getValueArray(), newCell.getValueOffset(),
5615                   oldCell.getValueLength());
5616                 System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
5617                   newCell.getValueArray(),
5618                   newCell.getValueOffset() + oldCell.getValueLength(),
5619                   cell.getValueLength());
5620                 // Copy in tag data
5621                 System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
5622                   tagBytes.length);
5623                 idx++;
5624               } else {
5625                 // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP
5626                 CellUtil.updateLatestStamp(cell, now);
5627 
5628                 // Cell TTL handling
5629 
5630                 if (append.getTTL() != Long.MAX_VALUE) {
5631                   List<Tag> newTags = new ArrayList<Tag>(1);
5632                   newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
5633                   // Add the new TTL tag
5634                   newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
5635                       cell.getRowLength(),
5636                     cell.getFamilyArray(), cell.getFamilyOffset(),
5637                       cell.getFamilyLength(),
5638                     cell.getQualifierArray(), cell.getQualifierOffset(),
5639                       cell.getQualifierLength(),
5640                     cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
5641                     cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
5642                     newTags);
5643                 } else {
5644                   newCell = cell;
5645                 }
5646               }
5647 
5648               CellUtil.setSequenceId(newCell, mvccNum);
5649               // Give coprocessors a chance to update the new cell
5650               if (coprocessorHost != null) {
5651                 newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,
5652                     append, oldCell, newCell);
5653               }
5654               kvs.add(newCell);
5655 
5656               // Append update to WAL
5657               if (writeToWAL) {
5658                 if (walEdits == null) {
5659                   walEdits = new WALEdit();
5660                 }
5661                 walEdits.add(newCell);
5662               }
5663             }
5664 
5665             //store the kvs to the temporary memstore before writing WAL
5666             tempMemstore.put(store, kvs);
5667           }
5668 
5669           //Actually write to Memstore now
5670           for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5671             Store store = entry.getKey();
5672             if (store.getFamily().getMaxVersions() == 1) {
5673               // upsert if VERSIONS for this CF == 1
5674               size += store.upsert(entry.getValue(), getSmallestReadPoint());
5675               memstoreCells.addAll(entry.getValue());
5676             } else {
5677               // otherwise keep older versions around
5678               for (Cell cell: entry.getValue()) {
5679                 Pair<Long, Cell> ret = store.add(cell);
5680                 size += ret.getFirst();
5681                 memstoreCells.add(ret.getSecond());
5682                 doRollBackMemstore = true;
5683               }
5684             }
5685             allKVs.addAll(entry.getValue());
5686           }
5687 
5688           // Actually write to WAL now
5689           if (writeToWAL) {
5690             // Using default cluster id, as this can only happen in the originating
5691             // cluster. A slave cluster receives the final value (not the delta)
5692             // as a Put.
5693             // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
5694             walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
5695               this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
5696             txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
5697               this.sequenceId, true, memstoreCells);
5698           } else {
5699             recordMutationWithoutWal(append.getFamilyCellMap());
5700           }
5701           if (walKey == null) {
5702             // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
5703             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
5704           }
5705           size = this.addAndGetGlobalMemstoreSize(size);
5706           flush = isFlushSize(size);
5707         } finally {
5708           this.updatesLock.readLock().unlock();
5709         }
5710       } finally {
5711         rowLock.release();
5712         rowLock = null;
5713       }
5714       // sync the transaction log outside the rowlock
5715       if(txid != 0){
5716         syncOrDefer(txid, durability);
5717       }
5718       doRollBackMemstore = false;
5719     } finally {
5720       if (rowLock != null) {
5721         rowLock.release();
5722       }
5723       // if the wal sync was unsuccessful, remove keys from memstore
5724       if (doRollBackMemstore) {
5725         rollbackMemstore(memstoreCells);
5726       }
5727       if (w != null) {
5728         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5729       }
5730       closeRegionOperation(Operation.APPEND);
5731     }
5732 
5733     if (this.metricsRegion != null) {
5734       this.metricsRegion.updateAppend();
5735     }
5736 
5737     if (flush) {
5738       // Request a cache flush. Do it outside update lock.
5739       requestFlush();
5740     }
5741 
5742 
5743     return append.isReturnResults() ? Result.create(allKVs) : null;
5744   }
5745 
5746   public Result increment(Increment increment) throws IOException {
5747     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
5748   }
5749 
5750   // TODO: There's a lot of boiler plate code identical to append.
5751   // We should refactor append and increment as local get-mutate-put
5752   // transactions, so all stores only go through one code path for puts.
5753   /**
5754    * Perform one or more increment operations on a row.
5755    * @return new keyvalues after increment
5756    * @throws IOException
5757    */
5758   public Result increment(Increment increment, long nonceGroup, long nonce)
5759   throws IOException {
5760     byte [] row = increment.getRow();
5761     checkRow(row, "increment");
5762     TimeRange tr = increment.getTimeRange();
5763     boolean flush = false;
5764     Durability durability = getEffectiveDurability(increment.getDurability());
5765     boolean writeToWAL = durability != Durability.SKIP_WAL;
5766     WALEdit walEdits = null;
5767     List<Cell> allKVs = new ArrayList<Cell>(increment.size());
5768     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5769 
5770     long size = 0;
5771     long txid = 0;
5772 
5773     checkReadOnly();
5774     checkResources();
5775     // Lock row
5776     startRegionOperation(Operation.INCREMENT);
5777     this.writeRequestsCount.increment();
5778     RowLock rowLock = null;
5779     WriteEntry w = null;
5780     WALKey walKey = null;
5781     long mvccNum = 0;
5782     List<Cell> memstoreCells = new ArrayList<Cell>();
5783     boolean doRollBackMemstore = false;
5784     try {
5785       rowLock = getRowLock(row);
5786       try {
5787         lock(this.updatesLock.readLock());
5788         try {
5789           // wait for all prior MVCC transactions to finish - while we hold the row lock
5790           // (so that we are guaranteed to see the latest state)
5791           mvcc.waitForPreviousTransactionsComplete();
5792           if (this.coprocessorHost != null) {
5793             Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
5794             if (r != null) {
5795               return r;
5796             }
5797           }
5798           // now start my own transaction
5799           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5800           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5801           long now = EnvironmentEdgeManager.currentTime();
5802           // Process each family
5803           for (Map.Entry<byte [], List<Cell>> family:
5804               increment.getFamilyCellMap().entrySet()) {
5805 
5806             Store store = stores.get(family.getKey());
5807             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5808 
5809             // Sort the cells so that they match the order that they
5810             // appear in the Get results. Otherwise, we won't be able to
5811             // find the existing values if the cells are not specified
5812             // in order by the client since cells are in an array list.
5813             Collections.sort(family.getValue(), store.getComparator());
5814             // Get previous values for all columns in this family
5815             Get get = new Get(row);
5816             for (Cell cell: family.getValue()) {
5817               get.addColumn(family.getKey(),  CellUtil.cloneQualifier(cell));
5818             }
5819             get.setTimeRange(tr.getMin(), tr.getMax());
5820             List<Cell> results = get(get, false);
5821 
5822             // Iterate the input columns and update existing values if they were
5823             // found, otherwise add new column initialized to the increment amount
5824             int idx = 0;
5825             for (Cell cell: family.getValue()) {
5826               long amount = Bytes.toLong(CellUtil.cloneValue(cell));
5827               boolean noWriteBack = (amount == 0);
5828               List<Tag> newTags = new ArrayList<Tag>();
5829 
5830               // Carry forward any tags that might have been added by a coprocessor
5831               if (cell.getTagsLength() > 0) {
5832                 Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(),
5833                   cell.getTagsOffset(), cell.getTagsLength());
5834                 while (i.hasNext()) {
5835                   newTags.add(i.next());
5836                 }
5837               }
5838 
5839               Cell c = null;
5840               long ts = now;
5841               if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
5842                 c = results.get(idx);
5843                 ts = Math.max(now, c.getTimestamp());
5844                 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
5845                   amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
5846                 } else {
5847                   // throw DoNotRetryIOException instead of IllegalArgumentException
5848                   throw new org.apache.hadoop.hbase.DoNotRetryIOException(
5849                       "Attempted to increment field that isn't 64 bits wide");
5850                 }
5851                 // Carry tags forward from previous version
5852                 if (c.getTagsLength() > 0) {
5853                   Iterator<Tag> i = CellUtil.tagsIterator(c.getTagsArray(),
5854                     c.getTagsOffset(), c.getTagsLength());
5855                   while (i.hasNext()) {
5856                     newTags.add(i.next());
5857                   }
5858                 }
5859                 idx++;
5860               }
5861 
5862               // Append new incremented KeyValue to list
5863               byte[] q = CellUtil.cloneQualifier(cell);
5864               byte[] val = Bytes.toBytes(amount);
5865 
5866               // Add the TTL tag if the mutation carried one
5867               if (increment.getTTL() != Long.MAX_VALUE) {
5868                 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
5869               }
5870 
5871               Cell newKV = new KeyValue(row, 0, row.length,
5872                 family.getKey(), 0, family.getKey().length,
5873                 q, 0, q.length,
5874                 ts,
5875                 KeyValue.Type.Put,
5876                 val, 0, val.length,
5877                 newTags);
5878 
5879               CellUtil.setSequenceId(newKV, mvccNum);
5880 
5881               // Give coprocessors a chance to update the new cell
5882               if (coprocessorHost != null) {
5883                 newKV = coprocessorHost.postMutationBeforeWAL(
5884                     RegionObserver.MutationType.INCREMENT, increment, c, newKV);
5885               }
5886               allKVs.add(newKV);
5887 
5888               if (!noWriteBack) {
5889                 kvs.add(newKV);
5890 
5891                 // Prepare WAL updates
5892                 if (writeToWAL) {
5893                   if (walEdits == null) {
5894                     walEdits = new WALEdit();
5895                   }
5896                   walEdits.add(newKV);
5897                 }
5898               }
5899             }
5900 
5901             //store the kvs to the temporary memstore before writing WAL
5902             if (!kvs.isEmpty()) {
5903               tempMemstore.put(store, kvs);
5904             }
5905           }
5906 
5907           //Actually write to Memstore now
5908           if (!tempMemstore.isEmpty()) {
5909             for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5910               Store store = entry.getKey();
5911               if (store.getFamily().getMaxVersions() == 1) {
5912                 // upsert if VERSIONS for this CF == 1
5913                 size += store.upsert(entry.getValue(), getSmallestReadPoint());
5914                 memstoreCells.addAll(entry.getValue());
5915               } else {
5916                 // otherwise keep older versions around
5917                 for (Cell cell : entry.getValue()) {
5918                   Pair<Long, Cell> ret = store.add(cell);
5919                   size += ret.getFirst();
5920                   memstoreCells.add(ret.getSecond());
5921                   doRollBackMemstore = true;
5922                 }
5923               }
5924             }
5925             size = this.addAndGetGlobalMemstoreSize(size);
5926             flush = isFlushSize(size);
5927           }
5928 
5929           // Actually write to WAL now
5930           if (walEdits != null && !walEdits.isEmpty()) {
5931             if (writeToWAL) {
5932               // Using default cluster id, as this can only happen in the originating
5933               // cluster. A slave cluster receives the final value (not the delta)
5934               // as a Put.
5935               // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
5936               walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
5937                 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
5938               txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
5939                 walKey, walEdits, getSequenceId(), true, memstoreCells);
5940             } else {
5941               recordMutationWithoutWal(increment.getFamilyCellMap());
5942             }
5943           }
5944           if(walKey == null){
5945             // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
5946             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
5947           }
5948         } finally {
5949           this.updatesLock.readLock().unlock();
5950         }
5951       } finally {
5952         rowLock.release();
5953         rowLock = null;
5954       }
5955       // sync the transaction log outside the rowlock
5956       if(txid != 0){
5957         syncOrDefer(txid, durability);
5958       }
5959       doRollBackMemstore = false;
5960     } finally {
5961       if (rowLock != null) {
5962         rowLock.release();
5963       }
5964       // if the wal sync was unsuccessful, remove keys from memstore
5965       if (doRollBackMemstore) {
5966         rollbackMemstore(memstoreCells);
5967       }
5968       if (w != null) {
5969         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5970       }
5971       closeRegionOperation(Operation.INCREMENT);
5972       if (this.metricsRegion != null) {
5973         this.metricsRegion.updateIncrement();
5974       }
5975     }
5976 
5977     if (flush) {
5978       // Request a cache flush.  Do it outside update lock.
5979       requestFlush();
5980     }
5981 
5982     return Result.create(allKVs);
5983   }
5984 
5985   //
5986   // New HBASE-880 Helpers
5987   //
5988 
5989   private void checkFamily(final byte [] family)
5990   throws NoSuchColumnFamilyException {
5991     if (!this.htableDescriptor.hasFamily(family)) {
5992       throw new NoSuchColumnFamilyException("Column family " +
5993           Bytes.toString(family) + " does not exist in region " + this
5994           + " in table " + this.htableDescriptor);
5995     }
5996   }
5997 
5998   public static final long FIXED_OVERHEAD = ClassSize.align(
5999       ClassSize.OBJECT +
6000       ClassSize.ARRAY +
6001       44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
6002       (11 * Bytes.SIZEOF_LONG) +
6003       4 * Bytes.SIZEOF_BOOLEAN);
6004 
6005   // woefully out of date - currently missing:
6006   // 1 x HashMap - coprocessorServiceHandlers
6007   // 6 x Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
6008   //   checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,
6009   //   writeRequestsCount
6010   // 1 x HRegion$WriteState - writestate
6011   // 1 x RegionCoprocessorHost - coprocessorHost
6012   // 1 x RegionSplitPolicy - splitPolicy
6013   // 1 x MetricsRegion - metricsRegion
6014   // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper
6015   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
6016       ClassSize.OBJECT + // closeLock
6017       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
6018       (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
6019       (2 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, scannerReadPoints
6020       WriteState.HEAP_SIZE + // writestate
6021       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
6022       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
6023       MultiVersionConsistencyControl.FIXED_SIZE // mvcc
6024       + ClassSize.TREEMAP // maxSeqIdInStores
6025       + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
6026       ;
6027 
6028   @Override
6029   public long heapSize() {
6030     long heapSize = DEEP_OVERHEAD;
6031     for (Store store : this.stores.values()) {
6032       heapSize += store.heapSize();
6033     }
6034     // this does not take into account row locks, recent flushes, mvcc entries, and more
6035     return heapSize;
6036   }
6037 
6038   /*
6039    * This method calls System.exit.
6040    * @param message Message to print out.  May be null.
6041    */
6042   private static void printUsageAndExit(final String message) {
6043     if (message != null && message.length() > 0) System.out.println(message);
6044     System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
6045     System.out.println("Options:");
6046     System.out.println(" major_compact  Pass this option to major compact " +
6047       "passed region.");
6048     System.out.println("Default outputs scan of passed region.");
6049     System.exit(1);
6050   }
6051 
6052   /**
6053    * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
6054    * be available for handling
6055    * {@link HRegion#execService(com.google.protobuf.RpcController,
6056    *    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
6057    *
6058    * <p>
6059    * Only a single instance may be registered per region for a given {@link Service} subclass (the
6060    * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
6061    * After the first registration, subsequent calls with the same service name will fail with
6062    * a return value of {@code false}.
6063    * </p>
6064    * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
6065    * @return {@code true} if the registration was successful, {@code false}
6066    * otherwise
6067    */
6068   public boolean registerService(Service instance) {
6069     /*
6070      * No stacking of instances is allowed for a single service name
6071      */
6072     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
6073     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
6074       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
6075           " already registered, rejecting request from "+instance
6076       );
6077       return false;
6078     }
6079 
6080     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
6081     if (LOG.isDebugEnabled()) {
6082       LOG.debug("Registered coprocessor service: region="+
6083           Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
6084     }
6085     return true;
6086   }
6087 
6088   /**
6089    * Executes a single protocol buffer coprocessor endpoint {@link Service} method using
6090    * the registered protocol handlers.  {@link Service} implementations must be registered via the
6091    * {@link HRegion#registerService(com.google.protobuf.Service)}
6092    * method before they are available.
6093    *
6094    * @param controller an {@code RpcController} implementation to pass to the invoked service
6095    * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
6096    *     and parameters for the method invocation
6097    * @return a protocol buffer {@code Message} instance containing the method's result
6098    * @throws IOException if no registered service handler is found or an error
6099    *     occurs during the invocation
6100    * @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)
6101    */
6102   public Message execService(RpcController controller, CoprocessorServiceCall call)
6103       throws IOException {
6104     String serviceName = call.getServiceName();
6105     String methodName = call.getMethodName();
6106     if (!coprocessorServiceHandlers.containsKey(serviceName)) {
6107       throw new UnknownProtocolException(null,
6108           "No registered coprocessor service found for name "+serviceName+
6109           " in region "+Bytes.toStringBinary(getRegionName()));
6110     }
6111 
6112     Service service = coprocessorServiceHandlers.get(serviceName);
6113     Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
6114     Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
6115     if (methodDesc == null) {
6116       throw new UnknownProtocolException(service.getClass(),
6117           "Unknown method "+methodName+" called on service "+serviceName+
6118               " in region "+Bytes.toStringBinary(getRegionName()));
6119     }
6120 
6121     Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
6122         .mergeFrom(call.getRequest()).build();
6123 
6124     if (coprocessorHost != null) {
6125       request = coprocessorHost.preEndpointInvocation(service, methodName, request);
6126     }
6127 
6128     final Message.Builder responseBuilder =
6129         service.getResponsePrototype(methodDesc).newBuilderForType();
6130     service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
6131       @Override
6132       public void run(Message message) {
6133         if (message != null) {
6134           responseBuilder.mergeFrom(message);
6135         }
6136       }
6137     });
6138 
6139     if (coprocessorHost != null) {
6140       coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
6141     }
6142 
6143     return responseBuilder.build();
6144   }
6145 
6146   /*
6147    * Process table.
6148    * Do major compaction or list content.
6149    * @throws IOException
6150    */
6151   private static void processTable(final FileSystem fs, final Path p,
6152       final WALFactory walFactory, final Configuration c,
6153       final boolean majorCompact)
6154   throws IOException {
6155     HRegion region;
6156     FSTableDescriptors fst = new FSTableDescriptors(c);
6157     // Currently expects tables have one region only.
6158     if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
6159       final WAL wal = walFactory.getMetaWAL(
6160           HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
6161       region = HRegion.newHRegion(p, wal, fs, c,
6162         HRegionInfo.FIRST_META_REGIONINFO,
6163           fst.get(TableName.META_TABLE_NAME), null);
6164     } else {
6165       throw new IOException("Not a known catalog table: " + p.toString());
6166     }
6167     try {
6168       region.initialize(null);
6169       if (majorCompact) {
6170         region.compactStores(true);
6171       } else {
6172         // Default behavior
6173         Scan scan = new Scan();
6174         // scan.addFamily(HConstants.CATALOG_FAMILY);
6175         RegionScanner scanner = region.getScanner(scan);
6176         try {
6177           List<Cell> kvs = new ArrayList<Cell>();
6178           boolean done;
6179           do {
6180             kvs.clear();
6181             done = scanner.next(kvs);
6182             if (kvs.size() > 0) LOG.info(kvs);
6183           } while (done);
6184         } finally {
6185           scanner.close();
6186         }
6187       }
6188     } finally {
6189       region.close();
6190     }
6191   }
6192 
6193   boolean shouldForceSplit() {
6194     return this.splitRequest;
6195   }
6196 
6197   byte[] getExplicitSplitPoint() {
6198     return this.explicitSplitPoint;
6199   }
6200 
6201   void forceSplit(byte[] sp) {
6202     // This HRegion will go away after the forced split is successful
6203     // But if a forced split fails, we need to clear forced split.
6204     this.splitRequest = true;
6205     if (sp != null) {
6206       this.explicitSplitPoint = sp;
6207     }
6208   }
6209 
6210   void clearSplit() {
6211     this.splitRequest = false;
6212     this.explicitSplitPoint = null;
6213   }
6214 
6215   /**
6216    * Give the region a chance to prepare before it is split.
6217    */
6218   protected void prepareToSplit() {
6219     // nothing
6220   }
6221 
6222   /**
6223    * Return the splitpoint. null indicates the region isn't splittable
6224    * If the splitpoint isn't explicitly specified, it will go over the stores
6225    * to find the best splitpoint. Currently the criteria of best splitpoint
6226    * is based on the size of the store.
6227    */
6228   public byte[] checkSplit() {
6229     // Can't split META
6230     if (this.getRegionInfo().isMetaTable() ||
6231         TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
6232       if (shouldForceSplit()) {
6233         LOG.warn("Cannot split meta region in HBase 0.20 and above");
6234       }
6235       return null;
6236     }
6237 
6238     // Can't split region which is in recovering state
6239     if (this.isRecovering()) {
6240       LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
6241       return null;
6242     }
6243 
6244     if (!splitPolicy.shouldSplit()) {
6245       return null;
6246     }
6247 
6248     byte[] ret = splitPolicy.getSplitPoint();
6249 
6250     if (ret != null) {
6251       try {
6252         checkRow(ret, "calculated split");
6253       } catch (IOException e) {
6254         LOG.error("Ignoring invalid split", e);
6255         return null;
6256       }
6257     }
6258     return ret;
6259   }
6260 
6261   /**
6262    * @return The priority that this region should have in the compaction queue
6263    */
6264   public int getCompactPriority() {
6265     int count = Integer.MAX_VALUE;
6266     for (Store store : stores.values()) {
6267       count = Math.min(count, store.getCompactPriority());
6268     }
6269     return count;
6270   }
6271 
6272 
6273   /** @return the coprocessor host */
6274   public RegionCoprocessorHost getCoprocessorHost() {
6275     return coprocessorHost;
6276   }
6277 
6278   /** @param coprocessorHost the new coprocessor host */
6279   public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
6280     this.coprocessorHost = coprocessorHost;
6281   }
6282 
6283   /**
6284    * This method needs to be called before any public call that reads or
6285    * modifies data. It has to be called just before a try.
6286    * #closeRegionOperation needs to be called in the try's finally block
6287    * Acquires a read lock and checks if the region is closing or closed.
6288    * @throws IOException
6289    */
6290   public void startRegionOperation() throws IOException {
6291     startRegionOperation(Operation.ANY);
6292   }
6293 
6294   /**
6295    * @param op The operation is about to be taken on the region
6296    * @throws IOException
6297    */
6298   protected void startRegionOperation(Operation op) throws IOException {
6299     switch (op) {
6300     case GET:  // read operations
6301     case SCAN:
6302       checkReadsEnabled();
6303     case INCREMENT: // write operations
6304     case APPEND:
6305     case SPLIT_REGION:
6306     case MERGE_REGION:
6307     case PUT:
6308     case DELETE:
6309     case BATCH_MUTATE:
6310     case COMPACT_REGION:
6311       // when a region is in recovering state, no read, split or merge is allowed
6312       if (isRecovering() && (this.disallowWritesInRecovering ||
6313               (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
6314         throw new RegionInRecoveryException(this.getRegionNameAsString() +
6315           " is recovering; cannot take reads");
6316       }
6317       break;
6318     default:
6319       break;
6320     }
6321     if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
6322         || op == Operation.COMPACT_REGION) {
6323       // split, merge or compact region doesn't need to check the closing/closed state or lock the
6324       // region
6325       return;
6326     }
6327     if (this.closing.get()) {
6328       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
6329     }
6330     lock(lock.readLock());
6331     if (this.closed.get()) {
6332       lock.readLock().unlock();
6333       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
6334     }
6335     try {
6336       if (coprocessorHost != null) {
6337         coprocessorHost.postStartRegionOperation(op);
6338       }
6339     } catch (Exception e) {
6340       lock.readLock().unlock();
6341       throw new IOException(e);
6342     }
6343   }
6344 
6345   /**
6346    * Closes the lock. This needs to be called in the finally block corresponding
6347    * to the try block of #startRegionOperation
6348    * @throws IOException
6349    */
6350   public void closeRegionOperation() throws IOException {
6351     closeRegionOperation(Operation.ANY);
6352   }
6353 
6354   /**
6355    * Closes the lock. This needs to be called in the finally block corresponding
6356    * to the try block of {@link #startRegionOperation(Operation)}
6357    * @throws IOException
6358    */
6359   public void closeRegionOperation(Operation operation) throws IOException {
6360     lock.readLock().unlock();
6361     if (coprocessorHost != null) {
6362       coprocessorHost.postCloseRegionOperation(operation);
6363     }
6364   }
6365 
6366   /**
6367    * This method needs to be called before any public call that reads or
6368    * modifies stores in bulk. It has to be called just before a try.
6369    * #closeBulkRegionOperation needs to be called in the try's finally block
6370    * Acquires a writelock and checks if the region is closing or closed.
6371    * @throws NotServingRegionException when the region is closing or closed
6372    * @throws RegionTooBusyException if failed to get the lock in time
6373    * @throws InterruptedIOException if interrupted while waiting for a lock
6374    */
6375   private void startBulkRegionOperation(boolean writeLockNeeded)
6376       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
6377     if (this.closing.get()) {
6378       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
6379     }
6380     if (writeLockNeeded) lock(lock.writeLock());
6381     else lock(lock.readLock());
6382     if (this.closed.get()) {
6383       if (writeLockNeeded) lock.writeLock().unlock();
6384       else lock.readLock().unlock();
6385       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
6386     }
6387   }
6388 
6389   /**
6390    * Closes the lock. This needs to be called in the finally block corresponding
6391    * to the try block of #startRegionOperation
6392    */
6393   private void closeBulkRegionOperation(){
6394     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
6395     else lock.readLock().unlock();
6396   }
6397 
6398   /**
6399    * Update counters for numer of puts without wal and the size of possible data loss.
6400    * These information are exposed by the region server metrics.
6401    */
6402   private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
6403     numMutationsWithoutWAL.increment();
6404     if (numMutationsWithoutWAL.get() <= 1) {
6405       LOG.info("writing data to region " + this +
6406                " with WAL disabled. Data may be lost in the event of a crash.");
6407     }
6408 
6409     long mutationSize = 0;
6410     for (List<Cell> cells: familyMap.values()) {
6411       assert cells instanceof RandomAccess;
6412       int listSize = cells.size();
6413       for (int i=0; i < listSize; i++) {
6414         Cell cell = cells.get(i);
6415         // TODO we need include tags length also here.
6416         mutationSize += KeyValueUtil.keyLength(cell) + cell.getValueLength();
6417       }
6418     }
6419 
6420     dataInMemoryWithoutWAL.add(mutationSize);
6421   }
6422 
6423   private void lock(final Lock lock)
6424       throws RegionTooBusyException, InterruptedIOException {
6425     lock(lock, 1);
6426   }
6427 
6428   /**
6429    * Try to acquire a lock.  Throw RegionTooBusyException
6430    * if failed to get the lock in time. Throw InterruptedIOException
6431    * if interrupted while waiting for the lock.
6432    */
6433   private void lock(final Lock lock, final int multiplier)
6434       throws RegionTooBusyException, InterruptedIOException {
6435     try {
6436       final long waitTime = Math.min(maxBusyWaitDuration,
6437           busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
6438       if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
6439         throw new RegionTooBusyException(
6440             "failed to get a lock in " + waitTime + " ms. " +
6441                 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
6442                 this.getRegionInfo().getRegionNameAsString()) +
6443                 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
6444                 this.getRegionServerServices().getServerName()));
6445       }
6446     } catch (InterruptedException ie) {
6447       LOG.info("Interrupted while waiting for a lock");
6448       InterruptedIOException iie = new InterruptedIOException();
6449       iie.initCause(ie);
6450       throw iie;
6451     }
6452   }
6453 
6454   /**
6455    * Calls sync with the given transaction ID if the region's table is not
6456    * deferring it.
6457    * @param txid should sync up to which transaction
6458    * @throws IOException If anything goes wrong with DFS
6459    */
6460   private void syncOrDefer(long txid, Durability durability) throws IOException {
6461     if (this.getRegionInfo().isMetaRegion()) {
6462       this.wal.sync(txid);
6463     } else {
6464       switch(durability) {
6465       case USE_DEFAULT:
6466         // do what table defaults to
6467         if (shouldSyncWAL()) {
6468           this.wal.sync(txid);
6469         }
6470         break;
6471       case SKIP_WAL:
6472         // nothing do to
6473         break;
6474       case ASYNC_WAL:
6475         // nothing do to
6476         break;
6477       case SYNC_WAL:
6478       case FSYNC_WAL:
6479         // sync the WAL edit (SYNC and FSYNC treated the same for now)
6480         this.wal.sync(txid);
6481         break;
6482       }
6483     }
6484   }
6485 
6486   /**
6487    * Check whether we should sync the wal from the table's durability settings
6488    */
6489   private boolean shouldSyncWAL() {
6490     return durability.ordinal() >  Durability.ASYNC_WAL.ordinal();
6491   }
6492 
6493   /**
6494    * A mocked list implementaion - discards all updates.
6495    */
6496   private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
6497 
6498     @Override
6499     public void add(int index, Cell element) {
6500       // do nothing
6501     }
6502 
6503     @Override
6504     public boolean addAll(int index, Collection<? extends Cell> c) {
6505       return false; // this list is never changed as a result of an update
6506     }
6507 
6508     @Override
6509     public KeyValue get(int index) {
6510       throw new UnsupportedOperationException();
6511     }
6512 
6513     @Override
6514     public int size() {
6515       return 0;
6516     }
6517   };
6518 
6519   /**
6520    * Facility for dumping and compacting catalog tables.
6521    * Only does catalog tables since these are only tables we for sure know
6522    * schema on.  For usage run:
6523    * <pre>
6524    *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
6525    * </pre>
6526    * @throws IOException
6527    */
6528   public static void main(String[] args) throws IOException {
6529     if (args.length < 1) {
6530       printUsageAndExit(null);
6531     }
6532     boolean majorCompact = false;
6533     if (args.length > 1) {
6534       if (!args[1].toLowerCase().startsWith("major")) {
6535         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
6536       }
6537       majorCompact = true;
6538     }
6539     final Path tableDir = new Path(args[0]);
6540     final Configuration c = HBaseConfiguration.create();
6541     final FileSystem fs = FileSystem.get(c);
6542     final Path logdir = new Path(c.get("hbase.tmp.dir"));
6543     final String logname = "wal" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
6544 
6545     final Configuration walConf = new Configuration(c);
6546     FSUtils.setRootDir(walConf, logdir);
6547     final WALFactory wals = new WALFactory(walConf, null, logname);
6548     try {
6549       processTable(fs, tableDir, wals, c, majorCompact);
6550     } finally {
6551        wals.close();
6552        // TODO: is this still right?
6553        BlockCache bc = new CacheConfig(c).getBlockCache();
6554        if (bc != null) bc.shutdown();
6555     }
6556   }
6557 
6558   /**
6559    * Gets the latest sequence number that was read from storage when this region was opened.
6560    */
6561   public long getOpenSeqNum() {
6562     return this.openSeqNum;
6563   }
6564 
6565   /**
6566    * Gets max sequence ids of stores that was read from storage when this region was opened. WAL
6567    * Edits with smaller or equal sequence number will be skipped from replay.
6568    */
6569   public Map<byte[], Long> getMaxStoreSeqIdForLogReplay() {
6570     return this.maxSeqIdInStores;
6571   }
6572 
6573   @VisibleForTesting
6574   public long getOldestSeqIdOfStore(byte[] familyName) {
6575     return wal.getEarliestMemstoreSeqNum(getRegionInfo()
6576         .getEncodedNameAsBytes(), familyName);
6577   }
6578 
6579   /**
6580    * @return if a given region is in compaction now.
6581    */
6582   public CompactionState getCompactionState() {
6583     boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
6584     return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
6585         : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
6586   }
6587 
6588   public void reportCompactionRequestStart(boolean isMajor){
6589     (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
6590   }
6591 
6592   public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
6593     int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
6594 
6595     // metrics
6596     compactionsFinished.incrementAndGet();
6597     compactionNumFilesCompacted.addAndGet(numFiles);
6598     compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
6599 
6600     assert newValue >= 0;
6601   }
6602 
6603   /**
6604    * Do not change this sequence id. See {@link #sequenceId} comment.
6605    * @return sequenceId
6606    */
6607   @VisibleForTesting
6608   public AtomicLong getSequenceId() {
6609     return this.sequenceId;
6610   }
6611 
6612   /**
6613    * sets this region's sequenceId.
6614    * @param value new value
6615    */
6616   private void setSequenceId(long value) {
6617     this.sequenceId.set(value);
6618   }
6619 
6620   /**
6621    * Listener class to enable callers of
6622    * bulkLoadHFile() to perform any necessary
6623    * pre/post processing of a given bulkload call
6624    */
6625   public interface BulkLoadListener {
6626 
6627     /**
6628      * Called before an HFile is actually loaded
6629      * @param family family being loaded to
6630      * @param srcPath path of HFile
6631      * @return final path to be used for actual loading
6632      * @throws IOException
6633      */
6634     String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
6635 
6636     /**
6637      * Called after a successful HFile load
6638      * @param family family being loaded to
6639      * @param srcPath path of HFile
6640      * @throws IOException
6641      */
6642     void doneBulkLoad(byte[] family, String srcPath) throws IOException;
6643 
6644     /**
6645      * Called after a failed HFile load
6646      * @param family family being loaded to
6647      * @param srcPath path of HFile
6648      * @throws IOException
6649      */
6650     void failedBulkLoad(byte[] family, String srcPath) throws IOException;
6651   }
6652 
6653   @VisibleForTesting class RowLockContext {
6654     private final HashedBytes row;
6655     private final CountDownLatch latch = new CountDownLatch(1);
6656     private final Thread thread;
6657     private int lockCount = 0;
6658 
6659     RowLockContext(HashedBytes row) {
6660       this.row = row;
6661       this.thread = Thread.currentThread();
6662     }
6663 
6664     boolean ownedByCurrentThread() {
6665       return thread == Thread.currentThread();
6666     }
6667 
6668     RowLock newLock() {
6669       lockCount++;
6670       return new RowLock(this);
6671     }
6672 
6673     @Override
6674     public String toString() {
6675       Thread t = this.thread;
6676       return "Thread=" + (t == null? "null": t.getName()) + ", row=" + this.row +
6677         ", lockCount=" + this.lockCount;
6678     }
6679 
6680     void releaseLock() {
6681       if (!ownedByCurrentThread()) {
6682         throw new IllegalArgumentException("Lock held by thread: " + thread
6683           + " cannot be released by different thread: " + Thread.currentThread());
6684       }
6685       lockCount--;
6686       if (lockCount == 0) {
6687         // no remaining locks by the thread, unlock and allow other threads to access
6688         RowLockContext existingContext = lockedRows.remove(row);
6689         if (existingContext != this) {
6690           throw new RuntimeException(
6691               "Internal row lock state inconsistent, should not happen, row: " + row);
6692         }
6693         latch.countDown();
6694       }
6695     }
6696   }
6697 
6698   /**
6699    * Row lock held by a given thread.
6700    * One thread may acquire multiple locks on the same row simultaneously.
6701    * The locks must be released by calling release() from the same thread.
6702    */
6703   public static class RowLock {
6704     @VisibleForTesting final RowLockContext context;
6705     private boolean released = false;
6706 
6707     @VisibleForTesting RowLock(RowLockContext context) {
6708       this.context = context;
6709     }
6710 
6711     /**
6712      * Release the given lock.  If there are no remaining locks held by the current thread
6713      * then unlock the row and allow other threads to acquire the lock.
6714      * @throws IllegalArgumentException if called by a different thread than the lock owning thread
6715      */
6716     public void release() {
6717       if (!released) {
6718         context.releaseLock();
6719         released = true;
6720       }
6721     }
6722   }
6723 
6724   /**
6725    * Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore
6726    * the WALEdit append later.
6727    * @param wal
6728    * @param cells list of Cells inserted into memstore. Those Cells are passed in order to
6729    *        be updated with right mvcc values(their wal sequence number)
6730    * @return Return the key used appending with no sync and no append.
6731    * @throws IOException
6732    */
6733   private WALKey appendEmptyEdit(final WAL wal, List<Cell> cells) throws IOException {
6734     // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
6735     WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
6736       WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
6737     // Call append but with an empty WALEdit.  The returned seqeunce id will not be associated
6738     // with any edit and we can be sure it went in after all outstanding appends.
6739     wal.append(getTableDesc(), getRegionInfo(), key,
6740       WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
6741     return key;
6742   }
6743 
6744   /**
6745    * Explictly sync wal
6746    * @throws IOException
6747    */
6748   public void syncWal() throws IOException {
6749     if(this.wal != null) {
6750       this.wal.sync();
6751     }
6752   }
6753 
6754   /**
6755    * {@inheritDoc}
6756    */
6757   @Override
6758   public void onConfigurationChange(Configuration conf) {
6759     // Do nothing for now.
6760   }
6761 
6762   /**
6763    * {@inheritDoc}
6764    */
6765   @Override
6766   public void registerChildren(ConfigurationManager manager) {
6767     configurationManager = Optional.of(manager);
6768     for (Store s : this.stores.values()) {
6769       configurationManager.get().registerObserver(s);
6770     }
6771   }
6772 
6773   /**
6774    * {@inheritDoc}
6775    */
6776   @Override
6777   public void deregisterChildren(ConfigurationManager manager) {
6778     for (Store s : this.stores.values()) {
6779       configurationManager.get().deregisterObserver(s);
6780     }
6781   }
6782 }