View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.io.UnsupportedEncodingException;
25  import java.lang.reflect.Constructor;
26  import java.text.ParseException;
27  import java.util.AbstractList;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.NavigableMap;
37  import java.util.NavigableSet;
38  import java.util.RandomAccess;
39  import java.util.Set;
40  import java.util.TreeMap;
41  import java.util.concurrent.Callable;
42  import java.util.concurrent.CompletionService;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.concurrent.ConcurrentSkipListMap;
45  import java.util.concurrent.CountDownLatch;
46  import java.util.concurrent.ExecutionException;
47  import java.util.concurrent.ExecutorCompletionService;
48  import java.util.concurrent.ExecutorService;
49  import java.util.concurrent.Executors;
50  import java.util.concurrent.Future;
51  import java.util.concurrent.FutureTask;
52  import java.util.concurrent.ThreadFactory;
53  import java.util.concurrent.ThreadPoolExecutor;
54  import java.util.concurrent.TimeUnit;
55  import java.util.concurrent.TimeoutException;
56  import java.util.concurrent.atomic.AtomicBoolean;
57  import java.util.concurrent.atomic.AtomicInteger;
58  import java.util.concurrent.atomic.AtomicLong;
59  import java.util.concurrent.locks.Lock;
60  import java.util.concurrent.locks.ReentrantReadWriteLock;
61  
62  import org.apache.commons.logging.Log;
63  import org.apache.commons.logging.LogFactory;
64  import org.apache.hadoop.hbase.classification.InterfaceAudience;
65  import org.apache.hadoop.conf.Configuration;
66  import org.apache.hadoop.fs.FileStatus;
67  import org.apache.hadoop.fs.FileSystem;
68  import org.apache.hadoop.fs.Path;
69  import org.apache.hadoop.hbase.Cell;
70  import org.apache.hadoop.hbase.CellScanner;
71  import org.apache.hadoop.hbase.CellUtil;
72  import org.apache.hadoop.hbase.CompoundConfiguration;
73  import org.apache.hadoop.hbase.DoNotRetryIOException;
74  import org.apache.hadoop.hbase.DroppedSnapshotException;
75  import org.apache.hadoop.hbase.HBaseConfiguration;
76  import org.apache.hadoop.hbase.HColumnDescriptor;
77  import org.apache.hadoop.hbase.HConstants;
78  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
79  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
80  import org.apache.hadoop.hbase.HRegionInfo;
81  import org.apache.hadoop.hbase.HTableDescriptor;
82  import org.apache.hadoop.hbase.KeyValue;
83  import org.apache.hadoop.hbase.KeyValueUtil;
84  import org.apache.hadoop.hbase.NotServingRegionException;
85  import org.apache.hadoop.hbase.RegionTooBusyException;
86  import org.apache.hadoop.hbase.TableName;
87  import org.apache.hadoop.hbase.UnknownScannerException;
88  import org.apache.hadoop.hbase.backup.HFileArchiver;
89  import org.apache.hadoop.hbase.client.Append;
90  import org.apache.hadoop.hbase.client.Delete;
91  import org.apache.hadoop.hbase.client.Durability;
92  import org.apache.hadoop.hbase.client.Get;
93  import org.apache.hadoop.hbase.client.Increment;
94  import org.apache.hadoop.hbase.client.IsolationLevel;
95  import org.apache.hadoop.hbase.client.Mutation;
96  import org.apache.hadoop.hbase.client.Put;
97  import org.apache.hadoop.hbase.client.Result;
98  import org.apache.hadoop.hbase.client.RowMutations;
99  import org.apache.hadoop.hbase.client.Scan;
100 import org.apache.hadoop.hbase.conf.ConfigurationManager;
101 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
102 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
103 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
104 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
105 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
106 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
107 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
108 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
109 import org.apache.hadoop.hbase.filter.FilterWrapper;
110 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
111 import org.apache.hadoop.hbase.io.HeapSize;
112 import org.apache.hadoop.hbase.io.TimeRange;
113 import org.apache.hadoop.hbase.io.hfile.BlockCache;
114 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
115 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
116 import org.apache.hadoop.hbase.ipc.RpcCallContext;
117 import org.apache.hadoop.hbase.ipc.RpcServer;
118 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
119 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
120 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
121 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
122 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
123 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
124 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
125 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
126 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
127 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
128 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
129 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
130 import org.apache.hadoop.hbase.regionserver.wal.HLog;
131 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
132 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
133 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
134 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.MutationReplay;
135 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
136 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
137 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
138 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
139 import org.apache.hadoop.hbase.util.Bytes;
140 import org.apache.hadoop.hbase.util.CancelableProgressable;
141 import org.apache.hadoop.hbase.util.ClassSize;
142 import org.apache.hadoop.hbase.util.CompressionTest;
143 import org.apache.hadoop.hbase.util.Counter;
144 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
145 import org.apache.hadoop.hbase.util.FSTableDescriptors;
146 import org.apache.hadoop.hbase.util.FSUtils;
147 import org.apache.hadoop.hbase.util.HashedBytes;
148 import org.apache.hadoop.hbase.util.Pair;
149 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
150 import org.apache.hadoop.hbase.util.Threads;
151 import org.apache.hadoop.io.MultipleIOException;
152 import org.apache.hadoop.util.StringUtils;
153 
154 import com.google.common.annotations.VisibleForTesting;
155 import com.google.common.base.Optional;
156 import com.google.common.base.Preconditions;
157 import com.google.common.collect.Lists;
158 import com.google.common.collect.Maps;
159 import com.google.common.io.Closeables;
160 import com.google.protobuf.Descriptors;
161 import com.google.protobuf.Message;
162 import com.google.protobuf.RpcCallback;
163 import com.google.protobuf.RpcController;
164 import com.google.protobuf.Service;
165 
166 /**
167  * HRegion stores data for a certain region of a table.  It stores all columns
168  * for each row. A given table consists of one or more HRegions.
169  *
170  * <p>We maintain multiple HStores for a single HRegion.
171  *
172  * <p>An Store is a set of rows with some column data; together,
173  * they make up all the data for the rows.
174  *
175  * <p>Each HRegion has a 'startKey' and 'endKey'.
176  * <p>The first is inclusive, the second is exclusive (except for
177  * the final region)  The endKey of region 0 is the same as
178  * startKey for region 1 (if it exists).  The startKey for the
179  * first region is null. The endKey for the final region is null.
180  *
181  * <p>Locking at the HRegion level serves only one purpose: preventing the
182  * region from being closed (and consequently split) while other operations
183  * are ongoing. Each row level operation obtains both a row lock and a region
184  * read lock for the duration of the operation. While a scanner is being
185  * constructed, getScanner holds a read lock. If the scanner is successfully
186  * constructed, it holds a read lock until it is closed. A close takes out a
187  * write lock and consequently will block for ongoing operations and will block
188  * new operations from starting while the close is in progress.
189  *
190  * <p>An HRegion is defined by its table and its key extent.
191  *
192  * <p>It consists of at least one Store.  The number of Stores should be
193  * configurable, so that data which is accessed together is stored in the same
194  * Store.  Right now, we approximate that by building a single Store for
195  * each column family.  (This config info will be communicated via the
196  * tabledesc.)
197  *
198  * <p>The HTableDescriptor contains metainfo about the HRegion's table.
199  * regionName is a unique identifier for this HRegion. (startKey, endKey]
200  * defines the keyspace for this HRegion.
201  */
202 @InterfaceAudience.Private
203 public class HRegion implements HeapSize, PropagatingConfigurationObserver { // , Writable{
204   public static final Log LOG = LogFactory.getLog(HRegion.class);
205 
206   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
207       "hbase.hregion.scan.loadColumnFamiliesOnDemand";
208 
209   /**
210    * This is the global default value for durability. All tables/mutations not
211    * defining a durability or using USE_DEFAULT will default to this value.
212    */
213   private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL;
214 
215   final AtomicBoolean closed = new AtomicBoolean(false);
216   /* Closing can take some time; use the closing flag if there is stuff we don't
217    * want to do while in closing state; e.g. like offer this region up to the
218    * master as a region to close if the carrying regionserver is overloaded.
219    * Once set, it is never cleared.
220    */
221   final AtomicBoolean closing = new AtomicBoolean(false);
222 
223   /**
224    * The sequence id of the last flush on this region.  Used doing some rough calculations on
225    * whether time to flush or not.
226    */
227   protected volatile long lastFlushSeqId = -1L;
228 
229   /**
230    * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL/HLog
231    * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
232    * Its default value is -1L. This default is used as a marker to indicate
233    * that the region hasn't opened yet. Once it is opened, it is set to the derived
234    * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
235    *
236    * <p>Control of this sequence is handed off to the WAL/HLog implementation.  It is responsible
237    * for tagging edits with the correct sequence id since it is responsible for getting the
238    * edits into the WAL files. It controls updating the sequence id value.  DO NOT UPDATE IT
239    * OUTSIDE OF THE WAL.  The value you get will not be what you think it is.
240    */
241   private final AtomicLong sequenceId = new AtomicLong(-1L);
242 
243   /**
244    * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
245    * startRegionOperation to possibly invoke different checks before any region operations. Not all
246    * operations have to be defined here. It's only needed when a special check is need in
247    * startRegionOperation
248    */
249   public enum Operation {
250     ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
251     REPLAY_BATCH_MUTATE, COMPACT_REGION
252   }
253 
254   //////////////////////////////////////////////////////////////////////////////
255   // Members
256   //////////////////////////////////////////////////////////////////////////////
257 
258   // map from a locked row to the context for that lock including:
259   // - CountDownLatch for threads waiting on that row
260   // - the thread that owns the lock (allow reentrancy)
261   // - reference count of (reentrant) locks held by the thread
262   // - the row itself
263   private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
264       new ConcurrentHashMap<HashedBytes, RowLockContext>();
265 
266   protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
267       Bytes.BYTES_RAWCOMPARATOR);
268 
269   // TODO: account for each registered handler in HeapSize computation
270   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
271 
272   public final AtomicLong memstoreSize = new AtomicLong(0);
273 
274   // Debug possible data loss due to WAL off
275   final Counter numMutationsWithoutWAL = new Counter();
276   final Counter dataInMemoryWithoutWAL = new Counter();
277 
278   // Debug why CAS operations are taking a while.
279   final Counter checkAndMutateChecksPassed = new Counter();
280   final Counter checkAndMutateChecksFailed = new Counter();
281 
282   //Number of requests
283   final Counter readRequestsCount = new Counter();
284   final Counter writeRequestsCount = new Counter();
285 
286   // Compaction counters
287   final AtomicLong compactionsFinished = new AtomicLong(0L);
288   final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
289   final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
290 
291 
292   private final HLog log;
293   private final HRegionFileSystem fs;
294   protected final Configuration conf;
295   private final Configuration baseConf;
296   private final KeyValue.KVComparator comparator;
297   private final int rowLockWaitDuration;
298   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
299 
300   // The internal wait duration to acquire a lock before read/update
301   // from the region. It is not per row. The purpose of this wait time
302   // is to avoid waiting a long time while the region is busy, so that
303   // we can release the IPC handler soon enough to improve the
304   // availability of the region server. It can be adjusted by
305   // tuning configuration "hbase.busy.wait.duration".
306   final long busyWaitDuration;
307   static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
308 
309   // If updating multiple rows in one call, wait longer,
310   // i.e. waiting for busyWaitDuration * # of rows. However,
311   // we can limit the max multiplier.
312   final int maxBusyWaitMultiplier;
313 
314   // Max busy wait duration. There is no point to wait longer than the RPC
315   // purge timeout, when a RPC call will be terminated by the RPC engine.
316   final long maxBusyWaitDuration;
317 
318   // negative number indicates infinite timeout
319   static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
320   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
321 
322   private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
323 
324   /**
325    * The sequence ID that was encountered when this region was opened.
326    */
327   private long openSeqNum = HConstants.NO_SEQNUM;
328 
329   /**
330    * The default setting for whether to enable on-demand CF loading for
331    * scan requests to this region. Requests can override it.
332    */
333   private boolean isLoadingCfsOnDemandDefault = false;
334 
335   private final AtomicInteger majorInProgress = new AtomicInteger(0);
336   private final AtomicInteger minorInProgress = new AtomicInteger(0);
337 
338   //
339   // Context: During replay we want to ensure that we do not lose any data. So, we
340   // have to be conservative in how we replay logs. For each store, we calculate
341   // the maxSeqId up to which the store was flushed. And, skip the edits which
342   // are equal to or lower than maxSeqId for each store.
343   // The following map is populated when opening the region
344   Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
345 
346   /**
347    * Config setting for whether to allow writes when a region is in recovering or not.
348    */
349   private boolean disallowWritesInRecovering = false;
350 
351   // when a region is in recovering state, it can only accept writes not reads
352   private volatile boolean isRecovering = false;
353 
354   private volatile Optional<ConfigurationManager> configurationManager;
355 
356   /**
357    * @return The smallest mvcc readPoint across all the scanners in this
358    * region. Writes older than this readPoint, are included  in every
359    * read operation.
360    */
361   public long getSmallestReadPoint() {
362     long minimumReadPoint;
363     // We need to ensure that while we are calculating the smallestReadPoint
364     // no new RegionScanners can grab a readPoint that we are unaware of.
365     // We achieve this by synchronizing on the scannerReadPoints object.
366     synchronized(scannerReadPoints) {
367       minimumReadPoint = mvcc.memstoreReadPoint();
368 
369       for (Long readPoint: this.scannerReadPoints.values()) {
370         if (readPoint < minimumReadPoint) {
371           minimumReadPoint = readPoint;
372         }
373       }
374     }
375     return minimumReadPoint;
376   }
377   /*
378    * Data structure of write state flags used coordinating flushes,
379    * compactions and closes.
380    */
381   static class WriteState {
382     // Set while a memstore flush is happening.
383     volatile boolean flushing = false;
384     // Set when a flush has been requested.
385     volatile boolean flushRequested = false;
386     // Number of compactions running.
387     volatile int compacting = 0;
388     // Gets set in close. If set, cannot compact or flush again.
389     volatile boolean writesEnabled = true;
390     // Set if region is read-only
391     volatile boolean readOnly = false;
392     // whether the reads are enabled. This is different than readOnly, because readOnly is
393     // static in the lifetime of the region, while readsEnabled is dynamic
394     volatile boolean readsEnabled = true;
395 
396     /**
397      * Set flags that make this region read-only.
398      *
399      * @param onOff flip value for region r/o setting
400      */
401     synchronized void setReadOnly(final boolean onOff) {
402       this.writesEnabled = !onOff;
403       this.readOnly = onOff;
404     }
405 
406     boolean isReadOnly() {
407       return this.readOnly;
408     }
409 
410     boolean isFlushRequested() {
411       return this.flushRequested;
412     }
413 
414     void setReadsEnabled(boolean readsEnabled) {
415       this.readsEnabled = readsEnabled;
416     }
417 
418     static final long HEAP_SIZE = ClassSize.align(
419         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
420   }
421 
422   /**
423    * Objects from this class are created when flushing to describe all the different states that
424    * that method ends up in. The Result enum describes those states. The sequence id should only
425    * be specified if the flush was successful, and the failure message should only be specified
426    * if it didn't flush.
427    */
428   public static class FlushResult {
429     enum Result {
430       FLUSHED_NO_COMPACTION_NEEDED,
431       FLUSHED_COMPACTION_NEEDED,
432       // Special case where a flush didn't run because there's nothing in the memstores. Used when
433       // bulk loading to know when we can still load even if a flush didn't happen.
434       CANNOT_FLUSH_MEMSTORE_EMPTY,
435       CANNOT_FLUSH
436       // Be careful adding more to this enum, look at the below methods to make sure
437     }
438 
439     final Result result;
440     final String failureReason;
441     final long flushSequenceId;
442 
443     /**
444      * Convenience constructor to use when the flush is successful, the failure message is set to
445      * null.
446      * @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED.
447      * @param flushSequenceId Generated sequence id that comes right after the edits in the
448      *                        memstores.
449      */
450     FlushResult(Result result, long flushSequenceId) {
451       this(result, flushSequenceId, null);
452       assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
453           .FLUSHED_COMPACTION_NEEDED;
454     }
455 
456     /**
457      * Convenience constructor to use when we cannot flush.
458      * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
459      * @param failureReason Reason why we couldn't flush.
460      */
461     FlushResult(Result result, String failureReason) {
462       this(result, -1, failureReason);
463       assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
464     }
465 
466     /**
467      * Constructor with all the parameters.
468      * @param result Any of the Result.
469      * @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
470      * @param failureReason Reason why we couldn't flush, or null.
471      */
472     FlushResult(Result result, long flushSequenceId, String failureReason) {
473       this.result = result;
474       this.flushSequenceId = flushSequenceId;
475       this.failureReason = failureReason;
476     }
477 
478     /**
479      * Convenience method, the equivalent of checking if result is
480      * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
481      * @return true if the memstores were flushed, else false.
482      */
483     public boolean isFlushSucceeded() {
484       return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
485           .FLUSHED_COMPACTION_NEEDED;
486     }
487 
488     /**
489      * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
490      * @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
491      */
492     public boolean isCompactionNeeded() {
493       return result == Result.FLUSHED_COMPACTION_NEEDED;
494     }
495   }
496 
497   final WriteState writestate = new WriteState();
498 
499   long memstoreFlushSize;
500   final long timestampSlop;
501   final long rowProcessorTimeout;
502   private volatile long lastFlushTime;
503   final RegionServerServices rsServices;
504   private RegionServerAccounting rsAccounting;
505   private long flushCheckInterval;
506   // flushPerChanges is to prevent too many changes in memstore
507   private long flushPerChanges;
508   private long blockingMemStoreSize;
509   final long threadWakeFrequency;
510   // Used to guard closes
511   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
512 
513   // Stop updates lock
514   private final ReentrantReadWriteLock updatesLock = new ReentrantReadWriteLock();
515   private boolean splitRequest;
516   private byte[] explicitSplitPoint = null;
517 
518   private final MultiVersionConsistencyControl mvcc =
519       new MultiVersionConsistencyControl();
520 
521   // Coprocessor host
522   private RegionCoprocessorHost coprocessorHost;
523 
524   private HTableDescriptor htableDescriptor = null;
525   private RegionSplitPolicy splitPolicy;
526 
527   private final MetricsRegion metricsRegion;
528   private final MetricsRegionWrapperImpl metricsRegionWrapper;
529   private final Durability durability;
530 
531   /**
532    * HRegion constructor. This constructor should only be used for testing and
533    * extensions.  Instances of HRegion should be instantiated with the
534    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
535    *
536    * @param tableDir qualified path of directory where region should be located,
537    * usually the table directory.
538    * @param log The HLog is the outbound log for any updates to the HRegion
539    * (There's a single HLog for all the HRegions on a single HRegionServer.)
540    * The log file is a logfile from the previous execution that's
541    * custom-computed for this HRegion. The HRegionServer computes and sorts the
542    * appropriate log info for this HRegion. If there is a previous log file
543    * (implying that the HRegion has been written-to before), then read it from
544    * the supplied path.
545    * @param fs is the filesystem.
546    * @param confParam is global configuration settings.
547    * @param regionInfo - HRegionInfo that describes the region
548    * is new), then read them from the supplied path.
549    * @param htd the table descriptor
550    * @param rsServices reference to {@link RegionServerServices} or null
551    */
552   @Deprecated
553   public HRegion(final Path tableDir, final HLog log, final FileSystem fs,
554       final Configuration confParam, final HRegionInfo regionInfo,
555       final HTableDescriptor htd, final RegionServerServices rsServices) {
556     this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
557       log, confParam, htd, rsServices);
558   }
559 
560   /**
561    * HRegion constructor. This constructor should only be used for testing and
562    * extensions.  Instances of HRegion should be instantiated with the
563    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
564    *
565    * @param fs is the filesystem.
566    * @param log The HLog is the outbound log for any updates to the HRegion
567    * (There's a single HLog for all the HRegions on a single HRegionServer.)
568    * The log file is a logfile from the previous execution that's
569    * custom-computed for this HRegion. The HRegionServer computes and sorts the
570    * appropriate log info for this HRegion. If there is a previous log file
571    * (implying that the HRegion has been written-to before), then read it from
572    * the supplied path.
573    * @param confParam is global configuration settings.
574    * @param htd the table descriptor
575    * @param rsServices reference to {@link RegionServerServices} or null
576    */
577   public HRegion(final HRegionFileSystem fs, final HLog log, final Configuration confParam,
578       final HTableDescriptor htd, final RegionServerServices rsServices) {
579     if (htd == null) {
580       throw new IllegalArgumentException("Need table descriptor");
581     }
582 
583     if (confParam instanceof CompoundConfiguration) {
584       throw new IllegalArgumentException("Need original base configuration");
585     }
586 
587     this.comparator = fs.getRegionInfo().getComparator();
588     this.log = log;
589     this.fs = fs;
590 
591     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
592     this.baseConf = confParam;
593     this.conf = new CompoundConfiguration()
594       .add(confParam)
595       .addStringMap(htd.getConfiguration())
596       .addBytesMap(htd.getValues());
597     this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
598         DEFAULT_CACHE_FLUSH_INTERVAL);
599     this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
600     if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
601       throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
602           + MAX_FLUSH_PER_CHANGES);
603     }
604 
605     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
606                     DEFAULT_ROWLOCK_WAIT_DURATION);
607 
608     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
609     this.htableDescriptor = htd;
610     this.rsServices = rsServices;
611     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
612     setHTableSpecificConf();
613     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
614 
615     this.busyWaitDuration = conf.getLong(
616       "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
617     this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
618     if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
619       throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
620         + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
621         + maxBusyWaitMultiplier + "). Their product should be positive");
622     }
623     this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
624       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
625 
626     /*
627      * timestamp.slop provides a server-side constraint on the timestamp. This
628      * assumes that you base your TS around currentTimeMillis(). In this case,
629      * throw an error to the user if the user-specified TS is newer than now +
630      * slop. LATEST_TIMESTAMP == don't use this functionality
631      */
632     this.timestampSlop = conf.getLong(
633         "hbase.hregion.keyvalue.timestamp.slop.millisecs",
634         HConstants.LATEST_TIMESTAMP);
635 
636     /**
637      * Timeout for the process time in processRowsWithLocks().
638      * Use -1 to switch off time bound.
639      */
640     this.rowProcessorTimeout = conf.getLong(
641         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
642     this.durability = htd.getDurability() == Durability.USE_DEFAULT
643         ? DEFAULT_DURABLITY
644         : htd.getDurability();
645     if (rsServices != null) {
646       this.rsAccounting = this.rsServices.getRegionServerAccounting();
647       // don't initialize coprocessors if not running within a regionserver
648       // TODO: revisit if coprocessors should load in other cases
649       this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
650       this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
651       this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
652 
653       Map<String, HRegion> recoveringRegions = rsServices.getRecoveringRegions();
654       String encodedName = getRegionInfo().getEncodedName();
655       if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
656         this.isRecovering = true;
657         recoveringRegions.put(encodedName, this);
658       }
659     } else {
660       this.metricsRegionWrapper = null;
661       this.metricsRegion = null;
662     }
663     if (LOG.isDebugEnabled()) {
664       // Write out region name as string and its encoded name.
665       LOG.debug("Instantiated " + this);
666     }
667 
668     // by default, we allow writes against a region when it's in recovering
669     this.disallowWritesInRecovering =
670         conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
671           HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
672     configurationManager = Optional.absent();
673   }
674 
675   void setHTableSpecificConf() {
676     if (this.htableDescriptor == null) return;
677     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
678 
679     if (flushSize <= 0) {
680       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
681         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
682     }
683     this.memstoreFlushSize = flushSize;
684     this.blockingMemStoreSize = this.memstoreFlushSize *
685         conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
686   }
687 
688   /**
689    * Initialize this region.
690    * Used only by tests and SplitTransaction to reopen the region.
691    * You should use createHRegion() or openHRegion()
692    * @return What the next sequence (edit) id should be.
693    * @throws IOException e
694    * @deprecated use HRegion.createHRegion() or HRegion.openHRegion()
695    */
696   @Deprecated
697   public long initialize() throws IOException {
698     return initialize(null);
699   }
700 
701   /**
702    * Initialize this region.
703    *
704    * @param reporter Tickle every so often if initialize is taking a while.
705    * @return What the next sequence (edit) id should be.
706    * @throws IOException e
707    */
708   private long initialize(final CancelableProgressable reporter) throws IOException {
709     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
710     long nextSeqId = -1;
711     try {
712       nextSeqId = initializeRegionInternals(reporter, status);
713       return nextSeqId;
714     } finally {
715       // nextSeqid will be -1 if the initialization fails.
716       // At least it will be 0 otherwise.
717       if (nextSeqId == -1) {
718         status
719             .abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
720       }
721     }
722   }
723 
724   private long initializeRegionInternals(final CancelableProgressable reporter,
725       final MonitoredTask status) throws IOException, UnsupportedEncodingException {
726     if (coprocessorHost != null) {
727       status.setStatus("Running coprocessor pre-open hook");
728       coprocessorHost.preOpen();
729     }
730 
731     // Write HRI to a file in case we need to recover hbase:meta
732     status.setStatus("Writing region info on filesystem");
733     fs.checkRegionInfoOnFilesystem();
734 
735 
736 
737     // Initialize all the HStores
738     status.setStatus("Initializing all the Stores");
739     long maxSeqId = initializeRegionStores(reporter, status);
740 
741     this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
742     this.writestate.flushRequested = false;
743     this.writestate.compacting = 0;
744 
745     if (this.writestate.writesEnabled) {
746       // Remove temporary data left over from old regions
747       status.setStatus("Cleaning up temporary data from old regions");
748       fs.cleanupTempDir();
749     }
750 
751     if (this.writestate.writesEnabled) {
752       status.setStatus("Cleaning up detritus from prior splits");
753       // Get rid of any splits or merges that were lost in-progress.  Clean out
754       // these directories here on open.  We may be opening a region that was
755       // being split but we crashed in the middle of it all.
756       fs.cleanupAnySplitDetritus();
757       fs.cleanupMergesDir();
758     }
759 
760     // Initialize split policy
761     this.splitPolicy = RegionSplitPolicy.create(this, conf);
762 
763     this.lastFlushTime = EnvironmentEdgeManager.currentTime();
764     // Use maximum of log sequenceid or that which was found in stores
765     // (particularly if no recovered edits, seqid will be -1).
766     long nextSeqid = maxSeqId + 1;
767     if (this.isRecovering) {
768       // In distributedLogReplay mode, we don't know the last change sequence number because region
769       // is opened before recovery completes. So we add a safety bumper to avoid new sequence number
770       // overlaps used sequence numbers
771       nextSeqid = HLogUtil.writeRegionOpenSequenceIdFile(this.fs.getFileSystem(), 
772             this.fs.getRegionDir(), nextSeqid, (this.flushPerChanges + 10000000));
773     }
774 
775     LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
776       "; next sequenceid=" + nextSeqid);
777 
778     // A region can be reopened if failed a split; reset flags
779     this.closing.set(false);
780     this.closed.set(false);
781 
782     if (coprocessorHost != null) {
783       status.setStatus("Running coprocessor post-open hooks");
784       coprocessorHost.postOpen();
785     }
786 
787     status.markComplete("Region opened successfully");
788     return nextSeqid;
789   }
790 
791   private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
792       throws IOException, UnsupportedEncodingException {
793     // Load in all the HStores.
794 
795     long maxSeqId = -1;
796     // initialized to -1 so that we pick up MemstoreTS from column families
797     long maxMemstoreTS = -1;
798 
799     if (!htableDescriptor.getFamilies().isEmpty()) {
800       // initialize the thread pool for opening stores in parallel.
801       ThreadPoolExecutor storeOpenerThreadPool =
802         getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
803       CompletionService<HStore> completionService =
804         new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
805 
806       // initialize each store in parallel
807       for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
808         status.setStatus("Instantiating store for column family " + family);
809         completionService.submit(new Callable<HStore>() {
810           @Override
811           public HStore call() throws IOException {
812             return instantiateHStore(family);
813           }
814         });
815       }
816       boolean allStoresOpened = false;
817       try {
818         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
819           Future<HStore> future = completionService.take();
820           HStore store = future.get();
821           this.stores.put(store.getColumnFamilyName().getBytes(), store);
822 
823           long storeMaxSequenceId = store.getMaxSequenceId();
824           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
825               storeMaxSequenceId);
826           if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
827             maxSeqId = storeMaxSequenceId;
828           }
829           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
830           if (maxStoreMemstoreTS > maxMemstoreTS) {
831             maxMemstoreTS = maxStoreMemstoreTS;
832           }
833         }
834         allStoresOpened = true;
835       } catch (InterruptedException e) {
836         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
837       } catch (ExecutionException e) {
838         throw new IOException(e.getCause());
839       } finally {
840         storeOpenerThreadPool.shutdownNow();
841         if (!allStoresOpened) {
842           // something went wrong, close all opened stores
843           LOG.error("Could not initialize all stores for the region=" + this);
844           for (Store store : this.stores.values()) {
845             try {
846               store.close();
847             } catch (IOException e) {
848               LOG.warn(e.getMessage());
849             }
850           }
851         }
852       }
853     }
854     if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
855       // Recover any edits if available.
856       maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
857           this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
858     }
859     maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
860     mvcc.initialize(maxSeqId);
861     return maxSeqId;
862   }
863 
864   private void writeRegionOpenMarker(HLog log, long openSeqId) throws IOException {
865     Map<byte[], List<Path>> storeFiles
866     = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
867     for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
868       Store store = entry.getValue();
869       ArrayList<Path> storeFileNames = new ArrayList<Path>();
870       for (StoreFile storeFile: store.getStorefiles()) {
871         storeFileNames.add(storeFile.getPath());
872       }
873       storeFiles.put(entry.getKey(), storeFileNames);
874     }
875 
876     RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
877       RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
878       getRegionServerServices().getServerName(), storeFiles);
879     HLogUtil.writeRegionEventMarker(log, getTableDesc(), getRegionInfo(), regionOpenDesc,
880       getSequenceId());
881   }
882 
883   private void writeRegionCloseMarker(HLog log) throws IOException {
884     Map<byte[], List<Path>> storeFiles
885     = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
886     for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
887       Store store = entry.getValue();
888       ArrayList<Path> storeFileNames = new ArrayList<Path>();
889       for (StoreFile storeFile: store.getStorefiles()) {
890         storeFileNames.add(storeFile.getPath());
891       }
892       storeFiles.put(entry.getKey(), storeFileNames);
893     }
894 
895     RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
896       RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
897       getRegionServerServices().getServerName(), storeFiles);
898     HLogUtil.writeRegionEventMarker(log, getTableDesc(), getRegionInfo(), regionEventDesc,
899       getSequenceId());
900   }
901 
902   /**
903    * @return True if this region has references.
904    */
905   public boolean hasReferences() {
906     for (Store store : this.stores.values()) {
907       if (store.hasReferences()) return true;
908     }
909     return false;
910   }
911 
912   /**
913    * This function will return the HDFS blocks distribution based on the data
914    * captured when HFile is created
915    * @return The HDFS blocks distribution for the region.
916    */
917   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
918     HDFSBlocksDistribution hdfsBlocksDistribution =
919       new HDFSBlocksDistribution();
920     synchronized (this.stores) {
921       for (Store store : this.stores.values()) {
922         for (StoreFile sf : store.getStorefiles()) {
923           HDFSBlocksDistribution storeFileBlocksDistribution =
924             sf.getHDFSBlockDistribution();
925           hdfsBlocksDistribution.add(storeFileBlocksDistribution);
926         }
927       }
928     }
929     return hdfsBlocksDistribution;
930   }
931 
932   /**
933    * This is a helper function to compute HDFS block distribution on demand
934    * @param conf configuration
935    * @param tableDescriptor HTableDescriptor of the table
936    * @param regionInfo encoded name of the region
937    * @return The HDFS blocks distribution for the given region.
938    * @throws IOException
939    */
940   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
941       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
942     Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
943     return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
944   }
945 
946   /**
947    * This is a helper function to compute HDFS block distribution on demand
948    * @param conf configuration
949    * @param tableDescriptor HTableDescriptor of the table
950    * @param regionInfo encoded name of the region
951    * @param tablePath the table directory
952    * @return The HDFS blocks distribution for the given region.
953    * @throws IOException
954    */
955   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
956       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo,  Path tablePath)
957       throws IOException {
958     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
959     FileSystem fs = tablePath.getFileSystem(conf);
960 
961     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
962     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
963       Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
964       if (storeFiles == null) continue;
965 
966       for (StoreFileInfo storeFileInfo : storeFiles) {
967         hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
968       }
969     }
970     return hdfsBlocksDistribution;
971   }
972 
973   public AtomicLong getMemstoreSize() {
974     return memstoreSize;
975   }
976 
977   /**
978    * Increase the size of mem store in this region and the size of global mem
979    * store
980    * @return the size of memstore in this region
981    */
982   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
983     if (this.rsAccounting != null) {
984       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
985     }
986     return this.memstoreSize.addAndGet(memStoreSize);
987   }
988 
989   /** @return a HRegionInfo object for this region */
990   public HRegionInfo getRegionInfo() {
991     return this.fs.getRegionInfo();
992   }
993 
994   /**
995    * @return Instance of {@link RegionServerServices} used by this HRegion.
996    * Can be null.
997    */
998   RegionServerServices getRegionServerServices() {
999     return this.rsServices;
1000   }
1001 
1002   /** @return readRequestsCount for this region */
1003   long getReadRequestsCount() {
1004     return this.readRequestsCount.get();
1005   }
1006 
1007   /** @return writeRequestsCount for this region */
1008   long getWriteRequestsCount() {
1009     return this.writeRequestsCount.get();
1010   }
1011 
1012   public MetricsRegion getMetrics() {
1013     return metricsRegion;
1014   }
1015 
1016   /** @return true if region is closed */
1017   public boolean isClosed() {
1018     return this.closed.get();
1019   }
1020 
1021   /**
1022    * @return True if closing process has started.
1023    */
1024   public boolean isClosing() {
1025     return this.closing.get();
1026   }
1027 
1028   /**
1029    * Reset recovering state of current region
1030    */
1031   public void setRecovering(boolean newState) {
1032     boolean wasRecovering = this.isRecovering;
1033     this.isRecovering = newState;
1034     if (wasRecovering && !isRecovering) {
1035       // Call only when log replay is over.
1036       coprocessorHost.postLogReplay();
1037     }
1038   }
1039 
1040   /**
1041    * @return True if current region is in recovering
1042    */
1043   public boolean isRecovering() {
1044     return this.isRecovering;
1045   }
1046 
1047   /** @return true if region is available (not closed and not closing) */
1048   public boolean isAvailable() {
1049     return !isClosed() && !isClosing();
1050   }
1051 
1052   /** @return true if region is splittable */
1053   public boolean isSplittable() {
1054     return isAvailable() && !hasReferences();
1055   }
1056 
1057   /**
1058    * @return true if region is mergeable
1059    */
1060   public boolean isMergeable() {
1061     if (!isAvailable()) {
1062       LOG.debug("Region " + this.getRegionNameAsString()
1063           + " is not mergeable because it is closing or closed");
1064       return false;
1065     }
1066     if (hasReferences()) {
1067       LOG.debug("Region " + this.getRegionNameAsString()
1068           + " is not mergeable because it has references");
1069       return false;
1070     }
1071 
1072     return true;
1073   }
1074 
1075   public boolean areWritesEnabled() {
1076     synchronized(this.writestate) {
1077       return this.writestate.writesEnabled;
1078     }
1079   }
1080 
1081    public MultiVersionConsistencyControl getMVCC() {
1082      return mvcc;
1083    }
1084 
1085    /*
1086     * Returns readpoint considering given IsolationLevel
1087     */
1088    public long getReadpoint(IsolationLevel isolationLevel) {
1089      if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1090        // This scan can read even uncommitted transactions
1091        return Long.MAX_VALUE;
1092      }
1093      return mvcc.memstoreReadPoint();
1094    }
1095 
1096    public boolean isLoadingCfsOnDemandDefault() {
1097      return this.isLoadingCfsOnDemandDefault;
1098    }
1099 
1100   /**
1101    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
1102    * service any more calls.
1103    *
1104    * <p>This method could take some time to execute, so don't call it from a
1105    * time-sensitive thread.
1106    *
1107    * @return Vector of all the storage files that the HRegion's component
1108    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
1109    * vector if already closed and null if judged that it should not close.
1110    *
1111    * @throws IOException e
1112    */
1113   public Map<byte[], List<StoreFile>> close() throws IOException {
1114     return close(false);
1115   }
1116 
1117   private final Object closeLock = new Object();
1118 
1119   /** Conf key for the periodic flush interval */
1120   public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1121       "hbase.regionserver.optionalcacheflushinterval";
1122   /** Default interval for the memstore flush */
1123   public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1124 
1125   /** Conf key to force a flush if there are already enough changes for one region in memstore */
1126   public static final String MEMSTORE_FLUSH_PER_CHANGES =
1127       "hbase.regionserver.flush.per.changes";
1128   public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000; // 30 millions
1129   /**
1130    * The following MAX_FLUSH_PER_CHANGES is large enough because each KeyValue has 20+ bytes
1131    * overhead. Therefore, even 1G empty KVs occupy at least 20GB memstore size for a single region
1132    */
1133   public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G
1134 
1135   /**
1136    * Close down this HRegion.  Flush the cache unless abort parameter is true,
1137    * Shut down each HStore, don't service any more calls.
1138    *
1139    * This method could take some time to execute, so don't call it from a
1140    * time-sensitive thread.
1141    *
1142    * @param abort true if server is aborting (only during testing)
1143    * @return Vector of all the storage files that the HRegion's component
1144    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
1145    * we are not to close at this time or we are already closed.
1146    *
1147    * @throws IOException e
1148    */
1149   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1150     // Only allow one thread to close at a time. Serialize them so dual
1151     // threads attempting to close will run up against each other.
1152     MonitoredTask status = TaskMonitor.get().createStatus(
1153         "Closing region " + this +
1154         (abort ? " due to abort" : ""));
1155 
1156     status.setStatus("Waiting for close lock");
1157     try {
1158       synchronized (closeLock) {
1159         return doClose(abort, status);
1160       }
1161     } finally {
1162       status.cleanup();
1163     }
1164   }
1165 
1166   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1167       throws IOException {
1168     if (isClosed()) {
1169       LOG.warn("Region " + this + " already closed");
1170       return null;
1171     }
1172 
1173     if (coprocessorHost != null) {
1174       status.setStatus("Running coprocessor pre-close hooks");
1175       this.coprocessorHost.preClose(abort);
1176     }
1177 
1178     status.setStatus("Disabling compacts and flushes for region");
1179     synchronized (writestate) {
1180       // Disable compacting and flushing by background threads for this
1181       // region.
1182       writestate.writesEnabled = false;
1183       LOG.debug("Closing " + this + ": disabling compactions & flushes");
1184       waitForFlushesAndCompactions();
1185     }
1186     // If we were not just flushing, is it worth doing a preflush...one
1187     // that will clear out of the bulk of the memstore before we put up
1188     // the close flag?
1189     if (!abort && worthPreFlushing()) {
1190       status.setStatus("Pre-flushing region before close");
1191       LOG.info("Running close preflush of " + this.getRegionNameAsString());
1192       try {
1193         internalFlushcache(status);
1194       } catch (IOException ioe) {
1195         // Failed to flush the region. Keep going.
1196         status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1197       }
1198     }
1199 
1200     this.closing.set(true);
1201     status.setStatus("Disabling writes for close");
1202     // block waiting for the lock for closing
1203     lock.writeLock().lock();
1204     try {
1205       if (this.isClosed()) {
1206         status.abort("Already got closed by another process");
1207         // SplitTransaction handles the null
1208         return null;
1209       }
1210       LOG.debug("Updates disabled for region " + this);
1211       // Don't flush the cache if we are aborting
1212       if (!abort) {
1213         int flushCount = 0;
1214         while (this.getMemstoreSize().get() > 0) {
1215           try {
1216             if (flushCount++ > 0) {
1217               int actualFlushes = flushCount - 1;
1218               if (actualFlushes > 5) {
1219                 // If we tried 5 times and are unable to clear memory, abort
1220                 // so we do not lose data
1221                 throw new DroppedSnapshotException("Failed clearing memory after " +
1222                   actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
1223               }
1224               LOG.info("Running extra flush, " + actualFlushes +
1225                 " (carrying snapshot?) " + this);
1226             }
1227             internalFlushcache(status);
1228           } catch (IOException ioe) {
1229             status.setStatus("Failed flush " + this + ", putting online again");
1230             synchronized (writestate) {
1231               writestate.writesEnabled = true;
1232             }
1233             // Have to throw to upper layers.  I can't abort server from here.
1234             throw ioe;
1235           }
1236         }
1237       }
1238 
1239       Map<byte[], List<StoreFile>> result =
1240         new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1241       if (!stores.isEmpty()) {
1242         // initialize the thread pool for closing stores in parallel.
1243         ThreadPoolExecutor storeCloserThreadPool =
1244           getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
1245         CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1246           new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1247 
1248         // close each store in parallel
1249         for (final Store store : stores.values()) {
1250           assert abort || store.getFlushableSize() == 0;
1251           completionService
1252               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1253                 @Override
1254                 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1255                   return new Pair<byte[], Collection<StoreFile>>(
1256                     store.getFamily().getName(), store.close());
1257                 }
1258               });
1259         }
1260         try {
1261           for (int i = 0; i < stores.size(); i++) {
1262             Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1263             Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1264             List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1265             if (familyFiles == null) {
1266               familyFiles = new ArrayList<StoreFile>();
1267               result.put(storeFiles.getFirst(), familyFiles);
1268             }
1269             familyFiles.addAll(storeFiles.getSecond());
1270           }
1271         } catch (InterruptedException e) {
1272           throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1273         } catch (ExecutionException e) {
1274           throw new IOException(e.getCause());
1275         } finally {
1276           storeCloserThreadPool.shutdownNow();
1277         }
1278       }
1279 
1280       status.setStatus("Writing region close event to WAL");
1281       if (!abort  && log != null && getRegionServerServices() != null) {
1282         writeRegionCloseMarker(log);
1283       }
1284 
1285       this.closed.set(true);
1286       if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
1287       if (coprocessorHost != null) {
1288         status.setStatus("Running coprocessor post-close hooks");
1289         this.coprocessorHost.postClose(abort);
1290       }
1291       if ( this.metricsRegion != null) {
1292         this.metricsRegion.close();
1293       }
1294       if ( this.metricsRegionWrapper != null) {
1295         Closeables.closeQuietly(this.metricsRegionWrapper);
1296       }
1297       status.markComplete("Closed");
1298       LOG.info("Closed " + this);
1299       return result;
1300     } finally {
1301       lock.writeLock().unlock();
1302     }
1303   }
1304 
1305   /**
1306    * Wait for all current flushes and compactions of the region to complete.
1307    * <p>
1308    * Exposed for TESTING.
1309    */
1310   public void waitForFlushesAndCompactions() {
1311     synchronized (writestate) {
1312       boolean interrupted = false;
1313       try {
1314         while (writestate.compacting > 0 || writestate.flushing) {
1315           LOG.debug("waiting for " + writestate.compacting + " compactions"
1316             + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1317           try {
1318             writestate.wait();
1319           } catch (InterruptedException iex) {
1320             // essentially ignore and propagate the interrupt back up
1321             LOG.warn("Interrupted while waiting");
1322             interrupted = true;
1323           }
1324         }
1325       } finally {
1326         if (interrupted) {
1327           Thread.currentThread().interrupt();
1328         }
1329       }
1330     }
1331   }
1332 
1333   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1334       final String threadNamePrefix) {
1335     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1336     int maxThreads = Math.min(numStores,
1337         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1338             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1339     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1340   }
1341 
1342   protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1343       final String threadNamePrefix) {
1344     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1345     int maxThreads = Math.max(1,
1346         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1347             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1348             / numStores);
1349     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1350   }
1351 
1352   static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1353       final String threadNamePrefix) {
1354     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1355       new ThreadFactory() {
1356         private int count = 1;
1357 
1358         @Override
1359         public Thread newThread(Runnable r) {
1360           return new Thread(r, threadNamePrefix + "-" + count++);
1361         }
1362       });
1363   }
1364 
1365    /**
1366     * @return True if its worth doing a flush before we put up the close flag.
1367     */
1368   private boolean worthPreFlushing() {
1369     return this.memstoreSize.get() >
1370       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1371   }
1372 
1373   //////////////////////////////////////////////////////////////////////////////
1374   // HRegion accessors
1375   //////////////////////////////////////////////////////////////////////////////
1376 
1377   /** @return start key for region */
1378   public byte [] getStartKey() {
1379     return this.getRegionInfo().getStartKey();
1380   }
1381 
1382   /** @return end key for region */
1383   public byte [] getEndKey() {
1384     return this.getRegionInfo().getEndKey();
1385   }
1386 
1387   /** @return region id */
1388   public long getRegionId() {
1389     return this.getRegionInfo().getRegionId();
1390   }
1391 
1392   /** @return region name */
1393   public byte [] getRegionName() {
1394     return this.getRegionInfo().getRegionName();
1395   }
1396 
1397   /** @return region name as string for logging */
1398   public String getRegionNameAsString() {
1399     return this.getRegionInfo().getRegionNameAsString();
1400   }
1401 
1402   /** @return HTableDescriptor for this region */
1403   public HTableDescriptor getTableDesc() {
1404     return this.htableDescriptor;
1405   }
1406 
1407   /** @return HLog in use for this region */
1408   public HLog getLog() {
1409     return this.log;
1410   }
1411 
1412   /**
1413    * A split takes the config from the parent region & passes it to the daughter
1414    * region's constructor. If 'conf' was passed, you would end up using the HTD
1415    * of the parent region in addition to the new daughter HTD. Pass 'baseConf'
1416    * to the daughter regions to avoid this tricky dedupe problem.
1417    * @return Configuration object
1418    */
1419   Configuration getBaseConf() {
1420     return this.baseConf;
1421   }
1422 
1423   /** @return {@link FileSystem} being used by this region */
1424   public FileSystem getFilesystem() {
1425     return fs.getFileSystem();
1426   }
1427 
1428   /** @return the {@link HRegionFileSystem} used by this region */
1429   public HRegionFileSystem getRegionFileSystem() {
1430     return this.fs;
1431   }
1432 
1433   /** @return the last time the region was flushed */
1434   public long getLastFlushTime() {
1435     return this.lastFlushTime;
1436   }
1437 
1438   //////////////////////////////////////////////////////////////////////////////
1439   // HRegion maintenance.
1440   //
1441   // These methods are meant to be called periodically by the HRegionServer for
1442   // upkeep.
1443   //////////////////////////////////////////////////////////////////////////////
1444 
1445   /** @return returns size of largest HStore. */
1446   public long getLargestHStoreSize() {
1447     long size = 0;
1448     for (Store h : stores.values()) {
1449       long storeSize = h.getSize();
1450       if (storeSize > size) {
1451         size = storeSize;
1452       }
1453     }
1454     return size;
1455   }
1456 
1457   /**
1458    * @return KeyValue Comparator
1459    */
1460   public KeyValue.KVComparator getComparator() {
1461     return this.comparator;
1462   }
1463 
1464   /*
1465    * Do preparation for pending compaction.
1466    * @throws IOException
1467    */
1468   protected void doRegionCompactionPrep() throws IOException {
1469   }
1470 
1471   void triggerMajorCompaction() {
1472     for (Store h : stores.values()) {
1473       h.triggerMajorCompaction();
1474     }
1475   }
1476 
1477   /**
1478    * This is a helper function that compact all the stores synchronously
1479    * It is used by utilities and testing
1480    *
1481    * @param majorCompaction True to force a major compaction regardless of thresholds
1482    * @throws IOException e
1483    */
1484   public void compactStores(final boolean majorCompaction)
1485   throws IOException {
1486     if (majorCompaction) {
1487       this.triggerMajorCompaction();
1488     }
1489     compactStores();
1490   }
1491 
1492   /**
1493    * This is a helper function that compact all the stores synchronously
1494    * It is used by utilities and testing
1495    *
1496    * @throws IOException e
1497    */
1498   public void compactStores() throws IOException {
1499     for (Store s : getStores().values()) {
1500       CompactionContext compaction = s.requestCompaction();
1501       if (compaction != null) {
1502         compact(compaction, s);
1503       }
1504     }
1505   }
1506 
1507   /*
1508    * Called by compaction thread and after region is opened to compact the
1509    * HStores if necessary.
1510    *
1511    * <p>This operation could block for a long time, so don't call it from a
1512    * time-sensitive thread.
1513    *
1514    * Note that no locking is necessary at this level because compaction only
1515    * conflicts with a region split, and that cannot happen because the region
1516    * server does them sequentially and not in parallel.
1517    *
1518    * @param cr Compaction details, obtained by requestCompaction()
1519    * @return whether the compaction completed
1520    * @throws IOException e
1521    */
1522   public boolean compact(CompactionContext compaction, Store store) throws IOException {
1523     assert compaction != null && compaction.hasSelection();
1524     assert !compaction.getRequest().getFiles().isEmpty();
1525     if (this.closing.get() || this.closed.get()) {
1526       LOG.debug("Skipping compaction on " + this + " because closing/closed");
1527       store.cancelRequestedCompaction(compaction);
1528       return false;
1529     }
1530     MonitoredTask status = null;
1531     boolean didPerformCompaction = false;
1532     // block waiting for the lock for compaction
1533     lock.readLock().lock();
1534     try {
1535       byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1536       if (stores.get(cf) != store) {
1537         LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1538             + " has been re-instantiated, cancel this compaction request. "
1539             + " It may be caused by the roll back of split transaction");
1540         return false;
1541       }
1542 
1543       status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1544       if (this.closed.get()) {
1545         String msg = "Skipping compaction on " + this + " because closed";
1546         LOG.debug(msg);
1547         status.abort(msg);
1548         return false;
1549       }
1550       boolean wasStateSet = false;
1551       try {
1552         synchronized (writestate) {
1553           if (writestate.writesEnabled) {
1554             wasStateSet = true;
1555             ++writestate.compacting;
1556           } else {
1557             String msg = "NOT compacting region " + this + ". Writes disabled.";
1558             LOG.info(msg);
1559             status.abort(msg);
1560             return false;
1561           }
1562         }
1563         LOG.info("Starting compaction on " + store + " in region " + this
1564             + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1565         doRegionCompactionPrep();
1566         try {
1567           status.setStatus("Compacting store " + store);
1568           didPerformCompaction = true;
1569           store.compact(compaction);
1570         } catch (InterruptedIOException iioe) {
1571           String msg = "compaction interrupted";
1572           LOG.info(msg, iioe);
1573           status.abort(msg);
1574           return false;
1575         }
1576       } finally {
1577         if (wasStateSet) {
1578           synchronized (writestate) {
1579             --writestate.compacting;
1580             if (writestate.compacting <= 0) {
1581               writestate.notifyAll();
1582             }
1583           }
1584         }
1585       }
1586       status.markComplete("Compaction complete");
1587       return true;
1588     } finally {
1589       try {
1590         if (!didPerformCompaction) store.cancelRequestedCompaction(compaction);
1591         if (status != null) status.cleanup();
1592       } finally {
1593         lock.readLock().unlock();
1594       }
1595     }
1596   }
1597 
1598   /**
1599    * Flush the cache.
1600    *
1601    * When this method is called the cache will be flushed unless:
1602    * <ol>
1603    *   <li>the cache is empty</li>
1604    *   <li>the region is closed.</li>
1605    *   <li>a flush is already in progress</li>
1606    *   <li>writes are disabled</li>
1607    * </ol>
1608    *
1609    * <p>This method may block for some time, so it should not be called from a
1610    * time-sensitive thread.
1611    *
1612    * @return true if the region needs compacting
1613    *
1614    * @throws IOException general io exceptions
1615    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1616    * because a Snapshot was not properly persisted.
1617    */
1618   public FlushResult flushcache() throws IOException {
1619     // fail-fast instead of waiting on the lock
1620     if (this.closing.get()) {
1621       String msg = "Skipping flush on " + this + " because closing";
1622       LOG.debug(msg);
1623       return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1624     }
1625     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1626     status.setStatus("Acquiring readlock on region");
1627     // block waiting for the lock for flushing cache
1628     lock.readLock().lock();
1629     try {
1630       if (this.closed.get()) {
1631         String msg = "Skipping flush on " + this + " because closed";
1632         LOG.debug(msg);
1633         status.abort(msg);
1634         return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1635       }
1636       if (coprocessorHost != null) {
1637         status.setStatus("Running coprocessor pre-flush hooks");
1638         coprocessorHost.preFlush();
1639       }
1640       if (numMutationsWithoutWAL.get() > 0) {
1641         numMutationsWithoutWAL.set(0);
1642         dataInMemoryWithoutWAL.set(0);
1643       }
1644       synchronized (writestate) {
1645         if (!writestate.flushing && writestate.writesEnabled) {
1646           this.writestate.flushing = true;
1647         } else {
1648           if (LOG.isDebugEnabled()) {
1649             LOG.debug("NOT flushing memstore for region " + this
1650                 + ", flushing=" + writestate.flushing + ", writesEnabled="
1651                 + writestate.writesEnabled);
1652           }
1653           String msg = "Not flushing since "
1654               + (writestate.flushing ? "already flushing"
1655               : "writes not enabled");
1656           status.abort(msg);
1657           return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1658         }
1659       }
1660       try {
1661         FlushResult fs = internalFlushcache(status);
1662 
1663         if (coprocessorHost != null) {
1664           status.setStatus("Running post-flush coprocessor hooks");
1665           coprocessorHost.postFlush();
1666         }
1667 
1668         status.markComplete("Flush successful");
1669         return fs;
1670       } finally {
1671         synchronized (writestate) {
1672           writestate.flushing = false;
1673           this.writestate.flushRequested = false;
1674           writestate.notifyAll();
1675         }
1676       }
1677     } finally {
1678       lock.readLock().unlock();
1679       status.cleanup();
1680     }
1681   }
1682 
1683   /**
1684    * Should the memstore be flushed now
1685    */
1686   boolean shouldFlush() {
1687     // This is a rough measure.
1688     if (this.lastFlushSeqId > 0
1689           && (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get())) {
1690       return true;
1691     }
1692     if (flushCheckInterval <= 0) { //disabled
1693       return false;
1694     }
1695     long now = EnvironmentEdgeManager.currentTime();
1696     //if we flushed in the recent past, we don't need to do again now
1697     if ((now - getLastFlushTime() < flushCheckInterval)) {
1698       return false;
1699     }
1700     //since we didn't flush in the recent past, flush now if certain conditions
1701     //are met. Return true on first such memstore hit.
1702     for (Store s : this.getStores().values()) {
1703       if (s.timeOfOldestEdit() < now - flushCheckInterval) {
1704         // we have an old enough edit in the memstore, flush
1705         return true;
1706       }
1707     }
1708     return false;
1709   }
1710 
1711   /**
1712    * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
1713    * memstore, all of which have also been written to the log. We need to write those updates in the
1714    * memstore out to disk, while being able to process reads/writes as much as possible during the
1715    * flush operation.
1716    * <p>This method may block for some time.  Every time you call it, we up the regions
1717    * sequence id even if we don't flush; i.e. the returned region id will be at least one larger
1718    * than the last edit applied to this region. The returned id does not refer to an actual edit.
1719    * The returned id can be used for say installing a bulk loaded file just ahead of the last hfile
1720    * that was the result of this flush, etc.
1721    * @return object describing the flush's state
1722    *
1723    * @throws IOException general io exceptions
1724    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1725    * because a Snapshot was not properly persisted.
1726    */
1727   protected FlushResult internalFlushcache(MonitoredTask status)
1728       throws IOException {
1729     return internalFlushcache(this.log, -1, status);
1730   }
1731 
1732   /**
1733    * @param wal Null if we're NOT to go via hlog/wal.
1734    * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
1735    * @return object describing the flush's state
1736    * @throws IOException
1737    * @see #internalFlushcache(MonitoredTask)
1738    */
1739   protected FlushResult internalFlushcache(
1740       final HLog wal, final long myseqid, MonitoredTask status)
1741   throws IOException {
1742     if (this.rsServices != null && this.rsServices.isAborted()) {
1743       // Don't flush when server aborting, it's unsafe
1744       throw new IOException("Aborting flush because server is aborted...");
1745     }
1746     final long startTime = EnvironmentEdgeManager.currentTime();
1747     // If nothing to flush, return, but we need to safely update the region sequence id
1748     if (this.memstoreSize.get() <= 0) {
1749       // Take an update lock because am about to change the sequence id and we want the sequence id
1750       // to be at the border of the empty memstore.
1751       MultiVersionConsistencyControl.WriteEntry w = null;
1752       this.updatesLock.writeLock().lock();
1753       try {
1754         if (this.memstoreSize.get() <= 0) {
1755           // Presume that if there are still no edits in the memstore, then there are no edits for
1756           // this region out in the WAL/HLog subsystem so no need to do any trickery clearing out
1757           // edits in the WAL system. Up the sequence number so the resulting flush id is for
1758           // sure just beyond the last appended region edit (useful as a marker when bulk loading,
1759           // etc.)
1760           // wal can be null replaying edits.
1761           if (wal != null) {
1762             w = mvcc.beginMemstoreInsert();
1763             long flushSeqId = getNextSequenceId(wal);
1764             FlushResult flushResult = new FlushResult(
1765               FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush");
1766             w.setWriteNumber(flushSeqId);
1767             mvcc.waitForPreviousTransactionsComplete(w);
1768             w = null;
1769             return flushResult;
1770           } else {
1771             return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
1772               "Nothing to flush");
1773           }
1774         }
1775       } finally {
1776         this.updatesLock.writeLock().unlock();
1777         if (w != null) {
1778           mvcc.advanceMemstore(w);
1779         }
1780       }
1781     }
1782 
1783     LOG.info("Started memstore flush for " + this +
1784       ", current region memstore size " +
1785       StringUtils.byteDesc(this.memstoreSize.get()) +
1786       ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
1787 
1788     // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
1789     // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
1790     // allow updates again so its value will represent the size of the updates received
1791     // during flush
1792     MultiVersionConsistencyControl.WriteEntry w = null;
1793 
1794     // We have to take an update lock during snapshot, or else a write could end up in both snapshot
1795     // and memstore (makes it difficult to do atomic rows then)
1796     status.setStatus("Obtaining lock to block concurrent updates");
1797     // block waiting for the lock for internal flush
1798     this.updatesLock.writeLock().lock();
1799     long totalFlushableSize = 0;
1800     status.setStatus("Preparing to flush by snapshotting stores in " +
1801       getRegionInfo().getEncodedName());
1802     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
1803     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
1804         Bytes.BYTES_COMPARATOR);
1805     long flushSeqId = -1L;
1806 
1807     long trxId = 0;
1808     try {
1809       try {
1810         w = mvcc.beginMemstoreInsert();
1811         if (wal != null) {
1812           if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
1813             // This should never happen.
1814             String msg = "Flush will not be started for ["
1815                 + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
1816             status.setStatus(msg);
1817             return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1818           }
1819           // Get a sequence id that we can use to denote the flush. It will be one beyond the last
1820           // edit that made it into the hfile (the below does not add an edit, it just asks the
1821           // WAL system to return next sequence edit).
1822           flushSeqId = getNextSequenceId(wal);
1823         } else {
1824           // use the provided sequence Id as WAL is not being used for this flush.
1825           flushSeqId = myseqid;
1826         }
1827 
1828         for (Store s : stores.values()) {
1829           totalFlushableSize += s.getFlushableSize();
1830           storeFlushCtxs.add(s.createFlushContext(flushSeqId));
1831           committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
1832         }
1833 
1834         // write the snapshot start to WAL
1835         if (wal != null) {
1836           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
1837             getRegionInfo(), flushSeqId, committedFiles);
1838           trxId = HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1839             desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock
1840         }
1841 
1842         // Prepare flush (take a snapshot)
1843         for (StoreFlushContext flush : storeFlushCtxs) {
1844           flush.prepare();
1845         }
1846       } catch (IOException ex) {
1847         if (wal != null) {
1848           if (trxId > 0) { // check whether we have already written START_FLUSH to WAL
1849             try {
1850               FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
1851                 getRegionInfo(), flushSeqId, committedFiles);
1852               HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1853                 desc, sequenceId, false);
1854             } catch (Throwable t) {
1855               LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
1856                   StringUtils.stringifyException(t));
1857               // ignore this since we will be aborting the RS with DSE.
1858             }
1859           }
1860           // we have called wal.startCacheFlush(), now we have to abort it
1861           wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1862           throw ex; // let upper layers deal with it.
1863         }
1864       } finally {
1865         this.updatesLock.writeLock().unlock();
1866       }
1867       String s = "Finished memstore snapshotting " + this +
1868         ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
1869       status.setStatus(s);
1870       if (LOG.isTraceEnabled()) LOG.trace(s);
1871       // sync unflushed WAL changes
1872       // see HBASE-8208 for details
1873       if (wal != null) {
1874         try {
1875           wal.sync(); // ensure that flush marker is sync'ed
1876         } catch (IOException ioe) {
1877           LOG.warn("Unexpected exception while log.sync(), ignoring. Exception: "
1878               + StringUtils.stringifyException(ioe));
1879         }
1880       }
1881 
1882       // wait for all in-progress transactions to commit to HLog before
1883       // we can start the flush. This prevents
1884       // uncommitted transactions from being written into HFiles.
1885       // We have to block before we start the flush, otherwise keys that
1886       // were removed via a rollbackMemstore could be written to Hfiles.
1887       w.setWriteNumber(flushSeqId);
1888       mvcc.waitForPreviousTransactionsComplete(w);
1889       // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
1890       w = null;
1891       s = "Flushing stores of " + this;
1892       status.setStatus(s);
1893       if (LOG.isTraceEnabled()) LOG.trace(s);
1894     } finally {
1895       if (w != null) {
1896         // in case of failure just mark current w as complete
1897         mvcc.advanceMemstore(w);
1898       }
1899     }
1900 
1901     // Any failure from here on out will be catastrophic requiring server
1902     // restart so hlog content can be replayed and put back into the memstore.
1903     // Otherwise, the snapshot content while backed up in the hlog, it will not
1904     // be part of the current running servers state.
1905     boolean compactionRequested = false;
1906     try {
1907       // A.  Flush memstore to all the HStores.
1908       // Keep running vector of all store files that includes both old and the
1909       // just-made new flush store file. The new flushed file is still in the
1910       // tmp directory.
1911 
1912       for (StoreFlushContext flush : storeFlushCtxs) {
1913         flush.flushCache(status);
1914       }
1915 
1916       // Switch snapshot (in memstore) -> new hfile (thus causing
1917       // all the store scanners to reset/reseek).
1918       Iterator<Store> it = stores.values().iterator(); // stores.values() and storeFlushCtxs have
1919       // same order
1920       for (StoreFlushContext flush : storeFlushCtxs) {
1921         boolean needsCompaction = flush.commit(status);
1922         if (needsCompaction) {
1923           compactionRequested = true;
1924         }
1925         committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles());
1926       }
1927       storeFlushCtxs.clear();
1928 
1929       // Set down the memstore size by amount of flush.
1930       this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
1931 
1932       if (wal != null) {
1933         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
1934         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
1935           getRegionInfo(), flushSeqId, committedFiles);
1936         HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1937           desc, sequenceId, true);
1938       }
1939     } catch (Throwable t) {
1940       // An exception here means that the snapshot was not persisted.
1941       // The hlog needs to be replayed so its content is restored to memstore.
1942       // Currently, only a server restart will do this.
1943       // We used to only catch IOEs but its possible that we'd get other
1944       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
1945       // all and sundry.
1946       if (wal != null) {
1947         try {
1948           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
1949             getRegionInfo(), flushSeqId, committedFiles);
1950           HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
1951             desc, sequenceId, false);
1952         } catch (Throwable ex) {
1953           LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
1954               StringUtils.stringifyException(ex));
1955           // ignore this since we will be aborting the RS with DSE.
1956         }
1957         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1958       }
1959       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
1960           Bytes.toStringBinary(getRegionName()));
1961       dse.initCause(t);
1962       status.abort("Flush failed: " + StringUtils.stringifyException(t));
1963       throw dse;
1964     }
1965 
1966     // If we get to here, the HStores have been written.
1967     if (wal != null) {
1968       wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1969     }
1970 
1971     // Record latest flush time
1972     this.lastFlushTime = EnvironmentEdgeManager.currentTime();
1973 
1974     // Update the last flushed sequence id for region. TODO: This is dup'd inside the WAL/FSHlog.
1975     this.lastFlushSeqId = flushSeqId;
1976 
1977     // C. Finally notify anyone waiting on memstore to clear:
1978     // e.g. checkResources().
1979     synchronized (this) {
1980       notifyAll(); // FindBugs NN_NAKED_NOTIFY
1981     }
1982 
1983     long time = EnvironmentEdgeManager.currentTime() - startTime;
1984     long memstoresize = this.memstoreSize.get();
1985     String msg = "Finished memstore flush of ~" +
1986       StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize +
1987       ", currentsize=" +
1988       StringUtils.byteDesc(memstoresize) + "/" + memstoresize +
1989       " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
1990       ", compaction requested=" + compactionRequested +
1991       ((wal == null)? "; wal=null": "");
1992     LOG.info(msg);
1993     status.setStatus(msg);
1994 
1995     return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
1996         FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
1997   }
1998 
1999   /**
2000    * Method to safely get the next sequence number.
2001    * @return Next sequence number unassociated with any actual edit.
2002    * @throws IOException
2003    */
2004   private long getNextSequenceId(final HLog wal) throws IOException {
2005     HLogKey key = this.appendNoSyncNoAppend(wal, null);
2006     return key.getSequenceId();
2007   }
2008 
2009   //////////////////////////////////////////////////////////////////////////////
2010   // get() methods for client use.
2011   //////////////////////////////////////////////////////////////////////////////
2012   /**
2013    * Return all the data for the row that matches <i>row</i> exactly,
2014    * or the one that immediately preceeds it, at or immediately before
2015    * <i>ts</i>.
2016    *
2017    * @param row row key
2018    * @return map of values
2019    * @throws IOException
2020    */
2021   Result getClosestRowBefore(final byte [] row)
2022   throws IOException{
2023     return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
2024   }
2025 
2026   /**
2027    * Return all the data for the row that matches <i>row</i> exactly,
2028    * or the one that immediately precedes it, at or immediately before
2029    * <i>ts</i>.
2030    *
2031    * @param row row key
2032    * @param family column family to find on
2033    * @return map of values
2034    * @throws IOException read exceptions
2035    */
2036   public Result getClosestRowBefore(final byte [] row, final byte [] family)
2037   throws IOException {
2038     if (coprocessorHost != null) {
2039       Result result = new Result();
2040       if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
2041         return result;
2042       }
2043     }
2044     // look across all the HStores for this region and determine what the
2045     // closest key is across all column families, since the data may be sparse
2046     checkRow(row, "getClosestRowBefore");
2047     startRegionOperation(Operation.GET);
2048     this.readRequestsCount.increment();
2049     try {
2050       Store store = getStore(family);
2051       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
2052       Cell key = store.getRowKeyAtOrBefore(row);
2053       Result result = null;
2054       if (key != null) {
2055         Get get = new Get(CellUtil.cloneRow(key));
2056         get.addFamily(family);
2057         result = get(get);
2058       }
2059       if (coprocessorHost != null) {
2060         coprocessorHost.postGetClosestRowBefore(row, family, result);
2061       }
2062       return result;
2063     } finally {
2064       closeRegionOperation(Operation.GET);
2065     }
2066   }
2067 
2068   /**
2069    * Return an iterator that scans over the HRegion, returning the indicated
2070    * columns and rows specified by the {@link Scan}.
2071    * <p>
2072    * This Iterator must be closed by the caller.
2073    *
2074    * @param scan configured {@link Scan}
2075    * @return RegionScanner
2076    * @throws IOException read exceptions
2077    */
2078   public RegionScanner getScanner(Scan scan) throws IOException {
2079    return getScanner(scan, null);
2080   }
2081 
2082   void prepareScanner(Scan scan) throws IOException {
2083     if(!scan.hasFamilies()) {
2084       // Adding all families to scanner
2085       for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
2086         scan.addFamily(family);
2087       }
2088     }
2089   }
2090 
2091   protected RegionScanner getScanner(Scan scan,
2092       List<KeyValueScanner> additionalScanners) throws IOException {
2093     startRegionOperation(Operation.SCAN);
2094     try {
2095       // Verify families are all valid
2096       prepareScanner(scan);
2097       if(scan.hasFamilies()) {
2098         for(byte [] family : scan.getFamilyMap().keySet()) {
2099           checkFamily(family);
2100         }
2101       }
2102       return instantiateRegionScanner(scan, additionalScanners);
2103     } finally {
2104       closeRegionOperation(Operation.SCAN);
2105     }
2106   }
2107 
2108   protected RegionScanner instantiateRegionScanner(Scan scan,
2109       List<KeyValueScanner> additionalScanners) throws IOException {
2110     if (scan.isReversed()) {
2111       if (scan.getFilter() != null) {
2112         scan.getFilter().setReversed(true);
2113       }
2114       return new ReversedRegionScannerImpl(scan, additionalScanners, this);
2115     }
2116     return new RegionScannerImpl(scan, additionalScanners, this);
2117   }
2118 
2119   /*
2120    * @param delete The passed delete is modified by this method. WARNING!
2121    */
2122   void prepareDelete(Delete delete) throws IOException {
2123     // Check to see if this is a deleteRow insert
2124     if(delete.getFamilyCellMap().isEmpty()){
2125       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
2126         // Don't eat the timestamp
2127         delete.deleteFamily(family, delete.getTimeStamp());
2128       }
2129     } else {
2130       for(byte [] family : delete.getFamilyCellMap().keySet()) {
2131         if(family == null) {
2132           throw new NoSuchColumnFamilyException("Empty family is invalid");
2133         }
2134         checkFamily(family);
2135       }
2136     }
2137   }
2138 
2139   //////////////////////////////////////////////////////////////////////////////
2140   // set() methods for client use.
2141   //////////////////////////////////////////////////////////////////////////////
2142   /**
2143    * @param delete delete object
2144    * @throws IOException read exceptions
2145    */
2146   public void delete(Delete delete)
2147   throws IOException {
2148     checkReadOnly();
2149     checkResources();
2150     startRegionOperation(Operation.DELETE);
2151     try {
2152       delete.getRow();
2153       // All edits for the given row (across all column families) must happen atomically.
2154       doBatchMutate(delete);
2155     } finally {
2156       closeRegionOperation(Operation.DELETE);
2157     }
2158   }
2159 
2160   /**
2161    * Row needed by below method.
2162    */
2163   private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2164   /**
2165    * This is used only by unit tests. Not required to be a public API.
2166    * @param familyMap map of family to edits for the given family.
2167    * @throws IOException
2168    */
2169   void delete(NavigableMap<byte[], List<Cell>> familyMap,
2170       Durability durability) throws IOException {
2171     Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2172     delete.setFamilyCellMap(familyMap);
2173     delete.setDurability(durability);
2174     doBatchMutate(delete);
2175   }
2176 
2177   /**
2178    * Setup correct timestamps in the KVs in Delete object.
2179    * Caller should have the row and region locks.
2180    * @param mutation
2181    * @param familyMap
2182    * @param byteNow
2183    * @throws IOException
2184    */
2185   void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2186       byte[] byteNow) throws IOException {
2187     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2188 
2189       byte[] family = e.getKey();
2190       List<Cell> cells = e.getValue();
2191       assert cells instanceof RandomAccess;
2192 
2193       Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2194       int listSize = cells.size();
2195       for (int i=0; i < listSize; i++) {
2196         Cell cell = cells.get(i);
2197         //  Check if time is LATEST, change to time of most recent addition if so
2198         //  This is expensive.
2199         if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) {
2200           byte[] qual = CellUtil.cloneQualifier(cell);
2201           if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2202 
2203           Integer count = kvCount.get(qual);
2204           if (count == null) {
2205             kvCount.put(qual, 1);
2206           } else {
2207             kvCount.put(qual, count + 1);
2208           }
2209           count = kvCount.get(qual);
2210 
2211           Get get = new Get(CellUtil.cloneRow(cell));
2212           get.setMaxVersions(count);
2213           get.addColumn(family, qual);
2214           if (coprocessorHost != null) {
2215             if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2216                 byteNow, get)) {
2217               updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2218             }
2219           } else {
2220             updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2221           }
2222         } else {
2223           CellUtil.updateLatestStamp(cell, byteNow, 0);
2224         }
2225       }
2226     }
2227   }
2228 
2229   void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow)
2230       throws IOException {
2231     List<Cell> result = get(get, false);
2232 
2233     if (result.size() < count) {
2234       // Nothing to delete
2235       CellUtil.updateLatestStamp(cell, byteNow, 0);
2236       return;
2237     }
2238     if (result.size() > count) {
2239       throw new RuntimeException("Unexpected size: " + result.size());
2240     }
2241     Cell getCell = result.get(count - 1);
2242     CellUtil.setTimestamp(cell, getCell.getTimestamp());
2243   }
2244 
2245   /**
2246    * @throws IOException
2247    */
2248   public void put(Put put)
2249   throws IOException {
2250     checkReadOnly();
2251 
2252     // Do a rough check that we have resources to accept a write.  The check is
2253     // 'rough' in that between the resource check and the call to obtain a
2254     // read lock, resources may run out.  For now, the thought is that this
2255     // will be extremely rare; we'll deal with it when it happens.
2256     checkResources();
2257     startRegionOperation(Operation.PUT);
2258     try {
2259       // All edits for the given row (across all column families) must happen atomically.
2260       doBatchMutate(put);
2261     } finally {
2262       closeRegionOperation(Operation.PUT);
2263     }
2264   }
2265 
2266   /**
2267    * Struct-like class that tracks the progress of a batch operation,
2268    * accumulating status codes and tracking the index at which processing
2269    * is proceeding.
2270    */
2271   private abstract static class BatchOperationInProgress<T> {
2272     T[] operations;
2273     int nextIndexToProcess = 0;
2274     OperationStatus[] retCodeDetails;
2275     WALEdit[] walEditsFromCoprocessors;
2276 
2277     public BatchOperationInProgress(T[] operations) {
2278       this.operations = operations;
2279       this.retCodeDetails = new OperationStatus[operations.length];
2280       this.walEditsFromCoprocessors = new WALEdit[operations.length];
2281       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2282     }
2283 
2284     public abstract Mutation getMutation(int index);
2285     public abstract long getNonceGroup(int index);
2286     public abstract long getNonce(int index);
2287     /** This method is potentially expensive and should only be used for non-replay CP path. */
2288     public abstract Mutation[] getMutationsForCoprocs();
2289     public abstract boolean isInReplay();
2290     public abstract long getReplaySequenceId();
2291 
2292     public boolean isDone() {
2293       return nextIndexToProcess == operations.length;
2294     }
2295   }
2296 
2297   private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2298     private long nonceGroup;
2299     private long nonce;
2300     public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2301       super(operations);
2302       this.nonceGroup = nonceGroup;
2303       this.nonce = nonce;
2304     }
2305 
2306     @Override
2307     public Mutation getMutation(int index) {
2308       return this.operations[index];
2309     }
2310 
2311     @Override
2312     public long getNonceGroup(int index) {
2313       return nonceGroup;
2314     }
2315 
2316     @Override
2317     public long getNonce(int index) {
2318       return nonce;
2319     }
2320 
2321     @Override
2322     public Mutation[] getMutationsForCoprocs() {
2323       return this.operations;
2324     }
2325 
2326     @Override
2327     public boolean isInReplay() {
2328       return false;
2329     }
2330 
2331     @Override
2332     public long getReplaySequenceId() {
2333       return 0;
2334     }
2335   }
2336 
2337   private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
2338     private long replaySeqId = 0;
2339     public ReplayBatch(MutationReplay[] operations, long seqId) {
2340       super(operations);
2341       this.replaySeqId = seqId;
2342     }
2343 
2344     @Override
2345     public Mutation getMutation(int index) {
2346       return this.operations[index].mutation;
2347     }
2348 
2349     @Override
2350     public long getNonceGroup(int index) {
2351       return this.operations[index].nonceGroup;
2352     }
2353 
2354     @Override
2355     public long getNonce(int index) {
2356       return this.operations[index].nonce;
2357     }
2358 
2359     @Override
2360     public Mutation[] getMutationsForCoprocs() {
2361       assert false;
2362       throw new RuntimeException("Should not be called for replay batch");
2363     }
2364 
2365     @Override
2366     public boolean isInReplay() {
2367       return true;
2368     }
2369 
2370     @Override
2371     public long getReplaySequenceId() {
2372       return this.replaySeqId;
2373     }
2374   }
2375 
2376   /**
2377    * Perform a batch of mutations.
2378    * It supports only Put and Delete mutations and will ignore other types passed.
2379    * @param mutations the list of mutations
2380    * @return an array of OperationStatus which internally contains the
2381    *         OperationStatusCode and the exceptionMessage if any.
2382    * @throws IOException
2383    */
2384   public OperationStatus[] batchMutate(
2385       Mutation[] mutations, long nonceGroup, long nonce) throws IOException {
2386     // As it stands, this is used for 3 things
2387     //  * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
2388     //  * coprocessor calls (see ex. BulkDeleteEndpoint).
2389     // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
2390     return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2391   }
2392 
2393   public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2394     return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2395   }
2396 
2397   /**
2398    * Replay a batch of mutations.
2399    * @param mutations mutations to replay.
2400    * @param replaySeqId SeqId for current mutations
2401    * @return an array of OperationStatus which internally contains the
2402    *         OperationStatusCode and the exceptionMessage if any.
2403    * @throws IOException
2404    */
2405   public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations, long replaySeqId)
2406       throws IOException {
2407     return batchMutate(new ReplayBatch(mutations, replaySeqId));
2408   }
2409 
2410   /**
2411    * Perform a batch of mutations.
2412    * It supports only Put and Delete mutations and will ignore other types passed.
2413    * @param mutations the list of mutations
2414    * @return an array of OperationStatus which internally contains the
2415    *         OperationStatusCode and the exceptionMessage if any.
2416    * @throws IOException
2417    */
2418   OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2419     boolean initialized = false;
2420     while (!batchOp.isDone()) {
2421       if (!batchOp.isInReplay()) {
2422         checkReadOnly();
2423       }
2424       checkResources();
2425 
2426       long newSize;
2427       Operation op = Operation.BATCH_MUTATE;
2428       if (batchOp.isInReplay()) op = Operation.REPLAY_BATCH_MUTATE;
2429       startRegionOperation(op);
2430 
2431       try {
2432         if (!initialized) {
2433           this.writeRequestsCount.add(batchOp.operations.length);
2434           if (!batchOp.isInReplay()) {
2435             doPreMutationHook(batchOp);
2436           }
2437           initialized = true;
2438         }
2439         long addedSize = doMiniBatchMutation(batchOp);
2440         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2441       } finally {
2442         closeRegionOperation(op);
2443       }
2444       if (isFlushSize(newSize)) {
2445         requestFlush();
2446       }
2447     }
2448     return batchOp.retCodeDetails;
2449   }
2450 
2451 
2452   private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2453       throws IOException {
2454     /* Run coprocessor pre hook outside of locks to avoid deadlock */
2455     WALEdit walEdit = new WALEdit();
2456     if (coprocessorHost != null) {
2457       for (int i = 0 ; i < batchOp.operations.length; i++) {
2458         Mutation m = batchOp.getMutation(i);
2459         if (m instanceof Put) {
2460           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2461             // pre hook says skip this Put
2462             // mark as success and skip in doMiniBatchMutation
2463             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2464           }
2465         } else if (m instanceof Delete) {
2466           Delete curDel = (Delete) m;
2467           if (curDel.getFamilyCellMap().isEmpty()) {
2468             // handle deleting a row case
2469             prepareDelete(curDel);
2470           }
2471           if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2472             // pre hook says skip this Delete
2473             // mark as success and skip in doMiniBatchMutation
2474             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2475           }
2476         } else {
2477           // In case of passing Append mutations along with the Puts and Deletes in batchMutate
2478           // mark the operation return code as failure so that it will not be considered in
2479           // the doMiniBatchMutation
2480           batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2481               "Put/Delete mutations only supported in batchMutate() now");
2482         }
2483         if (!walEdit.isEmpty()) {
2484           batchOp.walEditsFromCoprocessors[i] = walEdit;
2485           walEdit = new WALEdit();
2486         }
2487       }
2488     }
2489   }
2490 
2491   @SuppressWarnings("unchecked")
2492   private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2493     boolean isInReplay = batchOp.isInReplay();
2494     // variable to note if all Put items are for the same CF -- metrics related
2495     boolean putsCfSetConsistent = true;
2496     //The set of columnFamilies first seen for Put.
2497     Set<byte[]> putsCfSet = null;
2498     // variable to note if all Delete items are for the same CF -- metrics related
2499     boolean deletesCfSetConsistent = true;
2500     //The set of columnFamilies first seen for Delete.
2501     Set<byte[]> deletesCfSet = null;
2502 
2503     long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2504     WALEdit walEdit = new WALEdit(isInReplay);
2505     MultiVersionConsistencyControl.WriteEntry w = null;
2506     long txid = 0;
2507     boolean doRollBackMemstore = false;
2508     boolean locked = false;
2509 
2510     /** Keep track of the locks we hold so we can release them in finally clause */
2511     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2512     // reference family maps directly so coprocessors can mutate them if desired
2513     Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2514     List<Cell> memstoreCells = new ArrayList<Cell>();
2515     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
2516     int firstIndex = batchOp.nextIndexToProcess;
2517     int lastIndexExclusive = firstIndex;
2518     boolean success = false;
2519     int noOfPuts = 0, noOfDeletes = 0;
2520     HLogKey walKey = null;
2521     long mvccNum = 0;
2522     try {
2523       // ------------------------------------
2524       // STEP 1. Try to acquire as many locks as we can, and ensure
2525       // we acquire at least one.
2526       // ----------------------------------
2527       int numReadyToWrite = 0;
2528       long now = EnvironmentEdgeManager.currentTime();
2529       while (lastIndexExclusive < batchOp.operations.length) {
2530         Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2531         boolean isPutMutation = mutation instanceof Put;
2532 
2533         Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2534         // store the family map reference to allow for mutations
2535         familyMaps[lastIndexExclusive] = familyMap;
2536 
2537         // skip anything that "ran" already
2538         if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2539             != OperationStatusCode.NOT_RUN) {
2540           lastIndexExclusive++;
2541           continue;
2542         }
2543 
2544         try {
2545           if (isPutMutation) {
2546             // Check the families in the put. If bad, skip this one.
2547             if (isInReplay) {
2548               removeNonExistentColumnFamilyForReplay(familyMap);
2549             } else {
2550               checkFamilies(familyMap.keySet());
2551             }
2552             checkTimestamps(mutation.getFamilyCellMap(), now);
2553           } else {
2554             prepareDelete((Delete) mutation);
2555           }
2556         } catch (NoSuchColumnFamilyException nscf) {
2557           LOG.warn("No such column family in batch mutation", nscf);
2558           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2559               OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2560           lastIndexExclusive++;
2561           continue;
2562         } catch (FailedSanityCheckException fsce) {
2563           LOG.warn("Batch Mutation did not pass sanity check", fsce);
2564           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2565               OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2566           lastIndexExclusive++;
2567           continue;
2568         }
2569 
2570         // If we haven't got any rows in our batch, we should block to
2571         // get the next one.
2572         boolean shouldBlock = numReadyToWrite == 0;
2573         RowLock rowLock = null;
2574         try {
2575           rowLock = getRowLock(mutation.getRow(), shouldBlock);
2576         } catch (IOException ioe) {
2577           LOG.warn("Failed getting lock in batch put, row="
2578             + Bytes.toStringBinary(mutation.getRow()), ioe);
2579         }
2580         if (rowLock == null) {
2581           // We failed to grab another lock
2582           assert !shouldBlock : "Should never fail to get lock when blocking";
2583           break; // stop acquiring more rows for this batch
2584         } else {
2585           acquiredRowLocks.add(rowLock);
2586         }
2587 
2588         lastIndexExclusive++;
2589         numReadyToWrite++;
2590 
2591         if (isPutMutation) {
2592           // If Column Families stay consistent through out all of the
2593           // individual puts then metrics can be reported as a mutliput across
2594           // column families in the first put.
2595           if (putsCfSet == null) {
2596             putsCfSet = mutation.getFamilyCellMap().keySet();
2597           } else {
2598             putsCfSetConsistent = putsCfSetConsistent
2599                 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
2600           }
2601         } else {
2602           if (deletesCfSet == null) {
2603             deletesCfSet = mutation.getFamilyCellMap().keySet();
2604           } else {
2605             deletesCfSetConsistent = deletesCfSetConsistent
2606                 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
2607           }
2608         }
2609       }
2610 
2611       // we should record the timestamp only after we have acquired the rowLock,
2612       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
2613       now = EnvironmentEdgeManager.currentTime();
2614       byte[] byteNow = Bytes.toBytes(now);
2615 
2616       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
2617       if (numReadyToWrite <= 0) return 0L;
2618 
2619       // We've now grabbed as many mutations off the list as we can
2620 
2621       // ------------------------------------
2622       // STEP 2. Update any LATEST_TIMESTAMP timestamps
2623       // ----------------------------------
2624       for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
2625         // skip invalid
2626         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2627             != OperationStatusCode.NOT_RUN) continue;
2628 
2629         Mutation mutation = batchOp.getMutation(i);
2630         if (mutation instanceof Put) {
2631           updateCellTimestamps(familyMaps[i].values(), byteNow);
2632           noOfPuts++;
2633         } else {
2634           prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
2635           noOfDeletes++;
2636         }
2637       }
2638 
2639       lock(this.updatesLock.readLock(), numReadyToWrite);
2640       locked = true;
2641       if(isInReplay) {
2642         mvccNum = batchOp.getReplaySequenceId();
2643       } else {
2644         mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
2645       }
2646       //
2647       // ------------------------------------
2648       // Acquire the latest mvcc number
2649       // ----------------------------------
2650       w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
2651 
2652       // calling the pre CP hook for batch mutation
2653       if (!isInReplay && coprocessorHost != null) {
2654         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2655           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2656           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2657         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2658       }
2659 
2660       // ------------------------------------
2661       // STEP 3. Write back to memstore
2662       // Write to memstore. It is ok to write to memstore
2663       // first without updating the HLog because we do not roll
2664       // forward the memstore MVCC. The MVCC will be moved up when
2665       // the complete operation is done. These changes are not yet
2666       // visible to scanners till we update the MVCC. The MVCC is
2667       // moved only when the sync is complete.
2668       // ----------------------------------
2669       long addedSize = 0;
2670       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2671         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2672             != OperationStatusCode.NOT_RUN) {
2673           continue;
2674         }
2675         doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
2676         addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells);
2677       }
2678 
2679       // ------------------------------------
2680       // STEP 4. Build WAL edit
2681       // ----------------------------------
2682       Durability durability = Durability.USE_DEFAULT;
2683       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2684         // Skip puts that were determined to be invalid during preprocessing
2685         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2686             != OperationStatusCode.NOT_RUN) {
2687           continue;
2688         }
2689         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2690 
2691         Mutation m = batchOp.getMutation(i);
2692         Durability tmpDur = getEffectiveDurability(m.getDurability());
2693         if (tmpDur.ordinal() > durability.ordinal()) {
2694           durability = tmpDur;
2695         }
2696         if (tmpDur == Durability.SKIP_WAL) {
2697           recordMutationWithoutWal(m.getFamilyCellMap());
2698           continue;
2699         }
2700 
2701         long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
2702         // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
2703         // Given how nonces are originally written, these should be contiguous.
2704         // They don't have to be, it will still work, just write more WALEdits than needed.
2705         if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
2706           if (walEdit.size() > 0) {
2707             assert isInReplay;
2708             if (!isInReplay) {
2709               throw new IOException("Multiple nonces per batch and not in replay");
2710             }
2711             // txid should always increase, so having the one from the last call is ok.
2712             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2713               this.htableDescriptor.getTableName(), now, m.getClusterIds(),
2714               currentNonceGroup, currentNonce);
2715             txid = this.log.appendNoSync(this.htableDescriptor,  this.getRegionInfo(),  walKey,
2716               walEdit, getSequenceId(), true, null);
2717             walEdit = new WALEdit(isInReplay);
2718             walKey = null;
2719           }
2720           currentNonceGroup = nonceGroup;
2721           currentNonce = nonce;
2722         }
2723 
2724         // Add WAL edits by CP
2725         WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2726         if (fromCP != null) {
2727           for (Cell cell : fromCP.getCells()) {
2728             walEdit.add(cell);
2729           }
2730         }
2731         addFamilyMapToWALEdit(familyMaps[i], walEdit);
2732       }
2733 
2734       // -------------------------
2735       // STEP 5. Append the final edit to WAL. Do not sync wal.
2736       // -------------------------
2737       Mutation mutation = batchOp.getMutation(firstIndex);
2738       if (walEdit.size() > 0) {
2739         walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
2740             this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
2741             mutation.getClusterIds(), currentNonceGroup, currentNonce);
2742         if(isInReplay) {
2743           walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
2744         }
2745         txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
2746           getSequenceId(), true, memstoreCells);
2747       }
2748       if(walKey == null){
2749         // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
2750         walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
2751       }
2752 
2753       // -------------------------------
2754       // STEP 6. Release row locks, etc.
2755       // -------------------------------
2756       if (locked) {
2757         this.updatesLock.readLock().unlock();
2758         locked = false;
2759       }
2760       releaseRowLocks(acquiredRowLocks);
2761 
2762       // -------------------------
2763       // STEP 7. Sync wal.
2764       // -------------------------
2765       if (txid != 0) {
2766         syncOrDefer(txid, durability);
2767       }
2768 
2769       doRollBackMemstore = false;
2770       // calling the post CP hook for batch mutation
2771       if (!isInReplay && coprocessorHost != null) {
2772         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2773           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2774           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2775         coprocessorHost.postBatchMutate(miniBatchOp);
2776       }
2777 
2778       // ------------------------------------------------------------------
2779       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
2780       // ------------------------------------------------------------------
2781       if (w != null) {
2782         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2783         w = null;
2784       }
2785 
2786       // ------------------------------------
2787       // STEP 9. Run coprocessor post hooks. This should be done after the wal is
2788       // synced so that the coprocessor contract is adhered to.
2789       // ------------------------------------
2790       if (!isInReplay && coprocessorHost != null) {
2791         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2792           // only for successful puts
2793           if (batchOp.retCodeDetails[i].getOperationStatusCode()
2794               != OperationStatusCode.SUCCESS) {
2795             continue;
2796           }
2797           Mutation m = batchOp.getMutation(i);
2798           if (m instanceof Put) {
2799             coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
2800           } else {
2801             coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
2802           }
2803         }
2804       }
2805 
2806       success = true;
2807       return addedSize;
2808     } finally {
2809 
2810       // if the wal sync was unsuccessful, remove keys from memstore
2811       if (doRollBackMemstore) {
2812         rollbackMemstore(memstoreCells);
2813       }
2814       if (w != null) {
2815         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
2816       }
2817 
2818       if (locked) {
2819         this.updatesLock.readLock().unlock();
2820       }
2821       releaseRowLocks(acquiredRowLocks);
2822 
2823       // See if the column families were consistent through the whole thing.
2824       // if they were then keep them. If they were not then pass a null.
2825       // null will be treated as unknown.
2826       // Total time taken might be involving Puts and Deletes.
2827       // Split the time for puts and deletes based on the total number of Puts and Deletes.
2828 
2829       if (noOfPuts > 0) {
2830         // There were some Puts in the batch.
2831         if (this.metricsRegion != null) {
2832           this.metricsRegion.updatePut();
2833         }
2834       }
2835       if (noOfDeletes > 0) {
2836         // There were some Deletes in the batch.
2837         if (this.metricsRegion != null) {
2838           this.metricsRegion.updateDelete();
2839         }
2840       }
2841       if (!success) {
2842         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2843           if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
2844             batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
2845           }
2846         }
2847       }
2848       if (coprocessorHost != null && !batchOp.isInReplay()) {
2849         // call the coprocessor hook to do any finalization steps
2850         // after the put is done
2851         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2852             new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2853                 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
2854                 lastIndexExclusive);
2855         coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
2856       }
2857 
2858       batchOp.nextIndexToProcess = lastIndexExclusive;
2859     }
2860   }
2861 
2862   /**
2863    * Returns effective durability from the passed durability and
2864    * the table descriptor.
2865    */
2866   protected Durability getEffectiveDurability(Durability d) {
2867     return d == Durability.USE_DEFAULT ? this.durability : d;
2868   }
2869 
2870   //TODO, Think that gets/puts and deletes should be refactored a bit so that
2871   //the getting of the lock happens before, so that you would just pass it into
2872   //the methods. So in the case of checkAndMutate you could just do lockRow,
2873   //get, put, unlockRow or something
2874   /**
2875    *
2876    * @throws IOException
2877    * @return true if the new put was executed, false otherwise
2878    */
2879   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
2880       CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
2881       boolean writeToWAL)
2882   throws IOException{
2883     checkReadOnly();
2884     //TODO, add check for value length or maybe even better move this to the
2885     //client if this becomes a global setting
2886     checkResources();
2887     boolean isPut = w instanceof Put;
2888     if (!isPut && !(w instanceof Delete))
2889       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
2890           "be Put or Delete");
2891     if (!Bytes.equals(row, w.getRow())) {
2892       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
2893           "getRow must match the passed row");
2894     }
2895 
2896     startRegionOperation();
2897     try {
2898       Get get = new Get(row);
2899       checkFamily(family);
2900       get.addColumn(family, qualifier);
2901 
2902       // Lock row - note that doBatchMutate will relock this row if called
2903       RowLock rowLock = getRowLock(get.getRow());
2904       // wait for all previous transactions to complete (with lock held)
2905       mvcc.waitForPreviousTransactionsComplete();
2906       try {
2907         if (this.getCoprocessorHost() != null) {
2908           Boolean processed = null;
2909           if (w instanceof Put) {
2910             processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
2911                 qualifier, compareOp, comparator, (Put) w);
2912           } else if (w instanceof Delete) {
2913             processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
2914                 qualifier, compareOp, comparator, (Delete) w);
2915           }
2916           if (processed != null) {
2917             return processed;
2918           }
2919         }
2920         List<Cell> result = get(get, false);
2921 
2922         boolean valueIsNull = comparator.getValue() == null ||
2923           comparator.getValue().length == 0;
2924         boolean matches = false;
2925         if (result.size() == 0 && valueIsNull) {
2926           matches = true;
2927         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
2928             valueIsNull) {
2929           matches = true;
2930         } else if (result.size() == 1 && !valueIsNull) {
2931           Cell kv = result.get(0);
2932           int compareResult = comparator.compareTo(kv.getValueArray(),
2933               kv.getValueOffset(), kv.getValueLength());
2934           switch (compareOp) {
2935           case LESS:
2936             matches = compareResult < 0;
2937             break;
2938           case LESS_OR_EQUAL:
2939             matches = compareResult <= 0;
2940             break;
2941           case EQUAL:
2942             matches = compareResult == 0;
2943             break;
2944           case NOT_EQUAL:
2945             matches = compareResult != 0;
2946             break;
2947           case GREATER_OR_EQUAL:
2948             matches = compareResult >= 0;
2949             break;
2950           case GREATER:
2951             matches = compareResult > 0;
2952             break;
2953           default:
2954             throw new RuntimeException("Unknown Compare op " + compareOp.name());
2955           }
2956         }
2957         //If matches put the new put or delete the new delete
2958         if (matches) {
2959           // All edits for the given row (across all column families) must
2960           // happen atomically.
2961           doBatchMutate(w);
2962           this.checkAndMutateChecksPassed.increment();
2963           return true;
2964         }
2965         this.checkAndMutateChecksFailed.increment();
2966         return false;
2967       } finally {
2968         rowLock.release();
2969       }
2970     } finally {
2971       closeRegionOperation();
2972     }
2973   }
2974 
2975   //TODO, Think that gets/puts and deletes should be refactored a bit so that
2976   //the getting of the lock happens before, so that you would just pass it into
2977   //the methods. So in the case of checkAndMutate you could just do lockRow,
2978   //get, put, unlockRow or something
2979   /**
2980    *
2981    * @throws IOException
2982    * @return true if the new put was executed, false otherwise
2983    */
2984   public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
2985       CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
2986       boolean writeToWAL)
2987       throws IOException{
2988     checkReadOnly();
2989     //TODO, add check for value length or maybe even better move this to the
2990     //client if this becomes a global setting
2991     checkResources();
2992 
2993     startRegionOperation();
2994     try {
2995       Get get = new Get(row);
2996       checkFamily(family);
2997       get.addColumn(family, qualifier);
2998 
2999       // Lock row - note that doBatchMutate will relock this row if called
3000       RowLock rowLock = getRowLock(get.getRow());
3001       // wait for all previous transactions to complete (with lock held)
3002       mvcc.waitForPreviousTransactionsComplete();
3003       try {
3004         List<Cell> result = get(get, false);
3005 
3006         boolean valueIsNull = comparator.getValue() == null ||
3007             comparator.getValue().length == 0;
3008         boolean matches = false;
3009         if (result.size() == 0 && valueIsNull) {
3010           matches = true;
3011         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3012             valueIsNull) {
3013           matches = true;
3014         } else if (result.size() == 1 && !valueIsNull) {
3015           Cell kv = result.get(0);
3016           int compareResult = comparator.compareTo(kv.getValueArray(),
3017               kv.getValueOffset(), kv.getValueLength());
3018           switch (compareOp) {
3019           case LESS:
3020             matches = compareResult < 0;
3021             break;
3022           case LESS_OR_EQUAL:
3023             matches = compareResult <= 0;
3024             break;
3025           case EQUAL:
3026             matches = compareResult == 0;
3027             break;
3028           case NOT_EQUAL:
3029             matches = compareResult != 0;
3030             break;
3031           case GREATER_OR_EQUAL:
3032             matches = compareResult >= 0;
3033             break;
3034           case GREATER:
3035             matches = compareResult > 0;
3036             break;
3037           default:
3038             throw new RuntimeException("Unknown Compare op " + compareOp.name());
3039           }
3040         }
3041         //If matches put the new put or delete the new delete
3042         if (matches) {
3043           // All edits for the given row (across all column families) must
3044           // happen atomically.
3045           mutateRow(rm);
3046           this.checkAndMutateChecksPassed.increment();
3047           return true;
3048         }
3049         this.checkAndMutateChecksFailed.increment();
3050         return false;
3051       } finally {
3052         rowLock.release();
3053       }
3054     } finally {
3055       closeRegionOperation();
3056     }
3057   }
3058   private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
3059     // Currently this is only called for puts and deletes, so no nonces.
3060     OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
3061         HConstants.NO_NONCE, HConstants.NO_NONCE);
3062     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
3063       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
3064     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
3065       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
3066     }
3067   }
3068 
3069   /**
3070    * Complete taking the snapshot on the region. Writes the region info and adds references to the
3071    * working snapshot directory.
3072    *
3073    * TODO for api consistency, consider adding another version with no {@link ForeignExceptionSnare}
3074    * arg.  (In the future other cancellable HRegion methods could eventually add a
3075    * {@link ForeignExceptionSnare}, or we could do something fancier).
3076    *
3077    * @param desc snapshot description object
3078    * @param exnSnare ForeignExceptionSnare that captures external exceptions in case we need to
3079    *   bail out.  This is allowed to be null and will just be ignored in that case.
3080    * @throws IOException if there is an external or internal error causing the snapshot to fail
3081    */
3082   public void addRegionToSnapshot(SnapshotDescription desc,
3083       ForeignExceptionSnare exnSnare) throws IOException {
3084     Path rootDir = FSUtils.getRootDir(conf);
3085     Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
3086 
3087     SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
3088                                                         snapshotDir, desc, exnSnare);
3089     manifest.addRegion(this);
3090   }
3091 
3092   /**
3093    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the
3094    * provided current timestamp.
3095    * @throws IOException
3096    */
3097   void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
3098       throws IOException {
3099     for (List<Cell> cells: cellItr) {
3100       if (cells == null) continue;
3101       assert cells instanceof RandomAccess;
3102       int listSize = cells.size();
3103       for (int i = 0; i < listSize; i++) {
3104         CellUtil.updateLatestStamp(cells.get(i), now, 0);
3105       }
3106     }
3107   }
3108 
3109   /*
3110    * Check if resources to support an update.
3111    *
3112    * We throw RegionTooBusyException if above memstore limit
3113    * and expect client to retry using some kind of backoff
3114   */
3115   private void checkResources()
3116     throws RegionTooBusyException {
3117     // If catalog region, do not impose resource constraints or block updates.
3118     if (this.getRegionInfo().isMetaRegion()) return;
3119 
3120     if (this.memstoreSize.get() > this.blockingMemStoreSize) {
3121       requestFlush();
3122       throw new RegionTooBusyException("Above memstore limit, " +
3123           "regionName=" + (this.getRegionInfo() == null ? "unknown" :
3124           this.getRegionInfo().getRegionNameAsString()) +
3125           ", server=" + (this.getRegionServerServices() == null ? "unknown" :
3126           this.getRegionServerServices().getServerName()) +
3127           ", memstoreSize=" + memstoreSize.get() +
3128           ", blockingMemStoreSize=" + blockingMemStoreSize);
3129     }
3130   }
3131 
3132   /**
3133    * @throws IOException Throws exception if region is in read-only mode.
3134    */
3135   protected void checkReadOnly() throws IOException {
3136     if (this.writestate.isReadOnly()) {
3137       throw new IOException("region is read only");
3138     }
3139   }
3140 
3141   protected void checkReadsEnabled() throws IOException {
3142     if (!this.writestate.readsEnabled) {
3143       throw new IOException ("The region's reads are disabled. Cannot serve the request");
3144     }
3145   }
3146 
3147   public void setReadsEnabled(boolean readsEnabled) {
3148     this.writestate.setReadsEnabled(readsEnabled);
3149   }
3150 
3151   /**
3152    * Add updates first to the hlog and then add values to memstore.
3153    * Warning: Assumption is caller has lock on passed in row.
3154    * @param edits Cell updates by column
3155    * @throws IOException
3156    */
3157   private void put(final byte [] row, byte [] family, List<Cell> edits)
3158   throws IOException {
3159     NavigableMap<byte[], List<Cell>> familyMap;
3160     familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
3161 
3162     familyMap.put(family, edits);
3163     Put p = new Put(row);
3164     p.setFamilyCellMap(familyMap);
3165     doBatchMutate(p);
3166   }
3167 
3168   /**
3169    * Atomically apply the given map of family->edits to the memstore.
3170    * This handles the consistency control on its own, but the caller
3171    * should already have locked updatesLock.readLock(). This also does
3172    * <b>not</b> check the families for validity.
3173    *
3174    * @param familyMap Map of kvs per family
3175    * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
3176    *        If null, then this method internally creates a mvcc transaction.
3177    * @param output newly added KVs into memstore
3178    * @return the additional memory usage of the memstore caused by the
3179    * new entries.
3180    * @throws IOException
3181    */
3182   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
3183     long mvccNum, List<Cell> memstoreCells) throws IOException {
3184     long size = 0;
3185 
3186     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3187       byte[] family = e.getKey();
3188       List<Cell> cells = e.getValue();
3189       assert cells instanceof RandomAccess;
3190       Store store = getStore(family);
3191       int listSize = cells.size();
3192       for (int i=0; i < listSize; i++) {
3193         Cell cell = cells.get(i);  
3194         CellUtil.setSequenceId(cell, mvccNum);
3195         Pair<Long, Cell> ret = store.add(cell);
3196         size += ret.getFirst();
3197         memstoreCells.add(ret.getSecond());
3198       }
3199     }
3200 
3201      return size;
3202    }
3203 
3204   /**
3205    * Remove all the keys listed in the map from the memstore. This method is
3206    * called when a Put/Delete has updated memstore but subsequently fails to update
3207    * the wal. This method is then invoked to rollback the memstore.
3208    */
3209   private void rollbackMemstore(List<Cell> memstoreCells) {
3210     int kvsRolledback = 0;
3211 
3212     for (Cell cell : memstoreCells) {
3213       byte[] family = CellUtil.cloneFamily(cell);
3214       Store store = getStore(family);
3215       store.rollback(cell);
3216       kvsRolledback++;
3217     }
3218     LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
3219   }
3220 
3221   /**
3222    * Check the collection of families for validity.
3223    * @throws NoSuchColumnFamilyException if a family does not exist.
3224    */
3225   void checkFamilies(Collection<byte[]> families)
3226   throws NoSuchColumnFamilyException {
3227     for (byte[] family : families) {
3228       checkFamily(family);
3229     }
3230   }
3231 
3232   /**
3233    * During replay, there could exist column families which are removed between region server
3234    * failure and replay
3235    */
3236   private void removeNonExistentColumnFamilyForReplay(
3237       final Map<byte[], List<Cell>> familyMap) {
3238     List<byte[]> nonExistentList = null;
3239     for (byte[] family : familyMap.keySet()) {
3240       if (!this.htableDescriptor.hasFamily(family)) {
3241         if (nonExistentList == null) {
3242           nonExistentList = new ArrayList<byte[]>();
3243         }
3244         nonExistentList.add(family);
3245       }
3246     }
3247     if (nonExistentList != null) {
3248       for (byte[] family : nonExistentList) {
3249         // Perhaps schema was changed between crash and replay
3250         LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3251         familyMap.remove(family);
3252       }
3253     }
3254   }
3255 
3256   void checkTimestamps(final Map<byte[], List<Cell>> familyMap,
3257       long now) throws FailedSanityCheckException {
3258     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3259       return;
3260     }
3261     long maxTs = now + timestampSlop;
3262     for (List<Cell> kvs : familyMap.values()) {
3263       assert kvs instanceof RandomAccess;
3264       int listSize  = kvs.size();
3265       for (int i=0; i < listSize; i++) {
3266         Cell cell = kvs.get(i);
3267         // see if the user-side TS is out of range. latest = server-side
3268         long ts = cell.getTimestamp();
3269         if (ts != HConstants.LATEST_TIMESTAMP && ts > maxTs) {
3270           throw new FailedSanityCheckException("Timestamp for KV out of range "
3271               + cell + " (too.new=" + timestampSlop + ")");
3272         }
3273       }
3274     }
3275   }
3276 
3277   /**
3278    * Append the given map of family->edits to a WALEdit data structure.
3279    * This does not write to the HLog itself.
3280    * @param familyMap map of family->edits
3281    * @param walEdit the destination entry to append into
3282    */
3283   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3284       WALEdit walEdit) {
3285     for (List<Cell> edits : familyMap.values()) {
3286       assert edits instanceof RandomAccess;
3287       int listSize = edits.size();
3288       for (int i=0; i < listSize; i++) {
3289         Cell cell = edits.get(i);
3290         walEdit.add(cell);
3291       }
3292     }
3293   }
3294 
3295   private void requestFlush() {
3296     if (this.rsServices == null) {
3297       return;
3298     }
3299     synchronized (writestate) {
3300       if (this.writestate.isFlushRequested()) {
3301         return;
3302       }
3303       writestate.flushRequested = true;
3304     }
3305     // Make request outside of synchronize block; HBASE-818.
3306     this.rsServices.getFlushRequester().requestFlush(this);
3307     if (LOG.isDebugEnabled()) {
3308       LOG.debug("Flush requested on " + this);
3309     }
3310   }
3311 
3312   /*
3313    * @param size
3314    * @return True if size is over the flush threshold
3315    */
3316   private boolean isFlushSize(final long size) {
3317     return size > this.memstoreFlushSize;
3318   }
3319 
3320   /**
3321    * Read the edits log put under this region by wal log splitting process.  Put
3322    * the recovered edits back up into this region.
3323    *
3324    * <p>We can ignore any log message that has a sequence ID that's equal to or
3325    * lower than minSeqId.  (Because we know such log messages are already
3326    * reflected in the HFiles.)
3327    *
3328    * <p>While this is running we are putting pressure on memory yet we are
3329    * outside of our usual accounting because we are not yet an onlined region
3330    * (this stuff is being run as part of Region initialization).  This means
3331    * that if we're up against global memory limits, we'll not be flagged to flush
3332    * because we are not online. We can't be flushed by usual mechanisms anyways;
3333    * we're not yet online so our relative sequenceids are not yet aligned with
3334    * HLog sequenceids -- not till we come up online, post processing of split
3335    * edits.
3336    *
3337    * <p>But to help relieve memory pressure, at least manage our own heap size
3338    * flushing if are in excess of per-region limits.  Flushing, though, we have
3339    * to be careful and avoid using the regionserver/hlog sequenceid.  Its running
3340    * on a different line to whats going on in here in this region context so if we
3341    * crashed replaying these edits, but in the midst had a flush that used the
3342    * regionserver log with a sequenceid in excess of whats going on in here
3343    * in this region and with its split editlogs, then we could miss edits the
3344    * next time we go to recover. So, we have to flush inline, using seqids that
3345    * make sense in a this single region context only -- until we online.
3346    *
3347    * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
3348    * the maxSeqId for the store to be applied, else its skipped.
3349    * @return the sequence id of the last edit added to this region out of the
3350    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3351    * @throws UnsupportedEncodingException
3352    * @throws IOException
3353    */
3354   protected long replayRecoveredEditsIfAny(final Path regiondir,
3355       Map<byte[], Long> maxSeqIdInStores,
3356       final CancelableProgressable reporter, final MonitoredTask status)
3357       throws UnsupportedEncodingException, IOException {
3358     long minSeqIdForTheRegion = -1;
3359     for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3360       if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3361         minSeqIdForTheRegion = maxSeqIdInStore;
3362       }
3363     }
3364     long seqid = minSeqIdForTheRegion;
3365 
3366     FileSystem fs = this.fs.getFileSystem();
3367     NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
3368     if (LOG.isDebugEnabled()) {
3369       LOG.debug("Found " + (files == null ? 0 : files.size())
3370         + " recovered edits file(s) under " + regiondir);
3371     }
3372 
3373     if (files == null || files.isEmpty()) return seqid;
3374 
3375     for (Path edits: files) {
3376       if (edits == null || !fs.exists(edits)) {
3377         LOG.warn("Null or non-existent edits file: " + edits);
3378         continue;
3379       }
3380       if (isZeroLengthThenDelete(fs, edits)) continue;
3381 
3382       long maxSeqId;
3383       String fileName = edits.getName();
3384       maxSeqId = Math.abs(Long.parseLong(fileName));
3385       if (maxSeqId <= minSeqIdForTheRegion) {
3386         if (LOG.isDebugEnabled()) {
3387           String msg = "Maximum sequenceid for this log is " + maxSeqId
3388             + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3389             + ", skipped the whole file, path=" + edits;
3390           LOG.debug(msg);
3391         }
3392         continue;
3393       }
3394 
3395       try {
3396         // replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater
3397         seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3398       } catch (IOException e) {
3399         boolean skipErrors = conf.getBoolean(
3400             HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3401             conf.getBoolean(
3402                 "hbase.skip.errors",
3403                 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3404         if (conf.get("hbase.skip.errors") != null) {
3405           LOG.warn(
3406               "The property 'hbase.skip.errors' has been deprecated. Please use " +
3407               HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3408         }
3409         if (skipErrors) {
3410           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3411           LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3412               + "=true so continuing. Renamed " + edits +
3413               " as " + p, e);
3414         } else {
3415           throw e;
3416         }
3417       }
3418     }
3419     // The edits size added into rsAccounting during this replaying will not
3420     // be required any more. So just clear it.
3421     if (this.rsAccounting != null) {
3422       this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
3423     }
3424     if (seqid > minSeqIdForTheRegion) {
3425       // Then we added some edits to memory. Flush and cleanup split edit files.
3426       internalFlushcache(null, seqid, status);
3427     }
3428     // Now delete the content of recovered edits.  We're done w/ them.
3429     for (Path file: files) {
3430       if (!fs.delete(file, false)) {
3431         LOG.error("Failed delete of " + file);
3432       } else {
3433         LOG.debug("Deleted recovered.edits file=" + file);
3434       }
3435     }
3436     return seqid;
3437   }
3438 
3439   /*
3440    * @param edits File of recovered edits.
3441    * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in log
3442    * must be larger than this to be replayed for each store.
3443    * @param reporter
3444    * @return the sequence id of the last edit added to this region out of the
3445    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3446    * @throws IOException
3447    */
3448   private long replayRecoveredEdits(final Path edits,
3449       Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3450     throws IOException {
3451     String msg = "Replaying edits from " + edits;
3452     LOG.info(msg);
3453     MonitoredTask status = TaskMonitor.get().createStatus(msg);
3454     FileSystem fs = this.fs.getFileSystem();
3455 
3456     status.setStatus("Opening logs");
3457     HLog.Reader reader = null;
3458     try {
3459       reader = HLogFactory.createReader(fs, edits, conf);
3460       long currentEditSeqId = -1;
3461       long currentReplaySeqId = -1;
3462       long firstSeqIdInLog = -1;
3463       long skippedEdits = 0;
3464       long editsCount = 0;
3465       long intervalEdits = 0;
3466       HLog.Entry entry;
3467       Store store = null;
3468       boolean reported_once = false;
3469       ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3470 
3471       try {
3472         // How many edits seen before we check elapsed time
3473         int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
3474             2000);
3475         // How often to send a progress report (default 1/2 master timeout)
3476         int period = this.conf.getInt("hbase.hstore.report.period", 300000);
3477         long lastReport = EnvironmentEdgeManager.currentTime();
3478 
3479         while ((entry = reader.next()) != null) {
3480           HLogKey key = entry.getKey();
3481           WALEdit val = entry.getEdit();
3482 
3483           if (ng != null) { // some test, or nonces disabled
3484             ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
3485           }
3486 
3487           if (reporter != null) {
3488             intervalEdits += val.size();
3489             if (intervalEdits >= interval) {
3490               // Number of edits interval reached
3491               intervalEdits = 0;
3492               long cur = EnvironmentEdgeManager.currentTime();
3493               if (lastReport + period <= cur) {
3494                 status.setStatus("Replaying edits..." +
3495                     " skipped=" + skippedEdits +
3496                     " edits=" + editsCount);
3497                 // Timeout reached
3498                 if(!reporter.progress()) {
3499                   msg = "Progressable reporter failed, stopping replay";
3500                   LOG.warn(msg);
3501                   status.abort(msg);
3502                   throw new IOException(msg);
3503                 }
3504                 reported_once = true;
3505                 lastReport = cur;
3506               }
3507             }
3508           }
3509 
3510           // Start coprocessor replay here. The coprocessor is for each WALEdit
3511           // instead of a KeyValue.
3512           if (coprocessorHost != null) {
3513             status.setStatus("Running pre-WAL-restore hook in coprocessors");
3514             if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
3515               // if bypass this log entry, ignore it ...
3516               continue;
3517             }
3518           }
3519 
3520           if (firstSeqIdInLog == -1) {
3521             firstSeqIdInLog = key.getLogSeqNum();
3522           }
3523           currentEditSeqId = key.getLogSeqNum();
3524           currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
3525             key.getOrigLogSeqNum() : currentEditSeqId;
3526           boolean flush = false;
3527           for (Cell cell: val.getCells()) {
3528             // Check this edit is for me. Also, guard against writing the special
3529             // METACOLUMN info such as HBASE::CACHEFLUSH entries
3530             if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY) ||
3531                 !Bytes.equals(key.getEncodedRegionName(),
3532                   this.getRegionInfo().getEncodedNameAsBytes())) {
3533               //this is a special edit, we should handle it
3534               CompactionDescriptor compaction = WALEdit.getCompaction(cell);
3535               if (compaction != null) {
3536                 //replay the compaction
3537                 completeCompactionMarker(compaction);
3538               }
3539 
3540               skippedEdits++;
3541               continue;
3542             }
3543             // Figure which store the edit is meant for.
3544             if (store == null || !CellUtil.matchingFamily(cell, store.getFamily().getName())) {
3545               store = getStore(cell);
3546             }
3547             if (store == null) {
3548               // This should never happen.  Perhaps schema was changed between
3549               // crash and redeploy?
3550               LOG.warn("No family for " + cell);
3551               skippedEdits++;
3552               continue;
3553             }
3554             // Now, figure if we should skip this edit.
3555             if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
3556                 .getName())) {
3557               skippedEdits++;
3558               continue;
3559             }
3560             CellUtil.setSequenceId(cell, currentReplaySeqId);
3561             // Once we are over the limit, restoreEdit will keep returning true to
3562             // flush -- but don't flush until we've played all the kvs that make up
3563             // the WALEdit.
3564             flush = restoreEdit(store, cell);
3565             editsCount++;
3566           }
3567           if (flush) internalFlushcache(null, currentEditSeqId, status);
3568 
3569           if (coprocessorHost != null) {
3570             coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3571           }
3572         }
3573       } catch (EOFException eof) {
3574         Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3575         msg = "Encountered EOF. Most likely due to Master failure during " +
3576             "log splitting, so we have this data in another edit.  " +
3577             "Continuing, but renaming " + edits + " as " + p;
3578         LOG.warn(msg, eof);
3579         status.abort(msg);
3580       } catch (IOException ioe) {
3581         // If the IOE resulted from bad file format,
3582         // then this problem is idempotent and retrying won't help
3583         if (ioe.getCause() instanceof ParseException) {
3584           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3585           msg = "File corruption encountered!  " +
3586               "Continuing, but renaming " + edits + " as " + p;
3587           LOG.warn(msg, ioe);
3588           status.setStatus(msg);
3589         } else {
3590           status.abort(StringUtils.stringifyException(ioe));
3591           // other IO errors may be transient (bad network connection,
3592           // checksum exception on one datanode, etc).  throw & retry
3593           throw ioe;
3594         }
3595       }
3596       if (reporter != null && !reported_once) {
3597         reporter.progress();
3598       }
3599       msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3600         ", firstSequenceIdInLog=" + firstSeqIdInLog +
3601         ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
3602       status.markComplete(msg);
3603       LOG.debug(msg);
3604       return currentEditSeqId;
3605     } finally {
3606       status.cleanup();
3607       if (reader != null) {
3608          reader.close();
3609       }
3610     }
3611   }
3612 
3613   /**
3614    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
3615    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
3616    * See HBASE-2331.
3617    */
3618   void completeCompactionMarker(CompactionDescriptor compaction)
3619       throws IOException {
3620     Store store = this.getStore(compaction.getFamilyName().toByteArray());
3621     if (store == null) {
3622       LOG.warn("Found Compaction WAL edit for deleted family:" +
3623           Bytes.toString(compaction.getFamilyName().toByteArray()));
3624       return;
3625     }
3626     store.completeCompactionMarker(compaction);
3627   }
3628 
3629   /**
3630    * Used by tests
3631    * @param s Store to add edit too.
3632    * @param cell Cell to add.
3633    * @return True if we should flush.
3634    */
3635   protected boolean restoreEdit(final Store s, final Cell cell) {
3636     long kvSize = s.add(cell).getFirst();
3637     if (this.rsAccounting != null) {
3638       rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
3639     }
3640     return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3641   }
3642 
3643   /*
3644    * @param fs
3645    * @param p File to check.
3646    * @return True if file was zero-length (and if so, we'll delete it in here).
3647    * @throws IOException
3648    */
3649   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3650       throws IOException {
3651     FileStatus stat = fs.getFileStatus(p);
3652     if (stat.getLen() > 0) return false;
3653     LOG.warn("File " + p + " is zero-length, deleting.");
3654     fs.delete(p, false);
3655     return true;
3656   }
3657 
3658   protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
3659     return new HStore(this, family, this.conf);
3660   }
3661 
3662   /**
3663    * Return HStore instance.
3664    * Use with caution.  Exposed for use of fixup utilities.
3665    * @param column Name of column family hosted by this region.
3666    * @return Store that goes with the family on passed <code>column</code>.
3667    * TODO: Make this lookup faster.
3668    */
3669   public Store getStore(final byte[] column) {
3670     return this.stores.get(column);
3671   }
3672 
3673   /**
3674    * Return HStore instance. Does not do any copy: as the number of store is limited, we
3675    *  iterate on the list.
3676    */
3677   private Store getStore(Cell cell) {
3678     for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
3679       if (Bytes.equals(
3680           cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3681           famStore.getKey(), 0, famStore.getKey().length)) {
3682         return famStore.getValue();
3683       }
3684     }
3685 
3686     return null;
3687   }
3688 
3689   public Map<byte[], Store> getStores() {
3690     return this.stores;
3691   }
3692 
3693   /**
3694    * Return list of storeFiles for the set of CFs.
3695    * Uses closeLock to prevent the race condition where a region closes
3696    * in between the for loop - closing the stores one by one, some stores
3697    * will return 0 files.
3698    * @return List of storeFiles.
3699    */
3700   public List<String> getStoreFileList(final byte [][] columns)
3701     throws IllegalArgumentException {
3702     List<String> storeFileNames = new ArrayList<String>();
3703     synchronized(closeLock) {
3704       for(byte[] column : columns) {
3705         Store store = this.stores.get(column);
3706         if (store == null) {
3707           throw new IllegalArgumentException("No column family : " +
3708               new String(column) + " available");
3709         }
3710         for (StoreFile storeFile: store.getStorefiles()) {
3711           storeFileNames.add(storeFile.getPath().toString());
3712         }
3713       }
3714     }
3715     return storeFileNames;
3716   }
3717 
3718   //////////////////////////////////////////////////////////////////////////////
3719   // Support code
3720   //////////////////////////////////////////////////////////////////////////////
3721 
3722   /** Make sure this is a valid row for the HRegion */
3723   void checkRow(final byte [] row, String op) throws IOException {
3724     if (!rowIsInRange(getRegionInfo(), row)) {
3725       throw new WrongRegionException("Requested row out of range for " +
3726           op + " on HRegion " + this + ", startKey='" +
3727           Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" +
3728           Bytes.toStringBinary(getEndKey()) + "', row='" +
3729           Bytes.toStringBinary(row) + "'");
3730     }
3731   }
3732 
3733   /**
3734    * Tries to acquire a lock on the given row.
3735    * @param waitForLock if true, will block until the lock is available.
3736    *        Otherwise, just tries to obtain the lock and returns
3737    *        false if unavailable.
3738    * @return the row lock if acquired,
3739    *   null if waitForLock was false and the lock was not acquired
3740    * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
3741    */
3742   public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
3743     checkRow(row, "row lock");
3744     startRegionOperation();
3745     try {
3746       HashedBytes rowKey = new HashedBytes(row);
3747       RowLockContext rowLockContext = new RowLockContext(rowKey);
3748 
3749       // loop until we acquire the row lock (unless !waitForLock)
3750       while (true) {
3751         RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
3752         if (existingContext == null) {
3753           // Row is not already locked by any thread, use newly created context.
3754           break;
3755         } else if (existingContext.ownedByCurrentThread()) {
3756           // Row is already locked by current thread, reuse existing context instead.
3757           rowLockContext = existingContext;
3758           break;
3759         } else {
3760           if (!waitForLock) {
3761             return null;
3762           }
3763           try {
3764             // Row is already locked by some other thread, give up or wait for it
3765             if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
3766               throw new IOException("Timed out waiting for lock for row: " + rowKey);
3767             }
3768           } catch (InterruptedException ie) {
3769             LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
3770             InterruptedIOException iie = new InterruptedIOException();
3771             iie.initCause(ie);
3772             throw iie;
3773           }
3774         }
3775       }
3776 
3777       // allocate new lock for this thread
3778       return rowLockContext.newLock();
3779     } finally {
3780       closeRegionOperation();
3781     }
3782   }
3783 
3784   /**
3785    * Acquires a lock on the given row.
3786    * The same thread may acquire multiple locks on the same row.
3787    * @return the acquired row lock
3788    * @throws IOException if the lock could not be acquired after waiting
3789    */
3790   public RowLock getRowLock(byte[] row) throws IOException {
3791     return getRowLock(row, true);
3792   }
3793 
3794   /**
3795    * If the given list of row locks is not null, releases all locks.
3796    */
3797   public void releaseRowLocks(List<RowLock> rowLocks) {
3798     if (rowLocks != null) {
3799       for (RowLock rowLock : rowLocks) {
3800         rowLock.release();
3801       }
3802       rowLocks.clear();
3803     }
3804   }
3805 
3806   /**
3807    * Determines whether multiple column families are present
3808    * Precondition: familyPaths is not null
3809    *
3810    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3811    */
3812   private static boolean hasMultipleColumnFamilies(
3813       List<Pair<byte[], String>> familyPaths) {
3814     boolean multipleFamilies = false;
3815     byte[] family = null;
3816     for (Pair<byte[], String> pair : familyPaths) {
3817       byte[] fam = pair.getFirst();
3818       if (family == null) {
3819         family = fam;
3820       } else if (!Bytes.equals(family, fam)) {
3821         multipleFamilies = true;
3822         break;
3823       }
3824     }
3825     return multipleFamilies;
3826   }
3827 
3828 
3829   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3830                                 boolean assignSeqId) throws IOException {
3831     return bulkLoadHFiles(familyPaths, assignSeqId, null);
3832   }
3833 
3834   /**
3835    * Attempts to atomically load a group of hfiles.  This is critical for loading
3836    * rows with multiple column families atomically.
3837    *
3838    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3839    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
3840    * file about to be bulk loaded
3841    * @return true if successful, false if failed recoverably
3842    * @throws IOException if failed unrecoverably.
3843    */
3844   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
3845       BulkLoadListener bulkLoadListener) throws IOException {
3846     Preconditions.checkNotNull(familyPaths);
3847     // we need writeLock for multi-family bulk load
3848     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
3849     try {
3850       this.writeRequestsCount.increment();
3851 
3852       // There possibly was a split that happened between when the split keys
3853       // were gathered and before the HRegion's write lock was taken.  We need
3854       // to validate the HFile region before attempting to bulk load all of them
3855       List<IOException> ioes = new ArrayList<IOException>();
3856       List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
3857       for (Pair<byte[], String> p : familyPaths) {
3858         byte[] familyName = p.getFirst();
3859         String path = p.getSecond();
3860 
3861         Store store = getStore(familyName);
3862         if (store == null) {
3863           IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
3864               "No such column family " + Bytes.toStringBinary(familyName));
3865           ioes.add(ioe);
3866         } else {
3867           try {
3868             store.assertBulkLoadHFileOk(new Path(path));
3869           } catch (WrongRegionException wre) {
3870             // recoverable (file doesn't fit in region)
3871             failures.add(p);
3872           } catch (IOException ioe) {
3873             // unrecoverable (hdfs problem)
3874             ioes.add(ioe);
3875           }
3876         }
3877       }
3878 
3879       // validation failed because of some sort of IO problem.
3880       if (ioes.size() != 0) {
3881         IOException e = MultipleIOException.createIOException(ioes);
3882         LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
3883         throw e;
3884       }
3885 
3886       // validation failed, bail out before doing anything permanent.
3887       if (failures.size() != 0) {
3888         StringBuilder list = new StringBuilder();
3889         for (Pair<byte[], String> p : failures) {
3890           list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
3891             .append(p.getSecond());
3892         }
3893         // problem when validating
3894         LOG.warn("There was a recoverable bulk load failure likely due to a" +
3895             " split.  These (family, HFile) pairs were not loaded: " + list);
3896         return false;
3897       }
3898 
3899       long seqId = -1;
3900       // We need to assign a sequential ID that's in between two memstores in order to preserve
3901       // the guarantee that all the edits lower than the highest sequential ID from all the
3902       // HFiles are flushed on disk. See HBASE-10958.  The sequence id returned when we flush is
3903       // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
3904       // a sequence id that we can be sure is beyond the last hfile written).
3905       if (assignSeqId) {
3906         FlushResult fs = this.flushcache();
3907         if (fs.isFlushSucceeded()) {
3908           seqId = fs.flushSequenceId;
3909         } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
3910           seqId = fs.flushSequenceId;
3911         } else {
3912           throw new IOException("Could not bulk load with an assigned sequential ID because the " +
3913               "flush didn't run. Reason for not flushing: " + fs.failureReason);
3914         }
3915       }
3916 
3917       for (Pair<byte[], String> p : familyPaths) {
3918         byte[] familyName = p.getFirst();
3919         String path = p.getSecond();
3920         Store store = getStore(familyName);
3921         try {
3922           String finalPath = path;
3923           if(bulkLoadListener != null) {
3924             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
3925           }
3926           store.bulkLoadHFile(finalPath, seqId);
3927           if(bulkLoadListener != null) {
3928             bulkLoadListener.doneBulkLoad(familyName, path);
3929           }
3930         } catch (IOException ioe) {
3931           // A failure here can cause an atomicity violation that we currently
3932           // cannot recover from since it is likely a failed HDFS operation.
3933 
3934           // TODO Need a better story for reverting partial failures due to HDFS.
3935           LOG.error("There was a partial failure due to IO when attempting to" +
3936               " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe);
3937           if(bulkLoadListener != null) {
3938             try {
3939               bulkLoadListener.failedBulkLoad(familyName, path);
3940             } catch (Exception ex) {
3941               LOG.error("Error while calling failedBulkLoad for family "+
3942                   Bytes.toString(familyName)+" with path "+path, ex);
3943             }
3944           }
3945           throw ioe;
3946         }
3947       }
3948       return true;
3949     } finally {
3950       closeBulkRegionOperation();
3951     }
3952   }
3953 
3954   @Override
3955   public boolean equals(Object o) {
3956     return o instanceof HRegion && Bytes.equals(this.getRegionName(),
3957                                                 ((HRegion) o).getRegionName());
3958   }
3959 
3960   @Override
3961   public int hashCode() {
3962     return Bytes.hashCode(this.getRegionName());
3963   }
3964 
3965   @Override
3966   public String toString() {
3967     return this.getRegionNameAsString();
3968   }
3969 
3970   /**
3971    * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
3972    */
3973   class RegionScannerImpl implements RegionScanner {
3974     // Package local for testability
3975     KeyValueHeap storeHeap = null;
3976     /** Heap of key-values that are not essential for the provided filters and are thus read
3977      * on demand, if on-demand column family loading is enabled.*/
3978     KeyValueHeap joinedHeap = null;
3979     /**
3980      * If the joined heap data gathering is interrupted due to scan limits, this will
3981      * contain the row for which we are populating the values.*/
3982     protected Cell joinedContinuationRow = null;
3983     // KeyValue indicating that limit is reached when scanning
3984     private final KeyValue KV_LIMIT = new KeyValue();
3985     protected final byte[] stopRow;
3986     private final FilterWrapper filter;
3987     private int batch;
3988     protected int isScan;
3989     private boolean filterClosed = false;
3990     private long readPt;
3991     private long maxResultSize;
3992     protected HRegion region;
3993 
3994     @Override
3995     public HRegionInfo getRegionInfo() {
3996       return region.getRegionInfo();
3997     }
3998 
3999     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
4000         throws IOException {
4001 
4002       this.region = region;
4003       this.maxResultSize = scan.getMaxResultSize();
4004       if (scan.hasFilter()) {
4005         this.filter = new FilterWrapper(scan.getFilter());
4006       } else {
4007         this.filter = null;
4008       }
4009 
4010       this.batch = scan.getBatch();
4011       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
4012         this.stopRow = null;
4013       } else {
4014         this.stopRow = scan.getStopRow();
4015       }
4016       // If we are doing a get, we want to be [startRow,endRow] normally
4017       // it is [startRow,endRow) and if startRow=endRow we get nothing.
4018       this.isScan = scan.isGetScan() ? -1 : 0;
4019 
4020       // synchronize on scannerReadPoints so that nobody calculates
4021       // getSmallestReadPoint, before scannerReadPoints is updated.
4022       IsolationLevel isolationLevel = scan.getIsolationLevel();
4023       synchronized(scannerReadPoints) {
4024         this.readPt = getReadpoint(isolationLevel);
4025         scannerReadPoints.put(this, this.readPt);
4026       }
4027 
4028       // Here we separate all scanners into two lists - scanner that provide data required
4029       // by the filter to operate (scanners list) and all others (joinedScanners list).
4030       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
4031       List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
4032       if (additionalScanners != null) {
4033         scanners.addAll(additionalScanners);
4034       }
4035 
4036       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
4037           scan.getFamilyMap().entrySet()) {
4038         Store store = stores.get(entry.getKey());
4039         KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
4040         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
4041           || this.filter.isFamilyEssential(entry.getKey())) {
4042           scanners.add(scanner);
4043         } else {
4044           joinedScanners.add(scanner);
4045         }
4046       }
4047       initializeKVHeap(scanners, joinedScanners, region);
4048     }
4049 
4050     protected void initializeKVHeap(List<KeyValueScanner> scanners,
4051         List<KeyValueScanner> joinedScanners, HRegion region)
4052         throws IOException {
4053       this.storeHeap = new KeyValueHeap(scanners, region.comparator);
4054       if (!joinedScanners.isEmpty()) {
4055         this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
4056       }
4057     }
4058 
4059     @Override
4060     public long getMaxResultSize() {
4061       return maxResultSize;
4062     }
4063 
4064     @Override
4065     public long getMvccReadPoint() {
4066       return this.readPt;
4067     }
4068 
4069     /**
4070      * Reset both the filter and the old filter.
4071      *
4072      * @throws IOException in case a filter raises an I/O exception.
4073      */
4074     protected void resetFilters() throws IOException {
4075       if (filter != null) {
4076         filter.reset();
4077       }
4078     }
4079 
4080     @Override
4081     public boolean next(List<Cell> outResults)
4082         throws IOException {
4083       // apply the batching limit by default
4084       return next(outResults, batch);
4085     }
4086 
4087     @Override
4088     public synchronized boolean next(List<Cell> outResults, int limit) throws IOException {
4089       if (this.filterClosed) {
4090         throw new UnknownScannerException("Scanner was closed (timed out?) " +
4091             "after we renewed it. Could be caused by a very slow scanner " +
4092             "or a lengthy garbage collection");
4093       }
4094       startRegionOperation(Operation.SCAN);
4095       readRequestsCount.increment();
4096       try {
4097         return nextRaw(outResults, limit);
4098       } finally {
4099         closeRegionOperation(Operation.SCAN);
4100       }
4101     }
4102 
4103     @Override
4104     public boolean nextRaw(List<Cell> outResults)
4105         throws IOException {
4106       return nextRaw(outResults, batch);
4107     }
4108 
4109     @Override
4110     public boolean nextRaw(List<Cell> outResults, int limit) throws IOException {
4111       if (storeHeap == null) {
4112         // scanner is closed
4113         throw new UnknownScannerException("Scanner was closed");
4114       }
4115       boolean returnResult;
4116       if (outResults.isEmpty()) {
4117         // Usually outResults is empty. This is true when next is called
4118         // to handle scan or get operation.
4119         returnResult = nextInternal(outResults, limit);
4120       } else {
4121         List<Cell> tmpList = new ArrayList<Cell>();
4122         returnResult = nextInternal(tmpList, limit);
4123         outResults.addAll(tmpList);
4124       }
4125       resetFilters();
4126       if (isFilterDoneInternal()) {
4127         returnResult = false;
4128       }
4129       return returnResult;
4130     }
4131 
4132     private void populateFromJoinedHeap(List<Cell> results, int limit)
4133         throws IOException {
4134       assert joinedContinuationRow != null;
4135       Cell kv = populateResult(results, this.joinedHeap, limit,
4136           joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
4137           joinedContinuationRow.getRowLength());
4138       if (kv != KV_LIMIT) {
4139         // We are done with this row, reset the continuation.
4140         joinedContinuationRow = null;
4141       }
4142       // As the data is obtained from two independent heaps, we need to
4143       // ensure that result list is sorted, because Result relies on that.
4144       Collections.sort(results, comparator);
4145     }
4146 
4147     /**
4148      * Fetches records with currentRow into results list, until next row or limit (if not -1).
4149      * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
4150      * @param limit Max amount of KVs to place in result list, -1 means no limit.
4151      * @param currentRow Byte array with key we are fetching.
4152      * @param offset offset for currentRow
4153      * @param length length for currentRow
4154      * @return KV_LIMIT if limit reached, next KeyValue otherwise.
4155      */
4156     private Cell populateResult(List<Cell> results, KeyValueHeap heap, int limit,
4157         byte[] currentRow, int offset, short length) throws IOException {
4158       Cell nextKv;
4159       do {
4160         heap.next(results, limit - results.size());
4161         if (limit > 0 && results.size() == limit) {
4162           return KV_LIMIT;
4163         }
4164         nextKv = heap.peek();
4165       } while (nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length));
4166 
4167       return nextKv;
4168     }
4169 
4170     /*
4171      * @return True if a filter rules the scanner is over, done.
4172      */
4173     @Override
4174     public synchronized boolean isFilterDone() throws IOException {
4175       return isFilterDoneInternal();
4176     }
4177 
4178     private boolean isFilterDoneInternal() throws IOException {
4179       return this.filter != null && this.filter.filterAllRemaining();
4180     }
4181 
4182     private boolean nextInternal(List<Cell> results, int limit)
4183     throws IOException {
4184       if (!results.isEmpty()) {
4185         throw new IllegalArgumentException("First parameter should be an empty list");
4186       }
4187       RpcCallContext rpcCall = RpcServer.getCurrentCall();
4188       // The loop here is used only when at some point during the next we determine
4189       // that due to effects of filters or otherwise, we have an empty row in the result.
4190       // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
4191       // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
4192       // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
4193       while (true) {
4194         if (rpcCall != null) {
4195           // If a user specifies a too-restrictive or too-slow scanner, the
4196           // client might time out and disconnect while the server side
4197           // is still processing the request. We should abort aggressively
4198           // in that case.
4199           long afterTime = rpcCall.disconnectSince();
4200           if (afterTime >= 0) {
4201             throw new CallerDisconnectedException(
4202                 "Aborting on region " + getRegionNameAsString() + ", call " +
4203                     this + " after " + afterTime + " ms, since " +
4204                     "caller disconnected");
4205           }
4206         }
4207 
4208         // Let's see what we have in the storeHeap.
4209         Cell current = this.storeHeap.peek();
4210 
4211         byte[] currentRow = null;
4212         int offset = 0;
4213         short length = 0;
4214         if (current != null) {
4215           currentRow = current.getRowArray();
4216           offset = current.getRowOffset();
4217           length = current.getRowLength();
4218         }
4219         boolean stopRow = isStopRow(currentRow, offset, length);
4220         // Check if we were getting data from the joinedHeap and hit the limit.
4221         // If not, then it's main path - getting results from storeHeap.
4222         if (joinedContinuationRow == null) {
4223           // First, check if we are at a stop row. If so, there are no more results.
4224           if (stopRow) {
4225             if (filter != null && filter.hasFilterRow()) {
4226               filter.filterRowCells(results);
4227             }
4228             return false;
4229           }
4230 
4231           // Check if rowkey filter wants to exclude this row. If so, loop to next.
4232           // Technically, if we hit limits before on this row, we don't need this call.
4233           if (filterRowKey(currentRow, offset, length)) {
4234             boolean moreRows = nextRow(currentRow, offset, length);
4235             if (!moreRows) return false;
4236             results.clear();
4237             continue;
4238           }
4239 
4240           Cell nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
4241               length);
4242           // Ok, we are good, let's try to get some results from the main heap.
4243           if (nextKv == KV_LIMIT) {
4244             if (this.filter != null && filter.hasFilterRow()) {
4245               throw new IncompatibleFilterException(
4246                 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
4247             }
4248             return true; // We hit the limit.
4249           }
4250 
4251           stopRow = nextKv == null ||
4252               isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
4253           // save that the row was empty before filters applied to it.
4254           final boolean isEmptyRow = results.isEmpty();
4255 
4256           // We have the part of the row necessary for filtering (all of it, usually).
4257           // First filter with the filterRow(List).
4258           FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
4259           if (filter != null && filter.hasFilterRow()) {
4260             ret = filter.filterRowCellsWithRet(results);
4261           }
4262 
4263           if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
4264             results.clear();
4265             boolean moreRows = nextRow(currentRow, offset, length);
4266             if (!moreRows) return false;
4267 
4268             // This row was totally filtered out, if this is NOT the last row,
4269             // we should continue on. Otherwise, nothing else to do.
4270             if (!stopRow) continue;
4271             return false;
4272           }
4273 
4274           // Ok, we are done with storeHeap for this row.
4275           // Now we may need to fetch additional, non-essential data into row.
4276           // These values are not needed for filter to work, so we postpone their
4277           // fetch to (possibly) reduce amount of data loads from disk.
4278           if (this.joinedHeap != null) {
4279             Cell nextJoinedKv = joinedHeap.peek();
4280             // If joinedHeap is pointing to some other row, try to seek to a correct one.
4281             boolean mayHaveData = (nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv,
4282                 currentRow, offset, length))
4283                 || (this.joinedHeap.requestSeek(
4284                     KeyValueUtil.createFirstOnRow(currentRow, offset, length), true, true)
4285                     && joinedHeap.peek() != null && CellUtil.matchingRow(joinedHeap.peek(),
4286                     currentRow, offset, length));
4287             if (mayHaveData) {
4288               joinedContinuationRow = current;
4289               populateFromJoinedHeap(results, limit);
4290             }
4291           }
4292         } else {
4293           // Populating from the joined heap was stopped by limits, populate some more.
4294           populateFromJoinedHeap(results, limit);
4295         }
4296 
4297         // We may have just called populateFromJoinedMap and hit the limits. If that is
4298         // the case, we need to call it again on the next next() invocation.
4299         if (joinedContinuationRow != null) {
4300           return true;
4301         }
4302 
4303         // Finally, we are done with both joinedHeap and storeHeap.
4304         // Double check to prevent empty rows from appearing in result. It could be
4305         // the case when SingleColumnValueExcludeFilter is used.
4306         if (results.isEmpty()) {
4307           boolean moreRows = nextRow(currentRow, offset, length);
4308           if (!moreRows) return false;
4309           if (!stopRow) continue;
4310         }
4311 
4312         // We are done. Return the result.
4313         return !stopRow;
4314       }
4315     }
4316 
4317     /**
4318      * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines
4319      * both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may
4320      * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns
4321      * true when filterRow(List<KeyValue> kvs) is overridden not the filterRow(). Therefore, the
4322      * filterRow() will be skipped.
4323      */
4324     private boolean filterRow() throws IOException {
4325       // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
4326       // filterRowCells(List<Cell> kvs) so we skip that scenario here.
4327       return filter != null && (!filter.hasFilterRow())
4328           && filter.filterRow();
4329     }
4330 
4331     private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
4332       return filter != null
4333           && filter.filterRowKey(row, offset, length);
4334     }
4335 
4336     protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
4337       assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
4338       Cell next;
4339       while ((next = this.storeHeap.peek()) != null &&
4340              CellUtil.matchingRow(next, currentRow, offset, length)) {
4341         this.storeHeap.next(MOCKED_LIST);
4342       }
4343       resetFilters();
4344       // Calling the hook in CP which allows it to do a fast forward
4345       return this.region.getCoprocessorHost() == null
4346           || this.region.getCoprocessorHost()
4347               .postScannerFilterRow(this, currentRow, offset, length);
4348     }
4349 
4350     protected boolean isStopRow(byte[] currentRow, int offset, short length) {
4351       return currentRow == null ||
4352           (stopRow != null &&
4353           comparator.compareRows(stopRow, 0, stopRow.length,
4354             currentRow, offset, length) <= isScan);
4355     }
4356 
4357     @Override
4358     public synchronized void close() {
4359       if (storeHeap != null) {
4360         storeHeap.close();
4361         storeHeap = null;
4362       }
4363       if (joinedHeap != null) {
4364         joinedHeap.close();
4365         joinedHeap = null;
4366       }
4367       // no need to synchronize here.
4368       scannerReadPoints.remove(this);
4369       this.filterClosed = true;
4370     }
4371 
4372     KeyValueHeap getStoreHeapForTesting() {
4373       return storeHeap;
4374     }
4375 
4376     @Override
4377     public synchronized boolean reseek(byte[] row) throws IOException {
4378       if (row == null) {
4379         throw new IllegalArgumentException("Row cannot be null.");
4380       }
4381       boolean result = false;
4382       startRegionOperation();
4383       try {
4384         KeyValue kv = KeyValueUtil.createFirstOnRow(row);
4385         // use request seek to make use of the lazy seek option. See HBASE-5520
4386         result = this.storeHeap.requestSeek(kv, true, true);
4387         if (this.joinedHeap != null) {
4388           result = this.joinedHeap.requestSeek(kv, true, true) || result;
4389         }
4390       } finally {
4391         closeRegionOperation();
4392       }
4393       return result;
4394     }
4395   }
4396 
4397   // Utility methods
4398   /**
4399    * A utility method to create new instances of HRegion based on the
4400    * {@link HConstants#REGION_IMPL} configuration property.
4401    * @param tableDir qualified path of directory where region should be located,
4402    * usually the table directory.
4403    * @param log The HLog is the outbound log for any updates to the HRegion
4404    * (There's a single HLog for all the HRegions on a single HRegionServer.)
4405    * The log file is a logfile from the previous execution that's
4406    * custom-computed for this HRegion. The HRegionServer computes and sorts the
4407    * appropriate log info for this HRegion. If there is a previous log file
4408    * (implying that the HRegion has been written-to before), then read it from
4409    * the supplied path.
4410    * @param fs is the filesystem.
4411    * @param conf is global configuration settings.
4412    * @param regionInfo - HRegionInfo that describes the region
4413    * is new), then read them from the supplied path.
4414    * @param htd the table descriptor
4415    * @return the new instance
4416    */
4417   static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
4418       Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
4419       RegionServerServices rsServices) {
4420     try {
4421       @SuppressWarnings("unchecked")
4422       Class<? extends HRegion> regionClass =
4423           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
4424 
4425       Constructor<? extends HRegion> c =
4426           regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
4427               Configuration.class, HRegionInfo.class, HTableDescriptor.class,
4428               RegionServerServices.class);
4429 
4430       return c.newInstance(tableDir, log, fs, conf, regionInfo, htd, rsServices);
4431     } catch (Throwable e) {
4432       // todo: what should I throw here?
4433       throw new IllegalStateException("Could not instantiate a region instance.", e);
4434     }
4435   }
4436 
4437   /**
4438    * Convenience method creating new HRegions. Used by createTable and by the
4439    * bootstrap code in the HMaster constructor.
4440    * Note, this method creates an {@link HLog} for the created region. It
4441    * needs to be closed explicitly.  Use {@link HRegion#getLog()} to get
4442    * access.  <b>When done with a region created using this method, you will
4443    * need to explicitly close the {@link HLog} it created too; it will not be
4444    * done for you.  Not closing the log will leave at least a daemon thread
4445    * running.</b>  Call {@link #closeHRegion(HRegion)} and it will do
4446    * necessary cleanup for you.
4447    * @param info Info for region to create.
4448    * @param rootDir Root directory for HBase instance
4449    * @return new HRegion
4450    *
4451    * @throws IOException
4452    */
4453   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4454       final Configuration conf, final HTableDescriptor hTableDescriptor)
4455   throws IOException {
4456     return createHRegion(info, rootDir, conf, hTableDescriptor, null);
4457   }
4458 
4459   /**
4460    * This will do the necessary cleanup a call to
4461    * {@link #createHRegion(HRegionInfo, Path, Configuration, HTableDescriptor)}
4462    * requires.  This method will close the region and then close its
4463    * associated {@link HLog} file.  You use it if you call the other createHRegion,
4464    * the one that takes an {@link HLog} instance but don't be surprised by the
4465    * call to the {@link HLog#closeAndDelete()} on the {@link HLog} the
4466    * HRegion was carrying.
4467    * @throws IOException
4468    */
4469   public static void closeHRegion(final HRegion r) throws IOException {
4470     if (r == null) return;
4471     r.close();
4472     if (r.getLog() == null) return;
4473     r.getLog().closeAndDelete();
4474   }
4475 
4476   /**
4477    * Convenience method creating new HRegions. Used by createTable.
4478    * The {@link HLog} for the created region needs to be closed explicitly.
4479    * Use {@link HRegion#getLog()} to get access.
4480    *
4481    * @param info Info for region to create.
4482    * @param rootDir Root directory for HBase instance
4483    * @param hlog shared HLog
4484    * @param initialize - true to initialize the region
4485    * @return new HRegion
4486    *
4487    * @throws IOException
4488    */
4489   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4490                                       final Configuration conf,
4491                                       final HTableDescriptor hTableDescriptor,
4492                                       final HLog hlog,
4493                                       final boolean initialize)
4494       throws IOException {
4495     return createHRegion(info, rootDir, conf, hTableDescriptor,
4496         hlog, initialize, false);
4497   }
4498 
4499   /**
4500    * Convenience method creating new HRegions. Used by createTable.
4501    * The {@link HLog} for the created region needs to be closed
4502    * explicitly, if it is not null.
4503    * Use {@link HRegion#getLog()} to get access.
4504    *
4505    * @param info Info for region to create.
4506    * @param rootDir Root directory for HBase instance
4507    * @param hlog shared HLog
4508    * @param initialize - true to initialize the region
4509    * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
4510    * @return new HRegion
4511    * @throws IOException
4512    */
4513   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4514                                       final Configuration conf,
4515                                       final HTableDescriptor hTableDescriptor,
4516                                       final HLog hlog,
4517                                       final boolean initialize, final boolean ignoreHLog)
4518       throws IOException {
4519       Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4520       return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
4521   }
4522 
4523   /**
4524    * Convenience method creating new HRegions. Used by createTable.
4525    * The {@link HLog} for the created region needs to be closed
4526    * explicitly, if it is not null.
4527    * Use {@link HRegion#getLog()} to get access.
4528    *
4529    * @param info Info for region to create.
4530    * @param rootDir Root directory for HBase instance
4531    * @param tableDir table directory
4532    * @param hlog shared HLog
4533    * @param initialize - true to initialize the region
4534    * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
4535    * @return new HRegion
4536    * @throws IOException
4537    */
4538   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
4539                                       final Configuration conf,
4540                                       final HTableDescriptor hTableDescriptor,
4541                                       final HLog hlog,
4542                                       final boolean initialize, final boolean ignoreHLog)
4543       throws IOException {
4544     LOG.info("creating HRegion " + info.getTable().getNameAsString()
4545         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
4546         " Table name == " + info.getTable().getNameAsString());
4547     FileSystem fs = FileSystem.get(conf);
4548     HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
4549     HLog effectiveHLog = hlog;
4550     if (hlog == null && !ignoreHLog) {
4551       effectiveHLog = HLogFactory.createHLog(fs, rfs.getRegionDir(),
4552                                              HConstants.HREGION_LOGDIR_NAME, conf);
4553     }
4554     HRegion region = HRegion.newHRegion(tableDir,
4555         effectiveHLog, fs, conf, info, hTableDescriptor, null);
4556     if (initialize) {
4557       // If initializing, set the sequenceId. It is also required by HLogPerformanceEvaluation when
4558       // verifying the WALEdits.
4559       region.setSequenceId(region.initialize(null));
4560     }
4561     return region;
4562   }
4563 
4564   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4565                                       final Configuration conf,
4566                                       final HTableDescriptor hTableDescriptor,
4567                                       final HLog hlog)
4568     throws IOException {
4569     return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
4570   }
4571 
4572 
4573   /**
4574    * Open a Region.
4575    * @param info Info for region to be opened.
4576    * @param wal HLog for region to use. This method will call
4577    * HLog#setSequenceNumber(long) passing the result of the call to
4578    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4579    * up.  HRegionStore does this every time it opens a new region.
4580    * @return new HRegion
4581    *
4582    * @throws IOException
4583    */
4584   public static HRegion openHRegion(final HRegionInfo info,
4585       final HTableDescriptor htd, final HLog wal,
4586       final Configuration conf)
4587   throws IOException {
4588     return openHRegion(info, htd, wal, conf, null, null);
4589   }
4590 
4591   /**
4592    * Open a Region.
4593    * @param info Info for region to be opened
4594    * @param htd the table descriptor
4595    * @param wal HLog for region to use. This method will call
4596    * HLog#setSequenceNumber(long) passing the result of the call to
4597    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4598    * up.  HRegionStore does this every time it opens a new region.
4599    * @param conf The Configuration object to use.
4600    * @param rsServices An interface we can request flushes against.
4601    * @param reporter An interface we can report progress against.
4602    * @return new HRegion
4603    *
4604    * @throws IOException
4605    */
4606   public static HRegion openHRegion(final HRegionInfo info,
4607     final HTableDescriptor htd, final HLog wal, final Configuration conf,
4608     final RegionServerServices rsServices,
4609     final CancelableProgressable reporter)
4610   throws IOException {
4611     return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
4612   }
4613 
4614   /**
4615    * Open a Region.
4616    * @param rootDir Root directory for HBase instance
4617    * @param info Info for region to be opened.
4618    * @param htd the table descriptor
4619    * @param wal HLog for region to use. This method will call
4620    * HLog#setSequenceNumber(long) passing the result of the call to
4621    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4622    * up.  HRegionStore does this every time it opens a new region.
4623    * @param conf The Configuration object to use.
4624    * @return new HRegion
4625    * @throws IOException
4626    */
4627   public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
4628       final HTableDescriptor htd, final HLog wal, final Configuration conf)
4629   throws IOException {
4630     return openHRegion(rootDir, info, htd, wal, conf, null, null);
4631   }
4632 
4633   /**
4634    * Open a Region.
4635    * @param rootDir Root directory for HBase instance
4636    * @param info Info for region to be opened.
4637    * @param htd the table descriptor
4638    * @param wal HLog for region to use. This method will call
4639    * HLog#setSequenceNumber(long) passing the result of the call to
4640    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4641    * up.  HRegionStore does this every time it opens a new region.
4642    * @param conf The Configuration object to use.
4643    * @param rsServices An interface we can request flushes against.
4644    * @param reporter An interface we can report progress against.
4645    * @return new HRegion
4646    * @throws IOException
4647    */
4648   public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
4649       final HTableDescriptor htd, final HLog wal, final Configuration conf,
4650       final RegionServerServices rsServices,
4651       final CancelableProgressable reporter)
4652   throws IOException {
4653     FileSystem fs = null;
4654     if (rsServices != null) {
4655       fs = rsServices.getFileSystem();
4656     }
4657     if (fs == null) {
4658       fs = FileSystem.get(conf);
4659     }
4660     return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
4661   }
4662 
4663   /**
4664    * Open a Region.
4665    * @param conf The Configuration object to use.
4666    * @param fs Filesystem to use
4667    * @param rootDir Root directory for HBase instance
4668    * @param info Info for region to be opened.
4669    * @param htd the table descriptor
4670    * @param wal HLog for region to use. This method will call
4671    * HLog#setSequenceNumber(long) passing the result of the call to
4672    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4673    * up.  HRegionStore does this every time it opens a new region.
4674    * @return new HRegion
4675    * @throws IOException
4676    */
4677   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4678       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal)
4679       throws IOException {
4680     return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
4681   }
4682 
4683   /**
4684    * Open a Region.
4685    * @param conf The Configuration object to use.
4686    * @param fs Filesystem to use
4687    * @param rootDir Root directory for HBase instance
4688    * @param info Info for region to be opened.
4689    * @param htd the table descriptor
4690    * @param wal HLog for region to use. This method will call
4691    * HLog#setSequenceNumber(long) passing the result of the call to
4692    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4693    * up.  HRegionStore does this every time it opens a new region.
4694    * @param rsServices An interface we can request flushes against.
4695    * @param reporter An interface we can report progress against.
4696    * @return new HRegion
4697    * @throws IOException
4698    */
4699   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4700       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4701       final RegionServerServices rsServices, final CancelableProgressable reporter)
4702       throws IOException {
4703     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4704     return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
4705   }
4706 
4707   /**
4708    * Open a Region.
4709    * @param conf The Configuration object to use.
4710    * @param fs Filesystem to use
4711    * @param rootDir Root directory for HBase instance
4712    * @param info Info for region to be opened.
4713    * @param htd the table descriptor
4714    * @param wal HLog for region to use. This method will call
4715    * HLog#setSequenceNumber(long) passing the result of the call to
4716    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4717    * up.  HRegionStore does this every time it opens a new region.
4718    * @param rsServices An interface we can request flushes against.
4719    * @param reporter An interface we can report progress against.
4720    * @return new HRegion
4721    * @throws IOException
4722    */
4723   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4724       final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4725       final RegionServerServices rsServices, final CancelableProgressable reporter)
4726       throws IOException {
4727     if (info == null) throw new NullPointerException("Passed region info is null");
4728     if (LOG.isDebugEnabled()) {
4729       LOG.debug("Opening region: " + info);
4730     }
4731     HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
4732     return r.openHRegion(reporter);
4733   }
4734 
4735 
4736   /**
4737    * Useful when reopening a closed region (normally for unit tests)
4738    * @param other original object
4739    * @param reporter An interface we can report progress against.
4740    * @return new HRegion
4741    * @throws IOException
4742    */
4743   public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
4744       throws IOException {
4745     HRegionFileSystem regionFs = other.getRegionFileSystem();
4746     HRegion r = newHRegion(regionFs.getTableDir(), other.getLog(), regionFs.getFileSystem(),
4747         other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
4748     return r.openHRegion(reporter);
4749   }
4750 
4751   /**
4752    * Open HRegion.
4753    * Calls initialize and sets sequenceId.
4754    * @return Returns <code>this</code>
4755    * @throws IOException
4756    */
4757   protected HRegion openHRegion(final CancelableProgressable reporter)
4758   throws IOException {
4759     checkCompressionCodecs();
4760 
4761     this.openSeqNum = initialize(reporter);
4762     this.setSequenceId(openSeqNum);
4763     if (log != null && getRegionServerServices() != null) {
4764       writeRegionOpenMarker(log, openSeqNum);
4765     }
4766     return this;
4767   }
4768 
4769   private void checkCompressionCodecs() throws IOException {
4770     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4771       CompressionTest.testCompression(fam.getCompression());
4772       CompressionTest.testCompression(fam.getCompactionCompression());
4773     }
4774   }
4775 
4776   /**
4777    * Create a daughter region from given a temp directory with the region data.
4778    * @param hri Spec. for daughter region to open.
4779    * @throws IOException
4780    */
4781   HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
4782     // Move the files from the temporary .splits to the final /table/region directory
4783     fs.commitDaughterRegion(hri);
4784 
4785     // Create the daughter HRegion instance
4786     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
4787         this.getBaseConf(), hri, this.getTableDesc(), rsServices);
4788     r.readRequestsCount.set(this.getReadRequestsCount() / 2);
4789     r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
4790     return r;
4791   }
4792 
4793   /**
4794    * Create a merged region given a temp directory with the region data.
4795    * @param region_b another merging region
4796    * @return merged HRegion
4797    * @throws IOException
4798    */
4799   HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
4800       final HRegion region_b) throws IOException {
4801     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(),
4802         fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
4803         this.getTableDesc(), this.rsServices);
4804     r.readRequestsCount.set(this.getReadRequestsCount()
4805         + region_b.getReadRequestsCount());
4806     r.writeRequestsCount.set(this.getWriteRequestsCount()
4807         + region_b.getWriteRequestsCount());
4808     this.fs.commitMergedRegion(mergedRegionInfo);
4809     return r;
4810   }
4811 
4812   /**
4813    * Inserts a new region's meta information into the passed
4814    * <code>meta</code> region. Used by the HMaster bootstrap code adding
4815    * new table to hbase:meta table.
4816    *
4817    * @param meta hbase:meta HRegion to be updated
4818    * @param r HRegion to add to <code>meta</code>
4819    *
4820    * @throws IOException
4821    */
4822   // TODO remove since only test and merge use this
4823   public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
4824     meta.checkResources();
4825     // The row key is the region name
4826     byte[] row = r.getRegionName();
4827     final long now = EnvironmentEdgeManager.currentTime();
4828     final List<Cell> cells = new ArrayList<Cell>(2);
4829     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4830       HConstants.REGIONINFO_QUALIFIER, now,
4831       r.getRegionInfo().toByteArray()));
4832     // Set into the root table the version of the meta table.
4833     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4834       HConstants.META_VERSION_QUALIFIER, now,
4835       Bytes.toBytes(HConstants.META_VERSION)));
4836     meta.put(row, HConstants.CATALOG_FAMILY, cells);
4837   }
4838 
4839   /**
4840    * Computes the Path of the HRegion
4841    *
4842    * @param tabledir qualified path for table
4843    * @param name ENCODED region name
4844    * @return Path of HRegion directory
4845    */
4846   @Deprecated
4847   public static Path getRegionDir(final Path tabledir, final String name) {
4848     return new Path(tabledir, name);
4849   }
4850 
4851   /**
4852    * Computes the Path of the HRegion
4853    *
4854    * @param rootdir qualified path of HBase root directory
4855    * @param info HRegionInfo for the region
4856    * @return qualified path of region directory
4857    */
4858   @Deprecated
4859   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
4860     return new Path(
4861       FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
4862   }
4863 
4864   /**
4865    * Determines if the specified row is within the row range specified by the
4866    * specified HRegionInfo
4867    *
4868    * @param info HRegionInfo that specifies the row range
4869    * @param row row to be checked
4870    * @return true if the row is within the range specified by the HRegionInfo
4871    */
4872   public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
4873     return ((info.getStartKey().length == 0) ||
4874         (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
4875         ((info.getEndKey().length == 0) ||
4876             (Bytes.compareTo(info.getEndKey(), row) > 0));
4877   }
4878 
4879   /**
4880    * Merge two HRegions.  The regions must be adjacent and must not overlap.
4881    *
4882    * @return new merged HRegion
4883    * @throws IOException
4884    */
4885   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
4886   throws IOException {
4887     HRegion a = srcA;
4888     HRegion b = srcB;
4889 
4890     // Make sure that srcA comes first; important for key-ordering during
4891     // write of the merged file.
4892     if (srcA.getStartKey() == null) {
4893       if (srcB.getStartKey() == null) {
4894         throw new IOException("Cannot merge two regions with null start key");
4895       }
4896       // A's start key is null but B's isn't. Assume A comes before B
4897     } else if ((srcB.getStartKey() == null) ||
4898       (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
4899       a = srcB;
4900       b = srcA;
4901     }
4902 
4903     if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
4904       throw new IOException("Cannot merge non-adjacent regions");
4905     }
4906     return merge(a, b);
4907   }
4908 
4909   /**
4910    * Merge two regions whether they are adjacent or not.
4911    *
4912    * @param a region a
4913    * @param b region b
4914    * @return new merged region
4915    * @throws IOException
4916    */
4917   public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
4918     if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
4919       throw new IOException("Regions do not belong to the same table");
4920     }
4921 
4922     FileSystem fs = a.getRegionFileSystem().getFileSystem();
4923     // Make sure each region's cache is empty
4924     a.flushcache();
4925     b.flushcache();
4926 
4927     // Compact each region so we only have one store file per family
4928     a.compactStores(true);
4929     if (LOG.isDebugEnabled()) {
4930       LOG.debug("Files for region: " + a);
4931       a.getRegionFileSystem().logFileSystemState(LOG);
4932     }
4933     b.compactStores(true);
4934     if (LOG.isDebugEnabled()) {
4935       LOG.debug("Files for region: " + b);
4936       b.getRegionFileSystem().logFileSystemState(LOG);
4937     }
4938 
4939     RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
4940     if (!rmt.prepare(null)) {
4941       throw new IOException("Unable to merge regions " + a + " and " + b);
4942     }
4943     HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
4944     LOG.info("starting merge of regions: " + a + " and " + b
4945         + " into new region " + mergedRegionInfo.getRegionNameAsString()
4946         + " with start key <"
4947         + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
4948         + "> and end key <"
4949         + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
4950     HRegion dstRegion;
4951     try {
4952       dstRegion = rmt.execute(null, null);
4953     } catch (IOException ioe) {
4954       rmt.rollback(null, null);
4955       throw new IOException("Failed merging region " + a + " and " + b
4956           + ", and successfully rolled back");
4957     }
4958     dstRegion.compactStores(true);
4959 
4960     if (LOG.isDebugEnabled()) {
4961       LOG.debug("Files for new region");
4962       dstRegion.getRegionFileSystem().logFileSystemState(LOG);
4963     }
4964 
4965     if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
4966       throw new IOException("Merged region " + dstRegion
4967           + " still has references after the compaction, is compaction canceled?");
4968     }
4969 
4970     // Archiving the 'A' region
4971     HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
4972     // Archiving the 'B' region
4973     HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
4974 
4975     LOG.info("merge completed. New region is " + dstRegion);
4976     return dstRegion;
4977   }
4978 
4979   //
4980   // HBASE-880
4981   //
4982   /**
4983    * @param get get object
4984    * @return result
4985    * @throws IOException read exceptions
4986    */
4987   public Result get(final Get get) throws IOException {
4988     checkRow(get.getRow(), "Get");
4989     // Verify families are all valid
4990     if (get.hasFamilies()) {
4991       for (byte [] family: get.familySet()) {
4992         checkFamily(family);
4993       }
4994     } else { // Adding all families to scanner
4995       for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
4996         get.addFamily(family);
4997       }
4998     }
4999     List<Cell> results = get(get, true);
5000     boolean stale = this.getRegionInfo().getReplicaId() != 0;
5001     return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
5002   }
5003 
5004   /*
5005    * Do a get based on the get parameter.
5006    * @param withCoprocessor invoke coprocessor or not. We don't want to
5007    * always invoke cp for this private method.
5008    */
5009   public List<Cell> get(Get get, boolean withCoprocessor)
5010   throws IOException {
5011 
5012     List<Cell> results = new ArrayList<Cell>();
5013 
5014     // pre-get CP hook
5015     if (withCoprocessor && (coprocessorHost != null)) {
5016        if (coprocessorHost.preGet(get, results)) {
5017          return results;
5018        }
5019     }
5020 
5021     Scan scan = new Scan(get);
5022 
5023     RegionScanner scanner = null;
5024     try {
5025       scanner = getScanner(scan);
5026       scanner.next(results);
5027     } finally {
5028       if (scanner != null)
5029         scanner.close();
5030     }
5031 
5032     // post-get CP hook
5033     if (withCoprocessor && (coprocessorHost != null)) {
5034       coprocessorHost.postGet(get, results);
5035     }
5036 
5037     // do after lock
5038     if (this.metricsRegion != null) {
5039       long totalSize = 0l;
5040       for (Cell cell : results) {
5041         totalSize += CellUtil.estimatedLengthOf(cell);
5042       }
5043       this.metricsRegion.updateGet(totalSize);
5044     }
5045 
5046     return results;
5047   }
5048 
5049   public void mutateRow(RowMutations rm) throws IOException {
5050     // Don't need nonces here - RowMutations only supports puts and deletes
5051     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
5052   }
5053 
5054   /**
5055    * Perform atomic mutations within the region w/o nonces.
5056    * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)}
5057    */
5058   public void mutateRowsWithLocks(Collection<Mutation> mutations,
5059       Collection<byte[]> rowsToLock) throws IOException {
5060     mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
5061   }
5062 
5063   /**
5064    * Perform atomic mutations within the region.
5065    * @param mutations The list of mutations to perform.
5066    * <code>mutations</code> can contain operations for multiple rows.
5067    * Caller has to ensure that all rows are contained in this region.
5068    * @param rowsToLock Rows to lock
5069    * @param nonceGroup Optional nonce group of the operation (client Id)
5070    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5071    * If multiple rows are locked care should be taken that
5072    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
5073    * @throws IOException
5074    */
5075   public void mutateRowsWithLocks(Collection<Mutation> mutations,
5076       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
5077     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
5078     processRowsWithLocks(proc, -1, nonceGroup, nonce);
5079   }
5080 
5081   /**
5082    * Performs atomic multiple reads and writes on a given row.
5083    *
5084    * @param processor The object defines the reads and writes to a row.
5085    * @param nonceGroup Optional nonce group of the operation (client Id)
5086    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5087    */
5088   public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
5089       throws IOException {
5090     processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
5091   }
5092 
5093   /**
5094    * Performs atomic multiple reads and writes on a given row.
5095    *
5096    * @param processor The object defines the reads and writes to a row.
5097    * @param timeout The timeout of the processor.process() execution
5098    *                Use a negative number to switch off the time bound
5099    * @param nonceGroup Optional nonce group of the operation (client Id)
5100    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
5101    */
5102   public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
5103       long nonceGroup, long nonce) throws IOException {
5104 
5105     for (byte[] row : processor.getRowsToLock()) {
5106       checkRow(row, "processRowsWithLocks");
5107     }
5108     if (!processor.readOnly()) {
5109       checkReadOnly();
5110     }
5111     checkResources();
5112 
5113     startRegionOperation();
5114     WALEdit walEdit = new WALEdit();
5115 
5116     // 1. Run pre-process hook
5117     try {
5118       processor.preProcess(this, walEdit);
5119     } catch (IOException e) {
5120       closeRegionOperation();
5121       throw e;
5122     }
5123     // Short circuit the read only case
5124     if (processor.readOnly()) {
5125       try {
5126         long now = EnvironmentEdgeManager.currentTime();
5127         doProcessRowWithTimeout(
5128             processor, now, this, null, null, timeout);
5129         processor.postProcess(this, walEdit, true);
5130       } catch (IOException e) {
5131         throw e;
5132       } finally {
5133         closeRegionOperation();
5134       }
5135       return;
5136     }
5137 
5138     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
5139     boolean locked;
5140     boolean walSyncSuccessful = false;
5141     List<RowLock> acquiredRowLocks;
5142     long addedSize = 0;
5143     List<Mutation> mutations = new ArrayList<Mutation>();
5144     List<Cell> memstoreCells = new ArrayList<Cell>();
5145     Collection<byte[]> rowsToLock = processor.getRowsToLock();
5146     long mvccNum = 0;
5147     HLogKey walKey = null;
5148     try {
5149       // 2. Acquire the row lock(s)
5150       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
5151       for (byte[] row : rowsToLock) {
5152         // Attempt to lock all involved rows, throw if any lock times out
5153         acquiredRowLocks.add(getRowLock(row));
5154       }
5155       // 3. Region lock
5156       lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
5157       locked = true;
5158       // Get a mvcc write number
5159       mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5160 
5161       long now = EnvironmentEdgeManager.currentTime();
5162       try {
5163         // 4. Let the processor scan the rows, generate mutations and add
5164         //    waledits
5165         doProcessRowWithTimeout(
5166             processor, now, this, mutations, walEdit, timeout);
5167 
5168         if (!mutations.isEmpty()) {
5169           // 5. Start mvcc transaction
5170           writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5171           // 6. Call the preBatchMutate hook
5172           processor.preBatchMutate(this, walEdit);
5173           // 7. Apply to memstore
5174           for (Mutation m : mutations) {
5175             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5176               Cell cell = cellScanner.current();
5177               CellUtil.setSequenceId(cell, mvccNum);
5178               Store store = getStore(cell);
5179               if (store == null) {
5180                 checkFamily(CellUtil.cloneFamily(cell));
5181                 // unreachable
5182               }
5183               Pair<Long, Cell> ret = store.add(cell);
5184               addedSize += ret.getFirst();
5185               memstoreCells.add(ret.getSecond());
5186             }
5187           }
5188 
5189           long txid = 0;
5190           // 8. Append no sync
5191           if (!walEdit.isEmpty()) {
5192             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
5193               this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
5194               processor.getClusterIds(), nonceGroup, nonce);
5195             txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
5196               walKey, walEdit, getSequenceId(), true, memstoreCells);
5197           }
5198           if(walKey == null){
5199             // since we use log sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
5200             // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
5201             walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
5202           }
5203 
5204           // 9. Release region lock
5205           if (locked) {
5206             this.updatesLock.readLock().unlock();
5207             locked = false;
5208           }
5209 
5210           // 10. Release row lock(s)
5211           releaseRowLocks(acquiredRowLocks);
5212 
5213           // 11. Sync edit log
5214           if (txid != 0) {
5215             syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
5216           }
5217           walSyncSuccessful = true;
5218           // 12. call postBatchMutate hook
5219           processor.postBatchMutate(this);
5220         }
5221       } finally {
5222         if (!mutations.isEmpty() && !walSyncSuccessful) {
5223           LOG.warn("Wal sync failed. Roll back " + mutations.size() +
5224               " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
5225               processor.getRowsToLock().iterator().next()) + "...");
5226           for (Mutation m : mutations) {
5227             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5228               Cell cell = cellScanner.current();
5229               getStore(cell).rollback(cell);
5230             }
5231           }
5232         }
5233         // 13. Roll mvcc forward
5234         if (writeEntry != null) {
5235           mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
5236         }
5237         if (locked) {
5238           this.updatesLock.readLock().unlock();
5239         }
5240         // release locks if some were acquired but another timed out
5241         releaseRowLocks(acquiredRowLocks);
5242       }
5243 
5244       // 14. Run post-process hook
5245       processor.postProcess(this, walEdit, walSyncSuccessful);
5246 
5247     } catch (IOException e) {
5248       throw e;
5249     } finally {
5250       closeRegionOperation();
5251       if (!mutations.isEmpty() &&
5252           isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
5253         requestFlush();
5254       }
5255     }
5256   }
5257 
5258   private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
5259                                        final long now,
5260                                        final HRegion region,
5261                                        final List<Mutation> mutations,
5262                                        final WALEdit walEdit,
5263                                        final long timeout) throws IOException {
5264     // Short circuit the no time bound case.
5265     if (timeout < 0) {
5266       try {
5267         processor.process(now, region, mutations, walEdit);
5268       } catch (IOException e) {
5269         LOG.warn("RowProcessor:" + processor.getClass().getName() +
5270             " throws Exception on row(s):" +
5271             Bytes.toStringBinary(
5272               processor.getRowsToLock().iterator().next()) + "...", e);
5273         throw e;
5274       }
5275       return;
5276     }
5277 
5278     // Case with time bound
5279     FutureTask<Void> task =
5280       new FutureTask<Void>(new Callable<Void>() {
5281         @Override
5282         public Void call() throws IOException {
5283           try {
5284             processor.process(now, region, mutations, walEdit);
5285             return null;
5286           } catch (IOException e) {
5287             LOG.warn("RowProcessor:" + processor.getClass().getName() +
5288                 " throws Exception on row(s):" +
5289                 Bytes.toStringBinary(
5290                     processor.getRowsToLock().iterator().next()) + "...", e);
5291             throw e;
5292           }
5293         }
5294       });
5295     rowProcessorExecutor.execute(task);
5296     try {
5297       task.get(timeout, TimeUnit.MILLISECONDS);
5298     } catch (TimeoutException te) {
5299       LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
5300           Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
5301           "...");
5302       throw new IOException(te);
5303     } catch (Exception e) {
5304       throw new IOException(e);
5305     }
5306   }
5307 
5308   public Result append(Append append) throws IOException {
5309     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
5310   }
5311 
5312   // TODO: There's a lot of boiler plate code identical
5313   // to increment... See how to better unify that.
5314   /**
5315    * Perform one or more append operations on a row.
5316    *
5317    * @return new keyvalues after increment
5318    * @throws IOException
5319    */
5320   public Result append(Append append, long nonceGroup, long nonce)
5321       throws IOException {
5322     byte[] row = append.getRow();
5323     checkRow(row, "append");
5324     boolean flush = false;
5325     Durability durability = getEffectiveDurability(append.getDurability());
5326     boolean writeToWAL = durability != Durability.SKIP_WAL;
5327     WALEdit walEdits = null;
5328     List<Cell> allKVs = new ArrayList<Cell>(append.size());
5329     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5330 
5331     long size = 0;
5332     long txid = 0;
5333 
5334     checkReadOnly();
5335     checkResources();
5336     // Lock row
5337     startRegionOperation(Operation.APPEND);
5338     this.writeRequestsCount.increment();
5339     long mvccNum = 0;
5340     WriteEntry w = null;
5341     HLogKey walKey = null;
5342     RowLock rowLock = null;
5343     List<Cell> memstoreCells = new ArrayList<Cell>();
5344     boolean doRollBackMemstore = false;
5345     try {
5346       rowLock = getRowLock(row);
5347       try {
5348         lock(this.updatesLock.readLock());
5349         try {
5350           // wait for all prior MVCC transactions to finish - while we hold the row lock
5351           // (so that we are guaranteed to see the latest state)
5352           mvcc.waitForPreviousTransactionsComplete();
5353           if (this.coprocessorHost != null) {
5354             Result r = this.coprocessorHost.preAppendAfterRowLock(append);
5355             if(r!= null) {
5356               return r;
5357             }
5358           }
5359           // now start my own transaction
5360           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5361           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5362           long now = EnvironmentEdgeManager.currentTime();
5363           // Process each family
5364           for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
5365 
5366             Store store = stores.get(family.getKey());
5367             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5368 
5369             // Sort the cells so that they match the order that they
5370             // appear in the Get results. Otherwise, we won't be able to
5371             // find the existing values if the cells are not specified
5372             // in order by the client since cells are in an array list.
5373             Collections.sort(family.getValue(), store.getComparator());
5374             // Get previous values for all columns in this family
5375             Get get = new Get(row);
5376             for (Cell cell : family.getValue()) {
5377               get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
5378             }
5379             List<Cell> results = get(get, false);
5380             // Iterate the input columns and update existing values if they were
5381             // found, otherwise add new column initialized to the append value
5382 
5383             // Avoid as much copying as possible. Every byte is copied at most
5384             // once.
5385             // Would be nice if KeyValue had scatter/gather logic
5386             int idx = 0;
5387             for (Cell cell : family.getValue()) {
5388               Cell newCell;
5389               Cell oldCell = null;
5390               if (idx < results.size()
5391                   && CellUtil.matchingQualifier(results.get(idx), cell)) {
5392                 oldCell = results.get(idx);
5393                 // allocate an empty kv once
5394                 newCell = new KeyValue(row.length, cell.getFamilyLength(),
5395                     cell.getQualifierLength(), now, KeyValue.Type.Put,
5396                     oldCell.getValueLength() + cell.getValueLength(),
5397                     oldCell.getTagsLength() + cell.getTagsLength());
5398                 // copy in the value
5399                 System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
5400                     newCell.getValueArray(), newCell.getValueOffset(),
5401                     oldCell.getValueLength());
5402                 System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
5403                     newCell.getValueArray(),
5404                     newCell.getValueOffset() + oldCell.getValueLength(),
5405                     cell.getValueLength());
5406                 // copy in the tags
5407                 System.arraycopy(oldCell.getTagsArray(), oldCell.getTagsOffset(),
5408                     newCell.getTagsArray(), newCell.getTagsOffset(), oldCell.getTagsLength());
5409                 System.arraycopy(cell.getTagsArray(), cell.getTagsOffset(), newCell.getTagsArray(),
5410                     newCell.getTagsOffset() + oldCell.getTagsLength(), cell.getTagsLength());
5411                 // copy in row, family, and qualifier
5412                 System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
5413                     newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
5414                 System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
5415                     newCell.getFamilyArray(), newCell.getFamilyOffset(),
5416                     cell.getFamilyLength());
5417                 System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
5418                     newCell.getQualifierArray(), newCell.getQualifierOffset(),
5419                     cell.getQualifierLength());
5420                 idx++;
5421               } else {
5422                 // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
5423                 // so only need to update the timestamp to 'now'
5424                 CellUtil.updateLatestStamp(cell, now);
5425                 newCell = cell;
5426              }
5427               CellUtil.setSequenceId(newCell, mvccNum);
5428               // Give coprocessors a chance to update the new cell
5429               if (coprocessorHost != null) {
5430                 newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,
5431                     append, oldCell, newCell);
5432               }
5433               kvs.add(newCell);
5434 
5435               // Append update to WAL
5436               if (writeToWAL) {
5437                 if (walEdits == null) {
5438                   walEdits = new WALEdit();
5439                 }
5440                 walEdits.add(newCell);
5441               }
5442             }
5443 
5444             //store the kvs to the temporary memstore before writing HLog
5445             tempMemstore.put(store, kvs);
5446           }
5447 
5448           //Actually write to Memstore now
5449           for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5450             Store store = entry.getKey();
5451             if (store.getFamily().getMaxVersions() == 1) {
5452               // upsert if VERSIONS for this CF == 1
5453               size += store.upsert(entry.getValue(), getSmallestReadPoint());
5454               memstoreCells.addAll(entry.getValue());
5455             } else {
5456               // otherwise keep older versions around
5457               for (Cell cell: entry.getValue()) {
5458                 Pair<Long, Cell> ret = store.add(cell);
5459                 size += ret.getFirst();
5460                 memstoreCells.add(ret.getSecond());
5461                 doRollBackMemstore = true;
5462               }
5463             }
5464             allKVs.addAll(entry.getValue());
5465           }
5466 
5467           // Actually write to WAL now
5468           if (writeToWAL) {
5469             // Using default cluster id, as this can only happen in the originating
5470             // cluster. A slave cluster receives the final value (not the delta)
5471             // as a Put.
5472             walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
5473               this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
5474             txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
5475               this.sequenceId, true, memstoreCells);
5476           } else {
5477             recordMutationWithoutWal(append.getFamilyCellMap());
5478           }
5479           if (walKey == null) {
5480             // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
5481             walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
5482           }
5483 
5484           size = this.addAndGetGlobalMemstoreSize(size);
5485           flush = isFlushSize(size);
5486         } finally {
5487           this.updatesLock.readLock().unlock();
5488         }
5489       } finally {
5490         rowLock.release();
5491         rowLock = null;
5492       }
5493       // sync the transaction log outside the rowlock
5494       if(txid != 0){
5495         syncOrDefer(txid, durability);
5496       }
5497       doRollBackMemstore = false;
5498     } finally {
5499       if (rowLock != null) {
5500         rowLock.release();
5501       }
5502       // if the wal sync was unsuccessful, remove keys from memstore
5503       if (doRollBackMemstore) {
5504         rollbackMemstore(memstoreCells);
5505       }
5506       if (w != null) {
5507         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5508       }
5509       closeRegionOperation(Operation.APPEND);
5510     }
5511 
5512     if (this.metricsRegion != null) {
5513       this.metricsRegion.updateAppend();
5514     }
5515 
5516     if (flush) {
5517       // Request a cache flush. Do it outside update lock.
5518       requestFlush();
5519     }
5520 
5521 
5522     return append.isReturnResults() ? Result.create(allKVs) : null;
5523   }
5524 
5525   public Result increment(Increment increment) throws IOException {
5526     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
5527   }
5528 
5529   /**
5530    * Perform one or more increment operations on a row.
5531    * @return new keyvalues after increment
5532    * @throws IOException
5533    */
5534   public Result increment(Increment increment, long nonceGroup, long nonce)
5535   throws IOException {
5536     byte [] row = increment.getRow();
5537     checkRow(row, "increment");
5538     TimeRange tr = increment.getTimeRange();
5539     boolean flush = false;
5540     Durability durability = getEffectiveDurability(increment.getDurability());
5541     boolean writeToWAL = durability != Durability.SKIP_WAL;
5542     WALEdit walEdits = null;
5543     List<Cell> allKVs = new ArrayList<Cell>(increment.size());
5544     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5545 
5546     long size = 0;
5547     long txid = 0;
5548 
5549     checkReadOnly();
5550     checkResources();
5551     // Lock row
5552     startRegionOperation(Operation.INCREMENT);
5553     this.writeRequestsCount.increment();
5554     RowLock rowLock = null;
5555     WriteEntry w = null;
5556     HLogKey walKey = null;
5557     long mvccNum = 0;
5558     List<Cell> memstoreCells = new ArrayList<Cell>();
5559     boolean doRollBackMemstore = false;
5560     try {
5561       rowLock = getRowLock(row);
5562       try {
5563         lock(this.updatesLock.readLock());
5564         try {
5565           // wait for all prior MVCC transactions to finish - while we hold the row lock
5566           // (so that we are guaranteed to see the latest state)
5567           mvcc.waitForPreviousTransactionsComplete();
5568           if (this.coprocessorHost != null) {
5569             Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
5570             if (r != null) {
5571               return r;
5572             }
5573           }
5574           // now start my own transaction
5575           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
5576           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
5577           long now = EnvironmentEdgeManager.currentTime();
5578           // Process each family
5579           for (Map.Entry<byte [], List<Cell>> family:
5580               increment.getFamilyCellMap().entrySet()) {
5581 
5582             Store store = stores.get(family.getKey());
5583             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5584 
5585             // Sort the cells so that they match the order that they
5586             // appear in the Get results. Otherwise, we won't be able to
5587             // find the existing values if the cells are not specified
5588             // in order by the client since cells are in an array list.
5589             Collections.sort(family.getValue(), store.getComparator());
5590             // Get previous values for all columns in this family
5591             Get get = new Get(row);
5592             for (Cell cell: family.getValue()) {
5593               get.addColumn(family.getKey(),  CellUtil.cloneQualifier(cell));
5594             }
5595             get.setTimeRange(tr.getMin(), tr.getMax());
5596             List<Cell> results = get(get, false);
5597 
5598             // Iterate the input columns and update existing values if they were
5599             // found, otherwise add new column initialized to the increment amount
5600             int idx = 0;
5601             for (Cell kv: family.getValue()) {
5602               long amount = Bytes.toLong(CellUtil.cloneValue(kv));
5603               boolean noWriteBack = (amount == 0);
5604 
5605               Cell c = null;
5606               if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
5607                 c = results.get(idx);
5608                 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
5609                   amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
5610                 } else {
5611                   // throw DoNotRetryIOException instead of IllegalArgumentException
5612                   throw new org.apache.hadoop.hbase.DoNotRetryIOException(
5613                       "Attempted to increment field that isn't 64 bits wide");
5614                 }
5615                 idx++;
5616               }
5617 
5618               // Append new incremented KeyValue to list
5619               byte[] q = CellUtil.cloneQualifier(kv);
5620               byte[] val = Bytes.toBytes(amount);
5621               int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
5622               int incCellTagsLen = kv.getTagsLength();
5623               Cell newKV = new KeyValue(row.length, family.getKey().length, q.length, now,
5624                   KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
5625               System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length);
5626               System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(),
5627                   family.getKey().length);
5628               System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length);
5629               // copy in the value
5630               System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length);
5631               // copy tags
5632               if (oldCellTagsLen > 0) {
5633                 System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(),
5634                     newKV.getTagsOffset(), oldCellTagsLen);
5635               }
5636               if (incCellTagsLen > 0) {
5637                 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
5638                     newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
5639               }
5640               CellUtil.setSequenceId(newKV, mvccNum);
5641               // Give coprocessors a chance to update the new cell
5642               if (coprocessorHost != null) {
5643                 newKV = coprocessorHost.postMutationBeforeWAL(
5644                     RegionObserver.MutationType.INCREMENT, increment, c, newKV);
5645               }
5646               allKVs.add(newKV);
5647 
5648               if (!noWriteBack) {
5649                 kvs.add(newKV);
5650 
5651                 // Prepare WAL updates
5652                 if (writeToWAL) {
5653                   if (walEdits == null) {
5654                     walEdits = new WALEdit();
5655                   }
5656                   walEdits.add(newKV);
5657                 }
5658               }
5659             }
5660 
5661             //store the kvs to the temporary memstore before writing HLog
5662             if (!kvs.isEmpty()) {
5663               tempMemstore.put(store, kvs);
5664             }
5665           }
5666 
5667           //Actually write to Memstore now
5668           if (!tempMemstore.isEmpty()) {
5669             for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5670               Store store = entry.getKey();
5671               if (store.getFamily().getMaxVersions() == 1) {
5672                 // upsert if VERSIONS for this CF == 1
5673                 size += store.upsert(entry.getValue(), getSmallestReadPoint());
5674                 memstoreCells.addAll(entry.getValue());
5675               } else {
5676                 // otherwise keep older versions around
5677                 for (Cell cell : entry.getValue()) {
5678                   Pair<Long, Cell> ret = store.add(cell);
5679                   size += ret.getFirst();
5680                   memstoreCells.add(ret.getSecond());
5681                   doRollBackMemstore = true;
5682                 }
5683               }
5684             }
5685             size = this.addAndGetGlobalMemstoreSize(size);
5686             flush = isFlushSize(size);
5687           }
5688 
5689           // Actually write to WAL now
5690           if (walEdits != null && !walEdits.isEmpty()) {
5691             if (writeToWAL) {
5692               // Using default cluster id, as this can only happen in the originating
5693               // cluster. A slave cluster receives the final value (not the delta)
5694               // as a Put.
5695               walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
5696                 this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
5697               txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
5698                 walKey, walEdits, getSequenceId(), true, memstoreCells);
5699             } else {
5700               recordMutationWithoutWal(increment.getFamilyCellMap());
5701             }
5702           }
5703           if(walKey == null){
5704             // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
5705             walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
5706           }
5707         } finally {
5708           this.updatesLock.readLock().unlock();
5709         }
5710       } finally {
5711         rowLock.release();
5712         rowLock = null;
5713       }
5714       // sync the transaction log outside the rowlock
5715       if(txid != 0){
5716         syncOrDefer(txid, durability);
5717       }
5718       doRollBackMemstore = false;
5719     } finally {
5720       if (rowLock != null) {
5721         rowLock.release();
5722       }
5723       // if the wal sync was unsuccessful, remove keys from memstore
5724       if (doRollBackMemstore) {
5725         rollbackMemstore(memstoreCells);
5726       }
5727       if (w != null) {
5728         mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
5729       }
5730       closeRegionOperation(Operation.INCREMENT);
5731       if (this.metricsRegion != null) {
5732         this.metricsRegion.updateIncrement();
5733       }
5734     }
5735 
5736     if (flush) {
5737       // Request a cache flush.  Do it outside update lock.
5738       requestFlush();
5739     }
5740 
5741     return Result.create(allKVs);
5742   }
5743 
5744   //
5745   // New HBASE-880 Helpers
5746   //
5747 
5748   private void checkFamily(final byte [] family)
5749   throws NoSuchColumnFamilyException {
5750     if (!this.htableDescriptor.hasFamily(family)) {
5751       throw new NoSuchColumnFamilyException("Column family " +
5752           Bytes.toString(family) + " does not exist in region " + this
5753           + " in table " + this.htableDescriptor);
5754     }
5755   }
5756 
5757   public static final long FIXED_OVERHEAD = ClassSize.align(
5758       ClassSize.OBJECT +
5759       ClassSize.ARRAY +
5760       41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
5761       (12 * Bytes.SIZEOF_LONG) +
5762       4 * Bytes.SIZEOF_BOOLEAN);
5763 
5764   // woefully out of date - currently missing:
5765   // 1 x HashMap - coprocessorServiceHandlers
5766   // 6 x Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
5767   //   checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,
5768   //   writeRequestsCount
5769   // 1 x HRegion$WriteState - writestate
5770   // 1 x RegionCoprocessorHost - coprocessorHost
5771   // 1 x RegionSplitPolicy - splitPolicy
5772   // 1 x MetricsRegion - metricsRegion
5773   // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper
5774   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
5775       ClassSize.OBJECT + // closeLock
5776       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
5777       (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
5778       (2 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, scannerReadPoints
5779       WriteState.HEAP_SIZE + // writestate
5780       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
5781       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
5782       MultiVersionConsistencyControl.FIXED_SIZE // mvcc
5783       + ClassSize.TREEMAP // maxSeqIdInStores
5784       + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
5785       ;
5786 
5787   @Override
5788   public long heapSize() {
5789     long heapSize = DEEP_OVERHEAD;
5790     for (Store store : this.stores.values()) {
5791       heapSize += store.heapSize();
5792     }
5793     // this does not take into account row locks, recent flushes, mvcc entries, and more
5794     return heapSize;
5795   }
5796 
5797   /*
5798    * This method calls System.exit.
5799    * @param message Message to print out.  May be null.
5800    */
5801   private static void printUsageAndExit(final String message) {
5802     if (message != null && message.length() > 0) System.out.println(message);
5803     System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
5804     System.out.println("Options:");
5805     System.out.println(" major_compact  Pass this option to major compact " +
5806       "passed region.");
5807     System.out.println("Default outputs scan of passed region.");
5808     System.exit(1);
5809   }
5810 
5811   /**
5812    * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
5813    * be available for handling
5814    * {@link HRegion#execService(com.google.protobuf.RpcController,
5815    *    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
5816    *
5817    * <p>
5818    * Only a single instance may be registered per region for a given {@link Service} subclass (the
5819    * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
5820    * After the first registration, subsequent calls with the same service name will fail with
5821    * a return value of {@code false}.
5822    * </p>
5823    * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
5824    * @return {@code true} if the registration was successful, {@code false}
5825    * otherwise
5826    */
5827   public boolean registerService(Service instance) {
5828     /*
5829      * No stacking of instances is allowed for a single service name
5830      */
5831     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
5832     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
5833       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
5834           " already registered, rejecting request from "+instance
5835       );
5836       return false;
5837     }
5838 
5839     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
5840     if (LOG.isDebugEnabled()) {
5841       LOG.debug("Registered coprocessor service: region="+
5842           Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
5843     }
5844     return true;
5845   }
5846 
5847   /**
5848    * Executes a single protocol buffer coprocessor endpoint {@link Service} method using
5849    * the registered protocol handlers.  {@link Service} implementations must be registered via the
5850    * {@link HRegion#registerService(com.google.protobuf.Service)}
5851    * method before they are available.
5852    *
5853    * @param controller an {@code RpcController} implementation to pass to the invoked service
5854    * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
5855    *     and parameters for the method invocation
5856    * @return a protocol buffer {@code Message} instance containing the method's result
5857    * @throws IOException if no registered service handler is found or an error
5858    *     occurs during the invocation
5859    * @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)
5860    */
5861   public Message execService(RpcController controller, CoprocessorServiceCall call)
5862       throws IOException {
5863     String serviceName = call.getServiceName();
5864     String methodName = call.getMethodName();
5865     if (!coprocessorServiceHandlers.containsKey(serviceName)) {
5866       throw new UnknownProtocolException(null,
5867           "No registered coprocessor service found for name "+serviceName+
5868           " in region "+Bytes.toStringBinary(getRegionName()));
5869     }
5870 
5871     Service service = coprocessorServiceHandlers.get(serviceName);
5872     Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
5873     Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
5874     if (methodDesc == null) {
5875       throw new UnknownProtocolException(service.getClass(),
5876           "Unknown method "+methodName+" called on service "+serviceName+
5877               " in region "+Bytes.toStringBinary(getRegionName()));
5878     }
5879 
5880     Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
5881         .mergeFrom(call.getRequest()).build();
5882 
5883     if (coprocessorHost != null) {
5884       request = coprocessorHost.preEndpointInvocation(service, methodName, request);
5885     }
5886 
5887     final Message.Builder responseBuilder =
5888         service.getResponsePrototype(methodDesc).newBuilderForType();
5889     service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
5890       @Override
5891       public void run(Message message) {
5892         if (message != null) {
5893           responseBuilder.mergeFrom(message);
5894         }
5895       }
5896     });
5897 
5898     if (coprocessorHost != null) {
5899       coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
5900     }
5901 
5902     return responseBuilder.build();
5903   }
5904 
5905   /*
5906    * Process table.
5907    * Do major compaction or list content.
5908    * @throws IOException
5909    */
5910   private static void processTable(final FileSystem fs, final Path p,
5911       final HLog log, final Configuration c,
5912       final boolean majorCompact)
5913   throws IOException {
5914     HRegion region;
5915     FSTableDescriptors fst = new FSTableDescriptors(c);
5916     // Currently expects tables have one region only.
5917     if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
5918       region = HRegion.newHRegion(p, log, fs, c,
5919         HRegionInfo.FIRST_META_REGIONINFO,
5920           fst.get(TableName.META_TABLE_NAME), null);
5921     } else {
5922       throw new IOException("Not a known catalog table: " + p.toString());
5923     }
5924     try {
5925       region.initialize(null);
5926       if (majorCompact) {
5927         region.compactStores(true);
5928       } else {
5929         // Default behavior
5930         Scan scan = new Scan();
5931         // scan.addFamily(HConstants.CATALOG_FAMILY);
5932         RegionScanner scanner = region.getScanner(scan);
5933         try {
5934           List<Cell> kvs = new ArrayList<Cell>();
5935           boolean done;
5936           do {
5937             kvs.clear();
5938             done = scanner.next(kvs);
5939             if (kvs.size() > 0) LOG.info(kvs);
5940           } while (done);
5941         } finally {
5942           scanner.close();
5943         }
5944       }
5945     } finally {
5946       region.close();
5947     }
5948   }
5949 
5950   boolean shouldForceSplit() {
5951     return this.splitRequest;
5952   }
5953 
5954   byte[] getExplicitSplitPoint() {
5955     return this.explicitSplitPoint;
5956   }
5957 
5958   void forceSplit(byte[] sp) {
5959     // This HRegion will go away after the forced split is successful
5960     // But if a forced split fails, we need to clear forced split.
5961     this.splitRequest = true;
5962     if (sp != null) {
5963       this.explicitSplitPoint = sp;
5964     }
5965   }
5966 
5967   void clearSplit() {
5968     this.splitRequest = false;
5969     this.explicitSplitPoint = null;
5970   }
5971 
5972   /**
5973    * Give the region a chance to prepare before it is split.
5974    */
5975   protected void prepareToSplit() {
5976     // nothing
5977   }
5978 
5979   /**
5980    * Return the splitpoint. null indicates the region isn't splittable
5981    * If the splitpoint isn't explicitly specified, it will go over the stores
5982    * to find the best splitpoint. Currently the criteria of best splitpoint
5983    * is based on the size of the store.
5984    */
5985   public byte[] checkSplit() {
5986     // Can't split META
5987     if (this.getRegionInfo().isMetaTable() ||
5988         TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
5989       if (shouldForceSplit()) {
5990         LOG.warn("Cannot split meta region in HBase 0.20 and above");
5991       }
5992       return null;
5993     }
5994 
5995     // Can't split region which is in recovering state
5996     if (this.isRecovering()) {
5997       LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
5998       return null;
5999     }
6000 
6001     if (!splitPolicy.shouldSplit()) {
6002       return null;
6003     }
6004 
6005     byte[] ret = splitPolicy.getSplitPoint();
6006 
6007     if (ret != null) {
6008       try {
6009         checkRow(ret, "calculated split");
6010       } catch (IOException e) {
6011         LOG.error("Ignoring invalid split", e);
6012         return null;
6013       }
6014     }
6015     return ret;
6016   }
6017 
6018   /**
6019    * @return The priority that this region should have in the compaction queue
6020    */
6021   public int getCompactPriority() {
6022     int count = Integer.MAX_VALUE;
6023     for (Store store : stores.values()) {
6024       count = Math.min(count, store.getCompactPriority());
6025     }
6026     return count;
6027   }
6028 
6029 
6030   /** @return the coprocessor host */
6031   public RegionCoprocessorHost getCoprocessorHost() {
6032     return coprocessorHost;
6033   }
6034 
6035   /** @param coprocessorHost the new coprocessor host */
6036   public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
6037     this.coprocessorHost = coprocessorHost;
6038   }
6039 
6040   /**
6041    * This method needs to be called before any public call that reads or
6042    * modifies data. It has to be called just before a try.
6043    * #closeRegionOperation needs to be called in the try's finally block
6044    * Acquires a read lock and checks if the region is closing or closed.
6045    * @throws IOException
6046    */
6047   public void startRegionOperation() throws IOException {
6048     startRegionOperation(Operation.ANY);
6049   }
6050 
6051   /**
6052    * @param op The operation is about to be taken on the region
6053    * @throws IOException
6054    */
6055   protected void startRegionOperation(Operation op) throws IOException {
6056     switch (op) {
6057     case GET:  // read operations
6058     case SCAN:
6059       checkReadsEnabled();
6060     case INCREMENT: // write operations
6061     case APPEND:
6062     case SPLIT_REGION:
6063     case MERGE_REGION:
6064     case PUT:
6065     case DELETE:
6066     case BATCH_MUTATE:
6067     case COMPACT_REGION:
6068       // when a region is in recovering state, no read, split or merge is allowed
6069       if (isRecovering() && (this.disallowWritesInRecovering ||
6070               (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
6071         throw new RegionInRecoveryException(this.getRegionNameAsString() +
6072           " is recovering; cannot take reads");
6073       }
6074       break;
6075     default:
6076       break;
6077     }
6078     if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
6079         || op == Operation.COMPACT_REGION) {
6080       // split, merge or compact region doesn't need to check the closing/closed state or lock the
6081       // region
6082       return;
6083     }
6084     if (this.closing.get()) {
6085       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
6086     }
6087     lock(lock.readLock());
6088     if (this.closed.get()) {
6089       lock.readLock().unlock();
6090       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
6091     }
6092     try {
6093       if (coprocessorHost != null) {
6094         coprocessorHost.postStartRegionOperation(op);
6095       }
6096     } catch (Exception e) {
6097       lock.readLock().unlock();
6098       throw new IOException(e);
6099     }
6100   }
6101 
6102   /**
6103    * Closes the lock. This needs to be called in the finally block corresponding
6104    * to the try block of #startRegionOperation
6105    * @throws IOException
6106    */
6107   public void closeRegionOperation() throws IOException {
6108     closeRegionOperation(Operation.ANY);
6109   }
6110 
6111   /**
6112    * Closes the lock. This needs to be called in the finally block corresponding
6113    * to the try block of {@link #startRegionOperation(Operation)}
6114    * @throws IOException
6115    */
6116   public void closeRegionOperation(Operation operation) throws IOException {
6117     lock.readLock().unlock();
6118     if (coprocessorHost != null) {
6119       coprocessorHost.postCloseRegionOperation(operation);
6120     }
6121   }
6122 
6123   /**
6124    * This method needs to be called before any public call that reads or
6125    * modifies stores in bulk. It has to be called just before a try.
6126    * #closeBulkRegionOperation needs to be called in the try's finally block
6127    * Acquires a writelock and checks if the region is closing or closed.
6128    * @throws NotServingRegionException when the region is closing or closed
6129    * @throws RegionTooBusyException if failed to get the lock in time
6130    * @throws InterruptedIOException if interrupted while waiting for a lock
6131    */
6132   private void startBulkRegionOperation(boolean writeLockNeeded)
6133       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
6134     if (this.closing.get()) {
6135       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
6136     }
6137     if (writeLockNeeded) lock(lock.writeLock());
6138     else lock(lock.readLock());
6139     if (this.closed.get()) {
6140       if (writeLockNeeded) lock.writeLock().unlock();
6141       else lock.readLock().unlock();
6142       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
6143     }
6144   }
6145 
6146   /**
6147    * Closes the lock. This needs to be called in the finally block corresponding
6148    * to the try block of #startRegionOperation
6149    */
6150   private void closeBulkRegionOperation(){
6151     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
6152     else lock.readLock().unlock();
6153   }
6154 
6155   /**
6156    * Update counters for numer of puts without wal and the size of possible data loss.
6157    * These information are exposed by the region server metrics.
6158    */
6159   private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
6160     numMutationsWithoutWAL.increment();
6161     if (numMutationsWithoutWAL.get() <= 1) {
6162       LOG.info("writing data to region " + this +
6163                " with WAL disabled. Data may be lost in the event of a crash.");
6164     }
6165 
6166     long mutationSize = 0;
6167     for (List<Cell> cells: familyMap.values()) {
6168       assert cells instanceof RandomAccess;
6169       int listSize = cells.size();
6170       for (int i=0; i < listSize; i++) {
6171         Cell cell = cells.get(i);
6172         // TODO we need include tags length also here.
6173         mutationSize += KeyValueUtil.keyLength(cell) + cell.getValueLength();
6174       }
6175     }
6176 
6177     dataInMemoryWithoutWAL.add(mutationSize);
6178   }
6179 
6180   private void lock(final Lock lock)
6181       throws RegionTooBusyException, InterruptedIOException {
6182     lock(lock, 1);
6183   }
6184 
6185   /**
6186    * Try to acquire a lock.  Throw RegionTooBusyException
6187    * if failed to get the lock in time. Throw InterruptedIOException
6188    * if interrupted while waiting for the lock.
6189    */
6190   private void lock(final Lock lock, final int multiplier)
6191       throws RegionTooBusyException, InterruptedIOException {
6192     try {
6193       final long waitTime = Math.min(maxBusyWaitDuration,
6194           busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
6195       if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
6196         throw new RegionTooBusyException(
6197             "failed to get a lock in " + waitTime + " ms. " +
6198                 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
6199                 this.getRegionInfo().getRegionNameAsString()) +
6200                 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
6201                 this.getRegionServerServices().getServerName()));
6202       }
6203     } catch (InterruptedException ie) {
6204       LOG.info("Interrupted while waiting for a lock");
6205       InterruptedIOException iie = new InterruptedIOException();
6206       iie.initCause(ie);
6207       throw iie;
6208     }
6209   }
6210 
6211   /**
6212    * Calls sync with the given transaction ID if the region's table is not
6213    * deferring it.
6214    * @param txid should sync up to which transaction
6215    * @throws IOException If anything goes wrong with DFS
6216    */
6217   private void syncOrDefer(long txid, Durability durability) throws IOException {
6218     if (this.getRegionInfo().isMetaRegion()) {
6219       this.log.sync(txid);
6220     } else {
6221       switch(durability) {
6222       case USE_DEFAULT:
6223         // do what table defaults to
6224         if (shouldSyncLog()) {
6225           this.log.sync(txid);
6226         }
6227         break;
6228       case SKIP_WAL:
6229         // nothing do to
6230         break;
6231       case ASYNC_WAL:
6232         // nothing do to
6233         break;
6234       case SYNC_WAL:
6235       case FSYNC_WAL:
6236         // sync the WAL edit (SYNC and FSYNC treated the same for now)
6237         this.log.sync(txid);
6238         break;
6239       }
6240     }
6241   }
6242 
6243   /**
6244    * Check whether we should sync the log from the table's durability settings
6245    */
6246   private boolean shouldSyncLog() {
6247     return durability.ordinal() >  Durability.ASYNC_WAL.ordinal();
6248   }
6249 
6250   /**
6251    * A mocked list implementaion - discards all updates.
6252    */
6253   private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
6254 
6255     @Override
6256     public void add(int index, Cell element) {
6257       // do nothing
6258     }
6259 
6260     @Override
6261     public boolean addAll(int index, Collection<? extends Cell> c) {
6262       return false; // this list is never changed as a result of an update
6263     }
6264 
6265     @Override
6266     public KeyValue get(int index) {
6267       throw new UnsupportedOperationException();
6268     }
6269 
6270     @Override
6271     public int size() {
6272       return 0;
6273     }
6274   };
6275 
6276   /**
6277    * Facility for dumping and compacting catalog tables.
6278    * Only does catalog tables since these are only tables we for sure know
6279    * schema on.  For usage run:
6280    * <pre>
6281    *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
6282    * </pre>
6283    * @throws IOException
6284    */
6285   public static void main(String[] args) throws IOException {
6286     if (args.length < 1) {
6287       printUsageAndExit(null);
6288     }
6289     boolean majorCompact = false;
6290     if (args.length > 1) {
6291       if (!args[1].toLowerCase().startsWith("major")) {
6292         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
6293       }
6294       majorCompact = true;
6295     }
6296     final Path tableDir = new Path(args[0]);
6297     final Configuration c = HBaseConfiguration.create();
6298     final FileSystem fs = FileSystem.get(c);
6299     final Path logdir = new Path(c.get("hbase.tmp.dir"));
6300     final String logname = "hlog" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
6301 
6302     final HLog log = HLogFactory.createHLog(fs, logdir, logname, c);
6303     try {
6304       processTable(fs, tableDir, log, c, majorCompact);
6305     } finally {
6306        log.close();
6307        // TODO: is this still right?
6308        BlockCache bc = new CacheConfig(c).getBlockCache();
6309        if (bc != null) bc.shutdown();
6310     }
6311   }
6312 
6313   /**
6314    * Gets the latest sequence number that was read from storage when this region was opened.
6315    */
6316   public long getOpenSeqNum() {
6317     return this.openSeqNum;
6318   }
6319 
6320   /**
6321    * Gets max sequence ids of stores that was read from storage when this region was opened. WAL
6322    * Edits with smaller or equal sequence number will be skipped from replay.
6323    */
6324   public Map<byte[], Long> getMaxStoreSeqIdForLogReplay() {
6325     return this.maxSeqIdInStores;
6326   }
6327 
6328   /**
6329    * @return if a given region is in compaction now.
6330    */
6331   public CompactionState getCompactionState() {
6332     boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
6333     return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
6334         : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
6335   }
6336 
6337   public void reportCompactionRequestStart(boolean isMajor){
6338     (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
6339   }
6340 
6341   public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
6342     int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
6343 
6344     // metrics
6345     compactionsFinished.incrementAndGet();
6346     compactionNumFilesCompacted.addAndGet(numFiles);
6347     compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
6348 
6349     assert newValue >= 0;
6350   }
6351 
6352   /**
6353    * Do not change this sequence id. See {@link #sequenceId} comment.
6354    * @return sequenceId
6355    */
6356   @VisibleForTesting
6357   public AtomicLong getSequenceId() {
6358     return this.sequenceId;
6359   }
6360 
6361   /**
6362    * sets this region's sequenceId.
6363    * @param value new value
6364    */
6365   private void setSequenceId(long value) {
6366     this.sequenceId.set(value);
6367   }
6368 
6369   /**
6370    * Listener class to enable callers of
6371    * bulkLoadHFile() to perform any necessary
6372    * pre/post processing of a given bulkload call
6373    */
6374   public interface BulkLoadListener {
6375 
6376     /**
6377      * Called before an HFile is actually loaded
6378      * @param family family being loaded to
6379      * @param srcPath path of HFile
6380      * @return final path to be used for actual loading
6381      * @throws IOException
6382      */
6383     String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
6384 
6385     /**
6386      * Called after a successful HFile load
6387      * @param family family being loaded to
6388      * @param srcPath path of HFile
6389      * @throws IOException
6390      */
6391     void doneBulkLoad(byte[] family, String srcPath) throws IOException;
6392 
6393     /**
6394      * Called after a failed HFile load
6395      * @param family family being loaded to
6396      * @param srcPath path of HFile
6397      * @throws IOException
6398      */
6399     void failedBulkLoad(byte[] family, String srcPath) throws IOException;
6400   }
6401 
6402   @VisibleForTesting class RowLockContext {
6403     private final HashedBytes row;
6404     private final CountDownLatch latch = new CountDownLatch(1);
6405     private final Thread thread;
6406     private int lockCount = 0;
6407 
6408     RowLockContext(HashedBytes row) {
6409       this.row = row;
6410       this.thread = Thread.currentThread();
6411     }
6412 
6413     boolean ownedByCurrentThread() {
6414       return thread == Thread.currentThread();
6415     }
6416 
6417     RowLock newLock() {
6418       lockCount++;
6419       return new RowLock(this);
6420     }
6421 
6422     @Override
6423     public String toString() {
6424       Thread t = this.thread;
6425       return "Thread=" + (t == null? "null": t.getName()) + ", row=" + this.row +
6426         ", lockCount=" + this.lockCount;
6427     }
6428 
6429     void releaseLock() {
6430       if (!ownedByCurrentThread()) {
6431         throw new IllegalArgumentException("Lock held by thread: " + thread
6432           + " cannot be released by different thread: " + Thread.currentThread());
6433       }
6434       lockCount--;
6435       if (lockCount == 0) {
6436         // no remaining locks by the thread, unlock and allow other threads to access
6437         RowLockContext existingContext = lockedRows.remove(row);
6438         if (existingContext != this) {
6439           throw new RuntimeException(
6440               "Internal row lock state inconsistent, should not happen, row: " + row);
6441         }
6442         latch.countDown();
6443       }
6444     }
6445   }
6446 
6447   /**
6448    * Row lock held by a given thread.
6449    * One thread may acquire multiple locks on the same row simultaneously.
6450    * The locks must be released by calling release() from the same thread.
6451    */
6452   public static class RowLock {
6453     @VisibleForTesting final RowLockContext context;
6454     private boolean released = false;
6455 
6456     @VisibleForTesting RowLock(RowLockContext context) {
6457       this.context = context;
6458     }
6459 
6460     /**
6461      * Release the given lock.  If there are no remaining locks held by the current thread
6462      * then unlock the row and allow other threads to acquire the lock.
6463      * @throws IllegalArgumentException if called by a different thread than the lock owning thread
6464      */
6465     public void release() {
6466       if (!released) {
6467         context.releaseLock();
6468         released = true;
6469       }
6470     }
6471   }
6472 
6473   /**
6474    * Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore
6475    * the WALEdit append later.
6476    * @param wal
6477    * @param cells list of Cells inserted into memstore. Those Cells are passed in order to
6478    *        be updated with right mvcc values(their log sequence number)
6479    * @return Return the key used appending with no sync and no append.
6480    * @throws IOException
6481    */
6482   private HLogKey appendNoSyncNoAppend(final HLog wal, List<Cell> cells) throws IOException {
6483     HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
6484       HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
6485     // Call append but with an empty WALEdit.  The returned seqeunce id will not be associated
6486     // with any edit and we can be sure it went in after all outstanding appends.
6487     wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
6488       WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
6489     return key;
6490   }
6491 
6492   /**
6493    * Explictly sync wal
6494    * @throws IOException
6495    */
6496   public void syncWal() throws IOException {
6497     if(this.log != null) {
6498       this.log.sync();
6499     }
6500   }
6501 
6502   /**
6503    * {@inheritDoc}
6504    */
6505   @Override
6506   public void onConfigurationChange(Configuration conf) {
6507     // Do nothing for now.
6508   }
6509 
6510   /**
6511    * {@inheritDoc}
6512    */
6513   @Override
6514   public void registerChildren(ConfigurationManager manager) {
6515     configurationManager = Optional.of(manager);
6516     for (Store s : this.stores.values()) {
6517       configurationManager.get().registerObserver(s);
6518     }
6519   }
6520 
6521   /**
6522    * {@inheritDoc}
6523    */
6524   @Override
6525   public void deregisterChildren(ConfigurationManager manager) {
6526     for (Store s : this.stores.values()) {
6527       configurationManager.get().deregisterObserver(s);
6528     }
6529   }
6530 }