View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.io.UnsupportedEncodingException;
25  import java.lang.reflect.Constructor;
26  import java.text.ParseException;
27  import java.util.AbstractList;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.NavigableMap;
36  import java.util.NavigableSet;
37  import java.util.Set;
38  import java.util.TreeMap;
39  import java.util.UUID;
40  import java.util.concurrent.Callable;
41  import java.util.concurrent.CompletionService;
42  import java.util.concurrent.ConcurrentHashMap;
43  import java.util.concurrent.ConcurrentSkipListMap;
44  import java.util.concurrent.CountDownLatch;
45  import java.util.concurrent.ExecutionException;
46  import java.util.concurrent.ExecutorCompletionService;
47  import java.util.concurrent.ExecutorService;
48  import java.util.concurrent.Executors;
49  import java.util.concurrent.Future;
50  import java.util.concurrent.FutureTask;
51  import java.util.concurrent.ThreadFactory;
52  import java.util.concurrent.ThreadPoolExecutor;
53  import java.util.concurrent.TimeUnit;
54  import java.util.concurrent.TimeoutException;
55  import java.util.concurrent.atomic.AtomicBoolean;
56  import java.util.concurrent.atomic.AtomicInteger;
57  import java.util.concurrent.atomic.AtomicLong;
58  import java.util.concurrent.locks.Lock;
59  import java.util.concurrent.locks.ReentrantReadWriteLock;
60  
61  import org.apache.commons.logging.Log;
62  import org.apache.commons.logging.LogFactory;
63  import org.apache.hadoop.classification.InterfaceAudience;
64  import org.apache.hadoop.conf.Configuration;
65  import org.apache.hadoop.fs.FileStatus;
66  import org.apache.hadoop.fs.FileSystem;
67  import org.apache.hadoop.fs.Path;
68  import org.apache.hadoop.hbase.Cell;
69  import org.apache.hadoop.hbase.CellUtil;
70  import org.apache.hadoop.hbase.CompoundConfiguration;
71  import org.apache.hadoop.hbase.DoNotRetryIOException;
72  import org.apache.hadoop.hbase.DroppedSnapshotException;
73  import org.apache.hadoop.hbase.HBaseConfiguration;
74  import org.apache.hadoop.hbase.HColumnDescriptor;
75  import org.apache.hadoop.hbase.HConstants;
76  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
77  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
78  import org.apache.hadoop.hbase.HRegionInfo;
79  import org.apache.hadoop.hbase.HTableDescriptor;
80  import org.apache.hadoop.hbase.KeyValue;
81  import org.apache.hadoop.hbase.KeyValueUtil;
82  import org.apache.hadoop.hbase.NotServingRegionException;
83  import org.apache.hadoop.hbase.RegionTooBusyException;
84  import org.apache.hadoop.hbase.TableName;
85  import org.apache.hadoop.hbase.UnknownScannerException;
86  import org.apache.hadoop.hbase.backup.HFileArchiver;
87  import org.apache.hadoop.hbase.client.Append;
88  import org.apache.hadoop.hbase.client.Delete;
89  import org.apache.hadoop.hbase.client.Durability;
90  import org.apache.hadoop.hbase.client.Get;
91  import org.apache.hadoop.hbase.client.Increment;
92  import org.apache.hadoop.hbase.client.IsolationLevel;
93  import org.apache.hadoop.hbase.client.Mutation;
94  import org.apache.hadoop.hbase.client.Put;
95  import org.apache.hadoop.hbase.client.Result;
96  import org.apache.hadoop.hbase.client.RowMutations;
97  import org.apache.hadoop.hbase.client.Scan;
98  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
99  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
100 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
101 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
102 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
103 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
104 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
105 import org.apache.hadoop.hbase.filter.Filter;
106 import org.apache.hadoop.hbase.filter.FilterWrapper;
107 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
108 import org.apache.hadoop.hbase.io.HeapSize;
109 import org.apache.hadoop.hbase.io.TimeRange;
110 import org.apache.hadoop.hbase.io.hfile.BlockCache;
111 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
112 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
113 import org.apache.hadoop.hbase.ipc.RpcCallContext;
114 import org.apache.hadoop.hbase.ipc.RpcServer;
115 import org.apache.hadoop.hbase.master.AssignmentManager;
116 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
117 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
119 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
121 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
122 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
123 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
124 import org.apache.hadoop.hbase.regionserver.wal.HLog;
125 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
126 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
127 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
128 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.MutationReplay;
129 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
130 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
131 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
132 import org.apache.hadoop.hbase.util.Bytes;
133 import org.apache.hadoop.hbase.util.CancelableProgressable;
134 import org.apache.hadoop.hbase.util.ClassSize;
135 import org.apache.hadoop.hbase.util.CompressionTest;
136 import org.apache.hadoop.hbase.util.Counter;
137 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
138 import org.apache.hadoop.hbase.util.FSUtils;
139 import org.apache.hadoop.hbase.util.HashedBytes;
140 import org.apache.hadoop.hbase.util.Pair;
141 import org.apache.hadoop.hbase.util.Threads;
142 import org.apache.hadoop.io.MultipleIOException;
143 import org.apache.hadoop.util.StringUtils;
144 
145 import com.google.common.annotations.VisibleForTesting;
146 import com.google.common.base.Preconditions;
147 import com.google.common.collect.Lists;
148 import com.google.common.collect.Maps;
149 import com.google.common.io.Closeables;
150 import com.google.protobuf.Descriptors;
151 import com.google.protobuf.Message;
152 import com.google.protobuf.RpcCallback;
153 import com.google.protobuf.RpcController;
154 import com.google.protobuf.Service;
155 
156 /**
157  * HRegion stores data for a certain region of a table.  It stores all columns
158  * for each row. A given table consists of one or more HRegions.
159  *
160  * <p>We maintain multiple HStores for a single HRegion.
161  *
162  * <p>An Store is a set of rows with some column data; together,
163  * they make up all the data for the rows.
164  *
165  * <p>Each HRegion has a 'startKey' and 'endKey'.
166  * <p>The first is inclusive, the second is exclusive (except for
167  * the final region)  The endKey of region 0 is the same as
168  * startKey for region 1 (if it exists).  The startKey for the
169  * first region is null. The endKey for the final region is null.
170  *
171  * <p>Locking at the HRegion level serves only one purpose: preventing the
172  * region from being closed (and consequently split) while other operations
173  * are ongoing. Each row level operation obtains both a row lock and a region
174  * read lock for the duration of the operation. While a scanner is being
175  * constructed, getScanner holds a read lock. If the scanner is successfully
176  * constructed, it holds a read lock until it is closed. A close takes out a
177  * write lock and consequently will block for ongoing operations and will block
178  * new operations from starting while the close is in progress.
179  *
180  * <p>An HRegion is defined by its table and its key extent.
181  *
182  * <p>It consists of at least one Store.  The number of Stores should be
183  * configurable, so that data which is accessed together is stored in the same
184  * Store.  Right now, we approximate that by building a single Store for
185  * each column family.  (This config info will be communicated via the
186  * tabledesc.)
187  *
188  * <p>The HTableDescriptor contains metainfo about the HRegion's table.
189  * regionName is a unique identifier for this HRegion. (startKey, endKey]
190  * defines the keyspace for this HRegion.
191  */
192 @InterfaceAudience.Private
193 public class HRegion implements HeapSize { // , Writable{
194   public static final Log LOG = LogFactory.getLog(HRegion.class);
195 
196   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
197       "hbase.hregion.scan.loadColumnFamiliesOnDemand";
198       
199   /**
200    * This is the global default value for durability. All tables/mutations not
201    * defining a durability or using USE_DEFAULT will default to this value.
202    */
203   private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL;
204 
205   final AtomicBoolean closed = new AtomicBoolean(false);
206   /* Closing can take some time; use the closing flag if there is stuff we don't
207    * want to do while in closing state; e.g. like offer this region up to the
208    * master as a region to close if the carrying regionserver is overloaded.
209    * Once set, it is never cleared.
210    */
211   final AtomicBoolean closing = new AtomicBoolean(false);
212 
213   protected volatile long completeSequenceId = -1L;
214 
215   /**
216    * Region level sequence Id. It is used for appending WALEdits in HLog. Its default value is -1,
217    * as a marker that the region hasn't opened yet. Once it is opened, it is set to
218    * {@link #openSeqNum}.
219    */
220   private final AtomicLong sequenceId = new AtomicLong(-1L);
221 
222   /**
223    * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
224    * startRegionOperation to possibly invoke different checks before any region operations. Not all
225    * operations have to be defined here. It's only needed when a special check is need in
226    * startRegionOperation
227    */
228   public enum Operation {
229     ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
230     REPLAY_BATCH_MUTATE, COMPACT_REGION
231   }
232 
233   //////////////////////////////////////////////////////////////////////////////
234   // Members
235   //////////////////////////////////////////////////////////////////////////////
236 
237   // map from a locked row to the context for that lock including:
238   // - CountDownLatch for threads waiting on that row
239   // - the thread that owns the lock (allow reentrancy)
240   // - reference count of (reentrant) locks held by the thread
241   // - the row itself
242   private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
243       new ConcurrentHashMap<HashedBytes, RowLockContext>();
244 
245   protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
246       Bytes.BYTES_RAWCOMPARATOR);
247 
248   // TODO: account for each registered handler in HeapSize computation
249   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
250 
251   public final AtomicLong memstoreSize = new AtomicLong(0);
252 
253   // Debug possible data loss due to WAL off
254   final Counter numMutationsWithoutWAL = new Counter();
255   final Counter dataInMemoryWithoutWAL = new Counter();
256 
257   // Debug why CAS operations are taking a while.
258   final Counter checkAndMutateChecksPassed = new Counter();
259   final Counter checkAndMutateChecksFailed = new Counter();
260 
261   //Number of requests
262   final Counter readRequestsCount = new Counter();
263   final Counter writeRequestsCount = new Counter();
264 
265   // Compaction counters
266   final AtomicLong compactionsFinished = new AtomicLong(0L);
267   final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
268   final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
269 
270 
271   private final HLog log;
272   private final HRegionFileSystem fs;
273   protected final Configuration conf;
274   private final Configuration baseConf;
275   private final KeyValue.KVComparator comparator;
276   private final int rowLockWaitDuration;
277   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
278 
279   // The internal wait duration to acquire a lock before read/update
280   // from the region. It is not per row. The purpose of this wait time
281   // is to avoid waiting a long time while the region is busy, so that
282   // we can release the IPC handler soon enough to improve the
283   // availability of the region server. It can be adjusted by
284   // tuning configuration "hbase.busy.wait.duration".
285   final long busyWaitDuration;
286   static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
287 
288   // If updating multiple rows in one call, wait longer,
289   // i.e. waiting for busyWaitDuration * # of rows. However,
290   // we can limit the max multiplier.
291   final int maxBusyWaitMultiplier;
292 
293   // Max busy wait duration. There is no point to wait longer than the RPC
294   // purge timeout, when a RPC call will be terminated by the RPC engine.
295   final long maxBusyWaitDuration;
296 
297   // negative number indicates infinite timeout
298   static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
299   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
300 
301   private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
302 
303   /**
304    * The sequence ID that was encountered when this region was opened.
305    */
306   private long openSeqNum = HConstants.NO_SEQNUM;
307 
308   /**
309    * The default setting for whether to enable on-demand CF loading for
310    * scan requests to this region. Requests can override it.
311    */
312   private boolean isLoadingCfsOnDemandDefault = false;
313 
314   private final AtomicInteger majorInProgress = new AtomicInteger(0);
315   private final AtomicInteger minorInProgress = new AtomicInteger(0);
316 
317   //
318   // Context: During replay we want to ensure that we do not lose any data. So, we
319   // have to be conservative in how we replay logs. For each store, we calculate
320   // the maxSeqId up to which the store was flushed. And, skip the edits which
321   // are equal to or lower than maxSeqId for each store.
322   // The following map is populated when opening the region
323   Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
324 
325   /**
326    * Config setting for whether to allow writes when a region is in recovering or not.
327    */
328   private boolean disallowWritesInRecovering = false;
329 
330   // when a region is in recovering state, it can only accept writes not reads
331   private volatile boolean isRecovering = false;
332 
333   /**
334    * @return The smallest mvcc readPoint across all the scanners in this
335    * region. Writes older than this readPoint, are included  in every
336    * read operation.
337    */
338   public long getSmallestReadPoint() {
339     long minimumReadPoint;
340     // We need to ensure that while we are calculating the smallestReadPoint
341     // no new RegionScanners can grab a readPoint that we are unaware of.
342     // We achieve this by synchronizing on the scannerReadPoints object.
343     synchronized(scannerReadPoints) {
344       minimumReadPoint = mvcc.memstoreReadPoint();
345 
346       for (Long readPoint: this.scannerReadPoints.values()) {
347         if (readPoint < minimumReadPoint) {
348           minimumReadPoint = readPoint;
349         }
350       }
351     }
352     return minimumReadPoint;
353   }
354   /*
355    * Data structure of write state flags used coordinating flushes,
356    * compactions and closes.
357    */
358   static class WriteState {
359     // Set while a memstore flush is happening.
360     volatile boolean flushing = false;
361     // Set when a flush has been requested.
362     volatile boolean flushRequested = false;
363     // Number of compactions running.
364     volatile int compacting = 0;
365     // Gets set in close. If set, cannot compact or flush again.
366     volatile boolean writesEnabled = true;
367     // Set if region is read-only
368     volatile boolean readOnly = false;
369 
370     /**
371      * Set flags that make this region read-only.
372      *
373      * @param onOff flip value for region r/o setting
374      */
375     synchronized void setReadOnly(final boolean onOff) {
376       this.writesEnabled = !onOff;
377       this.readOnly = onOff;
378     }
379 
380     boolean isReadOnly() {
381       return this.readOnly;
382     }
383 
384     boolean isFlushRequested() {
385       return this.flushRequested;
386     }
387 
388     static final long HEAP_SIZE = ClassSize.align(
389         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
390   }
391 
392   final WriteState writestate = new WriteState();
393 
394   long memstoreFlushSize;
395   final long timestampSlop;
396   final long rowProcessorTimeout;
397   private volatile long lastFlushTime;
398   final RegionServerServices rsServices;
399   private RegionServerAccounting rsAccounting;
400   private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
401   private long flushCheckInterval;
402   // flushPerChanges is to prevent too many changes in memstore    
403   private long flushPerChanges;
404   private long blockingMemStoreSize;
405   final long threadWakeFrequency;
406   // Used to guard closes
407   final ReentrantReadWriteLock lock =
408     new ReentrantReadWriteLock();
409 
410   // Stop updates lock
411   private final ReentrantReadWriteLock updatesLock =
412     new ReentrantReadWriteLock();
413   private boolean splitRequest;
414   private byte[] explicitSplitPoint = null;
415 
416   private final MultiVersionConsistencyControl mvcc =
417       new MultiVersionConsistencyControl();
418 
419   // Coprocessor host
420   private RegionCoprocessorHost coprocessorHost;
421 
422   private HTableDescriptor htableDescriptor = null;
423   private RegionSplitPolicy splitPolicy;
424 
425   private final MetricsRegion metricsRegion;
426   private final MetricsRegionWrapperImpl metricsRegionWrapper;
427   private final Durability durability;
428 
429   /**
430    * HRegion constructor. This constructor should only be used for testing and
431    * extensions.  Instances of HRegion should be instantiated with the
432    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
433    *
434    * @param tableDir qualified path of directory where region should be located,
435    * usually the table directory.
436    * @param log The HLog is the outbound log for any updates to the HRegion
437    * (There's a single HLog for all the HRegions on a single HRegionServer.)
438    * The log file is a logfile from the previous execution that's
439    * custom-computed for this HRegion. The HRegionServer computes and sorts the
440    * appropriate log info for this HRegion. If there is a previous log file
441    * (implying that the HRegion has been written-to before), then read it from
442    * the supplied path.
443    * @param fs is the filesystem.
444    * @param confParam is global configuration settings.
445    * @param regionInfo - HRegionInfo that describes the region
446    * is new), then read them from the supplied path.
447    * @param htd the table descriptor
448    * @param rsServices reference to {@link RegionServerServices} or null
449    */
450   @Deprecated
451   public HRegion(final Path tableDir, final HLog log, final FileSystem fs,
452       final Configuration confParam, final HRegionInfo regionInfo,
453       final HTableDescriptor htd, final RegionServerServices rsServices) {
454     this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
455       log, confParam, htd, rsServices);
456   }
457 
458   /**
459    * HRegion constructor. This constructor should only be used for testing and
460    * extensions.  Instances of HRegion should be instantiated with the
461    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
462    *
463    * @param fs is the filesystem.
464    * @param log The HLog is the outbound log for any updates to the HRegion
465    * (There's a single HLog for all the HRegions on a single HRegionServer.)
466    * The log file is a logfile from the previous execution that's
467    * custom-computed for this HRegion. The HRegionServer computes and sorts the
468    * appropriate log info for this HRegion. If there is a previous log file
469    * (implying that the HRegion has been written-to before), then read it from
470    * the supplied path.
471    * @param confParam is global configuration settings.
472    * @param htd the table descriptor
473    * @param rsServices reference to {@link RegionServerServices} or null
474    */
475   public HRegion(final HRegionFileSystem fs, final HLog log, final Configuration confParam,
476       final HTableDescriptor htd, final RegionServerServices rsServices) {
477     if (htd == null) {
478       throw new IllegalArgumentException("Need table descriptor");
479     }
480 
481     if (confParam instanceof CompoundConfiguration) {
482       throw new IllegalArgumentException("Need original base configuration");
483     }
484 
485     this.comparator = fs.getRegionInfo().getComparator();
486     this.log = log;
487     this.fs = fs;
488 
489     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
490     this.baseConf = confParam;
491     this.conf = new CompoundConfiguration()
492       .add(confParam)
493       .addStringMap(htd.getConfiguration())
494       .addWritableMap(htd.getValues());
495     this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
496         DEFAULT_CACHE_FLUSH_INTERVAL);
497     this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
498     if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
499       throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
500           + MAX_FLUSH_PER_CHANGES);
501     }
502     
503     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
504                     DEFAULT_ROWLOCK_WAIT_DURATION);
505 
506     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
507     this.htableDescriptor = htd;
508     this.rsServices = rsServices;
509     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
510     setHTableSpecificConf();
511     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
512 
513     this.busyWaitDuration = conf.getLong(
514       "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
515     this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
516     if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
517       throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
518         + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
519         + maxBusyWaitMultiplier + "). Their product should be positive");
520     }
521     this.maxBusyWaitDuration = conf.getLong("ipc.client.call.purge.timeout",
522       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
523 
524     /*
525      * timestamp.slop provides a server-side constraint on the timestamp. This
526      * assumes that you base your TS around currentTimeMillis(). In this case,
527      * throw an error to the user if the user-specified TS is newer than now +
528      * slop. LATEST_TIMESTAMP == don't use this functionality
529      */
530     this.timestampSlop = conf.getLong(
531         "hbase.hregion.keyvalue.timestamp.slop.millisecs",
532         HConstants.LATEST_TIMESTAMP);
533 
534     /**
535      * Timeout for the process time in processRowsWithLocks().
536      * Use -1 to switch off time bound.
537      */
538     this.rowProcessorTimeout = conf.getLong(
539         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
540     this.durability = htd.getDurability() == Durability.USE_DEFAULT
541         ? DEFAULT_DURABLITY
542         : htd.getDurability();
543     if (rsServices != null) {
544       this.rsAccounting = this.rsServices.getRegionServerAccounting();
545       // don't initialize coprocessors if not running within a regionserver
546       // TODO: revisit if coprocessors should load in other cases
547       this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
548       this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
549       this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
550 
551       Map<String, HRegion> recoveringRegions = rsServices.getRecoveringRegions();
552       String encodedName = getRegionInfo().getEncodedName();
553       if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
554         this.isRecovering = true;
555         recoveringRegions.put(encodedName, this);
556       }
557     } else {
558       this.metricsRegionWrapper = null;
559       this.metricsRegion = null;
560     }
561     if (LOG.isDebugEnabled()) {
562       // Write out region name as string and its encoded name.
563       LOG.debug("Instantiated " + this);
564     }
565 
566     // by default, we allow writes against a region when it's in recovering
567     this.disallowWritesInRecovering =
568         conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
569           HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
570   }
571 
572   void setHTableSpecificConf() {
573     if (this.htableDescriptor == null) return;
574     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
575 
576     if (flushSize <= 0) {
577       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
578         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
579     }
580     this.memstoreFlushSize = flushSize;
581     this.blockingMemStoreSize = this.memstoreFlushSize *
582         conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
583   }
584 
585   /**
586    * Initialize this region.
587    * Used only by tests and SplitTransaction to reopen the region.
588    * You should use createHRegion() or openHRegion()
589    * @return What the next sequence (edit) id should be.
590    * @throws IOException e
591    * @deprecated use HRegion.createHRegion() or HRegion.openHRegion()
592    */
593   @Deprecated
594   public long initialize() throws IOException {
595     return initialize(null);
596   }
597 
598   /**
599    * Initialize this region.
600    *
601    * @param reporter Tickle every so often if initialize is taking a while.
602    * @return What the next sequence (edit) id should be.
603    * @throws IOException e
604    */
605   private long initialize(final CancelableProgressable reporter) throws IOException {
606     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
607     long nextSeqId = -1;
608     try {
609       nextSeqId = initializeRegionInternals(reporter, status);
610       return nextSeqId;
611     } finally {
612       // nextSeqid will be -1 if the initialization fails.
613       // At least it will be 0 otherwise.
614       if (nextSeqId == -1) {
615         status
616             .abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
617       }
618     }
619   }
620 
621   private long initializeRegionInternals(final CancelableProgressable reporter,
622       final MonitoredTask status) throws IOException, UnsupportedEncodingException {
623     if (coprocessorHost != null) {
624       status.setStatus("Running coprocessor pre-open hook");
625       coprocessorHost.preOpen();
626     }
627 
628     // Write HRI to a file in case we need to recover hbase:meta
629     status.setStatus("Writing region info on filesystem");
630     fs.checkRegionInfoOnFilesystem();
631 
632     // Remove temporary data left over from old regions
633     status.setStatus("Cleaning up temporary data from old regions");
634     fs.cleanupTempDir();
635 
636     // Initialize all the HStores
637     status.setStatus("Initializing all the Stores");
638     long maxSeqId = initializeRegionStores(reporter, status);
639 
640     status.setStatus("Cleaning up detritus from prior splits");
641     // Get rid of any splits or merges that were lost in-progress.  Clean out
642     // these directories here on open.  We may be opening a region that was
643     // being split but we crashed in the middle of it all.
644     fs.cleanupAnySplitDetritus();
645     fs.cleanupMergesDir();
646 
647     this.writestate.setReadOnly(this.htableDescriptor.isReadOnly());
648     this.writestate.flushRequested = false;
649     this.writestate.compacting = 0;
650 
651     // Initialize split policy
652     this.splitPolicy = RegionSplitPolicy.create(this, conf);
653 
654     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
655     // Use maximum of log sequenceid or that which was found in stores
656     // (particularly if no recovered edits, seqid will be -1).
657     long nextSeqid = maxSeqId + 1;
658     if (this.isRecovering) {
659       // In distributedLogReplay mode, we don't know the last change sequence number because region
660       // is opened before recovery completes. So we add a safety bumper to avoid new sequence number
661       // overlaps used sequence numbers
662       nextSeqid += this.flushPerChanges + 10000000; // add another extra 10million
663     }
664     LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
665       "; next sequenceid=" + nextSeqid);
666 
667     // A region can be reopened if failed a split; reset flags
668     this.closing.set(false);
669     this.closed.set(false);
670 
671     this.completeSequenceId = nextSeqid;
672     if (coprocessorHost != null) {
673       status.setStatus("Running coprocessor post-open hooks");
674       coprocessorHost.postOpen();
675     }
676 
677     status.markComplete("Region opened successfully");
678     return nextSeqid;
679   }
680 
681   private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
682       throws IOException, UnsupportedEncodingException {
683     // Load in all the HStores.
684 
685     long maxSeqId = -1;
686     // initialized to -1 so that we pick up MemstoreTS from column families
687     long maxMemstoreTS = -1;
688 
689     if (!htableDescriptor.getFamilies().isEmpty()) {
690       // initialize the thread pool for opening stores in parallel.
691       ThreadPoolExecutor storeOpenerThreadPool =
692         getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
693       CompletionService<HStore> completionService =
694         new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
695 
696       // initialize each store in parallel
697       for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
698         status.setStatus("Instantiating store for column family " + family);
699         completionService.submit(new Callable<HStore>() {
700           @Override
701           public HStore call() throws IOException {
702             return instantiateHStore(family);
703           }
704         });
705       }
706       boolean allStoresOpened = false;
707       try {
708         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
709           Future<HStore> future = completionService.take();
710           HStore store = future.get();
711 
712           this.stores.put(store.getColumnFamilyName().getBytes(), store);
713           // Do not include bulk loaded files when determining seqIdForReplay
714           long storeSeqIdForReplay = store.getMaxSequenceId(false);
715           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
716               storeSeqIdForReplay);
717           // Include bulk loaded files when determining seqIdForAssignment
718           long storeSeqIdForAssignment = store.getMaxSequenceId(true);
719           if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) {
720             maxSeqId = storeSeqIdForAssignment;
721           }
722           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
723           if (maxStoreMemstoreTS > maxMemstoreTS) {
724             maxMemstoreTS = maxStoreMemstoreTS;
725           }
726         }
727         allStoresOpened = true;
728       } catch (InterruptedException e) {
729         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
730       } catch (ExecutionException e) {
731         throw new IOException(e.getCause());
732       } finally {
733         storeOpenerThreadPool.shutdownNow();
734         if (!allStoresOpened) {
735           // something went wrong, close all opened stores
736           LOG.error("Could not initialize all stores for the region=" + this);
737           for (Store store : this.stores.values()) {
738             try {
739               store.close();
740             } catch (IOException e) { 
741               LOG.warn(e.getMessage());
742             }
743           }
744         }
745       }
746     }
747     mvcc.initialize(maxMemstoreTS + 1);
748     // Recover any edits if available.
749     maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
750         this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
751     return maxSeqId;
752   }
753 
754   /**
755    * @return True if this region has references.
756    */
757   public boolean hasReferences() {
758     for (Store store : this.stores.values()) {
759       if (store.hasReferences()) return true;
760     }
761     return false;
762   }
763 
764   /**
765    * This function will return the HDFS blocks distribution based on the data
766    * captured when HFile is created
767    * @return The HDFS blocks distribution for the region.
768    */
769   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
770     HDFSBlocksDistribution hdfsBlocksDistribution =
771       new HDFSBlocksDistribution();
772     synchronized (this.stores) {
773       for (Store store : this.stores.values()) {
774         for (StoreFile sf : store.getStorefiles()) {
775           HDFSBlocksDistribution storeFileBlocksDistribution =
776             sf.getHDFSBlockDistribution();
777           hdfsBlocksDistribution.add(storeFileBlocksDistribution);
778         }
779       }
780     }
781     return hdfsBlocksDistribution;
782   }
783 
784   /**
785    * This is a helper function to compute HDFS block distribution on demand
786    * @param conf configuration
787    * @param tableDescriptor HTableDescriptor of the table
788    * @param regionInfo encoded name of the region
789    * @return The HDFS blocks distribution for the given region.
790    * @throws IOException
791    */
792   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
793       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
794     Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
795     return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
796   }
797 
798   /**
799    * This is a helper function to compute HDFS block distribution on demand
800    * @param conf configuration
801    * @param tableDescriptor HTableDescriptor of the table
802    * @param regionInfo encoded name of the region
803    * @param tablePath the table directory
804    * @return The HDFS blocks distribution for the given region.
805    * @throws IOException
806    */
807   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
808       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo,  Path tablePath)
809       throws IOException {
810     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
811     FileSystem fs = tablePath.getFileSystem(conf);
812 
813     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
814     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
815       Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
816       if (storeFiles == null) continue;
817 
818       for (StoreFileInfo storeFileInfo : storeFiles) {
819         hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
820       }
821     }
822     return hdfsBlocksDistribution;
823   }
824 
825   public AtomicLong getMemstoreSize() {
826     return memstoreSize;
827   }
828 
829   /**
830    * Increase the size of mem store in this region and the size of global mem
831    * store
832    * @param memStoreSize
833    * @return the size of memstore in this region
834    */
835   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
836     if (this.rsAccounting != null) {
837       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
838     }
839     return this.memstoreSize.addAndGet(memStoreSize);
840   }
841 
842   /** @return a HRegionInfo object for this region */
843   public HRegionInfo getRegionInfo() {
844     return this.fs.getRegionInfo();
845   }
846 
847   /**
848    * @return Instance of {@link RegionServerServices} used by this HRegion.
849    * Can be null.
850    */
851   RegionServerServices getRegionServerServices() {
852     return this.rsServices;
853   }
854 
855   /** @return readRequestsCount for this region */
856   long getReadRequestsCount() {
857     return this.readRequestsCount.get();
858   }
859 
860   /** @return writeRequestsCount for this region */
861   long getWriteRequestsCount() {
862     return this.writeRequestsCount.get();
863   }
864 
865   MetricsRegion getMetrics() {
866     return metricsRegion;
867   }
868 
869   /** @return true if region is closed */
870   public boolean isClosed() {
871     return this.closed.get();
872   }
873 
874   /**
875    * @return True if closing process has started.
876    */
877   public boolean isClosing() {
878     return this.closing.get();
879   }
880 
881   /**
882    * Reset recovering state of current region
883    * @param newState
884    */
885   public void setRecovering(boolean newState) {
886     boolean wasRecovering = this.isRecovering;
887     this.isRecovering = newState;
888     if (wasRecovering && !isRecovering) {
889       // Call only when log replay is over.
890       coprocessorHost.postLogReplay();
891     }
892   }
893 
894   /**
895    * @return True if current region is in recovering
896    */
897   public boolean isRecovering() {
898     return this.isRecovering;
899   }
900 
901   /** @return true if region is available (not closed and not closing) */
902   public boolean isAvailable() {
903     return !isClosed() && !isClosing();
904   }
905 
906   /** @return true if region is splittable */
907   public boolean isSplittable() {
908     return isAvailable() && !hasReferences();
909   }
910 
911   /**
912    * @return true if region is mergeable
913    */
914   public boolean isMergeable() {
915     if (!isAvailable()) {
916       LOG.debug("Region " + this.getRegionNameAsString()
917           + " is not mergeable because it is closing or closed");
918       return false;
919     }
920     if (hasReferences()) {
921       LOG.debug("Region " + this.getRegionNameAsString()
922           + " is not mergeable because it has references");
923       return false;
924     }
925 
926     return true;
927   }
928 
929   public boolean areWritesEnabled() {
930     synchronized(this.writestate) {
931       return this.writestate.writesEnabled;
932     }
933   }
934 
935    public MultiVersionConsistencyControl getMVCC() {
936      return mvcc;
937    }
938 
939    /*
940     * Returns readpoint considering given IsolationLevel
941     */
942    public long getReadpoint(IsolationLevel isolationLevel) {
943      if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
944        // This scan can read even uncommitted transactions
945        return Long.MAX_VALUE;
946      }
947      return mvcc.memstoreReadPoint();
948    }
949 
950    public boolean isLoadingCfsOnDemandDefault() {
951      return this.isLoadingCfsOnDemandDefault;
952    }
953 
954   /**
955    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
956    * service any more calls.
957    *
958    * <p>This method could take some time to execute, so don't call it from a
959    * time-sensitive thread.
960    *
961    * @return Vector of all the storage files that the HRegion's component
962    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
963    * vector if already closed and null if judged that it should not close.
964    *
965    * @throws IOException e
966    */
967   public Map<byte[], List<StoreFile>> close() throws IOException {
968     return close(false);
969   }
970 
971   private final Object closeLock = new Object();
972 
973   /** Conf key for the periodic flush interval */
974   public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
975       "hbase.regionserver.optionalcacheflushinterval";
976   /** Default interval for the memstore flush */
977   public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
978 
979   /** Conf key to force a flush if there are already enough changes for one region in memstore */
980   public static final String MEMSTORE_FLUSH_PER_CHANGES =
981       "hbase.regionserver.flush.per.changes";
982   public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000; // 30 millions
983   /**
984    * The following MAX_FLUSH_PER_CHANGES is large enough because each KeyValue has 20+ bytes
985    * overhead. Therefore, even 1G empty KVs occupy at least 20GB memstore size for a single region
986    */
987   public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G
988 
989   /**
990    * Close down this HRegion.  Flush the cache unless abort parameter is true,
991    * Shut down each HStore, don't service any more calls.
992    *
993    * This method could take some time to execute, so don't call it from a
994    * time-sensitive thread.
995    *
996    * @param abort true if server is aborting (only during testing)
997    * @return Vector of all the storage files that the HRegion's component
998    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
999    * we are not to close at this time or we are already closed.
1000    *
1001    * @throws IOException e
1002    */
1003   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1004     // Only allow one thread to close at a time. Serialize them so dual
1005     // threads attempting to close will run up against each other.
1006     MonitoredTask status = TaskMonitor.get().createStatus(
1007         "Closing region " + this +
1008         (abort ? " due to abort" : ""));
1009 
1010     status.setStatus("Waiting for close lock");
1011     try {
1012       synchronized (closeLock) {
1013         return doClose(abort, status);
1014       }
1015     } finally {
1016       status.cleanup();
1017     }
1018   }
1019 
1020   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1021       throws IOException {
1022     if (isClosed()) {
1023       LOG.warn("Region " + this + " already closed");
1024       return null;
1025     }
1026 
1027     if (coprocessorHost != null) {
1028       status.setStatus("Running coprocessor pre-close hooks");
1029       this.coprocessorHost.preClose(abort);
1030     }
1031 
1032     status.setStatus("Disabling compacts and flushes for region");
1033     synchronized (writestate) {
1034       // Disable compacting and flushing by background threads for this
1035       // region.
1036       writestate.writesEnabled = false;
1037       LOG.debug("Closing " + this + ": disabling compactions & flushes");
1038       waitForFlushesAndCompactions();
1039     }
1040     // If we were not just flushing, is it worth doing a preflush...one
1041     // that will clear out of the bulk of the memstore before we put up
1042     // the close flag?
1043     if (!abort && worthPreFlushing()) {
1044       status.setStatus("Pre-flushing region before close");
1045       LOG.info("Running close preflush of " + this.getRegionNameAsString());
1046       try {
1047         internalFlushcache(status);
1048       } catch (IOException ioe) {
1049         // Failed to flush the region. Keep going.
1050         status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1051       }
1052     }
1053 
1054     this.closing.set(true);
1055     status.setStatus("Disabling writes for close");
1056     // block waiting for the lock for closing
1057     lock.writeLock().lock();
1058     try {
1059       if (this.isClosed()) {
1060         status.abort("Already got closed by another process");
1061         // SplitTransaction handles the null
1062         return null;
1063       }
1064       LOG.debug("Updates disabled for region " + this);
1065       // Don't flush the cache if we are aborting
1066       if (!abort) {
1067         int flushCount = 0;
1068         while (this.getMemstoreSize().get() > 0) {
1069           try {
1070             if (flushCount++ > 0) {
1071               int actualFlushes = flushCount - 1;
1072               if (actualFlushes > 5) {
1073                 // If we tried 5 times and are unable to clear memory, abort
1074                 // so we do not lose data
1075                 throw new DroppedSnapshotException("Failed clearing memory after " +
1076                   actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
1077               } 
1078               LOG.info("Running extra flush, " + actualFlushes +
1079                 " (carrying snapshot?) " + this);
1080             }
1081             internalFlushcache(status);
1082           } catch (IOException ioe) {
1083             status.setStatus("Failed flush " + this + ", putting online again");
1084             synchronized (writestate) {
1085               writestate.writesEnabled = true;
1086             }
1087             // Have to throw to upper layers.  I can't abort server from here.
1088             throw ioe;
1089           }
1090         }
1091       }
1092 
1093       Map<byte[], List<StoreFile>> result =
1094         new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1095       if (!stores.isEmpty()) {
1096         // initialize the thread pool for closing stores in parallel.
1097         ThreadPoolExecutor storeCloserThreadPool =
1098           getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
1099         CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1100           new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1101 
1102         // close each store in parallel
1103         for (final Store store : stores.values()) {
1104           assert abort? true: store.getFlushableSize() == 0;
1105           completionService
1106               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1107                 @Override
1108                 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1109                   return new Pair<byte[], Collection<StoreFile>>(
1110                     store.getFamily().getName(), store.close());
1111                 }
1112               });
1113         }
1114         try {
1115           for (int i = 0; i < stores.size(); i++) {
1116             Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1117             Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1118             List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1119             if (familyFiles == null) {
1120               familyFiles = new ArrayList<StoreFile>();
1121               result.put(storeFiles.getFirst(), familyFiles);
1122             }
1123             familyFiles.addAll(storeFiles.getSecond());
1124           }
1125         } catch (InterruptedException e) {
1126           throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1127         } catch (ExecutionException e) {
1128           throw new IOException(e.getCause());
1129         } finally {
1130           storeCloserThreadPool.shutdownNow();
1131         }
1132       }
1133       this.closed.set(true);
1134       if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
1135       if (coprocessorHost != null) {
1136         status.setStatus("Running coprocessor post-close hooks");
1137         this.coprocessorHost.postClose(abort);
1138       }
1139       if ( this.metricsRegion != null) {
1140         this.metricsRegion.close();
1141       }
1142       if ( this.metricsRegionWrapper != null) {
1143         Closeables.closeQuietly(this.metricsRegionWrapper);
1144       }
1145       status.markComplete("Closed");
1146       LOG.info("Closed " + this);
1147       return result;
1148     } finally {
1149       lock.writeLock().unlock();
1150     }
1151   }
1152 
1153   /**
1154    * Wait for all current flushes and compactions of the region to complete.
1155    * <p>
1156    * Exposed for TESTING.
1157    */
1158   public void waitForFlushesAndCompactions() {
1159     synchronized (writestate) {
1160       boolean interrupted = false;
1161       try {
1162         while (writestate.compacting > 0 || writestate.flushing) {
1163           LOG.debug("waiting for " + writestate.compacting + " compactions"
1164             + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1165           try {
1166             writestate.wait();
1167           } catch (InterruptedException iex) {
1168             // essentially ignore and propagate the interrupt back up
1169             LOG.warn("Interrupted while waiting");
1170             interrupted = true;
1171           }
1172         }
1173       } finally {
1174         if (interrupted) {
1175           Thread.currentThread().interrupt();
1176         }
1177       }
1178     }
1179   }
1180 
1181   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1182       final String threadNamePrefix) {
1183     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1184     int maxThreads = Math.min(numStores,
1185         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1186             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1187     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1188   }
1189 
1190   protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1191       final String threadNamePrefix) {
1192     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1193     int maxThreads = Math.max(1,
1194         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1195             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1196             / numStores);
1197     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1198   }
1199 
1200   static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1201       final String threadNamePrefix) {
1202     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1203       new ThreadFactory() {
1204         private int count = 1;
1205 
1206         @Override
1207         public Thread newThread(Runnable r) {
1208           return new Thread(r, threadNamePrefix + "-" + count++);
1209         }
1210       });
1211   }
1212 
1213    /**
1214     * @return True if its worth doing a flush before we put up the close flag.
1215     */
1216   private boolean worthPreFlushing() {
1217     return this.memstoreSize.get() >
1218       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1219   }
1220 
1221   //////////////////////////////////////////////////////////////////////////////
1222   // HRegion accessors
1223   //////////////////////////////////////////////////////////////////////////////
1224 
1225   /** @return start key for region */
1226   public byte [] getStartKey() {
1227     return this.getRegionInfo().getStartKey();
1228   }
1229 
1230   /** @return end key for region */
1231   public byte [] getEndKey() {
1232     return this.getRegionInfo().getEndKey();
1233   }
1234 
1235   /** @return region id */
1236   public long getRegionId() {
1237     return this.getRegionInfo().getRegionId();
1238   }
1239 
1240   /** @return region name */
1241   public byte [] getRegionName() {
1242     return this.getRegionInfo().getRegionName();
1243   }
1244 
1245   /** @return region name as string for logging */
1246   public String getRegionNameAsString() {
1247     return this.getRegionInfo().getRegionNameAsString();
1248   }
1249 
1250   /** @return HTableDescriptor for this region */
1251   public HTableDescriptor getTableDesc() {
1252     return this.htableDescriptor;
1253   }
1254 
1255   /** @return HLog in use for this region */
1256   public HLog getLog() {
1257     return this.log;
1258   }
1259 
1260   /**
1261    * A split takes the config from the parent region & passes it to the daughter
1262    * region's constructor. If 'conf' was passed, you would end up using the HTD
1263    * of the parent region in addition to the new daughter HTD. Pass 'baseConf'
1264    * to the daughter regions to avoid this tricky dedupe problem.
1265    * @return Configuration object
1266    */
1267   Configuration getBaseConf() {
1268     return this.baseConf;
1269   }
1270 
1271   /** @return {@link FileSystem} being used by this region */
1272   public FileSystem getFilesystem() {
1273     return fs.getFileSystem();
1274   }
1275 
1276   /** @return the {@link HRegionFileSystem} used by this region */
1277   public HRegionFileSystem getRegionFileSystem() {
1278     return this.fs;
1279   }
1280 
1281   /** @return the last time the region was flushed */
1282   public long getLastFlushTime() {
1283     return this.lastFlushTime;
1284   }
1285 
1286   //////////////////////////////////////////////////////////////////////////////
1287   // HRegion maintenance.
1288   //
1289   // These methods are meant to be called periodically by the HRegionServer for
1290   // upkeep.
1291   //////////////////////////////////////////////////////////////////////////////
1292 
1293   /** @return returns size of largest HStore. */
1294   public long getLargestHStoreSize() {
1295     long size = 0;
1296     for (Store h : stores.values()) {
1297       long storeSize = h.getSize();
1298       if (storeSize > size) {
1299         size = storeSize;
1300       }
1301     }
1302     return size;
1303   }
1304 
1305   /**
1306    * @return KeyValue Comparator
1307    */
1308   public KeyValue.KVComparator getComparator() {
1309     return this.comparator;
1310   }
1311 
1312   /*
1313    * Do preparation for pending compaction.
1314    * @throws IOException
1315    */
1316   protected void doRegionCompactionPrep() throws IOException {
1317   }
1318 
1319   void triggerMajorCompaction() {
1320     for (Store h : stores.values()) {
1321       h.triggerMajorCompaction();
1322     }
1323   }
1324 
1325   /**
1326    * This is a helper function that compact all the stores synchronously
1327    * It is used by utilities and testing
1328    *
1329    * @param majorCompaction True to force a major compaction regardless of thresholds
1330    * @throws IOException e
1331    */
1332   public void compactStores(final boolean majorCompaction)
1333   throws IOException {
1334     if (majorCompaction) {
1335       this.triggerMajorCompaction();
1336     }
1337     compactStores();
1338   }
1339 
1340   /**
1341    * This is a helper function that compact all the stores synchronously
1342    * It is used by utilities and testing
1343    *
1344    * @throws IOException e
1345    */
1346   public void compactStores() throws IOException {
1347     for (Store s : getStores().values()) {
1348       CompactionContext compaction = s.requestCompaction();
1349       if (compaction != null) {
1350         compact(compaction, s);
1351       }
1352     }
1353   }
1354 
1355   /*
1356    * Called by compaction thread and after region is opened to compact the
1357    * HStores if necessary.
1358    *
1359    * <p>This operation could block for a long time, so don't call it from a
1360    * time-sensitive thread.
1361    *
1362    * Note that no locking is necessary at this level because compaction only
1363    * conflicts with a region split, and that cannot happen because the region
1364    * server does them sequentially and not in parallel.
1365    *
1366    * @param cr Compaction details, obtained by requestCompaction()
1367    * @return whether the compaction completed
1368    * @throws IOException e
1369    */
1370   public boolean compact(CompactionContext compaction, Store store) throws IOException {
1371     assert compaction != null && compaction.hasSelection();
1372     assert !compaction.getRequest().getFiles().isEmpty();
1373     if (this.closing.get() || this.closed.get()) {
1374       LOG.debug("Skipping compaction on " + this + " because closing/closed");
1375       store.cancelRequestedCompaction(compaction);
1376       return false;
1377     }
1378     MonitoredTask status = null;
1379     boolean didPerformCompaction = false;
1380     // block waiting for the lock for compaction
1381     lock.readLock().lock();
1382     try {
1383       byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1384       if (stores.get(cf) != store) {
1385         LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1386             + " has been re-instantiated, cancel this compaction request. "
1387             + " It may be caused by the roll back of split transaction");
1388         return false;
1389       }
1390 
1391       status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1392       if (this.closed.get()) {
1393         String msg = "Skipping compaction on " + this + " because closed";
1394         LOG.debug(msg);
1395         status.abort(msg);
1396         return false;
1397       }
1398       boolean wasStateSet = false;
1399       try {
1400         synchronized (writestate) {
1401           if (writestate.writesEnabled) {
1402             wasStateSet = true;
1403             ++writestate.compacting;
1404           } else {
1405             String msg = "NOT compacting region " + this + ". Writes disabled.";
1406             LOG.info(msg);
1407             status.abort(msg);
1408             return false;
1409           }
1410         }
1411         LOG.info("Starting compaction on " + store + " in region " + this
1412             + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1413         doRegionCompactionPrep();
1414         try {
1415           status.setStatus("Compacting store " + store);
1416           didPerformCompaction = true;
1417           store.compact(compaction);
1418         } catch (InterruptedIOException iioe) {
1419           String msg = "compaction interrupted";
1420           LOG.info(msg, iioe);
1421           status.abort(msg);
1422           return false;
1423         }
1424       } finally {
1425         if (wasStateSet) {
1426           synchronized (writestate) {
1427             --writestate.compacting;
1428             if (writestate.compacting <= 0) {
1429               writestate.notifyAll();
1430             }
1431           }
1432         }
1433       }
1434       status.markComplete("Compaction complete");
1435       return true;
1436     } finally {
1437       try {
1438         if (!didPerformCompaction) store.cancelRequestedCompaction(compaction);
1439         if (status != null) status.cleanup();
1440       } finally {
1441         lock.readLock().unlock();
1442       }
1443     }
1444   }
1445 
1446   /**
1447    * Flush the cache.
1448    *
1449    * When this method is called the cache will be flushed unless:
1450    * <ol>
1451    *   <li>the cache is empty</li>
1452    *   <li>the region is closed.</li>
1453    *   <li>a flush is already in progress</li>
1454    *   <li>writes are disabled</li>
1455    * </ol>
1456    *
1457    * <p>This method may block for some time, so it should not be called from a
1458    * time-sensitive thread.
1459    *
1460    * @return true if the region needs compacting
1461    *
1462    * @throws IOException general io exceptions
1463    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1464    * because a Snapshot was not properly persisted.
1465    */
1466   public boolean flushcache() throws IOException {
1467     // fail-fast instead of waiting on the lock
1468     if (this.closing.get()) {
1469       LOG.debug("Skipping flush on " + this + " because closing");
1470       return false;
1471     }
1472     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1473     status.setStatus("Acquiring readlock on region");
1474     // block waiting for the lock for flushing cache
1475     lock.readLock().lock();
1476     try {
1477       if (this.closed.get()) {
1478         LOG.debug("Skipping flush on " + this + " because closed");
1479         status.abort("Skipped: closed");
1480         return false;
1481       }
1482       if (coprocessorHost != null) {
1483         status.setStatus("Running coprocessor pre-flush hooks");
1484         coprocessorHost.preFlush();
1485       }
1486       if (numMutationsWithoutWAL.get() > 0) {
1487         numMutationsWithoutWAL.set(0);
1488         dataInMemoryWithoutWAL.set(0);
1489       }
1490       synchronized (writestate) {
1491         if (!writestate.flushing && writestate.writesEnabled) {
1492           this.writestate.flushing = true;
1493         } else {
1494           if (LOG.isDebugEnabled()) {
1495             LOG.debug("NOT flushing memstore for region " + this
1496                 + ", flushing=" + writestate.flushing + ", writesEnabled="
1497                 + writestate.writesEnabled);
1498           }
1499           status.abort("Not flushing since "
1500               + (writestate.flushing ? "already flushing"
1501                   : "writes not enabled"));
1502           return false;
1503         }
1504       }
1505       try {
1506         boolean result = internalFlushcache(status);
1507 
1508         if (coprocessorHost != null) {
1509           status.setStatus("Running post-flush coprocessor hooks");
1510           coprocessorHost.postFlush();
1511         }
1512 
1513         status.markComplete("Flush successful");
1514         return result;
1515       } finally {
1516         synchronized (writestate) {
1517           writestate.flushing = false;
1518           this.writestate.flushRequested = false;
1519           writestate.notifyAll();
1520         }
1521       }
1522     } finally {
1523       lock.readLock().unlock();
1524       status.cleanup();
1525     }
1526   }
1527 
1528   /**
1529    * Should the memstore be flushed now
1530    */
1531   boolean shouldFlush() {
1532     if(this.completeSequenceId + this.flushPerChanges < this.sequenceId.get()) {
1533       return true;
1534     }
1535     if (flushCheckInterval <= 0) { //disabled
1536       return false;
1537     }
1538     long now = EnvironmentEdgeManager.currentTimeMillis();
1539     //if we flushed in the recent past, we don't need to do again now
1540     if ((now - getLastFlushTime() < flushCheckInterval)) {
1541       return false;
1542     }
1543     //since we didn't flush in the recent past, flush now if certain conditions
1544     //are met. Return true on first such memstore hit.
1545     for (Store s : this.getStores().values()) {
1546       if (s.timeOfOldestEdit() < now - flushCheckInterval) {
1547         // we have an old enough edit in the memstore, flush
1548         return true;
1549       }
1550     }
1551     return false;
1552   }
1553 
1554   /**
1555    * Flush the memstore.
1556    *
1557    * Flushing the memstore is a little tricky. We have a lot of updates in the
1558    * memstore, all of which have also been written to the log. We need to
1559    * write those updates in the memstore out to disk, while being able to
1560    * process reads/writes as much as possible during the flush operation. Also,
1561    * the log has to state clearly the point in time at which the memstore was
1562    * flushed. (That way, during recovery, we know when we can rely on the
1563    * on-disk flushed structures and when we have to recover the memstore from
1564    * the log.)
1565    *
1566    * <p>So, we have a three-step process:
1567    *
1568    * <ul><li>A. Flush the memstore to the on-disk stores, noting the current
1569    * sequence ID for the log.<li>
1570    *
1571    * <li>B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence
1572    * ID that was current at the time of memstore-flush.</li>
1573    *
1574    * <li>C. Get rid of the memstore structures that are now redundant, as
1575    * they've been flushed to the on-disk HStores.</li>
1576    * </ul>
1577    * <p>This method is protected, but can be accessed via several public
1578    * routes.
1579    *
1580    * <p> This method may block for some time.
1581    * @param status
1582    *
1583    * @return true if the region needs compacting
1584    *
1585    * @throws IOException general io exceptions
1586    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1587    * because a Snapshot was not properly persisted.
1588    */
1589   protected boolean internalFlushcache(MonitoredTask status)
1590       throws IOException {
1591     return internalFlushcache(this.log, -1, status);
1592   }
1593 
1594   /**
1595    * @param wal Null if we're NOT to go via hlog/wal.
1596    * @param myseqid The seqid to use if <code>wal</code> is null writing out
1597    * flush file.
1598    * @param status
1599    * @return true if the region needs compacting
1600    * @throws IOException
1601    * @see #internalFlushcache(MonitoredTask)
1602    */
1603   protected boolean internalFlushcache(
1604       final HLog wal, final long myseqid, MonitoredTask status)
1605   throws IOException {
1606     if (this.rsServices != null && this.rsServices.isAborted()) {
1607       // Don't flush when server aborting, it's unsafe
1608       throw new IOException("Aborting flush because server is abortted...");
1609     }
1610     final long startTime = EnvironmentEdgeManager.currentTimeMillis();
1611     // Clear flush flag.
1612     // If nothing to flush, return and avoid logging start/stop flush.
1613     if (this.memstoreSize.get() <= 0) {
1614       return false;
1615     }
1616     if (LOG.isDebugEnabled()) {
1617       LOG.debug("Started memstore flush for " + this +
1618         ", current region memstore size " +
1619         StringUtils.humanReadableInt(this.memstoreSize.get()) +
1620         ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
1621     }
1622 
1623     // Stop updates while we snapshot the memstore of all stores. We only have
1624     // to do this for a moment.  Its quick.  The subsequent sequence id that
1625     // goes into the HLog after we've flushed all these snapshots also goes
1626     // into the info file that sits beside the flushed files.
1627     // We also set the memstore size to zero here before we allow updates
1628     // again so its value will represent the size of the updates received
1629     // during the flush
1630     MultiVersionConsistencyControl.WriteEntry w = null;
1631 
1632     // We have to take a write lock during snapshot, or else a write could
1633     // end up in both snapshot and memstore (makes it difficult to do atomic
1634     // rows then)
1635     status.setStatus("Obtaining lock to block concurrent updates");
1636     // block waiting for the lock for internal flush
1637     this.updatesLock.writeLock().lock();
1638     long totalFlushableSize = 0;
1639     status.setStatus("Preparing to flush by snapshotting stores");
1640     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
1641     long flushSeqId = -1L;
1642     try {
1643       // Record the mvcc for all transactions in progress.
1644       w = mvcc.beginMemstoreInsert();
1645       mvcc.advanceMemstore(w);
1646       // check if it is not closing.
1647       if (wal != null) {
1648         if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
1649           status.setStatus("Flush will not be started for ["
1650               + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.");
1651           return false;
1652         }
1653         flushSeqId = this.sequenceId.incrementAndGet();
1654       } else {
1655         // use the provided sequence Id as WAL is not being used for this flush.
1656         flushSeqId = myseqid;
1657       }
1658 
1659       for (Store s : stores.values()) {
1660         totalFlushableSize += s.getFlushableSize();
1661         storeFlushCtxs.add(s.createFlushContext(flushSeqId));
1662       }
1663 
1664       // prepare flush (take a snapshot)
1665       for (StoreFlushContext flush : storeFlushCtxs) {
1666         flush.prepare();
1667       }
1668     } finally {
1669       this.updatesLock.writeLock().unlock();
1670     }
1671     String s = "Finished memstore snapshotting " + this +
1672       ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
1673     status.setStatus(s);
1674     if (LOG.isTraceEnabled()) LOG.trace(s);
1675 
1676     // sync unflushed WAL changes when deferred log sync is enabled
1677     // see HBASE-8208 for details
1678     if (wal != null && !shouldSyncLog()) {
1679       wal.sync();
1680     }
1681 
1682     // wait for all in-progress transactions to commit to HLog before
1683     // we can start the flush. This prevents
1684     // uncommitted transactions from being written into HFiles.
1685     // We have to block before we start the flush, otherwise keys that
1686     // were removed via a rollbackMemstore could be written to Hfiles.
1687     mvcc.waitForRead(w);
1688 
1689     s = "Flushing stores of " + this;
1690     status.setStatus(s);
1691     if (LOG.isTraceEnabled()) LOG.trace(s);
1692 
1693     // Any failure from here on out will be catastrophic requiring server
1694     // restart so hlog content can be replayed and put back into the memstore.
1695     // Otherwise, the snapshot content while backed up in the hlog, it will not
1696     // be part of the current running servers state.
1697     boolean compactionRequested = false;
1698     try {
1699       // A.  Flush memstore to all the HStores.
1700       // Keep running vector of all store files that includes both old and the
1701       // just-made new flush store file. The new flushed file is still in the
1702       // tmp directory.
1703 
1704       for (StoreFlushContext flush : storeFlushCtxs) {
1705         flush.flushCache(status);
1706       }
1707 
1708       // Switch snapshot (in memstore) -> new hfile (thus causing
1709       // all the store scanners to reset/reseek).
1710       for (StoreFlushContext flush : storeFlushCtxs) {
1711         boolean needsCompaction = flush.commit(status);
1712         if (needsCompaction) {
1713           compactionRequested = true;
1714         }
1715       }
1716       storeFlushCtxs.clear();
1717 
1718       // Set down the memstore size by amount of flush.
1719       this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
1720     } catch (Throwable t) {
1721       // An exception here means that the snapshot was not persisted.
1722       // The hlog needs to be replayed so its content is restored to memstore.
1723       // Currently, only a server restart will do this.
1724       // We used to only catch IOEs but its possible that we'd get other
1725       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
1726       // all and sundry.
1727       if (wal != null) {
1728         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1729       }
1730       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
1731           Bytes.toStringBinary(getRegionName()));
1732       dse.initCause(t);
1733       status.abort("Flush failed: " + StringUtils.stringifyException(t));
1734       throw dse;
1735     }
1736 
1737     // If we get to here, the HStores have been written.
1738     if (wal != null) {
1739       wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1740     }
1741 
1742     // Record latest flush time
1743     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
1744 
1745     // Update the last flushed sequence id for region
1746     completeSequenceId = flushSeqId;
1747 
1748     // C. Finally notify anyone waiting on memstore to clear:
1749     // e.g. checkResources().
1750     synchronized (this) {
1751       notifyAll(); // FindBugs NN_NAKED_NOTIFY
1752     }
1753 
1754     long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
1755     long memstoresize = this.memstoreSize.get();
1756     String msg = "Finished memstore flush of ~" +
1757       StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize +
1758       ", currentsize=" +
1759       StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize +
1760       " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
1761       ", compaction requested=" + compactionRequested +
1762       ((wal == null)? "; wal=null": "");
1763     LOG.info(msg);
1764     status.setStatus(msg);
1765     this.recentFlushes.add(new Pair<Long,Long>(time/1000, totalFlushableSize));
1766 
1767     return compactionRequested;
1768   }
1769 
1770   //////////////////////////////////////////////////////////////////////////////
1771   // get() methods for client use.
1772   //////////////////////////////////////////////////////////////////////////////
1773   /**
1774    * Return all the data for the row that matches <i>row</i> exactly,
1775    * or the one that immediately preceeds it, at or immediately before
1776    * <i>ts</i>.
1777    *
1778    * @param row row key
1779    * @return map of values
1780    * @throws IOException
1781    */
1782   Result getClosestRowBefore(final byte [] row)
1783   throws IOException{
1784     return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
1785   }
1786 
1787   /**
1788    * Return all the data for the row that matches <i>row</i> exactly,
1789    * or the one that immediately preceeds it, at or immediately before
1790    * <i>ts</i>.
1791    *
1792    * @param row row key
1793    * @param family column family to find on
1794    * @return map of values
1795    * @throws IOException read exceptions
1796    */
1797   public Result getClosestRowBefore(final byte [] row, final byte [] family)
1798   throws IOException {
1799     if (coprocessorHost != null) {
1800       Result result = new Result();
1801       if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
1802         return result;
1803       }
1804     }
1805     // look across all the HStores for this region and determine what the
1806     // closest key is across all column families, since the data may be sparse
1807     checkRow(row, "getClosestRowBefore");
1808     startRegionOperation(Operation.GET);
1809     this.readRequestsCount.increment();
1810     try {
1811       Store store = getStore(family);
1812       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
1813       KeyValue key = store.getRowKeyAtOrBefore(row);
1814       Result result = null;
1815       if (key != null) {
1816         Get get = new Get(key.getRow());
1817         get.addFamily(family);
1818         result = get(get);
1819       }
1820       if (coprocessorHost != null) {
1821         coprocessorHost.postGetClosestRowBefore(row, family, result);
1822       }
1823       return result;
1824     } finally {
1825       closeRegionOperation(Operation.GET);
1826     }
1827   }
1828 
1829   /**
1830    * Return an iterator that scans over the HRegion, returning the indicated
1831    * columns and rows specified by the {@link Scan}.
1832    * <p>
1833    * This Iterator must be closed by the caller.
1834    *
1835    * @param scan configured {@link Scan}
1836    * @return RegionScanner
1837    * @throws IOException read exceptions
1838    */
1839   public RegionScanner getScanner(Scan scan) throws IOException {
1840    return getScanner(scan, null);
1841   }
1842 
1843   void prepareScanner(Scan scan) throws IOException {
1844     if(!scan.hasFamilies()) {
1845       // Adding all families to scanner
1846       for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
1847         scan.addFamily(family);
1848       }
1849     }
1850   }
1851 
1852   protected RegionScanner getScanner(Scan scan,
1853       List<KeyValueScanner> additionalScanners) throws IOException {
1854     startRegionOperation(Operation.SCAN);
1855     try {
1856       // Verify families are all valid
1857       prepareScanner(scan);
1858       if(scan.hasFamilies()) {
1859         for(byte [] family : scan.getFamilyMap().keySet()) {
1860           checkFamily(family);
1861         }
1862       }
1863       return instantiateRegionScanner(scan, additionalScanners);
1864     } finally {
1865       closeRegionOperation(Operation.SCAN);
1866     }
1867   }
1868 
1869   protected RegionScanner instantiateRegionScanner(Scan scan,
1870       List<KeyValueScanner> additionalScanners) throws IOException {
1871     if (scan.isReversed()) {
1872       if (scan.getFilter() != null) {
1873         scan.getFilter().setReversed(true);
1874       }
1875       return new ReversedRegionScannerImpl(scan, additionalScanners, this);
1876     }
1877     return new RegionScannerImpl(scan, additionalScanners, this);
1878   }
1879 
1880   /*
1881    * @param delete The passed delete is modified by this method. WARNING!
1882    */
1883   void prepareDelete(Delete delete) throws IOException {
1884     // Check to see if this is a deleteRow insert
1885     if(delete.getFamilyCellMap().isEmpty()){
1886       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
1887         // Don't eat the timestamp
1888         delete.deleteFamily(family, delete.getTimeStamp());
1889       }
1890     } else {
1891       for(byte [] family : delete.getFamilyCellMap().keySet()) {
1892         if(family == null) {
1893           throw new NoSuchColumnFamilyException("Empty family is invalid");
1894         }
1895         checkFamily(family);
1896       }
1897     }
1898   }
1899 
1900   //////////////////////////////////////////////////////////////////////////////
1901   // set() methods for client use.
1902   //////////////////////////////////////////////////////////////////////////////
1903   /**
1904    * @param delete delete object
1905    * @throws IOException read exceptions
1906    */
1907   public void delete(Delete delete)
1908   throws IOException {
1909     checkReadOnly();
1910     checkResources();
1911     startRegionOperation(Operation.DELETE);
1912     this.writeRequestsCount.increment();
1913     try {
1914       delete.getRow();
1915       // All edits for the given row (across all column families) must happen atomically.
1916       doBatchMutate(delete);
1917     } finally {
1918       closeRegionOperation(Operation.DELETE);
1919     }
1920   }
1921 
1922   /**
1923    * Row needed by below method.
1924    */
1925   private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
1926   /**
1927    * This is used only by unit tests. Not required to be a public API.
1928    * @param familyMap map of family to edits for the given family.
1929    * @param durability
1930    * @throws IOException
1931    */
1932   void delete(NavigableMap<byte[], List<Cell>> familyMap,
1933       Durability durability) throws IOException {
1934     Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
1935     delete.setFamilyCellMap(familyMap);
1936     delete.setDurability(durability);
1937     doBatchMutate(delete);
1938   }
1939 
1940   /**
1941    * Setup correct timestamps in the KVs in Delete object.
1942    * Caller should have the row and region locks.
1943    * @param familyMap
1944    * @param byteNow
1945    * @throws IOException
1946    */
1947   void prepareDeleteTimestamps(Map<byte[], List<Cell>> familyMap, byte[] byteNow)
1948       throws IOException {
1949     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
1950 
1951       byte[] family = e.getKey();
1952       List<Cell> cells = e.getValue();
1953       Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
1954 
1955       for (Cell cell: cells) {
1956         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
1957         //  Check if time is LATEST, change to time of most recent addition if so
1958         //  This is expensive.
1959         if (kv.isLatestTimestamp() && kv.isDeleteType()) {
1960           byte[] qual = kv.getQualifier();
1961           if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
1962 
1963           Integer count = kvCount.get(qual);
1964           if (count == null) {
1965             kvCount.put(qual, 1);
1966           } else {
1967             kvCount.put(qual, count + 1);
1968           }
1969           count = kvCount.get(qual);
1970 
1971           Get get = new Get(kv.getRow());
1972           get.setMaxVersions(count);
1973           get.addColumn(family, qual);
1974 
1975           List<Cell> result = get(get, false);
1976 
1977           if (result.size() < count) {
1978             // Nothing to delete
1979             kv.updateLatestStamp(byteNow);
1980             continue;
1981           }
1982           if (result.size() > count) {
1983             throw new RuntimeException("Unexpected size: " + result.size());
1984           }
1985           KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
1986           Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
1987               getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
1988         } else {
1989           kv.updateLatestStamp(byteNow);
1990         }
1991       }
1992     }
1993   }
1994 
1995   /**
1996    * @param put
1997    * @throws IOException
1998    */
1999   public void put(Put put)
2000   throws IOException {
2001     checkReadOnly();
2002 
2003     // Do a rough check that we have resources to accept a write.  The check is
2004     // 'rough' in that between the resource check and the call to obtain a
2005     // read lock, resources may run out.  For now, the thought is that this
2006     // will be extremely rare; we'll deal with it when it happens.
2007     checkResources();
2008     startRegionOperation(Operation.PUT);
2009     this.writeRequestsCount.increment();
2010     try {
2011       // All edits for the given row (across all column families) must happen atomically.
2012       doBatchMutate(put);
2013     } finally {
2014       closeRegionOperation(Operation.PUT);
2015     }
2016   }
2017 
2018   /**
2019    * Struct-like class that tracks the progress of a batch operation,
2020    * accumulating status codes and tracking the index at which processing
2021    * is proceeding.
2022    */
2023   private abstract static class BatchOperationInProgress<T> {
2024     T[] operations;
2025     int nextIndexToProcess = 0;
2026     OperationStatus[] retCodeDetails;
2027     WALEdit[] walEditsFromCoprocessors;
2028 
2029     public BatchOperationInProgress(T[] operations) {
2030       this.operations = operations;
2031       this.retCodeDetails = new OperationStatus[operations.length];
2032       this.walEditsFromCoprocessors = new WALEdit[operations.length];
2033       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2034     }
2035 
2036     public abstract Mutation getMutation(int index);
2037     public abstract long getNonceGroup(int index);
2038     public abstract long getNonce(int index);
2039     /** This method is potentially expensive and should only be used for non-replay CP path. */
2040     public abstract Mutation[] getMutationsForCoprocs();
2041     public abstract boolean isInReplay();
2042 
2043     public boolean isDone() {
2044       return nextIndexToProcess == operations.length;
2045     }
2046   }
2047 
2048   private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2049     private long nonceGroup;
2050     private long nonce;
2051     public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2052       super(operations);
2053       this.nonceGroup = nonceGroup;
2054       this.nonce = nonce;
2055     }
2056 
2057     public Mutation getMutation(int index) {
2058       return this.operations[index];
2059     }
2060 
2061     @Override
2062     public long getNonceGroup(int index) {
2063       return nonceGroup;
2064     }
2065 
2066     @Override
2067     public long getNonce(int index) {
2068       return nonce;
2069     }
2070 
2071     @Override
2072     public Mutation[] getMutationsForCoprocs() {
2073       return this.operations;
2074     }
2075 
2076     @Override
2077     public boolean isInReplay() {
2078       return false;
2079     }
2080   }
2081 
2082   private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
2083     public ReplayBatch(MutationReplay[] operations) {
2084       super(operations);
2085     }
2086 
2087     @Override
2088     public Mutation getMutation(int index) {
2089       return this.operations[index].mutation;
2090     }
2091 
2092     @Override
2093     public long getNonceGroup(int index) {
2094       return this.operations[index].nonceGroup;
2095     }
2096 
2097     @Override
2098     public long getNonce(int index) {
2099       return this.operations[index].nonce;
2100     }
2101 
2102     @Override
2103     public Mutation[] getMutationsForCoprocs() {
2104       assert false;
2105       throw new RuntimeException("Should not be called for replay batch");
2106     }
2107 
2108     @Override
2109     public boolean isInReplay() {
2110       return true;
2111     }
2112   }
2113 
2114   /**
2115    * Perform a batch of mutations.
2116    * It supports only Put and Delete mutations and will ignore other types passed.
2117    * @param mutations the list of mutations
2118    * @return an array of OperationStatus which internally contains the
2119    *         OperationStatusCode and the exceptionMessage if any.
2120    * @throws IOException
2121    */
2122   public OperationStatus[] batchMutate(
2123       Mutation[] mutations, long nonceGroup, long nonce) throws IOException {
2124     // As it stands, this is used for 3 things
2125     //  * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
2126     //  * coprocessor calls (see ex. BulkDeleteEndpoint).
2127     // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
2128     return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2129   }
2130 
2131   public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2132     return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2133   }
2134 
2135   /**
2136    * Replay a batch of mutations.
2137    * @param mutations mutations to replay.
2138    * @return an array of OperationStatus which internally contains the
2139    *         OperationStatusCode and the exceptionMessage if any.
2140    * @throws IOException
2141    */
2142   public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations)
2143       throws IOException {
2144     return batchMutate(new ReplayBatch(mutations));
2145   }
2146 
2147   /**
2148    * Perform a batch of mutations.
2149    * It supports only Put and Delete mutations and will ignore other types passed.
2150    * @param mutations the list of mutations
2151    * @return an array of OperationStatus which internally contains the
2152    *         OperationStatusCode and the exceptionMessage if any.
2153    * @throws IOException
2154    */
2155   OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2156     boolean initialized = false;
2157     while (!batchOp.isDone()) {
2158       if (!batchOp.isInReplay()) {
2159         checkReadOnly();
2160       }
2161       checkResources();
2162 
2163       long newSize;
2164       Operation op = Operation.BATCH_MUTATE;
2165       if (batchOp.isInReplay()) op = Operation.REPLAY_BATCH_MUTATE;
2166       startRegionOperation(op);
2167 
2168       try {
2169         if (!initialized) {
2170           if (!batchOp.isInReplay()) {
2171             this.writeRequestsCount.increment();
2172             doPreMutationHook(batchOp);
2173           }
2174           initialized = true;
2175         }
2176         long addedSize = doMiniBatchMutation(batchOp);
2177         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2178       } finally {
2179         closeRegionOperation(op);
2180       }
2181       if (isFlushSize(newSize)) {
2182         requestFlush();
2183       }
2184     }
2185     return batchOp.retCodeDetails;
2186   }
2187 
2188 
2189   private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2190       throws IOException {
2191     /* Run coprocessor pre hook outside of locks to avoid deadlock */
2192     WALEdit walEdit = new WALEdit();
2193     if (coprocessorHost != null) {
2194       for (int i = 0 ; i < batchOp.operations.length; i++) {
2195         Mutation m = batchOp.getMutation(i);
2196         if (m instanceof Put) {
2197           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2198             // pre hook says skip this Put
2199             // mark as success and skip in doMiniBatchMutation
2200             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2201           }
2202         } else if (m instanceof Delete) {
2203           Delete curDel = (Delete) m;
2204           if (curDel.getFamilyCellMap().isEmpty()) {
2205             // handle deleting a row case
2206             prepareDelete(curDel);
2207           }
2208           if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2209             // pre hook says skip this Delete
2210             // mark as success and skip in doMiniBatchMutation
2211             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2212           }
2213         } else {
2214           // In case of passing Append mutations along with the Puts and Deletes in batchMutate
2215           // mark the operation return code as failure so that it will not be considered in
2216           // the doMiniBatchMutation
2217           batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2218               "Put/Delete mutations only supported in batchMutate() now");
2219         }
2220         if (!walEdit.isEmpty()) {
2221           batchOp.walEditsFromCoprocessors[i] = walEdit;
2222           walEdit = new WALEdit();
2223         }
2224       }
2225     }
2226   }
2227 
2228   @SuppressWarnings("unchecked")
2229   private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2230     boolean isInReplay = batchOp.isInReplay();
2231     // variable to note if all Put items are for the same CF -- metrics related
2232     boolean putsCfSetConsistent = true;
2233     //The set of columnFamilies first seen for Put.
2234     Set<byte[]> putsCfSet = null;
2235     // variable to note if all Delete items are for the same CF -- metrics related
2236     boolean deletesCfSetConsistent = true;
2237     //The set of columnFamilies first seen for Delete.
2238     Set<byte[]> deletesCfSet = null;
2239 
2240     long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2241     WALEdit walEdit = new WALEdit(isInReplay);
2242     MultiVersionConsistencyControl.WriteEntry w = null;
2243     long txid = 0;
2244     boolean doRollBackMemstore = false;
2245     boolean locked = false;
2246 
2247     /** Keep track of the locks we hold so we can release them in finally clause */
2248     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2249     // reference family maps directly so coprocessors can mutate them if desired
2250     Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2251     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
2252     int firstIndex = batchOp.nextIndexToProcess;
2253     int lastIndexExclusive = firstIndex;
2254     boolean success = false;
2255     int noOfPuts = 0, noOfDeletes = 0;
2256     try {
2257       // ------------------------------------
2258       // STEP 1. Try to acquire as many locks as we can, and ensure
2259       // we acquire at least one.
2260       // ----------------------------------
2261       int numReadyToWrite = 0;
2262       long now = EnvironmentEdgeManager.currentTimeMillis();
2263       while (lastIndexExclusive < batchOp.operations.length) {
2264         Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2265         boolean isPutMutation = mutation instanceof Put;
2266 
2267         Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2268         // store the family map reference to allow for mutations
2269         familyMaps[lastIndexExclusive] = familyMap;
2270 
2271         // skip anything that "ran" already
2272         if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2273             != OperationStatusCode.NOT_RUN) {
2274           lastIndexExclusive++;
2275           continue;
2276         }
2277 
2278         try {
2279           if (isPutMutation) {
2280             // Check the families in the put. If bad, skip this one.
2281             if (isInReplay) {
2282               removeNonExistentColumnFamilyForReplay(familyMap);
2283             } else {
2284               checkFamilies(familyMap.keySet());
2285             }
2286             checkTimestamps(mutation.getFamilyCellMap(), now);
2287           } else {
2288             prepareDelete((Delete) mutation);
2289           }
2290         } catch (NoSuchColumnFamilyException nscf) {
2291           LOG.warn("No such column family in batch mutation", nscf);
2292           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2293               OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2294           lastIndexExclusive++;
2295           continue;
2296         } catch (FailedSanityCheckException fsce) {
2297           LOG.warn("Batch Mutation did not pass sanity check", fsce);
2298           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2299               OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2300           lastIndexExclusive++;
2301           continue;
2302         }
2303 
2304         // If we haven't got any rows in our batch, we should block to
2305         // get the next one.
2306         boolean shouldBlock = numReadyToWrite == 0;
2307         RowLock rowLock = null;
2308         try {
2309           rowLock = getRowLock(mutation.getRow(), shouldBlock);
2310         } catch (IOException ioe) {
2311           LOG.warn("Failed getting lock in batch put, row="
2312             + Bytes.toStringBinary(mutation.getRow()), ioe);
2313         }
2314         if (rowLock == null) {
2315           // We failed to grab another lock
2316           assert !shouldBlock : "Should never fail to get lock when blocking";
2317           break; // stop acquiring more rows for this batch
2318         } else {
2319           acquiredRowLocks.add(rowLock);
2320         }
2321 
2322         lastIndexExclusive++;
2323         numReadyToWrite++;
2324 
2325         if (isPutMutation) {
2326           // If Column Families stay consistent through out all of the
2327           // individual puts then metrics can be reported as a mutliput across
2328           // column families in the first put.
2329           if (putsCfSet == null) {
2330             putsCfSet = mutation.getFamilyCellMap().keySet();
2331           } else {
2332             putsCfSetConsistent = putsCfSetConsistent
2333                 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
2334           }
2335         } else {
2336           if (deletesCfSet == null) {
2337             deletesCfSet = mutation.getFamilyCellMap().keySet();
2338           } else {
2339             deletesCfSetConsistent = deletesCfSetConsistent
2340                 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
2341           }
2342         }
2343       }
2344 
2345       // we should record the timestamp only after we have acquired the rowLock,
2346       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
2347       now = EnvironmentEdgeManager.currentTimeMillis();
2348       byte[] byteNow = Bytes.toBytes(now);
2349 
2350       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
2351       if (numReadyToWrite <= 0) return 0L;
2352 
2353       // We've now grabbed as many mutations off the list as we can
2354 
2355       // ------------------------------------
2356       // STEP 2. Update any LATEST_TIMESTAMP timestamps
2357       // ----------------------------------
2358       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2359         // skip invalid
2360         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2361             != OperationStatusCode.NOT_RUN) continue;
2362 
2363         Mutation mutation = batchOp.getMutation(i);
2364         if (mutation instanceof Put) {
2365           updateKVTimestamps(familyMaps[i].values(), byteNow);
2366           noOfPuts++;
2367         } else {
2368           prepareDeleteTimestamps(familyMaps[i], byteNow);
2369           noOfDeletes++;
2370         }
2371       }
2372 
2373       lock(this.updatesLock.readLock(), numReadyToWrite);
2374       locked = true;
2375 
2376       //
2377       // ------------------------------------
2378       // Acquire the latest mvcc number
2379       // ----------------------------------
2380       w = mvcc.beginMemstoreInsert();
2381 
2382       // calling the pre CP hook for batch mutation
2383       if (!isInReplay && coprocessorHost != null) {
2384         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2385           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2386           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2387         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2388       }
2389 
2390       // ------------------------------------
2391       // STEP 3. Write back to memstore
2392       // Write to memstore. It is ok to write to memstore
2393       // first without updating the HLog because we do not roll
2394       // forward the memstore MVCC. The MVCC will be moved up when
2395       // the complete operation is done. These changes are not yet
2396       // visible to scanners till we update the MVCC. The MVCC is
2397       // moved only when the sync is complete.
2398       // ----------------------------------
2399       long addedSize = 0;
2400       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2401         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2402             != OperationStatusCode.NOT_RUN) {
2403           continue;
2404         }
2405         doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
2406         addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
2407       }
2408 
2409       // ------------------------------------
2410       // STEP 4. Build WAL edit
2411       // ----------------------------------
2412       boolean hasWalAppends = false;
2413       Durability durability = Durability.USE_DEFAULT;
2414       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2415         // Skip puts that were determined to be invalid during preprocessing
2416         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2417             != OperationStatusCode.NOT_RUN) {
2418           continue;
2419         }
2420         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2421 
2422         Mutation m = batchOp.getMutation(i);
2423         Durability tmpDur = getEffectiveDurability(m.getDurability());
2424         if (tmpDur.ordinal() > durability.ordinal()) {
2425           durability = tmpDur;
2426         }
2427         if (tmpDur == Durability.SKIP_WAL) {
2428           recordMutationWithoutWal(m.getFamilyCellMap());
2429           continue;
2430         }
2431 
2432         long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
2433         // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
2434         // Given how nonces are originally written, these should be contiguous.
2435         // They don't have to be, it will still work, just write more WALEdits than needed.
2436         if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
2437           if (walEdit.size() > 0) {
2438             assert isInReplay;
2439             if (!isInReplay) {
2440               throw new IOException("Multiple nonces per batch and not in replay");
2441             }
2442             // txid should always increase, so having the one from the last call is ok.
2443             txid = this.log.appendNoSync(this.getRegionInfo(), htableDescriptor.getTableName(),
2444                   walEdit, m.getClusterIds(), now, htableDescriptor, this.sequenceId, true,
2445                   currentNonceGroup, currentNonce);
2446             hasWalAppends = true;
2447             walEdit = new WALEdit(isInReplay);
2448           }
2449           currentNonceGroup = nonceGroup;
2450           currentNonce = nonce;
2451         }
2452 
2453         // Add WAL edits by CP
2454         WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2455         if (fromCP != null) {
2456           for (KeyValue kv : fromCP.getKeyValues()) {
2457             walEdit.add(kv);
2458           }
2459         }
2460         addFamilyMapToWALEdit(familyMaps[i], walEdit);
2461       }
2462 
2463       // -------------------------
2464       // STEP 5. Append the final edit to WAL. Do not sync wal.
2465       // -------------------------
2466       Mutation mutation = batchOp.getMutation(firstIndex);
2467       if (walEdit.size() > 0) {
2468         txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
2469               walEdit, mutation.getClusterIds(), now, this.htableDescriptor, this.sequenceId,
2470               true, currentNonceGroup, currentNonce);
2471         hasWalAppends = true;
2472       }
2473 
2474       // -------------------------------
2475       // STEP 6. Release row locks, etc.
2476       // -------------------------------
2477       if (locked) {
2478         this.updatesLock.readLock().unlock();
2479         locked = false;
2480       }
2481       releaseRowLocks(acquiredRowLocks);
2482 
2483       // -------------------------
2484       // STEP 7. Sync wal.
2485       // -------------------------
2486       if (hasWalAppends) {
2487         syncOrDefer(txid, durability);
2488       }
2489       doRollBackMemstore = false;
2490       // calling the post CP hook for batch mutation
2491       if (!isInReplay && coprocessorHost != null) {
2492         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2493           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2494           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2495         coprocessorHost.postBatchMutate(miniBatchOp);
2496       }
2497 
2498       // ------------------------------------------------------------------
2499       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
2500       // ------------------------------------------------------------------
2501       if (w != null) {
2502         mvcc.completeMemstoreInsert(w);
2503         w = null;
2504       }
2505 
2506       // ------------------------------------
2507       // STEP 9. Run coprocessor post hooks. This should be done after the wal is
2508       // synced so that the coprocessor contract is adhered to.
2509       // ------------------------------------
2510       if (!isInReplay && coprocessorHost != null) {
2511         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2512           // only for successful puts
2513           if (batchOp.retCodeDetails[i].getOperationStatusCode()
2514               != OperationStatusCode.SUCCESS) {
2515             continue;
2516           }
2517           Mutation m = batchOp.getMutation(i);
2518           if (m instanceof Put) {
2519             coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
2520           } else {
2521             coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
2522           }
2523         }
2524       }
2525 
2526       success = true;
2527       return addedSize;
2528     } finally {
2529 
2530       // if the wal sync was unsuccessful, remove keys from memstore
2531       if (doRollBackMemstore) {
2532         rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
2533       }
2534       if (w != null) mvcc.completeMemstoreInsert(w);
2535 
2536       if (locked) {
2537         this.updatesLock.readLock().unlock();
2538       }
2539       releaseRowLocks(acquiredRowLocks);
2540 
2541       // See if the column families were consistent through the whole thing.
2542       // if they were then keep them. If they were not then pass a null.
2543       // null will be treated as unknown.
2544       // Total time taken might be involving Puts and Deletes.
2545       // Split the time for puts and deletes based on the total number of Puts and Deletes.
2546 
2547       if (noOfPuts > 0) {
2548         // There were some Puts in the batch.
2549         if (this.metricsRegion != null) {
2550           this.metricsRegion.updatePut();
2551         }
2552       }
2553       if (noOfDeletes > 0) {
2554         // There were some Deletes in the batch.
2555         if (this.metricsRegion != null) {
2556           this.metricsRegion.updateDelete();
2557         }
2558       }
2559       if (!success) {
2560         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2561           if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
2562             batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
2563           }
2564         }
2565       }
2566       if (coprocessorHost != null && !batchOp.isInReplay()) {
2567         // call the coprocessor hook to do any finalization steps
2568         // after the put is done
2569         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2570             new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2571                 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
2572                 lastIndexExclusive);
2573         coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
2574       }
2575 
2576       batchOp.nextIndexToProcess = lastIndexExclusive;
2577     }
2578   }
2579 
2580   /**
2581    * Returns effective durability from the passed durability and
2582    * the table descriptor.
2583    */
2584   protected Durability getEffectiveDurability(Durability d) {
2585     return d == Durability.USE_DEFAULT ? this.durability : d;
2586   }
2587 
2588   //TODO, Think that gets/puts and deletes should be refactored a bit so that
2589   //the getting of the lock happens before, so that you would just pass it into
2590   //the methods. So in the case of checkAndMutate you could just do lockRow,
2591   //get, put, unlockRow or something
2592   /**
2593    *
2594    * @param row
2595    * @param family
2596    * @param qualifier
2597    * @param compareOp
2598    * @param comparator
2599    * @param w
2600    * @param writeToWAL
2601    * @throws IOException
2602    * @return true if the new put was executed, false otherwise
2603    */
2604   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
2605       CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
2606       boolean writeToWAL)
2607   throws IOException{
2608     checkReadOnly();
2609     //TODO, add check for value length or maybe even better move this to the
2610     //client if this becomes a global setting
2611     checkResources();
2612     boolean isPut = w instanceof Put;
2613     if (!isPut && !(w instanceof Delete))
2614       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
2615           "be Put or Delete");
2616     if (!Bytes.equals(row, w.getRow())) {
2617       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
2618           "getRow must match the passed row");
2619     }
2620 
2621     startRegionOperation();
2622     try {
2623       Get get = new Get(row);
2624       checkFamily(family);
2625       get.addColumn(family, qualifier);
2626 
2627       // Lock row - note that doBatchMutate will relock this row if called
2628       RowLock rowLock = getRowLock(get.getRow());
2629       // wait for all previous transactions to complete (with lock held)
2630       mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
2631       List<Cell> result;
2632       try {
2633         result = get(get, false);
2634 
2635         boolean valueIsNull = comparator.getValue() == null ||
2636           comparator.getValue().length == 0;
2637         boolean matches = false;
2638         if (result.size() == 0 && valueIsNull) {
2639           matches = true;
2640         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
2641             valueIsNull) {
2642           matches = true;
2643         } else if (result.size() == 1 && !valueIsNull) {
2644           Cell kv = result.get(0);
2645           int compareResult = comparator.compareTo(kv.getValueArray(),
2646               kv.getValueOffset(), kv.getValueLength());
2647           switch (compareOp) {
2648           case LESS:
2649             matches = compareResult < 0;
2650             break;
2651           case LESS_OR_EQUAL:
2652             matches = compareResult <= 0;
2653             break;
2654           case EQUAL:
2655             matches = compareResult == 0;
2656             break;
2657           case NOT_EQUAL:
2658             matches = compareResult != 0;
2659             break;
2660           case GREATER_OR_EQUAL:
2661             matches = compareResult >= 0;
2662             break;
2663           case GREATER:
2664             matches = compareResult > 0;
2665             break;
2666           default:
2667             throw new RuntimeException("Unknown Compare op " + compareOp.name());
2668           }
2669         }
2670         //If matches put the new put or delete the new delete
2671         if (matches) {
2672           // All edits for the given row (across all column families) must
2673           // happen atomically.
2674           doBatchMutate((Mutation)w);
2675           this.checkAndMutateChecksPassed.increment();
2676           return true;
2677         }
2678         this.checkAndMutateChecksFailed.increment();
2679         return false;
2680       } finally {
2681         rowLock.release();
2682       }
2683     } finally {
2684       closeRegionOperation();
2685     }
2686   }
2687 
2688   private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
2689     // Currently this is only called for puts and deletes, so no nonces.
2690     OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
2691         HConstants.NO_NONCE, HConstants.NO_NONCE);
2692     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
2693       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
2694     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
2695       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
2696     }
2697   }
2698 
2699   /**
2700    * Complete taking the snapshot on the region. Writes the region info and adds references to the
2701    * working snapshot directory.
2702    *
2703    * TODO for api consistency, consider adding another version with no {@link ForeignExceptionSnare}
2704    * arg.  (In the future other cancellable HRegion methods could eventually add a
2705    * {@link ForeignExceptionSnare}, or we could do something fancier).
2706    *
2707    * @param desc snasphot description object
2708    * @param exnSnare ForeignExceptionSnare that captures external exeptions in case we need to
2709    *   bail out.  This is allowed to be null and will just be ignored in that case.
2710    * @throws IOException if there is an external or internal error causing the snapshot to fail
2711    */
2712   public void addRegionToSnapshot(SnapshotDescription desc,
2713       ForeignExceptionSnare exnSnare) throws IOException {
2714     // This should be "fast" since we don't rewrite store files but instead
2715     // back up the store files by creating a reference
2716     Path rootDir = FSUtils.getRootDir(this.rsServices.getConfiguration());
2717     Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
2718 
2719     // 1. dump region meta info into the snapshot directory
2720     LOG.debug("Storing region-info for snapshot.");
2721     HRegionFileSystem snapshotRegionFs = HRegionFileSystem.createRegionOnFileSystem(conf,
2722         this.fs.getFileSystem(), snapshotDir, getRegionInfo());
2723 
2724     // 2. iterate through all the stores in the region
2725     LOG.debug("Creating references for hfiles");
2726 
2727     // This ensures that we have an atomic view of the directory as long as we have < ls limit
2728     // (batch size of the files in a directory) on the namenode. Otherwise, we get back the files in
2729     // batches and may miss files being added/deleted. This could be more robust (iteratively
2730     // checking to see if we have all the files until we are sure), but the limit is currently 1000
2731     // files/batch, far more than the number of store files under a single column family.
2732     for (Store store : stores.values()) {
2733       // 2.1. build the snapshot reference directory for the store
2734       Path dstStoreDir = snapshotRegionFs.getStoreDir(store.getFamily().getNameAsString());
2735       List<StoreFile> storeFiles = new ArrayList<StoreFile>(store.getStorefiles());
2736       if (LOG.isDebugEnabled()) {
2737         LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
2738       }
2739 
2740       // 2.2. iterate through all the store's files and create "references".
2741       int sz = storeFiles.size();
2742       for (int i = 0; i < sz; i++) {
2743         if (exnSnare != null) {
2744           exnSnare.rethrowException();
2745         }
2746         StoreFile storeFile = storeFiles.get(i);
2747         Path file = storeFile.getPath();
2748 
2749         LOG.debug("Creating reference for file (" + (i+1) + "/" + sz + ") : " + file);
2750         Path referenceFile = new Path(dstStoreDir, file.getName());
2751         boolean success = true;
2752         if (storeFile.isReference()) {
2753           // write the Reference object to the snapshot
2754           storeFile.getFileInfo().getReference().write(fs.getFileSystem(), referenceFile);
2755         } else {
2756           // create "reference" to this store file.  It is intentionally an empty file -- all
2757           // necessary information is captured by its fs location and filename.  This allows us to
2758           // only figure out what needs to be done via a single nn operation (instead of having to
2759           // open and read the files as well).
2760           success = fs.getFileSystem().createNewFile(referenceFile);
2761         }
2762         if (!success) {
2763           throw new IOException("Failed to create reference file:" + referenceFile);
2764         }
2765       }
2766     }
2767   }
2768 
2769   /**
2770    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the
2771    * provided current timestamp.
2772    */
2773   void updateKVTimestamps(final Iterable<List<Cell>> keyLists, final byte[] now) {
2774     for (List<Cell> cells: keyLists) {
2775       if (cells == null) continue;
2776       for (Cell cell : cells) {
2777         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2778         kv.updateLatestStamp(now);
2779       }
2780     }
2781   }
2782 
2783   /*
2784    * Check if resources to support an update.
2785    *
2786    * We throw RegionTooBusyException if above memstore limit
2787    * and expect client to retry using some kind of backoff
2788   */
2789   private void checkResources()
2790     throws RegionTooBusyException {
2791     // If catalog region, do not impose resource constraints or block updates.
2792     if (this.getRegionInfo().isMetaRegion()) return;
2793 
2794     if (this.memstoreSize.get() > this.blockingMemStoreSize) {
2795       requestFlush();
2796       throw new RegionTooBusyException("Above memstore limit, " +
2797           "regionName=" + (this.getRegionInfo() == null ? "unknown" :
2798           this.getRegionInfo().getRegionNameAsString()) +
2799           ", server=" + (this.getRegionServerServices() == null ? "unknown" :
2800           this.getRegionServerServices().getServerName()) +
2801           ", memstoreSize=" + memstoreSize.get() +
2802           ", blockingMemStoreSize=" + blockingMemStoreSize);
2803     }
2804   }
2805 
2806   /**
2807    * @throws IOException Throws exception if region is in read-only mode.
2808    */
2809   protected void checkReadOnly() throws IOException {
2810     if (this.writestate.isReadOnly()) {
2811       throw new IOException("region is read only");
2812     }
2813   }
2814 
2815   /**
2816    * Add updates first to the hlog and then add values to memstore.
2817    * Warning: Assumption is caller has lock on passed in row.
2818    * @param family
2819    * @param edits Cell updates by column
2820    * @praram now
2821    * @throws IOException
2822    */
2823   private void put(final byte [] row, byte [] family, List<Cell> edits)
2824   throws IOException {
2825     NavigableMap<byte[], List<Cell>> familyMap;
2826     familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
2827 
2828     familyMap.put(family, edits);
2829     Put p = new Put(row);
2830     p.setFamilyCellMap(familyMap);
2831     doBatchMutate(p);
2832   }
2833 
2834   /**
2835    * Atomically apply the given map of family->edits to the memstore.
2836    * This handles the consistency control on its own, but the caller
2837    * should already have locked updatesLock.readLock(). This also does
2838    * <b>not</b> check the families for validity.
2839    *
2840    * @param familyMap Map of kvs per family
2841    * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
2842    *        If null, then this method internally creates a mvcc transaction.
2843    * @return the additional memory usage of the memstore caused by the
2844    * new entries.
2845    */
2846   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
2847     MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
2848     long size = 0;
2849     boolean freemvcc = false;
2850 
2851     try {
2852       if (localizedWriteEntry == null) {
2853         localizedWriteEntry = mvcc.beginMemstoreInsert();
2854         freemvcc = true;
2855       }
2856 
2857       for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2858         byte[] family = e.getKey();
2859         List<Cell> cells = e.getValue();
2860 
2861         Store store = getStore(family);
2862         for (Cell cell: cells) {
2863           KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2864           kv.setMvccVersion(localizedWriteEntry.getWriteNumber());
2865           size += store.add(kv);
2866         }
2867       }
2868     } finally {
2869       if (freemvcc) {
2870         mvcc.completeMemstoreInsert(localizedWriteEntry);
2871       }
2872     }
2873 
2874      return size;
2875    }
2876 
2877   /**
2878    * Remove all the keys listed in the map from the memstore. This method is
2879    * called when a Put/Delete has updated memstore but subequently fails to update
2880    * the wal. This method is then invoked to rollback the memstore.
2881    */
2882   private void rollbackMemstore(BatchOperationInProgress<?> batchOp,
2883                                 Map<byte[], List<Cell>>[] familyMaps,
2884                                 int start, int end) {
2885     int kvsRolledback = 0;
2886     for (int i = start; i < end; i++) {
2887       // skip over request that never succeeded in the first place.
2888       if (batchOp.retCodeDetails[i].getOperationStatusCode()
2889             != OperationStatusCode.SUCCESS) {
2890         continue;
2891       }
2892 
2893       // Rollback all the kvs for this row.
2894       Map<byte[], List<Cell>> familyMap  = familyMaps[i];
2895       for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2896         byte[] family = e.getKey();
2897         List<Cell> cells = e.getValue();
2898 
2899         // Remove those keys from the memstore that matches our
2900         // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is
2901         // that even the memstoreTS has to match for keys that will be rolleded-back.
2902         Store store = getStore(family);
2903         for (Cell cell: cells) {
2904           store.rollback(KeyValueUtil.ensureKeyValue(cell));
2905           kvsRolledback++;
2906         }
2907       }
2908     }
2909     LOG.debug("rollbackMemstore rolled back " + kvsRolledback +
2910         " keyvalues from start:" + start + " to end:" + end);
2911   }
2912 
2913   /**
2914    * Check the collection of families for validity.
2915    * @throws NoSuchColumnFamilyException if a family does not exist.
2916    */
2917   void checkFamilies(Collection<byte[]> families)
2918   throws NoSuchColumnFamilyException {
2919     for (byte[] family : families) {
2920       checkFamily(family);
2921     }
2922   }
2923 
2924   /**
2925    * During replay, there could exist column families which are removed between region server
2926    * failure and replay
2927    */
2928   private void removeNonExistentColumnFamilyForReplay(
2929       final Map<byte[], List<Cell>> familyMap) {
2930     List<byte[]> nonExistentList = null;
2931     for (byte[] family : familyMap.keySet()) {
2932       if (!this.htableDescriptor.hasFamily(family)) {
2933         if (nonExistentList == null) {
2934           nonExistentList = new ArrayList<byte[]>();
2935         }
2936         nonExistentList.add(family);
2937       }
2938     }
2939     if (nonExistentList != null) {
2940       for (byte[] family : nonExistentList) {
2941         // Perhaps schema was changed between crash and replay
2942         LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
2943         familyMap.remove(family);
2944       }
2945     }
2946   }
2947 
2948   void checkTimestamps(final Map<byte[], List<Cell>> familyMap,
2949       long now) throws FailedSanityCheckException {
2950     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
2951       return;
2952     }
2953     long maxTs = now + timestampSlop;
2954     for (List<Cell> kvs : familyMap.values()) {
2955       for (Cell cell : kvs) {
2956         // see if the user-side TS is out of range. latest = server-side
2957         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2958         if (!kv.isLatestTimestamp() && kv.getTimestamp() > maxTs) {
2959           throw new FailedSanityCheckException("Timestamp for KV out of range "
2960               + cell + " (too.new=" + timestampSlop + ")");
2961         }
2962       }
2963     }
2964   }
2965 
2966   /**
2967    * Append the given map of family->edits to a WALEdit data structure.
2968    * This does not write to the HLog itself.
2969    * @param familyMap map of family->edits
2970    * @param walEdit the destination entry to append into
2971    */
2972   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
2973       WALEdit walEdit) {
2974     for (List<Cell> edits : familyMap.values()) {
2975       for (Cell cell : edits) {
2976         walEdit.add(KeyValueUtil.ensureKeyValue(cell));
2977       }
2978     }
2979   }
2980 
2981   private void requestFlush() {
2982     if (this.rsServices == null) {
2983       return;
2984     }
2985     synchronized (writestate) {
2986       if (this.writestate.isFlushRequested()) {
2987         return;
2988       }
2989       writestate.flushRequested = true;
2990     }
2991     // Make request outside of synchronize block; HBASE-818.
2992     this.rsServices.getFlushRequester().requestFlush(this);
2993     if (LOG.isDebugEnabled()) {
2994       LOG.debug("Flush requested on " + this);
2995     }
2996   }
2997 
2998   /*
2999    * @param size
3000    * @return True if size is over the flush threshold
3001    */
3002   private boolean isFlushSize(final long size) {
3003     return size > this.memstoreFlushSize;
3004   }
3005 
3006   /**
3007    * Read the edits log put under this region by wal log splitting process.  Put
3008    * the recovered edits back up into this region.
3009    *
3010    * <p>We can ignore any log message that has a sequence ID that's equal to or
3011    * lower than minSeqId.  (Because we know such log messages are already
3012    * reflected in the HFiles.)
3013    *
3014    * <p>While this is running we are putting pressure on memory yet we are
3015    * outside of our usual accounting because we are not yet an onlined region
3016    * (this stuff is being run as part of Region initialization).  This means
3017    * that if we're up against global memory limits, we'll not be flagged to flush
3018    * because we are not online. We can't be flushed by usual mechanisms anyways;
3019    * we're not yet online so our relative sequenceids are not yet aligned with
3020    * HLog sequenceids -- not till we come up online, post processing of split
3021    * edits.
3022    *
3023    * <p>But to help relieve memory pressure, at least manage our own heap size
3024    * flushing if are in excess of per-region limits.  Flushing, though, we have
3025    * to be careful and avoid using the regionserver/hlog sequenceid.  Its running
3026    * on a different line to whats going on in here in this region context so if we
3027    * crashed replaying these edits, but in the midst had a flush that used the
3028    * regionserver log with a sequenceid in excess of whats going on in here
3029    * in this region and with its split editlogs, then we could miss edits the
3030    * next time we go to recover. So, we have to flush inline, using seqids that
3031    * make sense in a this single region context only -- until we online.
3032    *
3033    * @param regiondir
3034    * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
3035    * the maxSeqId for the store to be applied, else its skipped.
3036    * @param reporter
3037    * @return the sequence id of the last edit added to this region out of the
3038    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3039    * @throws UnsupportedEncodingException
3040    * @throws IOException
3041    */
3042   protected long replayRecoveredEditsIfAny(final Path regiondir,
3043       Map<byte[], Long> maxSeqIdInStores,
3044       final CancelableProgressable reporter, final MonitoredTask status)
3045       throws UnsupportedEncodingException, IOException {
3046     long minSeqIdForTheRegion = -1;
3047     for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3048       if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3049         minSeqIdForTheRegion = maxSeqIdInStore;
3050       }
3051     }
3052     long seqid = minSeqIdForTheRegion;
3053 
3054     FileSystem fs = this.fs.getFileSystem();
3055     NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
3056     if (LOG.isDebugEnabled()) {
3057       LOG.debug("Found " + (files == null ? 0 : files.size())
3058         + " recovered edits file(s) under " + regiondir);
3059     }
3060 
3061     if (files == null || files.isEmpty()) return seqid;
3062 
3063     for (Path edits: files) {
3064       if (edits == null || !fs.exists(edits)) {
3065         LOG.warn("Null or non-existent edits file: " + edits);
3066         continue;
3067       }
3068       if (isZeroLengthThenDelete(fs, edits)) continue;
3069 
3070       long maxSeqId;
3071       String fileName = edits.getName();
3072       maxSeqId = Math.abs(Long.parseLong(fileName));
3073       if (maxSeqId <= minSeqIdForTheRegion) {
3074         if (LOG.isDebugEnabled()) {
3075           String msg = "Maximum sequenceid for this log is " + maxSeqId
3076             + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3077             + ", skipped the whole file, path=" + edits;
3078           LOG.debug(msg);
3079         }
3080         continue;
3081       }
3082 
3083       try {
3084         seqid = replayRecoveredEdits(edits, maxSeqIdInStores, reporter);
3085       } catch (IOException e) {
3086         boolean skipErrors = conf.getBoolean(
3087             HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3088             conf.getBoolean(
3089                 "hbase.skip.errors",
3090                 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3091         if (conf.get("hbase.skip.errors") != null) {
3092           LOG.warn(
3093               "The property 'hbase.skip.errors' has been deprecated. Please use " +
3094               HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3095         }
3096         if (skipErrors) {
3097           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3098           LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3099               + "=true so continuing. Renamed " + edits +
3100               " as " + p, e);
3101         } else {
3102           throw e;
3103         }
3104       }
3105     }
3106     // The edits size added into rsAccounting during this replaying will not
3107     // be required any more. So just clear it.
3108     if (this.rsAccounting != null) {
3109       this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
3110     }
3111     if (seqid > minSeqIdForTheRegion) {
3112       // Then we added some edits to memory. Flush and cleanup split edit files.
3113       internalFlushcache(null, seqid, status);
3114     }
3115     // Now delete the content of recovered edits.  We're done w/ them.
3116     for (Path file: files) {
3117       if (!fs.delete(file, false)) {
3118         LOG.error("Failed delete of " + file);
3119       } else {
3120         LOG.debug("Deleted recovered.edits file=" + file);
3121       }
3122     }
3123     return seqid;
3124   }
3125 
3126   /*
3127    * @param edits File of recovered edits.
3128    * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in log
3129    * must be larger than this to be replayed for each store.
3130    * @param reporter
3131    * @return the sequence id of the last edit added to this region out of the
3132    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3133    * @throws IOException
3134    */
3135   private long replayRecoveredEdits(final Path edits,
3136       Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3137     throws IOException {
3138     String msg = "Replaying edits from " + edits;
3139     LOG.info(msg);
3140     MonitoredTask status = TaskMonitor.get().createStatus(msg);
3141     FileSystem fs = this.fs.getFileSystem();
3142 
3143     status.setStatus("Opening logs");
3144     HLog.Reader reader = null;
3145     try {
3146       reader = HLogFactory.createReader(fs, edits, conf);
3147       long currentEditSeqId = -1;
3148       long firstSeqIdInLog = -1;
3149       long skippedEdits = 0;
3150       long editsCount = 0;
3151       long intervalEdits = 0;
3152       HLog.Entry entry;
3153       Store store = null;
3154       boolean reported_once = false;
3155       ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3156 
3157       try {
3158         // How many edits seen before we check elapsed time
3159         int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
3160             2000);
3161         // How often to send a progress report (default 1/2 master timeout)
3162         int period = this.conf.getInt("hbase.hstore.report.period",
3163           this.conf.getInt(AssignmentManager.ASSIGNMENT_TIMEOUT,
3164             AssignmentManager.DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT) / 2);
3165         long lastReport = EnvironmentEdgeManager.currentTimeMillis();
3166 
3167         while ((entry = reader.next()) != null) {
3168           HLogKey key = entry.getKey();
3169           WALEdit val = entry.getEdit();
3170 
3171           if (ng != null) { // some test, or nonces disabled
3172             ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
3173           }
3174 
3175           if (reporter != null) {
3176             intervalEdits += val.size();
3177             if (intervalEdits >= interval) {
3178               // Number of edits interval reached
3179               intervalEdits = 0;
3180               long cur = EnvironmentEdgeManager.currentTimeMillis();
3181               if (lastReport + period <= cur) {
3182                 status.setStatus("Replaying edits..." +
3183                     " skipped=" + skippedEdits +
3184                     " edits=" + editsCount);
3185                 // Timeout reached
3186                 if(!reporter.progress()) {
3187                   msg = "Progressable reporter failed, stopping replay";
3188                   LOG.warn(msg);
3189                   status.abort(msg);
3190                   throw new IOException(msg);
3191                 }
3192                 reported_once = true;
3193                 lastReport = cur;
3194               }
3195             }
3196           }
3197 
3198           // Start coprocessor replay here. The coprocessor is for each WALEdit
3199           // instead of a KeyValue.
3200           if (coprocessorHost != null) {
3201             status.setStatus("Running pre-WAL-restore hook in coprocessors");
3202             if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
3203               // if bypass this log entry, ignore it ...
3204               continue;
3205             }
3206           }
3207 
3208           if (firstSeqIdInLog == -1) {
3209             firstSeqIdInLog = key.getLogSeqNum();
3210           }
3211           boolean flush = false;
3212           for (KeyValue kv: val.getKeyValues()) {
3213             // Check this edit is for me. Also, guard against writing the special
3214             // METACOLUMN info such as HBASE::CACHEFLUSH entries
3215             if (kv.matchingFamily(WALEdit.METAFAMILY) ||
3216                 !Bytes.equals(key.getEncodedRegionName(),
3217                   this.getRegionInfo().getEncodedNameAsBytes())) {
3218               //this is a special edit, we should handle it
3219               CompactionDescriptor compaction = WALEdit.getCompaction(kv);
3220               if (compaction != null) {
3221                 //replay the compaction
3222                 completeCompactionMarker(compaction);
3223               }
3224 
3225               skippedEdits++;
3226               continue;
3227             }
3228             // Figure which store the edit is meant for.
3229             if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
3230               store = this.stores.get(kv.getFamily());
3231             }
3232             if (store == null) {
3233               // This should never happen.  Perhaps schema was changed between
3234               // crash and redeploy?
3235               LOG.warn("No family for " + kv);
3236               skippedEdits++;
3237               continue;
3238             }
3239             // Now, figure if we should skip this edit.
3240             if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
3241                 .getName())) {
3242               skippedEdits++;
3243               continue;
3244             }
3245             currentEditSeqId = key.getLogSeqNum();
3246             // Once we are over the limit, restoreEdit will keep returning true to
3247             // flush -- but don't flush until we've played all the kvs that make up
3248             // the WALEdit.
3249             flush = restoreEdit(store, kv);
3250             editsCount++;
3251           }
3252           if (flush) internalFlushcache(null, currentEditSeqId, status);
3253 
3254           if (coprocessorHost != null) {
3255             coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3256           }
3257         }
3258       } catch (EOFException eof) {
3259         Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3260         msg = "Encountered EOF. Most likely due to Master failure during " +
3261             "log spliting, so we have this data in another edit.  " +
3262             "Continuing, but renaming " + edits + " as " + p;
3263         LOG.warn(msg, eof);
3264         status.abort(msg);
3265       } catch (IOException ioe) {
3266         // If the IOE resulted from bad file format,
3267         // then this problem is idempotent and retrying won't help
3268         if (ioe.getCause() instanceof ParseException) {
3269           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3270           msg = "File corruption encountered!  " +
3271               "Continuing, but renaming " + edits + " as " + p;
3272           LOG.warn(msg, ioe);
3273           status.setStatus(msg);
3274         } else {
3275           status.abort(StringUtils.stringifyException(ioe));
3276           // other IO errors may be transient (bad network connection,
3277           // checksum exception on one datanode, etc).  throw & retry
3278           throw ioe;
3279         }
3280       }
3281       if (reporter != null && !reported_once) {
3282         reporter.progress();
3283       }
3284       msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3285         ", firstSequenceidInLog=" + firstSeqIdInLog +
3286         ", maxSequenceidInLog=" + currentEditSeqId + ", path=" + edits;
3287       status.markComplete(msg);
3288       LOG.debug(msg);
3289       return currentEditSeqId;
3290     } finally {
3291       status.cleanup();
3292       if (reader != null) {
3293          reader.close();
3294       }
3295     }
3296   }
3297 
3298   /**
3299    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
3300    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
3301    * See HBASE-2331.
3302    * @param compaction
3303    */
3304   void completeCompactionMarker(CompactionDescriptor compaction)
3305       throws IOException {
3306     Store store = this.getStore(compaction.getFamilyName().toByteArray());
3307     if (store == null) {
3308       LOG.warn("Found Compaction WAL edit for deleted family:" +
3309           Bytes.toString(compaction.getFamilyName().toByteArray()));
3310       return;
3311     }
3312     store.completeCompactionMarker(compaction);
3313   }
3314 
3315   /**
3316    * Used by tests
3317    * @param s Store to add edit too.
3318    * @param kv KeyValue to add.
3319    * @return True if we should flush.
3320    */
3321   protected boolean restoreEdit(final Store s, final KeyValue kv) {
3322     long kvSize = s.add(kv);
3323     if (this.rsAccounting != null) {
3324       rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
3325     }
3326     return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3327   }
3328 
3329   /*
3330    * @param fs
3331    * @param p File to check.
3332    * @return True if file was zero-length (and if so, we'll delete it in here).
3333    * @throws IOException
3334    */
3335   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3336       throws IOException {
3337     FileStatus stat = fs.getFileStatus(p);
3338     if (stat.getLen() > 0) return false;
3339     LOG.warn("File " + p + " is zero-length, deleting.");
3340     fs.delete(p, false);
3341     return true;
3342   }
3343 
3344   protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
3345     return new HStore(this, family, this.conf);
3346   }
3347 
3348   /**
3349    * Return HStore instance.
3350    * Use with caution.  Exposed for use of fixup utilities.
3351    * @param column Name of column family hosted by this region.
3352    * @return Store that goes with the family on passed <code>column</code>.
3353    * TODO: Make this lookup faster.
3354    */
3355   public Store getStore(final byte[] column) {
3356     return this.stores.get(column);
3357   }
3358 
3359   public Map<byte[], Store> getStores() {
3360     return this.stores;
3361   }
3362 
3363   /**
3364    * Return list of storeFiles for the set of CFs.
3365    * Uses closeLock to prevent the race condition where a region closes
3366    * in between the for loop - closing the stores one by one, some stores
3367    * will return 0 files.
3368    * @return List of storeFiles.
3369    */
3370   public List<String> getStoreFileList(final byte [][] columns)
3371     throws IllegalArgumentException {
3372     List<String> storeFileNames = new ArrayList<String>();
3373     synchronized(closeLock) {
3374       for(byte[] column : columns) {
3375         Store store = this.stores.get(column);
3376         if (store == null) {
3377           throw new IllegalArgumentException("No column family : " +
3378               new String(column) + " available");
3379         }
3380         for (StoreFile storeFile: store.getStorefiles()) {
3381           storeFileNames.add(storeFile.getPath().toString());
3382         }
3383       }
3384     }
3385     return storeFileNames;
3386   }
3387   //////////////////////////////////////////////////////////////////////////////
3388   // Support code
3389   //////////////////////////////////////////////////////////////////////////////
3390 
3391   /** Make sure this is a valid row for the HRegion */
3392   void checkRow(final byte [] row, String op) throws IOException {
3393     if (!rowIsInRange(getRegionInfo(), row)) {
3394       throw new WrongRegionException("Requested row out of range for " +
3395           op + " on HRegion " + this + ", startKey='" +
3396           Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" +
3397           Bytes.toStringBinary(getEndKey()) + "', row='" +
3398           Bytes.toStringBinary(row) + "'");
3399     }
3400   }
3401 
3402   /**
3403    * Tries to acquire a lock on the given row.
3404    * @param waitForLock if true, will block until the lock is available.
3405    *        Otherwise, just tries to obtain the lock and returns
3406    *        false if unavailable.
3407    * @return the row lock if acquired,
3408    *   null if waitForLock was false and the lock was not acquired
3409    * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
3410    */
3411   public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
3412     checkRow(row, "row lock");
3413     startRegionOperation();
3414     try {
3415       HashedBytes rowKey = new HashedBytes(row);
3416       RowLockContext rowLockContext = new RowLockContext(rowKey);
3417 
3418       // loop until we acquire the row lock (unless !waitForLock)
3419       while (true) {
3420         RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
3421         if (existingContext == null) {
3422           // Row is not already locked by any thread, use newly created context.
3423           break;
3424         } else if (existingContext.ownedByCurrentThread()) {
3425           // Row is already locked by current thread, reuse existing context instead.
3426           rowLockContext = existingContext;
3427           break;
3428         } else {
3429           // Row is already locked by some other thread, give up or wait for it
3430           if (!waitForLock) {
3431             return null;
3432           }
3433           try {
3434             if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
3435               throw new IOException("Timed out waiting for lock for row: " + rowKey);
3436             }
3437           } catch (InterruptedException ie) {
3438             LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
3439             InterruptedIOException iie = new InterruptedIOException();
3440             iie.initCause(ie);
3441             throw iie;
3442           }
3443         }
3444       }
3445 
3446       // allocate new lock for this thread
3447       return rowLockContext.newLock();
3448     } finally {
3449       closeRegionOperation();
3450     }
3451   }
3452 
3453   /**
3454    * Acqures a lock on the given row.
3455    * The same thread may acquire multiple locks on the same row.
3456    * @return the acquired row lock
3457    * @throws IOException if the lock could not be acquired after waiting
3458    */
3459   public RowLock getRowLock(byte[] row) throws IOException {
3460     return getRowLock(row, true);
3461   }
3462 
3463   /**
3464    * If the given list of row locks is not null, releases all locks.
3465    */
3466   public void releaseRowLocks(List<RowLock> rowLocks) {
3467     if (rowLocks != null) {
3468       for (RowLock rowLock : rowLocks) {
3469         rowLock.release();
3470       }
3471       rowLocks.clear();
3472     }
3473   }
3474 
3475   /**
3476    * Determines whether multiple column families are present
3477    * Precondition: familyPaths is not null
3478    *
3479    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3480    */
3481   private static boolean hasMultipleColumnFamilies(
3482       List<Pair<byte[], String>> familyPaths) {
3483     boolean multipleFamilies = false;
3484     byte[] family = null;
3485     for (Pair<byte[], String> pair : familyPaths) {
3486       byte[] fam = pair.getFirst();
3487       if (family == null) {
3488         family = fam;
3489       } else if (!Bytes.equals(family, fam)) {
3490         multipleFamilies = true;
3491         break;
3492       }
3493     }
3494     return multipleFamilies;
3495   }
3496 
3497 
3498   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3499                                 boolean assignSeqId) throws IOException {
3500     return bulkLoadHFiles(familyPaths, assignSeqId, null);
3501   }
3502 
3503   /**
3504    * Attempts to atomically load a group of hfiles.  This is critical for loading
3505    * rows with multiple column families atomically.
3506    *
3507    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3508    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
3509    * file about to be bulk loaded
3510    * @param assignSeqId
3511    * @return true if successful, false if failed recoverably
3512    * @throws IOException if failed unrecoverably.
3513    */
3514   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
3515       BulkLoadListener bulkLoadListener) throws IOException {
3516     Preconditions.checkNotNull(familyPaths);
3517     // we need writeLock for multi-family bulk load
3518     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
3519     try {
3520       this.writeRequestsCount.increment();
3521 
3522       // There possibly was a split that happend between when the split keys
3523       // were gathered and before the HReiogn's write lock was taken.  We need
3524       // to validate the HFile region before attempting to bulk load all of them
3525       List<IOException> ioes = new ArrayList<IOException>();
3526       List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
3527       for (Pair<byte[], String> p : familyPaths) {
3528         byte[] familyName = p.getFirst();
3529         String path = p.getSecond();
3530 
3531         Store store = getStore(familyName);
3532         if (store == null) {
3533           IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
3534               "No such column family " + Bytes.toStringBinary(familyName));
3535           ioes.add(ioe);
3536         } else {
3537           try {
3538             store.assertBulkLoadHFileOk(new Path(path));
3539           } catch (WrongRegionException wre) {
3540             // recoverable (file doesn't fit in region)
3541             failures.add(p);
3542           } catch (IOException ioe) {
3543             // unrecoverable (hdfs problem)
3544             ioes.add(ioe);
3545           }
3546         }
3547       }
3548 
3549       // validation failed because of some sort of IO problem.
3550       if (ioes.size() != 0) {
3551         IOException e = MultipleIOException.createIOException(ioes);
3552         LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
3553         throw e;
3554       }
3555 
3556       // validation failed, bail out before doing anything permanent.
3557       if (failures.size() != 0) {
3558         StringBuilder list = new StringBuilder();
3559         for (Pair<byte[], String> p : failures) {
3560           list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
3561             .append(p.getSecond());
3562         }
3563         // problem when validating
3564         LOG.warn("There was a recoverable bulk load failure likely due to a" +
3565             " split.  These (family, HFile) pairs were not loaded: " + list);
3566         return false;
3567       }
3568 
3569       for (Pair<byte[], String> p : familyPaths) {
3570         byte[] familyName = p.getFirst();
3571         String path = p.getSecond();
3572         Store store = getStore(familyName);
3573         try {
3574           String finalPath = path;
3575           if(bulkLoadListener != null) {
3576             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
3577           }
3578           store.bulkLoadHFile(finalPath, assignSeqId ? this.sequenceId.incrementAndGet() : -1);
3579           if(bulkLoadListener != null) {
3580             bulkLoadListener.doneBulkLoad(familyName, path);
3581           }
3582         } catch (IOException ioe) {
3583           // A failure here can cause an atomicity violation that we currently
3584           // cannot recover from since it is likely a failed HDFS operation.
3585 
3586           // TODO Need a better story for reverting partial failures due to HDFS.
3587           LOG.error("There was a partial failure due to IO when attempting to" +
3588               " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe);
3589           if(bulkLoadListener != null) {
3590             try {
3591               bulkLoadListener.failedBulkLoad(familyName, path);
3592             } catch (Exception ex) {
3593               LOG.error("Error while calling failedBulkLoad for family "+
3594                   Bytes.toString(familyName)+" with path "+path, ex);
3595             }
3596           }
3597           throw ioe;
3598         }
3599       }
3600       return true;
3601     } finally {
3602       closeBulkRegionOperation();
3603     }
3604   }
3605 
3606   @Override
3607   public boolean equals(Object o) {
3608     return o instanceof HRegion && Bytes.equals(this.getRegionName(),
3609                                                 ((HRegion) o).getRegionName());
3610   }
3611 
3612   @Override
3613   public int hashCode() {
3614     return Bytes.hashCode(this.getRegionName());
3615   }
3616 
3617   @Override
3618   public String toString() {
3619     return this.getRegionNameAsString();
3620   }
3621 
3622   /**
3623    * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
3624    */
3625   class RegionScannerImpl implements RegionScanner {
3626     // Package local for testability
3627     KeyValueHeap storeHeap = null;
3628     /** Heap of key-values that are not essential for the provided filters and are thus read
3629      * on demand, if on-demand column family loading is enabled.*/
3630     KeyValueHeap joinedHeap = null;
3631     /**
3632      * If the joined heap data gathering is interrupted due to scan limits, this will
3633      * contain the row for which we are populating the values.*/
3634     protected KeyValue joinedContinuationRow = null;
3635     // KeyValue indicating that limit is reached when scanning
3636     private final KeyValue KV_LIMIT = new KeyValue();
3637     protected final byte[] stopRow;
3638     private final Filter filter;
3639     private int batch;
3640     protected int isScan;
3641     private boolean filterClosed = false;
3642     private long readPt;
3643     private long maxResultSize;
3644     protected HRegion region;
3645 
3646     @Override
3647     public HRegionInfo getRegionInfo() {
3648       return region.getRegionInfo();
3649     }
3650 
3651     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
3652         throws IOException {
3653 
3654       this.region = region;
3655       this.maxResultSize = scan.getMaxResultSize();
3656       if (scan.hasFilter()) {
3657         this.filter = new FilterWrapper(scan.getFilter());
3658       } else {
3659         this.filter = null;
3660       }
3661 
3662       this.batch = scan.getBatch();
3663       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
3664         this.stopRow = null;
3665       } else {
3666         this.stopRow = scan.getStopRow();
3667       }
3668       // If we are doing a get, we want to be [startRow,endRow] normally
3669       // it is [startRow,endRow) and if startRow=endRow we get nothing.
3670       this.isScan = scan.isGetScan() ? -1 : 0;
3671 
3672       // synchronize on scannerReadPoints so that nobody calculates
3673       // getSmallestReadPoint, before scannerReadPoints is updated.
3674       IsolationLevel isolationLevel = scan.getIsolationLevel();
3675       synchronized(scannerReadPoints) {
3676         this.readPt = getReadpoint(isolationLevel);
3677         scannerReadPoints.put(this, this.readPt);
3678       }
3679 
3680       // Here we separate all scanners into two lists - scanner that provide data required
3681       // by the filter to operate (scanners list) and all others (joinedScanners list).
3682       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
3683       List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
3684       if (additionalScanners != null) {
3685         scanners.addAll(additionalScanners);
3686       }
3687 
3688       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
3689           scan.getFamilyMap().entrySet()) {
3690         Store store = stores.get(entry.getKey());
3691         KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
3692         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
3693           || this.filter.isFamilyEssential(entry.getKey())) {
3694           scanners.add(scanner);
3695         } else {
3696           joinedScanners.add(scanner);
3697         }
3698       }
3699       initializeKVHeap(scanners, joinedScanners, region);
3700     }
3701 
3702     RegionScannerImpl(Scan scan, HRegion region) throws IOException {
3703       this(scan, null, region);
3704     }
3705 
3706     protected void initializeKVHeap(List<KeyValueScanner> scanners,
3707         List<KeyValueScanner> joinedScanners, HRegion region)
3708         throws IOException {
3709       this.storeHeap = new KeyValueHeap(scanners, region.comparator);
3710       if (!joinedScanners.isEmpty()) {
3711         this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
3712       }
3713     }
3714 
3715     @Override
3716     public long getMaxResultSize() {
3717       return maxResultSize;
3718     }
3719 
3720     @Override
3721     public long getMvccReadPoint() {
3722       return this.readPt;
3723     }
3724 
3725     /**
3726      * Reset both the filter and the old filter.
3727      *
3728      * @throws IOException in case a filter raises an I/O exception.
3729      */
3730     protected void resetFilters() throws IOException {
3731       if (filter != null) {
3732         filter.reset();
3733       }
3734     }
3735 
3736     @Override
3737     public boolean next(List<Cell> outResults)
3738         throws IOException {
3739       // apply the batching limit by default
3740       return next(outResults, batch);
3741     }
3742 
3743     @Override
3744     public synchronized boolean next(List<Cell> outResults, int limit) throws IOException {
3745       if (this.filterClosed) {
3746         throw new UnknownScannerException("Scanner was closed (timed out?) " +
3747             "after we renewed it. Could be caused by a very slow scanner " +
3748             "or a lengthy garbage collection");
3749       }
3750       startRegionOperation(Operation.SCAN);
3751       readRequestsCount.increment();
3752       try {
3753         return nextRaw(outResults, limit);
3754       } finally {
3755         closeRegionOperation(Operation.SCAN);
3756       }
3757     }
3758 
3759     @Override
3760     public boolean nextRaw(List<Cell> outResults)
3761         throws IOException {
3762       return nextRaw(outResults, batch);
3763     }
3764 
3765     @Override
3766     public boolean nextRaw(List<Cell> outResults, int limit) throws IOException {
3767       boolean returnResult;
3768       if (outResults.isEmpty()) {
3769         // Usually outResults is empty. This is true when next is called
3770         // to handle scan or get operation.
3771         returnResult = nextInternal(outResults, limit);
3772       } else {
3773         List<Cell> tmpList = new ArrayList<Cell>();
3774         returnResult = nextInternal(tmpList, limit);
3775         outResults.addAll(tmpList);
3776       }
3777       resetFilters();
3778       if (isFilterDoneInternal()) {
3779         return false;
3780       }
3781       if (region != null && region.metricsRegion != null) {
3782         long totalSize = 0;
3783         for(Cell c:outResults) {
3784           // TODO clean up
3785           KeyValue kv = KeyValueUtil.ensureKeyValue(c);
3786           totalSize += kv.getLength();
3787         }
3788         region.metricsRegion.updateScanNext(totalSize);
3789       }
3790       return returnResult;
3791     }
3792 
3793 
3794     private void populateFromJoinedHeap(List<Cell> results, int limit)
3795         throws IOException {
3796       assert joinedContinuationRow != null;
3797       KeyValue kv = populateResult(results, this.joinedHeap, limit,
3798           joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
3799           joinedContinuationRow.getRowLength());
3800       if (kv != KV_LIMIT) {
3801         // We are done with this row, reset the continuation.
3802         joinedContinuationRow = null;
3803       }
3804       // As the data is obtained from two independent heaps, we need to
3805       // ensure that result list is sorted, because Result relies on that.
3806       Collections.sort(results, comparator);
3807     }
3808 
3809     /**
3810      * Fetches records with currentRow into results list, until next row or limit (if not -1).
3811      * @param results
3812      * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
3813      * @param limit Max amount of KVs to place in result list, -1 means no limit.
3814      * @param currentRow Byte array with key we are fetching.
3815      * @param offset offset for currentRow
3816      * @param length length for currentRow
3817      * @return KV_LIMIT if limit reached, next KeyValue otherwise.
3818      */
3819     private KeyValue populateResult(List<Cell> results, KeyValueHeap heap, int limit,
3820         byte[] currentRow, int offset, short length) throws IOException {
3821       KeyValue nextKv;
3822       do {
3823         heap.next(results, limit - results.size());
3824         if (limit > 0 && results.size() == limit) {
3825           return KV_LIMIT;
3826         }
3827         nextKv = heap.peek();
3828       } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
3829 
3830       return nextKv;
3831     }
3832 
3833     /*
3834      * @return True if a filter rules the scanner is over, done.
3835      */
3836     @Override
3837     public synchronized boolean isFilterDone() throws IOException {
3838       return isFilterDoneInternal();
3839     }
3840 
3841     private boolean isFilterDoneInternal() throws IOException {
3842       return this.filter != null && this.filter.filterAllRemaining();
3843     }
3844 
3845     private boolean nextInternal(List<Cell> results, int limit)
3846     throws IOException {
3847       if (!results.isEmpty()) {
3848         throw new IllegalArgumentException("First parameter should be an empty list");
3849       }
3850       RpcCallContext rpcCall = RpcServer.getCurrentCall();
3851       // The loop here is used only when at some point during the next we determine
3852       // that due to effects of filters or otherwise, we have an empty row in the result.
3853       // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
3854       // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
3855       // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
3856       while (true) {
3857         if (rpcCall != null) {
3858           // If a user specifies a too-restrictive or too-slow scanner, the
3859           // client might time out and disconnect while the server side
3860           // is still processing the request. We should abort aggressively
3861           // in that case.
3862           long afterTime = rpcCall.disconnectSince();
3863           if (afterTime >= 0) {
3864             throw new CallerDisconnectedException(
3865                 "Aborting on region " + getRegionNameAsString() + ", call " +
3866                     this + " after " + afterTime + " ms, since " +
3867                     "caller disconnected");
3868           }
3869         }
3870 
3871         // Let's see what we have in the storeHeap.
3872         KeyValue current = this.storeHeap.peek();
3873 
3874         byte[] currentRow = null;
3875         int offset = 0;
3876         short length = 0;
3877         if (current != null) {
3878           currentRow = current.getRowArray();
3879           offset = current.getRowOffset();
3880           length = current.getRowLength();
3881         }
3882         boolean stopRow = isStopRow(currentRow, offset, length);
3883         // Check if we were getting data from the joinedHeap and hit the limit.
3884         // If not, then it's main path - getting results from storeHeap.
3885         if (joinedContinuationRow == null) {
3886           // First, check if we are at a stop row. If so, there are no more results.
3887           if (stopRow) {
3888             if (filter != null && filter.hasFilterRow()) {
3889               filter.filterRowCells(results);
3890             }
3891             return false;
3892           }
3893 
3894           // Check if rowkey filter wants to exclude this row. If so, loop to next.
3895           // Technically, if we hit limits before on this row, we don't need this call.
3896           if (filterRowKey(currentRow, offset, length)) {
3897             boolean moreRows = nextRow(currentRow, offset, length);
3898             if (!moreRows) return false;
3899             results.clear();
3900             continue;
3901           }
3902 
3903           KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
3904               length);
3905           // Ok, we are good, let's try to get some results from the main heap.
3906           if (nextKv == KV_LIMIT) {
3907             if (this.filter != null && filter.hasFilterRow()) {
3908               throw new IncompatibleFilterException(
3909                 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
3910             }
3911             return true; // We hit the limit.
3912           }
3913 
3914           stopRow = nextKv == null ||
3915               isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
3916           // save that the row was empty before filters applied to it.
3917           final boolean isEmptyRow = results.isEmpty();
3918 
3919           // We have the part of the row necessary for filtering (all of it, usually).
3920           // First filter with the filterRow(List).
3921           if (filter != null && filter.hasFilterRow()) {
3922             filter.filterRowCells(results);
3923           }
3924           
3925           if (isEmptyRow || filterRow()) {
3926             results.clear();
3927             boolean moreRows = nextRow(currentRow, offset, length);
3928             if (!moreRows) return false;
3929 
3930             // This row was totally filtered out, if this is NOT the last row,
3931             // we should continue on. Otherwise, nothing else to do.
3932             if (!stopRow) continue;
3933             return false;
3934           }
3935 
3936           // Ok, we are done with storeHeap for this row.
3937           // Now we may need to fetch additional, non-essential data into row.
3938           // These values are not needed for filter to work, so we postpone their
3939           // fetch to (possibly) reduce amount of data loads from disk.
3940           if (this.joinedHeap != null) {
3941             KeyValue nextJoinedKv = joinedHeap.peek();
3942             // If joinedHeap is pointing to some other row, try to seek to a correct one.
3943             boolean mayHaveData =
3944               (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
3945               || (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length),
3946                 true, true)
3947                 && joinedHeap.peek() != null
3948                 && joinedHeap.peek().matchingRow(currentRow, offset, length));
3949             if (mayHaveData) {
3950               joinedContinuationRow = current;
3951               populateFromJoinedHeap(results, limit);
3952             }
3953           }
3954         } else {
3955           // Populating from the joined heap was stopped by limits, populate some more.
3956           populateFromJoinedHeap(results, limit);
3957         }
3958 
3959         // We may have just called populateFromJoinedMap and hit the limits. If that is
3960         // the case, we need to call it again on the next next() invocation.
3961         if (joinedContinuationRow != null) {
3962           return true;
3963         }
3964 
3965         // Finally, we are done with both joinedHeap and storeHeap.
3966         // Double check to prevent empty rows from appearing in result. It could be
3967         // the case when SingleColumnValueExcludeFilter is used.
3968         if (results.isEmpty()) {
3969           boolean moreRows = nextRow(currentRow, offset, length);
3970           if (!moreRows) return false;
3971           if (!stopRow) continue;
3972         }
3973 
3974         // We are done. Return the result.
3975         return !stopRow;
3976       }
3977     }
3978 
3979     /**
3980      * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines
3981      * both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may
3982      * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns
3983      * true when filterRow(List<KeyValue> kvs) is overridden not the filterRow(). Therefore, the
3984      * filterRow() will be skipped.
3985      */
3986     private boolean filterRow() throws IOException {
3987       // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
3988       // filterRowCells(List<Cell> kvs) so we skip that scenario here.
3989       return filter != null && (!filter.hasFilterRow())
3990           && filter.filterRow();
3991     }
3992     
3993     private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
3994       return filter != null
3995           && filter.filterRowKey(row, offset, length);
3996     }
3997 
3998     protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
3999       assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
4000       KeyValue next;
4001       while ((next = this.storeHeap.peek()) != null &&
4002              next.matchingRow(currentRow, offset, length)) {
4003         this.storeHeap.next(MOCKED_LIST);
4004       }
4005       resetFilters();
4006       // Calling the hook in CP which allows it to do a fast forward
4007       return this.region.getCoprocessorHost() == null
4008           || this.region.getCoprocessorHost()
4009               .postScannerFilterRow(this, currentRow, offset, length);
4010     }
4011 
4012     protected boolean isStopRow(byte[] currentRow, int offset, short length) {
4013       return currentRow == null ||
4014           (stopRow != null &&
4015           comparator.compareRows(stopRow, 0, stopRow.length,
4016             currentRow, offset, length) <= isScan);
4017     }
4018 
4019     @Override
4020     public synchronized void close() {
4021       if (storeHeap != null) {
4022         storeHeap.close();
4023         storeHeap = null;
4024       }
4025       if (joinedHeap != null) {
4026         joinedHeap.close();
4027         joinedHeap = null;
4028       }
4029       // no need to sychronize here.
4030       scannerReadPoints.remove(this);
4031       this.filterClosed = true;
4032     }
4033 
4034     KeyValueHeap getStoreHeapForTesting() {
4035       return storeHeap;
4036     }
4037 
4038     @Override
4039     public synchronized boolean reseek(byte[] row) throws IOException {
4040       if (row == null) {
4041         throw new IllegalArgumentException("Row cannot be null.");
4042       }
4043       boolean result = false;
4044       startRegionOperation();
4045       try {
4046         KeyValue kv = KeyValue.createFirstOnRow(row);
4047         // use request seek to make use of the lazy seek option. See HBASE-5520
4048         result = this.storeHeap.requestSeek(kv, true, true);
4049         if (this.joinedHeap != null) {
4050           result = this.joinedHeap.requestSeek(kv, true, true) || result;
4051         }
4052       } finally {
4053         closeRegionOperation();
4054       }
4055       return result;
4056     }
4057   }
4058 
4059   // Utility methods
4060   /**
4061    * A utility method to create new instances of HRegion based on the
4062    * {@link HConstants#REGION_IMPL} configuration property.
4063    * @param tableDir qualified path of directory where region should be located,
4064    * usually the table directory.
4065    * @param log The HLog is the outbound log for any updates to the HRegion
4066    * (There's a single HLog for all the HRegions on a single HRegionServer.)
4067    * The log file is a logfile from the previous execution that's
4068    * custom-computed for this HRegion. The HRegionServer computes and sorts the
4069    * appropriate log info for this HRegion. If there is a previous log file
4070    * (implying that the HRegion has been written-to before), then read it from
4071    * the supplied path.
4072    * @param fs is the filesystem.
4073    * @param conf is global configuration settings.
4074    * @param regionInfo - HRegionInfo that describes the region
4075    * is new), then read them from the supplied path.
4076    * @param htd the table descriptor
4077    * @param rsServices
4078    * @return the new instance
4079    */
4080   static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
4081       Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
4082       RegionServerServices rsServices) {
4083     try {
4084       @SuppressWarnings("unchecked")
4085       Class<? extends HRegion> regionClass =
4086           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
4087 
4088       Constructor<? extends HRegion> c =
4089           regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
4090               Configuration.class, HRegionInfo.class, HTableDescriptor.class,
4091               RegionServerServices.class);
4092 
4093       return c.newInstance(tableDir, log, fs, conf, regionInfo, htd, rsServices);
4094     } catch (Throwable e) {
4095       // todo: what should I throw here?
4096       throw new IllegalStateException("Could not instantiate a region instance.", e);
4097     }
4098   }
4099 
4100   /**
4101    * Convenience method creating new HRegions. Used by createTable and by the
4102    * bootstrap code in the HMaster constructor.
4103    * Note, this method creates an {@link HLog} for the created region. It
4104    * needs to be closed explicitly.  Use {@link HRegion#getLog()} to get
4105    * access.  <b>When done with a region created using this method, you will
4106    * need to explicitly close the {@link HLog} it created too; it will not be
4107    * done for you.  Not closing the log will leave at least a daemon thread
4108    * running.</b>  Call {@link #closeHRegion(HRegion)} and it will do
4109    * necessary cleanup for you.
4110    * @param info Info for region to create.
4111    * @param rootDir Root directory for HBase instance
4112    * @param conf
4113    * @param hTableDescriptor
4114    * @return new HRegion
4115    *
4116    * @throws IOException
4117    */
4118   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4119       final Configuration conf, final HTableDescriptor hTableDescriptor)
4120   throws IOException {
4121     return createHRegion(info, rootDir, conf, hTableDescriptor, null);
4122   }
4123 
4124   /**
4125    * This will do the necessary cleanup a call to
4126    * {@link #createHRegion(HRegionInfo, Path, Configuration, HTableDescriptor)}
4127    * requires.  This method will close the region and then close its
4128    * associated {@link HLog} file.  You use it if you call the other createHRegion,
4129    * the one that takes an {@link HLog} instance but don't be surprised by the
4130    * call to the {@link HLog#closeAndDelete()} on the {@link HLog} the
4131    * HRegion was carrying.
4132    * @param r
4133    * @throws IOException
4134    */
4135   public static void closeHRegion(final HRegion r) throws IOException {
4136     if (r == null) return;
4137     r.close();
4138     if (r.getLog() == null) return;
4139     r.getLog().closeAndDelete();
4140   }
4141 
4142   /**
4143    * Convenience method creating new HRegions. Used by createTable.
4144    * The {@link HLog} for the created region needs to be closed explicitly.
4145    * Use {@link HRegion#getLog()} to get access.
4146    *
4147    * @param info Info for region to create.
4148    * @param rootDir Root directory for HBase instance
4149    * @param conf
4150    * @param hTableDescriptor
4151    * @param hlog shared HLog
4152    * @param initialize - true to initialize the region
4153    * @return new HRegion
4154    *
4155    * @throws IOException
4156    */
4157   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4158                                       final Configuration conf,
4159                                       final HTableDescriptor hTableDescriptor,
4160                                       final HLog hlog,
4161                                       final boolean initialize)
4162       throws IOException {
4163     return createHRegion(info, rootDir, conf, hTableDescriptor,
4164         hlog, initialize, false);
4165   }
4166 
4167   /**
4168    * Convenience method creating new HRegions. Used by createTable.
4169    * The {@link HLog} for the created region needs to be closed
4170    * explicitly, if it is not null.
4171    * Use {@link HRegion#getLog()} to get access.
4172    *
4173    * @param info Info for region to create.
4174    * @param rootDir Root directory for HBase instance
4175    * @param conf
4176    * @param hTableDescriptor
4177    * @param hlog shared HLog
4178    * @param initialize - true to initialize the region
4179    * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
4180    * @return new HRegion
4181    * @throws IOException
4182    */
4183   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4184                                       final Configuration conf,
4185                                       final HTableDescriptor hTableDescriptor,
4186                                       final HLog hlog,
4187                                       final boolean initialize, final boolean ignoreHLog)
4188       throws IOException {
4189       Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4190       return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
4191   }
4192 
4193   /**
4194    * Convenience method creating new HRegions. Used by createTable.
4195    * The {@link HLog} for the created region needs to be closed
4196    * explicitly, if it is not null.
4197    * Use {@link HRegion#getLog()} to get access.
4198    *
4199    * @param info Info for region to create.
4200    * @param rootDir Root directory for HBase instance
4201    * @param tableDir table directory
4202    * @param conf
4203    * @param hTableDescriptor
4204    * @param hlog shared HLog
4205    * @param initialize - true to initialize the region
4206    * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
4207    * @return new HRegion
4208    * @throws IOException
4209    */
4210   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
4211                                       final Configuration conf,
4212                                       final HTableDescriptor hTableDescriptor,
4213                                       final HLog hlog,
4214                                       final boolean initialize, final boolean ignoreHLog)
4215       throws IOException {
4216     LOG.info("creating HRegion " + info.getTable().getNameAsString()
4217         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
4218         " Table name == " + info.getTable().getNameAsString());
4219     FileSystem fs = FileSystem.get(conf);
4220     HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
4221     HLog effectiveHLog = hlog;
4222     if (hlog == null && !ignoreHLog) {
4223       effectiveHLog = HLogFactory.createHLog(fs, rfs.getRegionDir(),
4224                                              HConstants.HREGION_LOGDIR_NAME, conf);
4225     }
4226     HRegion region = HRegion.newHRegion(tableDir,
4227         effectiveHLog, fs, conf, info, hTableDescriptor, null);
4228     if (initialize) {
4229       // If initializing, set the sequenceId. It is also required by HLogPerformanceEvaluation when
4230       // verifying the WALEdits.
4231       region.setSequenceId(region.initialize());
4232     }
4233     return region;
4234   }
4235 
4236   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4237                                       final Configuration conf,
4238                                       final HTableDescriptor hTableDescriptor,
4239                                       final HLog hlog)
4240     throws IOException {
4241     return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
4242   }
4243 
4244 
4245   /**
4246    * Open a Region.
4247    * @param info Info for region to be opened.
4248    * @param wal HLog for region to use. This method will call
4249    * HLog#setSequenceNumber(long) passing the result of the call to
4250    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4251    * up.  HRegionStore does this every time it opens a new region.
4252    * @param conf
4253    * @return new HRegion
4254    *
4255    * @throws IOException
4256    */
4257   public static HRegion openHRegion(final HRegionInfo info,
4258       final HTableDescriptor htd, final HLog wal,
4259       final Configuration conf)
4260   throws IOException {
4261     return openHRegion(info, htd, wal, conf, null, null);
4262   }
4263 
4264   /**
4265    * Open a Region.
4266    * @param info Info for region to be opened
4267    * @param htd the table descriptor
4268    * @param wal HLog for region to use. This method will call
4269    * HLog#setSequenceNumber(long) passing the result of the call to
4270    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4271    * up.  HRegionStore does this every time it opens a new region.
4272    * @param conf The Configuration object to use.
4273    * @param rsServices An interface we can request flushes against.
4274    * @param reporter An interface we can report progress against.
4275    * @return new HRegion
4276    *
4277    * @throws IOException
4278    */
4279   public static HRegion openHRegion(final HRegionInfo info,
4280     final HTableDescriptor htd, final HLog wal, final Configuration conf,
4281     final RegionServerServices rsServices,
4282     final CancelableProgressable reporter)
4283   throws IOException {
4284     return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
4285   }
4286 
4287   /**
4288    * Open a Region.
4289    * @param rootDir Root directory for HBase instance
4290    * @param info Info for region to be opened.
4291    * @param htd the table descriptor
4292    * @param wal HLog for region to use. This method will call
4293    * HLog#setSequenceNumber(long) passing the result of the call to
4294    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4295    * up.  HRegionStore does this every time it opens a new region.
4296    * @param conf The Configuration object to use.
4297    * @return new HRegion
4298    * @throws IOException
4299    */
4300   public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
4301       final HTableDescriptor htd, final HLog wal, final Configuration conf)
4302   throws IOException {
4303     return openHRegion(rootDir, info, htd, wal, conf, null, null);
4304   }
4305 
4306   /**
4307    * Open a Region.
4308    * @param rootDir Root directory for HBase instance
4309    * @param info Info for region to be opened.
4310    * @param htd the table descriptor
4311    * @param wal HLog for region to use. This method will call
4312    * HLog#setSequenceNumber(long) passing the result of the call to
4313    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4314    * up.  HRegionStore does this every time it opens a new region.
4315    * @param conf The Configuration object to use.
4316    * @param rsServices An interface we can request flushes against.
4317    * @param reporter An interface we can report progress against.
4318    * @return new HRegion
4319    * @throws IOException
4320    */
4321   public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
4322       final HTableDescriptor htd, final HLog wal, final Configuration conf,
4323       final RegionServerServices rsServices,
4324       final CancelableProgressable reporter)
4325   throws IOException {
4326     FileSystem fs = null;
4327     if (rsServices != null) {
4328       fs = rsServices.getFileSystem();
4329     }
4330     if (fs == null) {
4331       fs = FileSystem.get(conf);
4332     }
4333     return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
4334   }
4335 
4336   /**
4337    * Open a Region.
4338    * @param conf The Configuration object to use.
4339    * @param fs Filesystem to use
4340    * @param rootDir Root directory for HBase instance
4341    * @param info Info for region to be opened.
4342    * @param htd the table descriptor
4343    * @param wal HLog for region to use. This method will call
4344    * HLog#setSequenceNumber(long) passing the result of the call to
4345    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4346    * up.  HRegionStore does this every time it opens a new region.
4347    * @return new HRegion
4348    * @throws IOException
4349    */
4350   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4351       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal)
4352       throws IOException {
4353     return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
4354   }
4355 
4356   /**
4357    * Open a Region.
4358    * @param conf The Configuration object to use.
4359    * @param fs Filesystem to use
4360    * @param rootDir Root directory for HBase instance
4361    * @param info Info for region to be opened.
4362    * @param htd the table descriptor
4363    * @param wal HLog for region to use. This method will call
4364    * HLog#setSequenceNumber(long) passing the result of the call to
4365    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4366    * up.  HRegionStore does this every time it opens a new region.
4367    * @param rsServices An interface we can request flushes against.
4368    * @param reporter An interface we can report progress against.
4369    * @return new HRegion
4370    * @throws IOException
4371    */
4372   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4373       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4374       final RegionServerServices rsServices, final CancelableProgressable reporter)
4375       throws IOException {
4376     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4377     return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
4378   }
4379 
4380   /**
4381    * Open a Region.
4382    * @param conf The Configuration object to use.
4383    * @param fs Filesystem to use
4384    * @param rootDir Root directory for HBase instance
4385    * @param info Info for region to be opened.
4386    * @param htd the table descriptor
4387    * @param wal HLog for region to use. This method will call
4388    * HLog#setSequenceNumber(long) passing the result of the call to
4389    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4390    * up.  HRegionStore does this every time it opens a new region.
4391    * @param rsServices An interface we can request flushes against.
4392    * @param reporter An interface we can report progress against.
4393    * @return new HRegion
4394    * @throws IOException
4395    */
4396   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4397       final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4398       final RegionServerServices rsServices, final CancelableProgressable reporter)
4399       throws IOException {
4400     if (info == null) throw new NullPointerException("Passed region info is null");
4401     if (LOG.isDebugEnabled()) {
4402       LOG.debug("Opening region: " + info);
4403     }
4404     HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
4405     return r.openHRegion(reporter);
4406   }
4407 
4408 
4409   /**
4410    * Useful when reopening a closed region (normally for unit tests)
4411    * @param other original object
4412    * @param reporter An interface we can report progress against.
4413    * @return new HRegion
4414    * @throws IOException
4415    */
4416   public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
4417       throws IOException {
4418     HRegionFileSystem regionFs = other.getRegionFileSystem();
4419     HRegion r = newHRegion(regionFs.getTableDir(), other.getLog(), regionFs.getFileSystem(),
4420         other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
4421     return r.openHRegion(reporter);
4422   }
4423 
4424   /**
4425    * Open HRegion.
4426    * Calls initialize and sets sequenceid.
4427    * @param reporter
4428    * @return Returns <code>this</code>
4429    * @throws IOException
4430    */
4431   protected HRegion openHRegion(final CancelableProgressable reporter)
4432   throws IOException {
4433     checkCompressionCodecs();
4434 
4435     this.openSeqNum = initialize(reporter);
4436     this.setSequenceId(openSeqNum);
4437     return this;
4438   }
4439 
4440   private void checkCompressionCodecs() throws IOException {
4441     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4442       CompressionTest.testCompression(fam.getCompression());
4443       CompressionTest.testCompression(fam.getCompactionCompression());
4444     }
4445   }
4446 
4447   /**
4448    * Create a daughter region from given a temp directory with the region data.
4449    * @param hri Spec. for daughter region to open.
4450    * @throws IOException
4451    */
4452   HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
4453     // Move the files from the temporary .splits to the final /table/region directory
4454     fs.commitDaughterRegion(hri);
4455 
4456     // Create the daughter HRegion instance
4457     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
4458         this.getBaseConf(), hri, this.getTableDesc(), rsServices);
4459     r.readRequestsCount.set(this.getReadRequestsCount() / 2);
4460     r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
4461     return r;
4462   }
4463 
4464   /**
4465    * Create a merged region given a temp directory with the region data.
4466    * @param mergedRegionInfo
4467    * @param region_b another merging region
4468    * @return merged hregion
4469    * @throws IOException
4470    */
4471   HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
4472       final HRegion region_b) throws IOException {
4473     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(),
4474         fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
4475         this.getTableDesc(), this.rsServices);
4476     r.readRequestsCount.set(this.getReadRequestsCount()
4477         + region_b.getReadRequestsCount());
4478     r.writeRequestsCount.set(this.getWriteRequestsCount()
4479         + region_b.getWriteRequestsCount());
4480     this.fs.commitMergedRegion(mergedRegionInfo);
4481     return r;
4482   }
4483 
4484   /**
4485    * Inserts a new region's meta information into the passed
4486    * <code>meta</code> region. Used by the HMaster bootstrap code adding
4487    * new table to hbase:meta table.
4488    *
4489    * @param meta hbase:meta HRegion to be updated
4490    * @param r HRegion to add to <code>meta</code>
4491    *
4492    * @throws IOException
4493    */
4494   // TODO remove since only test and merge use this
4495   public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
4496     meta.checkResources();
4497     // The row key is the region name
4498     byte[] row = r.getRegionName();
4499     final long now = EnvironmentEdgeManager.currentTimeMillis();
4500     final List<Cell> cells = new ArrayList<Cell>(2);
4501     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4502       HConstants.REGIONINFO_QUALIFIER, now,
4503       r.getRegionInfo().toByteArray()));
4504     // Set into the root table the version of the meta table.
4505     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4506       HConstants.META_VERSION_QUALIFIER, now,
4507       Bytes.toBytes(HConstants.META_VERSION)));
4508     meta.put(row, HConstants.CATALOG_FAMILY, cells);
4509   }
4510 
4511   /**
4512    * Computes the Path of the HRegion
4513    *
4514    * @param tabledir qualified path for table
4515    * @param name ENCODED region name
4516    * @return Path of HRegion directory
4517    */
4518   @Deprecated
4519   public static Path getRegionDir(final Path tabledir, final String name) {
4520     return new Path(tabledir, name);
4521   }
4522 
4523   /**
4524    * Computes the Path of the HRegion
4525    *
4526    * @param rootdir qualified path of HBase root directory
4527    * @param info HRegionInfo for the region
4528    * @return qualified path of region directory
4529    */
4530   @Deprecated
4531   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
4532     return new Path(
4533       FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
4534   }
4535 
4536   /**
4537    * Determines if the specified row is within the row range specified by the
4538    * specified HRegionInfo
4539    *
4540    * @param info HRegionInfo that specifies the row range
4541    * @param row row to be checked
4542    * @return true if the row is within the range specified by the HRegionInfo
4543    */
4544   public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
4545     return ((info.getStartKey().length == 0) ||
4546         (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
4547         ((info.getEndKey().length == 0) ||
4548             (Bytes.compareTo(info.getEndKey(), row) > 0));
4549   }
4550 
4551   /**
4552    * Merge two HRegions.  The regions must be adjacent and must not overlap.
4553    *
4554    * @param srcA
4555    * @param srcB
4556    * @return new merged HRegion
4557    * @throws IOException
4558    */
4559   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
4560   throws IOException {
4561     HRegion a = srcA;
4562     HRegion b = srcB;
4563 
4564     // Make sure that srcA comes first; important for key-ordering during
4565     // write of the merged file.
4566     if (srcA.getStartKey() == null) {
4567       if (srcB.getStartKey() == null) {
4568         throw new IOException("Cannot merge two regions with null start key");
4569       }
4570       // A's start key is null but B's isn't. Assume A comes before B
4571     } else if ((srcB.getStartKey() == null) ||
4572       (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
4573       a = srcB;
4574       b = srcA;
4575     }
4576 
4577     if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
4578       throw new IOException("Cannot merge non-adjacent regions");
4579     }
4580     return merge(a, b);
4581   }
4582 
4583   /**
4584    * Merge two regions whether they are adjacent or not.
4585    *
4586    * @param a region a
4587    * @param b region b
4588    * @return new merged region
4589    * @throws IOException
4590    */
4591   public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
4592     if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
4593       throw new IOException("Regions do not belong to the same table");
4594     }
4595 
4596     FileSystem fs = a.getRegionFileSystem().getFileSystem();
4597     // Make sure each region's cache is empty
4598     a.flushcache();
4599     b.flushcache();
4600 
4601     // Compact each region so we only have one store file per family
4602     a.compactStores(true);
4603     if (LOG.isDebugEnabled()) {
4604       LOG.debug("Files for region: " + a);
4605       a.getRegionFileSystem().logFileSystemState(LOG);
4606     }
4607     b.compactStores(true);
4608     if (LOG.isDebugEnabled()) {
4609       LOG.debug("Files for region: " + b);
4610       b.getRegionFileSystem().logFileSystemState(LOG);
4611     }
4612 
4613     RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
4614     if (!rmt.prepare(null)) {
4615       throw new IOException("Unable to merge regions " + a + " and " + b);
4616     }
4617     HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
4618     LOG.info("starting merge of regions: " + a + " and " + b
4619         + " into new region " + mergedRegionInfo.getRegionNameAsString()
4620         + " with start key <"
4621         + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
4622         + "> and end key <"
4623         + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
4624     HRegion dstRegion;
4625     try {
4626       dstRegion = rmt.execute(null, null);
4627     } catch (IOException ioe) {
4628       rmt.rollback(null, null);
4629       throw new IOException("Failed merging region " + a + " and " + b
4630           + ", and succssfully rolled back");
4631     }
4632     dstRegion.compactStores(true);
4633 
4634     if (LOG.isDebugEnabled()) {
4635       LOG.debug("Files for new region");
4636       dstRegion.getRegionFileSystem().logFileSystemState(LOG);
4637     }
4638 
4639     if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
4640       throw new IOException("Merged region " + dstRegion
4641           + " still has references after the compaction, is compaction canceled?");
4642     }
4643 
4644     // Archiving the 'A' region
4645     HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
4646     // Archiving the 'B' region
4647     HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
4648 
4649     LOG.info("merge completed. New region is " + dstRegion);
4650     return dstRegion;
4651   }
4652 
4653   //
4654   // HBASE-880
4655   //
4656   /**
4657    * @param get get object
4658    * @return result
4659    * @throws IOException read exceptions
4660    */
4661   public Result get(final Get get) throws IOException {
4662     checkRow(get.getRow(), "Get");
4663     // Verify families are all valid
4664     if (get.hasFamilies()) {
4665       for (byte [] family: get.familySet()) {
4666         checkFamily(family);
4667       }
4668     } else { // Adding all families to scanner
4669       for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
4670         get.addFamily(family);
4671       }
4672     }
4673     List<Cell> results = get(get, true);
4674     return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null);
4675   }
4676 
4677   /*
4678    * Do a get based on the get parameter.
4679    * @param withCoprocessor invoke coprocessor or not. We don't want to
4680    * always invoke cp for this private method.
4681    */
4682   private List<Cell> get(Get get, boolean withCoprocessor)
4683   throws IOException {
4684 
4685     List<Cell> results = new ArrayList<Cell>();
4686 
4687     // pre-get CP hook
4688     if (withCoprocessor && (coprocessorHost != null)) {
4689        if (coprocessorHost.preGet(get, results)) {
4690          return results;
4691        }
4692     }
4693 
4694     Scan scan = new Scan(get);
4695 
4696     RegionScanner scanner = null;
4697     try {
4698       scanner = getScanner(scan);
4699       scanner.next(results);
4700     } finally {
4701       if (scanner != null)
4702         scanner.close();
4703     }
4704 
4705     // post-get CP hook
4706     if (withCoprocessor && (coprocessorHost != null)) {
4707       coprocessorHost.postGet(get, results);
4708     }
4709 
4710     // do after lock
4711     if (this.metricsRegion != null) {
4712       long totalSize = 0l;
4713       if (results != null) {
4714         for (Cell kv:results) {
4715           totalSize += KeyValueUtil.ensureKeyValue(kv).getLength();
4716         }
4717       }
4718       this.metricsRegion.updateGet(totalSize);
4719     }
4720 
4721     return results;
4722   }
4723 
4724   public void mutateRow(RowMutations rm) throws IOException {
4725     // Don't need nonces here - RowMutations only supports puts and deletes
4726     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
4727   }
4728 
4729   /**
4730    * Perform atomic mutations within the region w/o nonces.
4731    * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)}
4732    */
4733   public void mutateRowsWithLocks(Collection<Mutation> mutations,
4734       Collection<byte[]> rowsToLock) throws IOException {
4735     mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
4736   }
4737 
4738   /**
4739    * Perform atomic mutations within the region.
4740    * @param mutations The list of mutations to perform.
4741    * <code>mutations</code> can contain operations for multiple rows.
4742    * Caller has to ensure that all rows are contained in this region.
4743    * @param rowsToLock Rows to lock
4744    * @param nonceGroup Optional nonce group of the operation (client Id)
4745    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4746    * If multiple rows are locked care should be taken that
4747    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
4748    * @throws IOException
4749    */
4750   public void mutateRowsWithLocks(Collection<Mutation> mutations,
4751       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
4752     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
4753     processRowsWithLocks(proc, -1, nonceGroup, nonce);
4754   }
4755 
4756   /**
4757    * Performs atomic multiple reads and writes on a given row.
4758    *
4759    * @param processor The object defines the reads and writes to a row.
4760    * @param nonceGroup Optional nonce group of the operation (client Id)
4761    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4762    */
4763   public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
4764       throws IOException {
4765     processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
4766   }
4767 
4768   /**
4769    * Performs atomic multiple reads and writes on a given row.
4770    *
4771    * @param processor The object defines the reads and writes to a row.
4772    * @param timeout The timeout of the processor.process() execution
4773    *                Use a negative number to switch off the time bound
4774    * @param nonceGroup Optional nonce group of the operation (client Id)
4775    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4776    */
4777   public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
4778       long nonceGroup, long nonce) throws IOException {
4779 
4780     for (byte[] row : processor.getRowsToLock()) {
4781       checkRow(row, "processRowsWithLocks");
4782     }
4783     if (!processor.readOnly()) {
4784       checkReadOnly();
4785     }
4786     checkResources();
4787 
4788     startRegionOperation();
4789     WALEdit walEdit = new WALEdit();
4790 
4791     // 1. Run pre-process hook
4792     processor.preProcess(this, walEdit);
4793 
4794     // Short circuit the read only case
4795     if (processor.readOnly()) {
4796       try {
4797         long now = EnvironmentEdgeManager.currentTimeMillis();
4798         doProcessRowWithTimeout(
4799             processor, now, this, null, null, timeout);
4800         processor.postProcess(this, walEdit);
4801       } catch (IOException e) {
4802         throw e;
4803       } finally {
4804         closeRegionOperation();
4805       }
4806       return;
4807     }
4808 
4809     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
4810     boolean locked = false;
4811     boolean walSyncSuccessful = false;
4812     List<RowLock> acquiredRowLocks = null;
4813     long addedSize = 0;
4814     List<KeyValue> mutations = new ArrayList<KeyValue>();
4815     Collection<byte[]> rowsToLock = processor.getRowsToLock();
4816     try {
4817       // 2. Acquire the row lock(s)
4818       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
4819       for (byte[] row : rowsToLock) {
4820         // Attempt to lock all involved rows, throw if any lock times out
4821         acquiredRowLocks.add(getRowLock(row));
4822       }
4823       // 3. Region lock
4824       lock(this.updatesLock.readLock(), acquiredRowLocks.size());
4825       locked = true;
4826 
4827       long now = EnvironmentEdgeManager.currentTimeMillis();
4828       try {
4829         // 4. Let the processor scan the rows, generate mutations and add
4830         //    waledits
4831         doProcessRowWithTimeout(
4832             processor, now, this, mutations, walEdit, timeout);
4833 
4834         if (!mutations.isEmpty()) {
4835           // 5. Get a mvcc write number
4836           writeEntry = mvcc.beginMemstoreInsert();
4837           // 6. Apply to memstore
4838           for (KeyValue kv : mutations) {
4839             kv.setMvccVersion(writeEntry.getWriteNumber());
4840             byte[] family = kv.getFamily();
4841             checkFamily(family);
4842             addedSize += stores.get(family).add(kv);
4843           }
4844 
4845           long txid = 0;
4846           // 7. Append no sync
4847           if (!walEdit.isEmpty()) {
4848             txid = this.log.appendNoSync(this.getRegionInfo(),
4849               this.htableDescriptor.getTableName(), walEdit, processor.getClusterIds(), now,
4850               this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce);
4851           }
4852           // 8. Release region lock
4853           if (locked) {
4854             this.updatesLock.readLock().unlock();
4855             locked = false;
4856           }
4857 
4858           // 9. Release row lock(s)
4859           releaseRowLocks(acquiredRowLocks);
4860 
4861           // 10. Sync edit log
4862           if (txid != 0) {
4863             syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
4864           }
4865           walSyncSuccessful = true;
4866         }
4867       } finally {
4868         if (!mutations.isEmpty() && !walSyncSuccessful) {
4869           LOG.warn("Wal sync failed. Roll back " + mutations.size() +
4870               " memstore keyvalues for row(s):" +
4871               processor.getRowsToLock().iterator().next() + "...");
4872           for (KeyValue kv : mutations) {
4873             stores.get(kv.getFamily()).rollback(kv);
4874           }
4875         }
4876         // 11. Roll mvcc forward
4877         if (writeEntry != null) {
4878           mvcc.completeMemstoreInsert(writeEntry);
4879           writeEntry = null;
4880         }
4881         if (locked) {
4882           this.updatesLock.readLock().unlock();
4883           locked = false;
4884         }
4885         // release locks if some were acquired but another timed out
4886         releaseRowLocks(acquiredRowLocks);
4887       }
4888 
4889       // 12. Run post-process hook
4890       processor.postProcess(this, walEdit);
4891 
4892     } catch (IOException e) {
4893       throw e;
4894     } finally {
4895       closeRegionOperation();
4896       if (!mutations.isEmpty() &&
4897           isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
4898         requestFlush();
4899       }
4900     }
4901   }
4902 
4903   private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
4904                                        final long now,
4905                                        final HRegion region,
4906                                        final List<KeyValue> mutations,
4907                                        final WALEdit walEdit,
4908                                        final long timeout) throws IOException {
4909     // Short circuit the no time bound case.
4910     if (timeout < 0) {
4911       try {
4912         processor.process(now, region, mutations, walEdit);
4913       } catch (IOException e) {
4914         LOG.warn("RowProcessor:" + processor.getClass().getName() +
4915             " throws Exception on row(s):" +
4916             Bytes.toStringBinary(
4917               processor.getRowsToLock().iterator().next()) + "...", e);
4918         throw e;
4919       }
4920       return;
4921     }
4922 
4923     // Case with time bound
4924     FutureTask<Void> task =
4925       new FutureTask<Void>(new Callable<Void>() {
4926         @Override
4927         public Void call() throws IOException {
4928           try {
4929             processor.process(now, region, mutations, walEdit);
4930             return null;
4931           } catch (IOException e) {
4932             LOG.warn("RowProcessor:" + processor.getClass().getName() +
4933                 " throws Exception on row(s):" +
4934                 Bytes.toStringBinary(
4935                     processor.getRowsToLock().iterator().next()) + "...", e);
4936             throw e;
4937           }
4938         }
4939       });
4940     rowProcessorExecutor.execute(task);
4941     try {
4942       task.get(timeout, TimeUnit.MILLISECONDS);
4943     } catch (TimeoutException te) {
4944       LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
4945           Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
4946           "...");
4947       throw new IOException(te);
4948     } catch (Exception e) {
4949       throw new IOException(e);
4950     }
4951   }
4952 
4953   public Result append(Append append) throws IOException {
4954     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
4955   }
4956 
4957   // TODO: There's a lot of boiler plate code identical
4958   // to increment... See how to better unify that.
4959   /**
4960    * Perform one or more append operations on a row.
4961    *
4962    * @param append
4963    * @return new keyvalues after increment
4964    * @throws IOException
4965    */
4966   public Result append(Append append, long nonceGroup, long nonce)
4967       throws IOException {
4968     byte[] row = append.getRow();
4969     checkRow(row, "append");
4970     boolean flush = false;
4971     Durability durability = getEffectiveDurability(append.getDurability());
4972     boolean writeToWAL = durability != Durability.SKIP_WAL;
4973     WALEdit walEdits = null;
4974     List<Cell> allKVs = new ArrayList<Cell>(append.size());
4975     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
4976 
4977     long size = 0;
4978     long txid = 0;
4979 
4980     checkReadOnly();
4981     checkResources();
4982     // Lock row
4983     startRegionOperation(Operation.APPEND);
4984     this.writeRequestsCount.increment();
4985     WriteEntry w = null;
4986     RowLock rowLock;
4987     try {
4988       rowLock = getRowLock(row);
4989       try {
4990         lock(this.updatesLock.readLock());
4991         // wait for all prior MVCC transactions to finish - while we hold the row lock
4992         // (so that we are guaranteed to see the latest state)
4993         mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
4994         // now start my own transaction
4995         w = mvcc.beginMemstoreInsert();
4996         try {
4997           long now = EnvironmentEdgeManager.currentTimeMillis();
4998           // Process each family
4999           for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
5000 
5001             Store store = stores.get(family.getKey());
5002             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5003 
5004             // Sort the cells so that they match the order that they
5005             // appear in the Get results. Otherwise, we won't be able to
5006             // find the existing values if the cells are not specified
5007             // in order by the client since cells are in an array list.
5008             Collections.sort(family.getValue(), store.getComparator());
5009             // Get previous values for all columns in this family
5010             Get get = new Get(row);
5011             for (Cell cell : family.getValue()) {
5012               KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5013               get.addColumn(family.getKey(), kv.getQualifier());
5014             }
5015             List<Cell> results = get(get, false);
5016 
5017             // Iterate the input columns and update existing values if they were
5018             // found, otherwise add new column initialized to the append value
5019 
5020             // Avoid as much copying as possible. Every byte is copied at most
5021             // once.
5022             // Would be nice if KeyValue had scatter/gather logic
5023             int idx = 0;
5024             for (Cell cell : family.getValue()) {
5025               KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5026               KeyValue newKV;
5027               KeyValue oldKv = null;
5028               if (idx < results.size()
5029                   && CellUtil.matchingQualifier(results.get(idx),kv)) {
5030                 oldKv = KeyValueUtil.ensureKeyValue(results.get(idx));
5031                 // allocate an empty kv once
5032                 newKV = new KeyValue(row.length, kv.getFamilyLength(),
5033                     kv.getQualifierLength(), now, KeyValue.Type.Put,
5034                     oldKv.getValueLength() + kv.getValueLength(),
5035                     oldKv.getTagsLength() + kv.getTagsLength());
5036                 // copy in the value
5037                 System.arraycopy(oldKv.getValueArray(), oldKv.getValueOffset(),
5038                     newKV.getValueArray(), newKV.getValueOffset(),
5039                     oldKv.getValueLength());
5040                 System.arraycopy(kv.getValueArray(), kv.getValueOffset(),
5041                     newKV.getValueArray(),
5042                     newKV.getValueOffset() + oldKv.getValueLength(),
5043                     kv.getValueLength());
5044                 // copy in the tags
5045                 System.arraycopy(oldKv.getTagsArray(), oldKv.getTagsOffset(), newKV.getTagsArray(),
5046                     newKV.getTagsOffset(), oldKv.getTagsLength());
5047                 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
5048                     newKV.getTagsOffset() + oldKv.getTagsLength(), kv.getTagsLength());
5049                 // copy in row, family, and qualifier
5050                 System.arraycopy(kv.getRowArray(), kv.getRowOffset(),
5051                     newKV.getRowArray(), newKV.getRowOffset(), kv.getRowLength());
5052                 System.arraycopy(kv.getFamilyArray(), kv.getFamilyOffset(),
5053                     newKV.getFamilyArray(), newKV.getFamilyOffset(),
5054                     kv.getFamilyLength());
5055                 System.arraycopy(kv.getQualifierArray(), kv.getQualifierOffset(),
5056                     newKV.getQualifierArray(), newKV.getQualifierOffset(),
5057                     kv.getQualifierLength());
5058                 idx++;
5059               } else {
5060                 newKV = kv;
5061                 // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
5062                 // so only need to update the timestamp to 'now'
5063                 newKV.updateLatestStamp(Bytes.toBytes(now));
5064              }
5065               newKV.setMvccVersion(w.getWriteNumber());
5066               // Give coprocessors a chance to update the new cell
5067               if (coprocessorHost != null) {
5068                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5069                     RegionObserver.MutationType.APPEND, append, oldKv, (Cell) newKV));
5070               }
5071               kvs.add(newKV);
5072 
5073               // Append update to WAL
5074               if (writeToWAL) {
5075                 if (walEdits == null) {
5076                   walEdits = new WALEdit();
5077                 }
5078                 walEdits.add(newKV);
5079               }
5080             }
5081 
5082             //store the kvs to the temporary memstore before writing HLog
5083             tempMemstore.put(store, kvs);
5084           }
5085 
5086           // Actually write to WAL now
5087           if (writeToWAL) {
5088             // Using default cluster id, as this can only happen in the orginating
5089             // cluster. A slave cluster receives the final value (not the delta)
5090             // as a Put.
5091             txid = this.log.appendNoSync(this.getRegionInfo(),
5092               this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(),
5093               EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,
5094               true, nonceGroup, nonce);
5095           } else {
5096             recordMutationWithoutWal(append.getFamilyCellMap());
5097           }
5098 
5099           //Actually write to Memstore now
5100           for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5101             Store store = entry.getKey();
5102             if (store.getFamily().getMaxVersions() == 1) {
5103               // upsert if VERSIONS for this CF == 1
5104               size += store.upsert(entry.getValue(), getSmallestReadPoint());
5105             } else {
5106               // otherwise keep older versions around
5107               for (Cell cell: entry.getValue()) {
5108                 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5109                 size += store.add(kv);
5110               }
5111             }
5112             allKVs.addAll(entry.getValue());
5113           }
5114           size = this.addAndGetGlobalMemstoreSize(size);
5115           flush = isFlushSize(size);
5116         } finally {
5117           this.updatesLock.readLock().unlock();
5118         }
5119       } finally {
5120         rowLock.release();
5121       }
5122       if (writeToWAL) {
5123         // sync the transaction log outside the rowlock
5124         syncOrDefer(txid, durability);
5125       }
5126     } finally {
5127       if (w != null) {
5128         mvcc.completeMemstoreInsert(w);
5129       }
5130       closeRegionOperation(Operation.APPEND);
5131     }
5132 
5133     if (this.metricsRegion != null) {
5134       this.metricsRegion.updateAppend();
5135     }
5136 
5137     if (flush) {
5138       // Request a cache flush. Do it outside update lock.
5139       requestFlush();
5140     }
5141 
5142 
5143     return append.isReturnResults() ? Result.create(allKVs) : null;
5144   }
5145 
5146   public Result increment(Increment increment) throws IOException {
5147     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
5148   }
5149 
5150   /**
5151    * Perform one or more increment operations on a row.
5152    * @param increment
5153    * @return new keyvalues after increment
5154    * @throws IOException
5155    */
5156   public Result increment(Increment increment, long nonceGroup, long nonce)
5157   throws IOException {
5158     byte [] row = increment.getRow();
5159     checkRow(row, "increment");
5160     TimeRange tr = increment.getTimeRange();
5161     boolean flush = false;
5162     Durability durability = getEffectiveDurability(increment.getDurability());
5163     boolean writeToWAL = durability != Durability.SKIP_WAL;
5164     WALEdit walEdits = null;
5165     List<Cell> allKVs = new ArrayList<Cell>(increment.size());
5166     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5167 
5168     long size = 0;
5169     long txid = 0;
5170 
5171     checkReadOnly();
5172     checkResources();
5173     // Lock row
5174     startRegionOperation(Operation.INCREMENT);
5175     this.writeRequestsCount.increment();
5176     WriteEntry w = null;
5177     try {
5178       RowLock rowLock = getRowLock(row);
5179       try {
5180         lock(this.updatesLock.readLock());
5181         // wait for all prior MVCC transactions to finish - while we hold the row lock
5182         // (so that we are guaranteed to see the latest state)
5183         mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
5184         // now start my own transaction
5185         w = mvcc.beginMemstoreInsert();
5186         try {
5187           long now = EnvironmentEdgeManager.currentTimeMillis();
5188           // Process each family
5189           for (Map.Entry<byte [], List<Cell>> family:
5190               increment.getFamilyCellMap().entrySet()) {
5191 
5192             Store store = stores.get(family.getKey());
5193             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5194 
5195             // Sort the cells so that they match the order that they
5196             // appear in the Get results. Otherwise, we won't be able to
5197             // find the existing values if the cells are not specified
5198             // in order by the client since cells are in an array list.
5199             Collections.sort(family.getValue(), store.getComparator());
5200             // Get previous values for all columns in this family
5201             Get get = new Get(row);
5202             for (Cell cell: family.getValue()) {
5203               KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5204               get.addColumn(family.getKey(), kv.getQualifier());
5205             }
5206             get.setTimeRange(tr.getMin(), tr.getMax());
5207             List<Cell> results = get(get, false);
5208 
5209             // Iterate the input columns and update existing values if they were
5210             // found, otherwise add new column initialized to the increment amount
5211             int idx = 0;
5212             for (Cell kv: family.getValue()) {
5213               long amount = Bytes.toLong(CellUtil.cloneValue(kv));
5214               boolean noWriteBack = (amount == 0);
5215 
5216               Cell c = null;
5217               if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
5218                 c = results.get(idx);
5219                 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
5220                   amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
5221                 } else {
5222                   // throw DoNotRetryIOException instead of IllegalArgumentException
5223                   throw new org.apache.hadoop.hbase.DoNotRetryIOException(
5224                       "Attempted to increment field that isn't 64 bits wide");
5225                 }
5226                 idx++;
5227               }
5228 
5229               // Append new incremented KeyValue to list
5230               byte[] q = CellUtil.cloneQualifier(kv);
5231               byte[] val = Bytes.toBytes(amount);
5232               int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
5233               int incCellTagsLen = kv.getTagsLength();
5234               KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, now,
5235                   KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
5236               System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length);
5237               System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(),
5238                   family.getKey().length);
5239               System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length);
5240               // copy in the value
5241               System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length);
5242               // copy tags
5243               if (oldCellTagsLen > 0) {
5244                 System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(),
5245                     newKV.getTagsOffset(), oldCellTagsLen);
5246               }
5247               if (incCellTagsLen > 0) {
5248                 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
5249                     newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
5250               }
5251               newKV.setMvccVersion(w.getWriteNumber());
5252               // Give coprocessors a chance to update the new cell
5253               if (coprocessorHost != null) {
5254                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5255                     RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKV));
5256               }
5257               allKVs.add(newKV);
5258 
5259               if (!noWriteBack) {
5260                 kvs.add(newKV);
5261 
5262                 // Prepare WAL updates
5263                 if (writeToWAL) {
5264                   if (walEdits == null) {
5265                     walEdits = new WALEdit();
5266                   }
5267                   walEdits.add(newKV);
5268                 }
5269               }
5270             }
5271 
5272             //store the kvs to the temporary memstore before writing HLog
5273             if (!kvs.isEmpty()) {
5274               tempMemstore.put(store, kvs);
5275             }
5276           }
5277 
5278           // Actually write to WAL now
5279           if (walEdits != null && !walEdits.isEmpty()) {
5280             if (writeToWAL) {
5281               // Using default cluster id, as this can only happen in the orginating
5282               // cluster. A slave cluster receives the final value (not the delta)
5283               // as a Put.
5284               txid = this.log.appendNoSync(this.getRegionInfo(),
5285                   this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(),
5286                   EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,
5287                   true, nonceGroup, nonce);
5288             } else {
5289               recordMutationWithoutWal(increment.getFamilyCellMap());
5290             }
5291           }
5292           //Actually write to Memstore now
5293           if (!tempMemstore.isEmpty()) {
5294             for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5295               Store store = entry.getKey();
5296               if (store.getFamily().getMaxVersions() == 1) {
5297                 // upsert if VERSIONS for this CF == 1
5298                 size += store.upsert(entry.getValue(), getSmallestReadPoint());
5299               } else {
5300                 // otherwise keep older versions around
5301                 for (Cell cell : entry.getValue()) {
5302                   KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5303                   size += store.add(kv);
5304                 }
5305               }
5306             }
5307             size = this.addAndGetGlobalMemstoreSize(size);
5308             flush = isFlushSize(size);
5309           }
5310         } finally {
5311           this.updatesLock.readLock().unlock();
5312         }
5313       } finally {
5314         rowLock.release();
5315       }
5316       if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) {
5317         // sync the transaction log outside the rowlock
5318         syncOrDefer(txid, durability);
5319       }
5320     } finally {
5321       if (w != null) {
5322         mvcc.completeMemstoreInsert(w);
5323       }
5324       closeRegionOperation(Operation.INCREMENT);
5325       if (this.metricsRegion != null) {
5326         this.metricsRegion.updateIncrement();
5327       }
5328     }
5329 
5330     if (flush) {
5331       // Request a cache flush.  Do it outside update lock.
5332       requestFlush();
5333     }
5334 
5335     return Result.create(allKVs);
5336   }
5337 
5338   //
5339   // New HBASE-880 Helpers
5340   //
5341 
5342   private void checkFamily(final byte [] family)
5343   throws NoSuchColumnFamilyException {
5344     if (!this.htableDescriptor.hasFamily(family)) {
5345       throw new NoSuchColumnFamilyException("Column family " +
5346           Bytes.toString(family) + " does not exist in region " + this
5347           + " in table " + this.htableDescriptor);
5348     }
5349   }
5350 
5351   public static final long FIXED_OVERHEAD = ClassSize.align(
5352       ClassSize.OBJECT +
5353       ClassSize.ARRAY +
5354       41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
5355       (12 * Bytes.SIZEOF_LONG) +
5356       4 * Bytes.SIZEOF_BOOLEAN);
5357 
5358   // woefully out of date - currently missing:
5359   // 1 x HashMap - coprocessorServiceHandlers
5360   // 6 x Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
5361   //   checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,
5362   //   writeRequestsCount
5363   // 1 x HRegion$WriteState - writestate
5364   // 1 x RegionCoprocessorHost - coprocessorHost
5365   // 1 x RegionSplitPolicy - splitPolicy
5366   // 1 x MetricsRegion - metricsRegion
5367   // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper
5368   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
5369       ClassSize.OBJECT + // closeLock
5370       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
5371       (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
5372       (2 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, scannerReadPoints
5373       WriteState.HEAP_SIZE + // writestate
5374       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
5375       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
5376       ClassSize.ARRAYLIST + // recentFlushes
5377       MultiVersionConsistencyControl.FIXED_SIZE // mvcc
5378       + ClassSize.TREEMAP // maxSeqIdInStores
5379       + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
5380       ;
5381 
5382   @Override
5383   public long heapSize() {
5384     long heapSize = DEEP_OVERHEAD;
5385     for (Store store : this.stores.values()) {
5386       heapSize += store.heapSize();
5387     }
5388     // this does not take into account row locks, recent flushes, mvcc entries, and more
5389     return heapSize;
5390   }
5391 
5392   /*
5393    * This method calls System.exit.
5394    * @param message Message to print out.  May be null.
5395    */
5396   private static void printUsageAndExit(final String message) {
5397     if (message != null && message.length() > 0) System.out.println(message);
5398     System.out.println("Usage: HRegion CATLALOG_TABLE_DIR [major_compact]");
5399     System.out.println("Options:");
5400     System.out.println(" major_compact  Pass this option to major compact " +
5401       "passed region.");
5402     System.out.println("Default outputs scan of passed region.");
5403     System.exit(1);
5404   }
5405 
5406   /**
5407    * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
5408    * be available for handling
5409    * {@link HRegion#execService(com.google.protobuf.RpcController,
5410    *    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
5411    *
5412    * <p>
5413    * Only a single instance may be registered per region for a given {@link Service} subclass (the
5414    * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
5415    * After the first registration, subsequent calls with the same service name will fail with
5416    * a return value of {@code false}.
5417    * </p>
5418    * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
5419    * @return {@code true} if the registration was successful, {@code false}
5420    * otherwise
5421    */
5422   public boolean registerService(Service instance) {
5423     /*
5424      * No stacking of instances is allowed for a single service name
5425      */
5426     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
5427     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
5428       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
5429           " already registered, rejecting request from "+instance
5430       );
5431       return false;
5432     }
5433 
5434     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
5435     if (LOG.isDebugEnabled()) {
5436       LOG.debug("Registered coprocessor service: region="+
5437           Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
5438     }
5439     return true;
5440   }
5441 
5442   /**
5443    * Executes a single protocol buffer coprocessor endpoint {@link Service} method using
5444    * the registered protocol handlers.  {@link Service} implementations must be registered via the
5445    * {@link HRegion#registerService(com.google.protobuf.Service)}
5446    * method before they are available.
5447    *
5448    * @param controller an {@code RpcContoller} implementation to pass to the invoked service
5449    * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
5450    *     and parameters for the method invocation
5451    * @return a protocol buffer {@code Message} instance containing the method's result
5452    * @throws IOException if no registered service handler is found or an error
5453    *     occurs during the invocation
5454    * @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)
5455    */
5456   public Message execService(RpcController controller, CoprocessorServiceCall call)
5457       throws IOException {
5458     String serviceName = call.getServiceName();
5459     String methodName = call.getMethodName();
5460     if (!coprocessorServiceHandlers.containsKey(serviceName)) {
5461       throw new UnknownProtocolException(null,
5462           "No registered coprocessor service found for name "+serviceName+
5463           " in region "+Bytes.toStringBinary(getRegionName()));
5464     }
5465 
5466     Service service = coprocessorServiceHandlers.get(serviceName);
5467     Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
5468     Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
5469     if (methodDesc == null) {
5470       throw new UnknownProtocolException(service.getClass(),
5471           "Unknown method "+methodName+" called on service "+serviceName+
5472               " in region "+Bytes.toStringBinary(getRegionName()));
5473     }
5474 
5475     Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
5476         .mergeFrom(call.getRequest()).build();
5477 
5478     if (coprocessorHost != null) {
5479       request = coprocessorHost.preEndpointInvocation(service, methodName, request);
5480     }
5481 
5482     final Message.Builder responseBuilder =
5483         service.getResponsePrototype(methodDesc).newBuilderForType();
5484     service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
5485       @Override
5486       public void run(Message message) {
5487         if (message != null) {
5488           responseBuilder.mergeFrom(message);
5489         }
5490       }
5491     });
5492 
5493     if (coprocessorHost != null) {
5494       coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
5495     }
5496 
5497     return responseBuilder.build();
5498   }
5499 
5500   /*
5501    * Process table.
5502    * Do major compaction or list content.
5503    * @param fs
5504    * @param p
5505    * @param log
5506    * @param c
5507    * @param majorCompact
5508    * @throws IOException
5509    */
5510   private static void processTable(final FileSystem fs, final Path p,
5511       final HLog log, final Configuration c,
5512       final boolean majorCompact)
5513   throws IOException {
5514     HRegion region = null;
5515     // Currently expects tables have one region only.
5516     if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
5517       region = HRegion.newHRegion(p, log, fs, c,
5518         HRegionInfo.FIRST_META_REGIONINFO, HTableDescriptor.META_TABLEDESC, null);
5519     } else {
5520       throw new IOException("Not a known catalog table: " + p.toString());
5521     }
5522     try {
5523       region.initialize();
5524       if (majorCompact) {
5525         region.compactStores(true);
5526       } else {
5527         // Default behavior
5528         Scan scan = new Scan();
5529         // scan.addFamily(HConstants.CATALOG_FAMILY);
5530         RegionScanner scanner = region.getScanner(scan);
5531         try {
5532           List<Cell> kvs = new ArrayList<Cell>();
5533           boolean done;
5534           do {
5535             kvs.clear();
5536             done = scanner.next(kvs);
5537             if (kvs.size() > 0) LOG.info(kvs);
5538           } while (done);
5539         } finally {
5540           scanner.close();
5541         }
5542       }
5543     } finally {
5544       region.close();
5545     }
5546   }
5547 
5548   boolean shouldForceSplit() {
5549     return this.splitRequest;
5550   }
5551 
5552   byte[] getExplicitSplitPoint() {
5553     return this.explicitSplitPoint;
5554   }
5555 
5556   void forceSplit(byte[] sp) {
5557     // NOTE : this HRegion will go away after the forced split is successfull
5558     //        therefore, no reason to clear this value
5559     this.splitRequest = true;
5560     if (sp != null) {
5561       this.explicitSplitPoint = sp;
5562     }
5563   }
5564 
5565   void clearSplit_TESTS_ONLY() {
5566     this.splitRequest = false;
5567   }
5568 
5569   /**
5570    * Give the region a chance to prepare before it is split.
5571    */
5572   protected void prepareToSplit() {
5573     // nothing
5574   }
5575 
5576   /**
5577    * Return the splitpoint. null indicates the region isn't splittable
5578    * If the splitpoint isn't explicitly specified, it will go over the stores
5579    * to find the best splitpoint. Currently the criteria of best splitpoint
5580    * is based on the size of the store.
5581    */
5582   public byte[] checkSplit() {
5583     // Can't split META
5584     if (this.getRegionInfo().isMetaTable() ||
5585         TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
5586       if (shouldForceSplit()) {
5587         LOG.warn("Cannot split meta region in HBase 0.20 and above");
5588       }
5589       return null;
5590     }
5591 
5592     // Can't split region which is in recovering state
5593     if (this.isRecovering()) {
5594       LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
5595       return null;
5596     }
5597 
5598     if (!splitPolicy.shouldSplit()) {
5599       return null;
5600     }
5601 
5602     byte[] ret = splitPolicy.getSplitPoint();
5603 
5604     if (ret != null) {
5605       try {
5606         checkRow(ret, "calculated split");
5607       } catch (IOException e) {
5608         LOG.error("Ignoring invalid split", e);
5609         return null;
5610       }
5611     }
5612     return ret;
5613   }
5614 
5615   /**
5616    * @return The priority that this region should have in the compaction queue
5617    */
5618   public int getCompactPriority() {
5619     int count = Integer.MAX_VALUE;
5620     for (Store store : stores.values()) {
5621       count = Math.min(count, store.getCompactPriority());
5622     }
5623     return count;
5624   }
5625 
5626   /**
5627    * Checks every store to see if one has too many
5628    * store files
5629    * @return true if any store has too many store files
5630    */
5631   public boolean needsCompaction() {
5632     for (Store store : stores.values()) {
5633       if(store.needsCompaction()) {
5634         return true;
5635       }
5636     }
5637     return false;
5638   }
5639 
5640   /** @return the coprocessor host */
5641   public RegionCoprocessorHost getCoprocessorHost() {
5642     return coprocessorHost;
5643   }
5644 
5645   /** @param coprocessorHost the new coprocessor host */
5646   public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
5647     this.coprocessorHost = coprocessorHost;
5648   }
5649 
5650   /**
5651    * This method needs to be called before any public call that reads or
5652    * modifies data. It has to be called just before a try.
5653    * #closeRegionOperation needs to be called in the try's finally block
5654    * Acquires a read lock and checks if the region is closing or closed.
5655    * @throws IOException 
5656    */
5657   public void startRegionOperation() throws IOException {
5658     startRegionOperation(Operation.ANY);
5659   }
5660 
5661   /**
5662    * @param op The operation is about to be taken on the region
5663    * @throws IOException 
5664    */
5665   protected void startRegionOperation(Operation op) throws IOException {
5666     switch (op) {
5667     case INCREMENT:
5668     case APPEND:
5669     case GET:
5670     case SCAN:
5671     case SPLIT_REGION:
5672     case MERGE_REGION:
5673     case PUT:
5674     case DELETE:
5675     case BATCH_MUTATE:
5676     case COMPACT_REGION:
5677       // when a region is in recovering state, no read, split or merge is allowed
5678       if (this.isRecovering() && (this.disallowWritesInRecovering ||
5679               (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
5680         throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering");
5681       }
5682       break;
5683     default:
5684       break;
5685     }
5686     if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
5687         || op == Operation.COMPACT_REGION) {
5688       // split, merge or compact region doesn't need to check the closing/closed state or lock the
5689       // region
5690       return;
5691     }
5692     if (this.closing.get()) {
5693       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5694     }
5695     lock(lock.readLock());
5696     if (this.closed.get()) {
5697       lock.readLock().unlock();
5698       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
5699     }
5700     try {
5701       if (coprocessorHost != null) {
5702         coprocessorHost.postStartRegionOperation(op);
5703       }
5704     } catch (Exception e) {
5705       lock.readLock().unlock();
5706       throw new IOException(e);
5707     }
5708   }
5709 
5710   /**
5711    * Closes the lock. This needs to be called in the finally block corresponding
5712    * to the try block of #startRegionOperation
5713    * @throws IOException 
5714    */
5715   public void closeRegionOperation() throws IOException {
5716     closeRegionOperation(Operation.ANY);
5717   }
5718 
5719   /**
5720    * Closes the lock. This needs to be called in the finally block corresponding
5721    * to the try block of {@link #startRegionOperation(Operation)}
5722    * @param operation
5723    * @throws IOException
5724    */
5725   public void closeRegionOperation(Operation operation) throws IOException {
5726     lock.readLock().unlock();
5727     if (coprocessorHost != null) {
5728       coprocessorHost.postCloseRegionOperation(operation);
5729     }
5730   }
5731 
5732   /**
5733    * This method needs to be called before any public call that reads or
5734    * modifies stores in bulk. It has to be called just before a try.
5735    * #closeBulkRegionOperation needs to be called in the try's finally block
5736    * Acquires a writelock and checks if the region is closing or closed.
5737    * @throws NotServingRegionException when the region is closing or closed
5738    * @throws RegionTooBusyException if failed to get the lock in time
5739    * @throws InterruptedIOException if interrupted while waiting for a lock
5740    */
5741   private void startBulkRegionOperation(boolean writeLockNeeded)
5742       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
5743     if (this.closing.get()) {
5744       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5745     }
5746     if (writeLockNeeded) lock(lock.writeLock());
5747     else lock(lock.readLock());
5748     if (this.closed.get()) {
5749       if (writeLockNeeded) lock.writeLock().unlock();
5750       else lock.readLock().unlock();
5751       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
5752     }
5753   }
5754 
5755   /**
5756    * Closes the lock. This needs to be called in the finally block corresponding
5757    * to the try block of #startRegionOperation
5758    */
5759   private void closeBulkRegionOperation(){
5760     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
5761     else lock.readLock().unlock();
5762   }
5763 
5764   /**
5765    * Update counters for numer of puts without wal and the size of possible data loss.
5766    * These information are exposed by the region server metrics.
5767    */
5768   private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
5769     numMutationsWithoutWAL.increment();
5770     if (numMutationsWithoutWAL.get() <= 1) {
5771       LOG.info("writing data to region " + this +
5772                " with WAL disabled. Data may be lost in the event of a crash.");
5773     }
5774 
5775     long mutationSize = 0;
5776     for (List<Cell> cells: familyMap.values()) {
5777       for (Cell cell : cells) {
5778         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5779         mutationSize += kv.getKeyLength() + kv.getValueLength();
5780       }
5781     }
5782 
5783     dataInMemoryWithoutWAL.add(mutationSize);
5784   }
5785 
5786   private void lock(final Lock lock)
5787       throws RegionTooBusyException, InterruptedIOException {
5788     lock(lock, 1);
5789   }
5790 
5791   /**
5792    * Try to acquire a lock.  Throw RegionTooBusyException
5793    * if failed to get the lock in time. Throw InterruptedIOException
5794    * if interrupted while waiting for the lock.
5795    */
5796   private void lock(final Lock lock, final int multiplier)
5797       throws RegionTooBusyException, InterruptedIOException {
5798     try {
5799       final long waitTime = Math.min(maxBusyWaitDuration,
5800           busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
5801       if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
5802         throw new RegionTooBusyException(
5803             "failed to get a lock in " + waitTime + " ms. " +
5804                 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
5805                 this.getRegionInfo().getRegionNameAsString()) +
5806                 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
5807                 this.getRegionServerServices().getServerName()));
5808       }
5809     } catch (InterruptedException ie) {
5810       LOG.info("Interrupted while waiting for a lock");
5811       InterruptedIOException iie = new InterruptedIOException();
5812       iie.initCause(ie);
5813       throw iie;
5814     }
5815   }
5816 
5817   /**
5818    * Calls sync with the given transaction ID if the region's table is not
5819    * deferring it.
5820    * @param txid should sync up to which transaction
5821    * @throws IOException If anything goes wrong with DFS
5822    */
5823   private void syncOrDefer(long txid, Durability durability) throws IOException {
5824     if (this.getRegionInfo().isMetaRegion()) {
5825       this.log.sync(txid);
5826     } else {
5827       switch(durability) {
5828       case USE_DEFAULT:
5829         // do what table defaults to
5830         if (shouldSyncLog()) {
5831           this.log.sync(txid);
5832         }
5833         break;
5834       case SKIP_WAL:
5835         // nothing do to
5836         break;
5837       case ASYNC_WAL:
5838         // nothing do to
5839         break;
5840       case SYNC_WAL:
5841       case FSYNC_WAL:
5842         // sync the WAL edit (SYNC and FSYNC treated the same for now)
5843         this.log.sync(txid);
5844         break;
5845       }
5846     }
5847   }
5848 
5849   /**
5850    * Check whether we should sync the log from the table's durability settings
5851    */
5852   private boolean shouldSyncLog() {
5853     return durability.ordinal() >  Durability.ASYNC_WAL.ordinal();
5854   }
5855 
5856   /**
5857    * A mocked list implementaion - discards all updates.
5858    */
5859   private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
5860 
5861     @Override
5862     public void add(int index, Cell element) {
5863       // do nothing
5864     }
5865 
5866     @Override
5867     public boolean addAll(int index, Collection<? extends Cell> c) {
5868       return false; // this list is never changed as a result of an update
5869     }
5870 
5871     @Override
5872     public KeyValue get(int index) {
5873       throw new UnsupportedOperationException();
5874     }
5875 
5876     @Override
5877     public int size() {
5878       return 0;
5879     }
5880   };
5881 
5882   /**
5883    * Facility for dumping and compacting catalog tables.
5884    * Only does catalog tables since these are only tables we for sure know
5885    * schema on.  For usage run:
5886    * <pre>
5887    *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
5888    * </pre>
5889    * @param args
5890    * @throws IOException
5891    */
5892   public static void main(String[] args) throws IOException {
5893     if (args.length < 1) {
5894       printUsageAndExit(null);
5895     }
5896     boolean majorCompact = false;
5897     if (args.length > 1) {
5898       if (!args[1].toLowerCase().startsWith("major")) {
5899         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
5900       }
5901       majorCompact = true;
5902     }
5903     final Path tableDir = new Path(args[0]);
5904     final Configuration c = HBaseConfiguration.create();
5905     final FileSystem fs = FileSystem.get(c);
5906     final Path logdir = new Path(c.get("hbase.tmp.dir"));
5907     final String logname = "hlog" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
5908 
5909     final HLog log = HLogFactory.createHLog(fs, logdir, logname, c);
5910     try {
5911       processTable(fs, tableDir, log, c, majorCompact);
5912     } finally {
5913        log.close();
5914        // TODO: is this still right?
5915        BlockCache bc = new CacheConfig(c).getBlockCache();
5916        if (bc != null) bc.shutdown();
5917     }
5918   }
5919 
5920   /**
5921    * Gets the latest sequence number that was read from storage when this region was opened.
5922    */
5923   public long getOpenSeqNum() {
5924     return this.openSeqNum;
5925   }
5926 
5927   /**
5928    * Gets max sequence ids of stores that was read from storage when this region was opened. WAL
5929    * Edits with smaller or equal sequence number will be skipped from replay.
5930    */
5931   public Map<byte[], Long> getMaxStoreSeqIdForLogReplay() {
5932     return this.maxSeqIdInStores;
5933   }
5934 
5935   /**
5936    * @return if a given region is in compaction now.
5937    */
5938   public CompactionState getCompactionState() {
5939     boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
5940     return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
5941         : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
5942   }
5943 
5944   public void reportCompactionRequestStart(boolean isMajor){
5945     (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
5946   }
5947 
5948   public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
5949     int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
5950 
5951     // metrics
5952     compactionsFinished.incrementAndGet();
5953     compactionNumFilesCompacted.addAndGet(numFiles);
5954     compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
5955 
5956     assert newValue >= 0;
5957   }
5958 
5959   /**
5960    * @return sequenceId.
5961    */
5962   public AtomicLong getSequenceId() {
5963     return this.sequenceId;
5964   }
5965 
5966   /**
5967    * sets this region's sequenceId.
5968    * @param value new value
5969    */
5970   private void setSequenceId(long value) {
5971     this.sequenceId.set(value);
5972   }
5973 
5974   /**
5975    * Listener class to enable callers of
5976    * bulkLoadHFile() to perform any necessary
5977    * pre/post processing of a given bulkload call
5978    */
5979   public interface BulkLoadListener {
5980 
5981     /**
5982      * Called before an HFile is actually loaded
5983      * @param family family being loaded to
5984      * @param srcPath path of HFile
5985      * @return final path to be used for actual loading
5986      * @throws IOException
5987      */
5988     String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
5989 
5990     /**
5991      * Called after a successful HFile load
5992      * @param family family being loaded to
5993      * @param srcPath path of HFile
5994      * @throws IOException
5995      */
5996     void doneBulkLoad(byte[] family, String srcPath) throws IOException;
5997 
5998     /**
5999      * Called after a failed HFile load
6000      * @param family family being loaded to
6001      * @param srcPath path of HFile
6002      * @throws IOException
6003      */
6004     void failedBulkLoad(byte[] family, String srcPath) throws IOException;
6005   }
6006 
6007   @VisibleForTesting class RowLockContext {
6008     private final HashedBytes row;
6009     private final CountDownLatch latch = new CountDownLatch(1);
6010     private final Thread thread;
6011     private int lockCount = 0;
6012 
6013     RowLockContext(HashedBytes row) {
6014       this.row = row;
6015       this.thread = Thread.currentThread();
6016     }
6017 
6018     boolean ownedByCurrentThread() {
6019       return thread == Thread.currentThread();
6020     }
6021 
6022     RowLock newLock() {
6023       lockCount++;
6024       return new RowLock(this);
6025     }
6026 
6027     void releaseLock() {
6028       if (!ownedByCurrentThread()) {
6029         throw new IllegalArgumentException("Lock held by thread: " + thread
6030           + " cannot be released by different thread: " + Thread.currentThread());
6031       }
6032       lockCount--;
6033       if (lockCount == 0) {
6034         // no remaining locks by the thread, unlock and allow other threads to access
6035         RowLockContext existingContext = lockedRows.remove(row);
6036         if (existingContext != this) {
6037           throw new RuntimeException(
6038               "Internal row lock state inconsistent, should not happen, row: " + row);
6039         }
6040         latch.countDown();
6041       }
6042     }
6043   }
6044 
6045   /**
6046    * Row lock held by a given thread.
6047    * One thread may acquire multiple locks on the same row simultaneously.
6048    * The locks must be released by calling release() from the same thread.
6049    */
6050   public static class RowLock {
6051     @VisibleForTesting final RowLockContext context;
6052     private boolean released = false;
6053 
6054     @VisibleForTesting RowLock(RowLockContext context) {
6055       this.context = context;
6056     }
6057 
6058     /**
6059      * Release the given lock.  If there are no remaining locks held by the current thread
6060      * then unlock the row and allow other threads to acquire the lock.
6061      * @throws IllegalArgumentException if called by a different thread than the lock owning thread
6062      */
6063     public void release() {
6064       if (!released) {
6065         context.releaseLock();
6066         released = true;
6067       }
6068     }
6069   }
6070 
6071   /**
6072    * Lock the updates' readLock first, so that we could safely append logs in coprocessors.
6073    * @throws RegionTooBusyException
6074    * @throws InterruptedIOException
6075    */
6076   public void updatesLock() throws RegionTooBusyException, InterruptedIOException {
6077     lock(updatesLock.readLock());
6078   }
6079 
6080   /**
6081    * Unlock the updates' readLock after appending logs in coprocessors.
6082    * @throws InterruptedIOException
6083    */
6084   public void updatesUnlock() throws InterruptedIOException {
6085     updatesLock.readLock().unlock();
6086   }
6087 }