1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.io.UnsupportedEncodingException;
25  import java.lang.reflect.Constructor;
26  import java.text.ParseException;
27  import java.util.AbstractList;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.NavigableMap;
36  import java.util.NavigableSet;
37  import java.util.Random;
38  import java.util.Set;
39  import java.util.TreeMap;
40  import java.util.UUID;
41  import java.util.concurrent.Callable;
42  import java.util.concurrent.CompletionService;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.concurrent.ConcurrentSkipListMap;
45  import java.util.concurrent.CountDownLatch;
46  import java.util.concurrent.ExecutionException;
47  import java.util.concurrent.ExecutorCompletionService;
48  import java.util.concurrent.ExecutorService;
49  import java.util.concurrent.Executors;
50  import java.util.concurrent.Future;
51  import java.util.concurrent.FutureTask;
52  import java.util.concurrent.ThreadFactory;
53  import java.util.concurrent.ThreadPoolExecutor;
54  import java.util.concurrent.TimeUnit;
55  import java.util.concurrent.TimeoutException;
56  import java.util.concurrent.atomic.AtomicBoolean;
57  import java.util.concurrent.atomic.AtomicInteger;
58  import java.util.concurrent.atomic.AtomicLong;
59  import java.util.concurrent.locks.Lock;
60  import java.util.concurrent.locks.ReentrantReadWriteLock;
61  
62  import org.apache.commons.logging.Log;
63  import org.apache.commons.logging.LogFactory;
64  import org.apache.hadoop.classification.InterfaceAudience;
65  import org.apache.hadoop.conf.Configuration;
66  import org.apache.hadoop.fs.FileStatus;
67  import org.apache.hadoop.fs.FileSystem;
68  import org.apache.hadoop.fs.Path;
69  import org.apache.hadoop.hbase.Cell;
70  import org.apache.hadoop.hbase.CompoundConfiguration;
71  import org.apache.hadoop.hbase.HBaseConfiguration;
72  import org.apache.hadoop.hbase.HColumnDescriptor;
73  import org.apache.hadoop.hbase.HConstants;
74  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
75  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
76  import org.apache.hadoop.hbase.HRegionInfo;
77  import org.apache.hadoop.hbase.HTableDescriptor;
78  import org.apache.hadoop.hbase.KeyValue;
79  import org.apache.hadoop.hbase.KeyValueUtil;
80  import org.apache.hadoop.hbase.backup.HFileArchiver;
81  import org.apache.hadoop.hbase.client.Append;
82  import org.apache.hadoop.hbase.client.Delete;
83  import org.apache.hadoop.hbase.client.Durability;
84  import org.apache.hadoop.hbase.client.Get;
85  import org.apache.hadoop.hbase.client.Increment;
86  import org.apache.hadoop.hbase.client.IsolationLevel;
87  import org.apache.hadoop.hbase.client.Mutation;
88  import org.apache.hadoop.hbase.client.Put;
89  import org.apache.hadoop.hbase.client.Result;
90  import org.apache.hadoop.hbase.client.Row;
91  import org.apache.hadoop.hbase.client.RowMutations;
92  import org.apache.hadoop.hbase.client.Scan;
93  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
94  import org.apache.hadoop.hbase.exceptions.DroppedSnapshotException;
95  import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
96  import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
97  import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
98  import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
99  import org.apache.hadoop.hbase.exceptions.RegionTooBusyException;
100 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
101 import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
102 import org.apache.hadoop.hbase.exceptions.WrongRegionException;
103 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
104 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
105 import org.apache.hadoop.hbase.filter.Filter;
106 import org.apache.hadoop.hbase.filter.FilterWrapper;
107 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
108 import org.apache.hadoop.hbase.io.HeapSize;
109 import org.apache.hadoop.hbase.io.TimeRange;
110 import org.apache.hadoop.hbase.io.hfile.BlockCache;
111 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
112 import org.apache.hadoop.hbase.ipc.RpcServer;
113 import org.apache.hadoop.hbase.ipc.RpcCallContext;
114 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
115 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
116 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
117 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
118 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
119 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
120 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
121 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
122 import org.apache.hadoop.hbase.regionserver.wal.HLog;
123 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
124 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
125 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
126 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
127 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
128 import org.apache.hadoop.hbase.util.Bytes;
129 import org.apache.hadoop.hbase.util.CancelableProgressable;
130 import org.apache.hadoop.hbase.util.ClassSize;
131 import org.apache.hadoop.hbase.util.CompressionTest;
132 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
133 import org.apache.hadoop.hbase.util.FSUtils;
134 import org.apache.hadoop.hbase.util.HashedBytes;
135 import org.apache.hadoop.hbase.util.Pair;
136 import org.apache.hadoop.hbase.util.Threads;
137 import org.apache.hadoop.io.MultipleIOException;
138 import org.apache.hadoop.util.StringUtils;
139 import org.cliffc.high_scale_lib.Counter;
140 
141 import com.google.common.base.Preconditions;
142 import com.google.common.collect.Lists;
143 import com.google.common.collect.Maps;
144 import com.google.common.io.Closeables;
145 import com.google.protobuf.Descriptors;
146 import com.google.protobuf.Message;
147 import com.google.protobuf.RpcCallback;
148 import com.google.protobuf.RpcController;
149 import com.google.protobuf.Service;
150 
151 /**
152  * HRegion stores data for a certain region of a table.  It stores all columns
153  * for each row. A given table consists of one or more HRegions.
154  *
155  * <p>We maintain multiple HStores for a single HRegion.
156  *
157  * <p>An Store is a set of rows with some column data; together,
158  * they make up all the data for the rows.
159  *
160  * <p>Each HRegion has a 'startKey' and 'endKey'.
161  * <p>The first is inclusive, the second is exclusive (except for
162  * the final region)  The endKey of region 0 is the same as
163  * startKey for region 1 (if it exists).  The startKey for the
164  * first region is null. The endKey for the final region is null.
165  *
166  * <p>Locking at the HRegion level serves only one purpose: preventing the
167  * region from being closed (and consequently split) while other operations
168  * are ongoing. Each row level operation obtains both a row lock and a region
169  * read lock for the duration of the operation. While a scanner is being
170  * constructed, getScanner holds a read lock. If the scanner is successfully
171  * constructed, it holds a read lock until it is closed. A close takes out a
172  * write lock and consequently will block for ongoing operations and will block
173  * new operations from starting while the close is in progress.
174  *
175  * <p>An HRegion is defined by its table and its key extent.
176  *
177  * <p>It consists of at least one Store.  The number of Stores should be
178  * configurable, so that data which is accessed together is stored in the same
179  * Store.  Right now, we approximate that by building a single Store for
180  * each column family.  (This config info will be communicated via the
181  * tabledesc.)
182  *
183  * <p>The HTableDescriptor contains metainfo about the HRegion's table.
184  * regionName is a unique identifier for this HRegion. (startKey, endKey]
185  * defines the keyspace for this HRegion.
186  */
187 @InterfaceAudience.Private
188 public class HRegion implements HeapSize { // , Writable{
189   public static final Log LOG = LogFactory.getLog(HRegion.class);
190 
191   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
192       "hbase.hregion.scan.loadColumnFamiliesOnDemand";
193 
194   final AtomicBoolean closed = new AtomicBoolean(false);
195   /* Closing can take some time; use the closing flag if there is stuff we don't
196    * want to do while in closing state; e.g. like offer this region up to the
197    * master as a region to close if the carrying regionserver is overloaded.
198    * Once set, it is never cleared.
199    */
200   final AtomicBoolean closing = new AtomicBoolean(false);
201 
202   protected long completeSequenceId = -1L;
203 
204   /**
205    * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
206    * startRegionOperation to possibly invoke different checks before any region operations. Not all
207    * operations have to be defined here. It's only needed when a special check is need in
208    * startRegionOperation
209    */
210   protected enum Operation {
211     ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION
212   }
213 
214   //////////////////////////////////////////////////////////////////////////////
215   // Members
216   //////////////////////////////////////////////////////////////////////////////
217 
218   private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
219     new ConcurrentHashMap<HashedBytes, CountDownLatch>();
220   private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
221     new ConcurrentHashMap<Integer, HashedBytes>();
222   private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
223   static private Random rand = new Random();
224 
225   protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
226       Bytes.BYTES_RAWCOMPARATOR);
227 
228   // TODO: account for each registered handler in HeapSize computation
229   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
230 
231   public final AtomicLong memstoreSize = new AtomicLong(0);
232 
233   // Debug possible data loss due to WAL off
234   final Counter numPutsWithoutWAL = new Counter();
235   final Counter dataInMemoryWithoutWAL = new Counter();
236 
237   // Debug why CAS operations are taking a while.
238   final Counter checkAndMutateChecksPassed = new Counter();
239   final Counter checkAndMutateChecksFailed = new Counter();
240 
241   //Number of requests
242   final Counter readRequestsCount = new Counter();
243   final Counter writeRequestsCount = new Counter();
244 
245   //How long operations were blocked by a memstore over highwater.
246   final Counter updatesBlockedMs = new Counter();
247 
248   private final HLog log;
249   private final HRegionFileSystem fs;
250   protected final Configuration conf;
251   private final Configuration baseConf;
252   private final KeyValue.KVComparator comparator;
253   private final int rowLockWaitDuration;
254   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
255 
256   // The internal wait duration to acquire a lock before read/update
257   // from the region. It is not per row. The purpose of this wait time
258   // is to avoid waiting a long time while the region is busy, so that
259   // we can release the IPC handler soon enough to improve the
260   // availability of the region server. It can be adjusted by
261   // tuning configuration "hbase.busy.wait.duration".
262   final long busyWaitDuration;
263   static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
264 
265   // If updating multiple rows in one call, wait longer,
266   // i.e. waiting for busyWaitDuration * # of rows. However,
267   // we can limit the max multiplier.
268   final int maxBusyWaitMultiplier;
269 
270   // Max busy wait duration. There is no point to wait longer than the RPC
271   // purge timeout, when a RPC call will be terminated by the RPC engine.
272   final long maxBusyWaitDuration;
273 
274   // negative number indicates infinite timeout
275   static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
276   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
277 
278   private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
279 
280   /**
281    * The sequence ID that was encountered when this region was opened.
282    */
283   private long openSeqNum = HConstants.NO_SEQNUM;
284 
285   /**
286    * The default setting for whether to enable on-demand CF loading for
287    * scan requests to this region. Requests can override it.
288    */
289   private boolean isLoadingCfsOnDemandDefault = false;
290 
291   private final AtomicInteger majorInProgress = new AtomicInteger(0);
292   private final AtomicInteger minorInProgress = new AtomicInteger(0);
293 
294   /**
295    * Min sequence id stored in store files of a region when opening the region
296    */
297   private long minSeqIdForLogReplay = -1;
298 
299   /**
300    * @return The smallest mvcc readPoint across all the scanners in this
301    * region. Writes older than this readPoint, are included  in every
302    * read operation.
303    */
304   public long getSmallestReadPoint() {
305     long minimumReadPoint;
306     // We need to ensure that while we are calculating the smallestReadPoint
307     // no new RegionScanners can grab a readPoint that we are unaware of.
308     // We achieve this by synchronizing on the scannerReadPoints object.
309     synchronized(scannerReadPoints) {
310       minimumReadPoint = mvcc.memstoreReadPoint();
311 
312       for (Long readPoint: this.scannerReadPoints.values()) {
313         if (readPoint < minimumReadPoint) {
314           minimumReadPoint = readPoint;
315         }
316       }
317     }
318     return minimumReadPoint;
319   }
320   /*
321    * Data structure of write state flags used coordinating flushes,
322    * compactions and closes.
323    */
324   static class WriteState {
325     // Set while a memstore flush is happening.
326     volatile boolean flushing = false;
327     // Set when a flush has been requested.
328     volatile boolean flushRequested = false;
329     // Number of compactions running.
330     volatile int compacting = 0;
331     // Gets set in close. If set, cannot compact or flush again.
332     volatile boolean writesEnabled = true;
333     // Set if region is read-only
334     volatile boolean readOnly = false;
335 
336     /**
337      * Set flags that make this region read-only.
338      *
339      * @param onOff flip value for region r/o setting
340      */
341     synchronized void setReadOnly(final boolean onOff) {
342       this.writesEnabled = !onOff;
343       this.readOnly = onOff;
344     }
345 
346     boolean isReadOnly() {
347       return this.readOnly;
348     }
349 
350     boolean isFlushRequested() {
351       return this.flushRequested;
352     }
353 
354     static final long HEAP_SIZE = ClassSize.align(
355         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
356   }
357 
358   final WriteState writestate = new WriteState();
359 
360   long memstoreFlushSize;
361   final long timestampSlop;
362   final long rowProcessorTimeout;
363   private volatile long lastFlushTime;
364   final RegionServerServices rsServices;
365   private RegionServerAccounting rsAccounting;
366   private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
367   private long flushCheckInterval;
368   private long blockingMemStoreSize;
369   final long threadWakeFrequency;
370   // Used to guard closes
371   final ReentrantReadWriteLock lock =
372     new ReentrantReadWriteLock();
373 
374   // Stop updates lock
375   private final ReentrantReadWriteLock updatesLock =
376     new ReentrantReadWriteLock();
377   private boolean splitRequest;
378   private byte[] explicitSplitPoint = null;
379 
380   private final MultiVersionConsistencyControl mvcc =
381       new MultiVersionConsistencyControl();
382 
383   // Coprocessor host
384   private RegionCoprocessorHost coprocessorHost;
385 
386   private HTableDescriptor htableDescriptor = null;
387   private RegionSplitPolicy splitPolicy;
388 
389   private final MetricsRegion metricsRegion;
390   private final MetricsRegionWrapperImpl metricsRegionWrapper;
391   private final boolean deferredLogSyncDisabled;
392 
393   /**
394    * HRegion constructor. This constructor should only be used for testing and
395    * extensions.  Instances of HRegion should be instantiated with the
396    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
397    *
398    * @param tableDir qualified path of directory where region should be located,
399    * usually the table directory.
400    * @param log The HLog is the outbound log for any updates to the HRegion
401    * (There's a single HLog for all the HRegions on a single HRegionServer.)
402    * The log file is a logfile from the previous execution that's
403    * custom-computed for this HRegion. The HRegionServer computes and sorts the
404    * appropriate log info for this HRegion. If there is a previous log file
405    * (implying that the HRegion has been written-to before), then read it from
406    * the supplied path.
407    * @param fs is the filesystem.
408    * @param confParam is global configuration settings.
409    * @param regionInfo - HRegionInfo that describes the region
410    * is new), then read them from the supplied path.
411    * @param htd the table descriptor
412    * @param rsServices reference to {@link RegionServerServices} or null
413    */
414   @Deprecated
415   public HRegion(final Path tableDir, final HLog log, final FileSystem fs,
416       final Configuration confParam, final HRegionInfo regionInfo,
417       final HTableDescriptor htd, final RegionServerServices rsServices) {
418     this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
419       log, confParam, htd, rsServices);
420   }
421 
422   /**
423    * HRegion constructor. This constructor should only be used for testing and
424    * extensions.  Instances of HRegion should be instantiated with the
425    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
426    *
427    * @param fs is the filesystem.
428    * @param log The HLog is the outbound log for any updates to the HRegion
429    * (There's a single HLog for all the HRegions on a single HRegionServer.)
430    * The log file is a logfile from the previous execution that's
431    * custom-computed for this HRegion. The HRegionServer computes and sorts the
432    * appropriate log info for this HRegion. If there is a previous log file
433    * (implying that the HRegion has been written-to before), then read it from
434    * the supplied path.
435    * @param confParam is global configuration settings.
436    * @param htd the table descriptor
437    * @param rsServices reference to {@link RegionServerServices} or null
438    */
439   public HRegion(final HRegionFileSystem fs, final HLog log, final Configuration confParam,
440       final HTableDescriptor htd, final RegionServerServices rsServices) {
441     if (htd == null) {
442       throw new IllegalArgumentException("Need table descriptor");
443     }
444 
445     if (confParam instanceof CompoundConfiguration) {
446       throw new IllegalArgumentException("Need original base configuration");
447     }
448 
449     this.comparator = fs.getRegionInfo().getComparator();
450     this.log = log;
451     this.fs = fs;
452 
453     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
454     this.baseConf = confParam;
455     this.conf = new CompoundConfiguration()
456       .add(confParam)
457       .addStringMap(htd.getConfiguration())
458       .addWritableMap(htd.getValues());
459     this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
460         DEFAULT_CACHE_FLUSH_INTERVAL);
461     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
462                     DEFAULT_ROWLOCK_WAIT_DURATION);
463 
464     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
465     this.htableDescriptor = htd;
466     this.rsServices = rsServices;
467     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
468     setHTableSpecificConf();
469     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
470 
471     this.busyWaitDuration = conf.getLong(
472       "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
473     this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
474     if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
475       throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
476         + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
477         + maxBusyWaitMultiplier + "). Their product should be positive");
478     }
479     this.maxBusyWaitDuration = conf.getLong("ipc.client.call.purge.timeout",
480       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
481 
482     /*
483      * timestamp.slop provides a server-side constraint on the timestamp. This
484      * assumes that you base your TS around currentTimeMillis(). In this case,
485      * throw an error to the user if the user-specified TS is newer than now +
486      * slop. LATEST_TIMESTAMP == don't use this functionality
487      */
488     this.timestampSlop = conf.getLong(
489         "hbase.hregion.keyvalue.timestamp.slop.millisecs",
490         HConstants.LATEST_TIMESTAMP);
491 
492     /**
493      * Timeout for the process time in processRowsWithLocks().
494      * Use -1 to switch off time bound.
495      */
496     this.rowProcessorTimeout = conf.getLong(
497         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
498     // When hbase.regionserver.optionallogflushinterval <= 0 , deferred log sync is disabled.
499     this.deferredLogSyncDisabled = conf.getLong("hbase.regionserver.optionallogflushinterval",
500         1 * 1000) <= 0;
501 
502     if (rsServices != null) {
503       this.rsAccounting = this.rsServices.getRegionServerAccounting();
504       // don't initialize coprocessors if not running within a regionserver
505       // TODO: revisit if coprocessors should load in other cases
506       this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
507       this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
508       this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
509     } else {
510       this.metricsRegionWrapper = null;
511       this.metricsRegion = null;
512     }
513     if (LOG.isDebugEnabled()) {
514       // Write out region name as string and its encoded name.
515       LOG.debug("Instantiated " + this);
516     }
517   }
518 
519   void setHTableSpecificConf() {
520     if (this.htableDescriptor == null) return;
521     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
522 
523     if (flushSize <= 0) {
524       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
525         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
526     }
527     this.memstoreFlushSize = flushSize;
528     this.blockingMemStoreSize = this.memstoreFlushSize *
529         conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
530   }
531 
532   /**
533    * Initialize this region.
534    * Used only by tests and SplitTransaction to reopen the region.
535    * You should use createHRegion() or openHRegion()
536    * @return What the next sequence (edit) id should be.
537    * @throws IOException e
538    * @deprecated use HRegion.createHRegion() or HRegion.openHRegion()
539    */
540   @Deprecated
541   public long initialize() throws IOException {
542     return initialize(null);
543   }
544 
545   /**
546    * Initialize this region.
547    *
548    * @param reporter Tickle every so often if initialize is taking a while.
549    * @return What the next sequence (edit) id should be.
550    * @throws IOException e
551    */
552   private long initialize(final CancelableProgressable reporter) throws IOException {
553     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
554     long nextSeqId = -1;
555     try {
556       nextSeqId = initializeRegionInternals(reporter, status);
557       return nextSeqId;
558     } finally {
559       // nextSeqid will be -1 if the initialization fails.
560       // At least it will be 0 otherwise.
561       if (nextSeqId == -1) {
562         status
563             .abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
564       }
565     }
566   }
567 
568   private long initializeRegionInternals(final CancelableProgressable reporter,
569       final MonitoredTask status) throws IOException, UnsupportedEncodingException {
570     if (coprocessorHost != null) {
571       status.setStatus("Running coprocessor pre-open hook");
572       coprocessorHost.preOpen();
573     }
574 
575     // Write HRI to a file in case we need to recover .META.
576     status.setStatus("Writing region info on filesystem");
577     fs.checkRegionInfoOnFilesystem();
578 
579     // Remove temporary data left over from old regions
580     status.setStatus("Cleaning up temporary data from old regions");
581     fs.cleanupTempDir();
582 
583     // Initialize all the HStores
584     status.setStatus("Initializing all the Stores");
585     long maxSeqId = initializeRegionStores(reporter, status);
586 
587     status.setStatus("Cleaning up detritus from prior splits");
588     // Get rid of any splits or merges that were lost in-progress.  Clean out
589     // these directories here on open.  We may be opening a region that was
590     // being split but we crashed in the middle of it all.
591     fs.cleanupAnySplitDetritus();
592     fs.cleanupMergesDir();
593 
594     this.writestate.setReadOnly(this.htableDescriptor.isReadOnly());
595     this.writestate.flushRequested = false;
596     this.writestate.compacting = 0;
597 
598     // Initialize split policy
599     this.splitPolicy = RegionSplitPolicy.create(this, conf);
600 
601     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
602     // Use maximum of log sequenceid or that which was found in stores
603     // (particularly if no recovered edits, seqid will be -1).
604     long nextSeqid = maxSeqId + 1;
605     LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
606 
607     // A region can be reopened if failed a split; reset flags
608     this.closing.set(false);
609     this.closed.set(false);
610 
611     if (coprocessorHost != null) {
612       status.setStatus("Running coprocessor post-open hooks");
613       coprocessorHost.postOpen();
614     }
615 
616     status.markComplete("Region opened successfully");
617     return nextSeqid;
618   }
619 
620   private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
621       throws IOException, UnsupportedEncodingException {
622     // Load in all the HStores.
623     //
624     // Context: During replay we want to ensure that we do not lose any data. So, we
625     // have to be conservative in how we replay logs. For each store, we calculate
626     // the maxSeqId up to which the store was flushed. And, skip the edits which
627     // is equal to or lower than maxSeqId for each store.
628     Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
629     long maxSeqId = -1;
630     // initialized to -1 so that we pick up MemstoreTS from column families
631     long maxMemstoreTS = -1;
632 
633     if (!htableDescriptor.getFamilies().isEmpty()) {
634       // initialize the thread pool for opening stores in parallel.
635       ThreadPoolExecutor storeOpenerThreadPool =
636         getStoreOpenAndCloseThreadPool(
637           "StoreOpenerThread-" + this.getRegionNameAsString());
638       CompletionService<HStore> completionService =
639         new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
640 
641       // initialize each store in parallel
642       for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
643         status.setStatus("Instantiating store for column family " + family);
644         completionService.submit(new Callable<HStore>() {
645           public HStore call() throws IOException {
646             return instantiateHStore(family);
647           }
648         });
649       }
650       try {
651         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
652           Future<HStore> future = completionService.take();
653           HStore store = future.get();
654 
655           this.stores.put(store.getColumnFamilyName().getBytes(), store);
656           // Do not include bulk loaded files when determining seqIdForReplay
657           long storeSeqIdForReplay = store.getMaxSequenceId(false);
658           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
659               storeSeqIdForReplay);
660           if (this.minSeqIdForLogReplay == -1 || storeSeqIdForReplay < this.minSeqIdForLogReplay) {
661             this.minSeqIdForLogReplay = storeSeqIdForReplay;
662           }
663           // Include bulk loaded files when determining seqIdForAssignment
664           long storeSeqIdForAssignment = store.getMaxSequenceId(true);
665           if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) {
666             maxSeqId = storeSeqIdForAssignment;
667           }
668           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
669           if (maxStoreMemstoreTS > maxMemstoreTS) {
670             maxMemstoreTS = maxStoreMemstoreTS;
671           }
672         }
673       } catch (InterruptedException e) {
674         throw new IOException(e);
675       } catch (ExecutionException e) {
676         throw new IOException(e.getCause());
677       } finally {
678         storeOpenerThreadPool.shutdownNow();
679       }
680     }
681     mvcc.initialize(maxMemstoreTS + 1);
682     // Recover any edits if available.
683     maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
684         this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
685     return maxSeqId;
686   }
687 
688   /**
689    * @return True if this region has references.
690    */
691   public boolean hasReferences() {
692     for (Store store : this.stores.values()) {
693       if (store.hasReferences()) return true;
694     }
695     return false;
696   }
697 
698   /**
699    * This function will return the HDFS blocks distribution based on the data
700    * captured when HFile is created
701    * @return The HDFS blocks distribution for the region.
702    */
703   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
704     HDFSBlocksDistribution hdfsBlocksDistribution =
705       new HDFSBlocksDistribution();
706     synchronized (this.stores) {
707       for (Store store : this.stores.values()) {
708         for (StoreFile sf : store.getStorefiles()) {
709           HDFSBlocksDistribution storeFileBlocksDistribution =
710             sf.getHDFSBlockDistribution();
711           hdfsBlocksDistribution.add(storeFileBlocksDistribution);
712         }
713       }
714     }
715     return hdfsBlocksDistribution;
716   }
717 
718   /**
719    * This is a helper function to compute HDFS block distribution on demand
720    * @param conf configuration
721    * @param tableDescriptor HTableDescriptor of the table
722    * @param regionInfo encoded name of the region
723    * @return The HDFS blocks distribution for the given region.
724    * @throws IOException
725    */
726   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
727       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
728     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
729     Path tablePath = FSUtils.getTablePath(FSUtils.getRootDir(conf), tableDescriptor.getName());
730     FileSystem fs = tablePath.getFileSystem(conf);
731 
732     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
733     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
734       Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
735       if (storeFiles == null) continue;
736 
737       for (StoreFileInfo storeFileInfo : storeFiles) {
738         hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
739       }
740     }
741     return hdfsBlocksDistribution;
742   }
743 
744   public AtomicLong getMemstoreSize() {
745     return memstoreSize;
746   }
747 
748   /**
749    * Increase the size of mem store in this region and the size of global mem
750    * store
751    * @param memStoreSize
752    * @return the size of memstore in this region
753    */
754   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
755     if (this.rsAccounting != null) {
756       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
757     }
758     return this.memstoreSize.getAndAdd(memStoreSize);
759   }
760 
761   /** @return a HRegionInfo object for this region */
762   public HRegionInfo getRegionInfo() {
763     return this.fs.getRegionInfo();
764   }
765 
766   /**
767    * @return Instance of {@link RegionServerServices} used by this HRegion.
768    * Can be null.
769    */
770   RegionServerServices getRegionServerServices() {
771     return this.rsServices;
772   }
773 
774   /** @return readRequestsCount for this region */
775   long getReadRequestsCount() {
776     return this.readRequestsCount.get();
777   }
778 
779   /** @return writeRequestsCount for this region */
780   long getWriteRequestsCount() {
781     return this.writeRequestsCount.get();
782   }
783 
784   MetricsRegion getMetrics() {
785     return metricsRegion;
786   }
787 
788   /** @return true if region is closed */
789   public boolean isClosed() {
790     return this.closed.get();
791   }
792 
793   /**
794    * @return True if closing process has started.
795    */
796   public boolean isClosing() {
797     return this.closing.get();
798   }
799 
800   /**
801    * Reset recovering state of current region
802    * @param newState
803    */
804   public void setRecovering(boolean newState) {
805     this.getRegionInfo().setRecovering(newState);
806   }
807   
808   /**
809    * @return True if current region is in recovering
810    */
811   public boolean isRecovering() {
812     return this.getRegionInfo().isRecovering();
813   }
814 
815   /** @return true if region is available (not closed and not closing) */
816   public boolean isAvailable() {
817     return !isClosed() && !isClosing();
818   }
819 
820   /** @return true if region is splittable */
821   public boolean isSplittable() {
822     return isAvailable() && !hasReferences();
823   }
824 
825   /**
826    * @return true if region is mergeable
827    */
828   public boolean isMergeable() {
829     if (!isAvailable()) {
830       LOG.debug("Region " + this.getRegionNameAsString()
831           + " is not mergeable because it is closing or closed");
832       return false;
833     }
834     if (hasReferences()) {
835       LOG.debug("Region " + this.getRegionNameAsString()
836           + " is not mergeable because it has references");
837       return false;
838     }
839 
840     return true;
841   }
842 
843   public boolean areWritesEnabled() {
844     synchronized(this.writestate) {
845       return this.writestate.writesEnabled;
846     }
847   }
848 
849    public MultiVersionConsistencyControl getMVCC() {
850      return mvcc;
851    }
852 
853    public boolean isLoadingCfsOnDemandDefault() {
854      return this.isLoadingCfsOnDemandDefault;
855    }
856 
857   /**
858    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
859    * service any more calls.
860    *
861    * <p>This method could take some time to execute, so don't call it from a
862    * time-sensitive thread.
863    *
864    * @return Vector of all the storage files that the HRegion's component
865    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
866    * vector if already closed and null if judged that it should not close.
867    *
868    * @throws IOException e
869    */
870   public Map<byte[], List<StoreFile>> close() throws IOException {
871     return close(false);
872   }
873 
874   private final Object closeLock = new Object();
875 
876   /** Conf key for the periodic flush interval */
877   public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = 
878       "hbase.regionserver.optionalcacheflushinterval";
879   /** Default interval for the memstore flush */
880   public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
881 
882   /**
883    * Close down this HRegion.  Flush the cache unless abort parameter is true,
884    * Shut down each HStore, don't service any more calls.
885    *
886    * This method could take some time to execute, so don't call it from a
887    * time-sensitive thread.
888    *
889    * @param abort true if server is aborting (only during testing)
890    * @return Vector of all the storage files that the HRegion's component
891    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
892    * we are not to close at this time or we are already closed.
893    *
894    * @throws IOException e
895    */
896   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
897     // Only allow one thread to close at a time. Serialize them so dual
898     // threads attempting to close will run up against each other.
899     MonitoredTask status = TaskMonitor.get().createStatus(
900         "Closing region " + this +
901         (abort ? " due to abort" : ""));
902 
903     status.setStatus("Waiting for close lock");
904     try {
905       synchronized (closeLock) {
906         return doClose(abort, status);
907       }
908     } finally {
909       status.cleanup();
910     }
911   }
912 
913   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
914       throws IOException {
915     if (isClosed()) {
916       LOG.warn("Region " + this + " already closed");
917       return null;
918     }
919 
920     if (coprocessorHost != null) {
921       status.setStatus("Running coprocessor pre-close hooks");
922       this.coprocessorHost.preClose(abort);
923     }
924 
925     status.setStatus("Disabling compacts and flushes for region");
926     boolean wasFlushing = false;
927     synchronized (writestate) {
928       // Disable compacting and flushing by background threads for this
929       // region.
930       writestate.writesEnabled = false;
931       wasFlushing = writestate.flushing;
932       LOG.debug("Closing " + this + ": disabling compactions & flushes");
933       waitForFlushesAndCompactions();
934     }
935     // If we were not just flushing, is it worth doing a preflush...one
936     // that will clear out of the bulk of the memstore before we put up
937     // the close flag?
938     if (!abort && !wasFlushing && worthPreFlushing()) {
939       status.setStatus("Pre-flushing region before close");
940       LOG.info("Running close preflush of " + this.getRegionNameAsString());
941       internalFlushcache(status);
942     }
943 
944     this.closing.set(true);
945     status.setStatus("Disabling writes for close");
946     // block waiting for the lock for closing
947     lock.writeLock().lock();
948     try {
949       if (this.isClosed()) {
950         status.abort("Already got closed by another process");
951         // SplitTransaction handles the null
952         return null;
953       }
954       LOG.debug("Updates disabled for region " + this);
955       // Don't flush the cache if we are aborting
956       if (!abort) {
957         internalFlushcache(status);
958       }
959 
960       Map<byte[], List<StoreFile>> result =
961         new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
962       if (!stores.isEmpty()) {
963         // initialize the thread pool for closing stores in parallel.
964         ThreadPoolExecutor storeCloserThreadPool =
965           getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
966         CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
967           new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
968 
969         // close each store in parallel
970         for (final Store store : stores.values()) {
971           completionService
972               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
973                 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
974                   return new Pair<byte[], Collection<StoreFile>>(
975                     store.getFamily().getName(), store.close());
976                 }
977               });
978         }
979         try {
980           for (int i = 0; i < stores.size(); i++) {
981             Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
982             Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
983             List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
984             if (familyFiles == null) {
985               familyFiles = new ArrayList<StoreFile>();
986               result.put(storeFiles.getFirst(), familyFiles);
987             }
988             familyFiles.addAll(storeFiles.getSecond());
989           }
990         } catch (InterruptedException e) {
991           throw new IOException(e);
992         } catch (ExecutionException e) {
993           throw new IOException(e.getCause());
994         } finally {
995           storeCloserThreadPool.shutdownNow();
996         }
997       }
998       this.closed.set(true);
999 
1000       if (coprocessorHost != null) {
1001         status.setStatus("Running coprocessor post-close hooks");
1002         this.coprocessorHost.postClose(abort);
1003       }
1004       if ( this.metricsRegion != null) {
1005         this.metricsRegion.close();
1006       }
1007       if ( this.metricsRegionWrapper != null) {
1008         Closeables.closeQuietly(this.metricsRegionWrapper);
1009       }
1010       status.markComplete("Closed");
1011       LOG.info("Closed " + this);
1012       return result;
1013     } finally {
1014       lock.writeLock().unlock();
1015     }
1016   }
1017 
1018   /**
1019    * Wait for all current flushes and compactions of the region to complete.
1020    * <p>
1021    * Exposed for TESTING.
1022    */
1023   public void waitForFlushesAndCompactions() {
1024     synchronized (writestate) {
1025       while (writestate.compacting > 0 || writestate.flushing) {
1026         LOG.debug("waiting for " + writestate.compacting + " compactions"
1027             + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1028         try {
1029           writestate.wait();
1030         } catch (InterruptedException iex) {
1031           // essentially ignore and propagate the interrupt back up
1032           Thread.currentThread().interrupt();
1033         }
1034       }
1035     }
1036   }
1037 
1038   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1039       final String threadNamePrefix) {
1040     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1041     int maxThreads = Math.min(numStores,
1042         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1043             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1044     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1045   }
1046 
1047   protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1048       final String threadNamePrefix) {
1049     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1050     int maxThreads = Math.max(1,
1051         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1052             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1053             / numStores);
1054     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1055   }
1056 
1057   static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1058       final String threadNamePrefix) {
1059     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1060       new ThreadFactory() {
1061         private int count = 1;
1062 
1063         public Thread newThread(Runnable r) {
1064           return new Thread(r, threadNamePrefix + "-" + count++);
1065         }
1066       });
1067   }
1068 
1069    /**
1070     * @return True if its worth doing a flush before we put up the close flag.
1071     */
1072   private boolean worthPreFlushing() {
1073     return this.memstoreSize.get() >
1074       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1075   }
1076 
1077   //////////////////////////////////////////////////////////////////////////////
1078   // HRegion accessors
1079   //////////////////////////////////////////////////////////////////////////////
1080 
1081   /** @return start key for region */
1082   public byte [] getStartKey() {
1083     return this.getRegionInfo().getStartKey();
1084   }
1085 
1086   /** @return end key for region */
1087   public byte [] getEndKey() {
1088     return this.getRegionInfo().getEndKey();
1089   }
1090 
1091   /** @return region id */
1092   public long getRegionId() {
1093     return this.getRegionInfo().getRegionId();
1094   }
1095 
1096   /** @return region name */
1097   public byte [] getRegionName() {
1098     return this.getRegionInfo().getRegionName();
1099   }
1100 
1101   /** @return region name as string for logging */
1102   public String getRegionNameAsString() {
1103     return this.getRegionInfo().getRegionNameAsString();
1104   }
1105 
1106   /** @return HTableDescriptor for this region */
1107   public HTableDescriptor getTableDesc() {
1108     return this.htableDescriptor;
1109   }
1110 
1111   /** @return HLog in use for this region */
1112   public HLog getLog() {
1113     return this.log;
1114   }
1115 
1116   /**
1117    * A split takes the config from the parent region & passes it to the daughter
1118    * region's constructor. If 'conf' was passed, you would end up using the HTD
1119    * of the parent region in addition to the new daughter HTD. Pass 'baseConf'
1120    * to the daughter regions to avoid this tricky dedupe problem.
1121    * @return Configuration object
1122    */
1123   Configuration getBaseConf() {
1124     return this.baseConf;
1125   }
1126 
1127   /** @return {@link FileSystem} being used by this region */
1128   public FileSystem getFilesystem() {
1129     return fs.getFileSystem();
1130   }
1131 
1132   /** @return the {@link HRegionFileSystem} used by this region */
1133   public HRegionFileSystem getRegionFileSystem() {
1134     return this.fs;
1135   }
1136 
1137   /** @return the last time the region was flushed */
1138   public long getLastFlushTime() {
1139     return this.lastFlushTime;
1140   }
1141 
1142   //////////////////////////////////////////////////////////////////////////////
1143   // HRegion maintenance.
1144   //
1145   // These methods are meant to be called periodically by the HRegionServer for
1146   // upkeep.
1147   //////////////////////////////////////////////////////////////////////////////
1148 
1149   /** @return returns size of largest HStore. */
1150   public long getLargestHStoreSize() {
1151     long size = 0;
1152     for (Store h : stores.values()) {
1153       long storeSize = h.getSize();
1154       if (storeSize > size) {
1155         size = storeSize;
1156       }
1157     }
1158     return size;
1159   }
1160 
1161   /*
1162    * Do preparation for pending compaction.
1163    * @throws IOException
1164    */
1165   protected void doRegionCompactionPrep() throws IOException {
1166   }
1167 
1168   void triggerMajorCompaction() {
1169     for (Store h : stores.values()) {
1170       h.triggerMajorCompaction();
1171     }
1172   }
1173 
1174   /**
1175    * This is a helper function that compact all the stores synchronously
1176    * It is used by utilities and testing
1177    *
1178    * @param majorCompaction True to force a major compaction regardless of thresholds
1179    * @throws IOException e
1180    */
1181   public void compactStores(final boolean majorCompaction)
1182   throws IOException {
1183     if (majorCompaction) {
1184       this.triggerMajorCompaction();
1185     }
1186     compactStores();
1187   }
1188 
1189   /**
1190    * This is a helper function that compact all the stores synchronously
1191    * It is used by utilities and testing
1192    *
1193    * @throws IOException e
1194    */
1195   public void compactStores() throws IOException {
1196     for (Store s : getStores().values()) {
1197       CompactionContext compaction = s.requestCompaction();
1198       if (compaction != null) {
1199         compact(compaction, s);
1200       }
1201     }
1202   }
1203 
1204   /*
1205    * Called by compaction thread and after region is opened to compact the
1206    * HStores if necessary.
1207    *
1208    * <p>This operation could block for a long time, so don't call it from a
1209    * time-sensitive thread.
1210    *
1211    * Note that no locking is necessary at this level because compaction only
1212    * conflicts with a region split, and that cannot happen because the region
1213    * server does them sequentially and not in parallel.
1214    *
1215    * @param cr Compaction details, obtained by requestCompaction()
1216    * @return whether the compaction completed
1217    * @throws IOException e
1218    */
1219   public boolean compact(CompactionContext compaction, Store store) throws IOException {
1220     assert compaction != null && compaction.hasSelection();
1221     assert !compaction.getRequest().getFiles().isEmpty();
1222     if (this.closing.get() || this.closed.get()) {
1223       LOG.debug("Skipping compaction on " + this + " because closing/closed");
1224       store.cancelRequestedCompaction(compaction);
1225       return false;
1226     }
1227     MonitoredTask status = null;
1228     boolean didPerformCompaction = false;
1229     // block waiting for the lock for compaction
1230     lock.readLock().lock();
1231     try {
1232       status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1233       if (this.closed.get()) {
1234         String msg = "Skipping compaction on " + this + " because closed";
1235         LOG.debug(msg);
1236         status.abort(msg);
1237         return false;
1238       }
1239       boolean wasStateSet = false;
1240       try {
1241         synchronized (writestate) {
1242           if (writestate.writesEnabled) {
1243             wasStateSet = true;
1244             ++writestate.compacting;
1245           } else {
1246             String msg = "NOT compacting region " + this + ". Writes disabled.";
1247             LOG.info(msg);
1248             status.abort(msg);
1249             return false;
1250           }
1251         }
1252         LOG.info("Starting compaction on " + store + " in region " + this
1253             + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1254         doRegionCompactionPrep();
1255         try {
1256           status.setStatus("Compacting store " + store);
1257           didPerformCompaction = true;
1258           store.compact(compaction);
1259         } catch (InterruptedIOException iioe) {
1260           String msg = "compaction interrupted";
1261           LOG.info(msg, iioe);
1262           status.abort(msg);
1263           return false;
1264         }
1265       } finally {
1266         if (wasStateSet) {
1267           synchronized (writestate) {
1268             --writestate.compacting;
1269             if (writestate.compacting <= 0) {
1270               writestate.notifyAll();
1271             }
1272           }
1273         }
1274       }
1275       status.markComplete("Compaction complete");
1276       return true;
1277     } finally {
1278       try {
1279         if (!didPerformCompaction) store.cancelRequestedCompaction(compaction);
1280         if (status != null) status.cleanup();
1281       } finally {
1282         lock.readLock().unlock();
1283       }
1284     }
1285   }
1286 
1287   /**
1288    * Flush the cache.
1289    *
1290    * When this method is called the cache will be flushed unless:
1291    * <ol>
1292    *   <li>the cache is empty</li>
1293    *   <li>the region is closed.</li>
1294    *   <li>a flush is already in progress</li>
1295    *   <li>writes are disabled</li>
1296    * </ol>
1297    *
1298    * <p>This method may block for some time, so it should not be called from a
1299    * time-sensitive thread.
1300    *
1301    * @return true if cache was flushed
1302    *
1303    * @throws IOException general io exceptions
1304    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1305    * because a Snapshot was not properly persisted.
1306    */
1307   public boolean flushcache() throws IOException {
1308     // fail-fast instead of waiting on the lock
1309     if (this.closing.get()) {
1310       LOG.debug("Skipping flush on " + this + " because closing");
1311       return false;
1312     }
1313     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1314     status.setStatus("Acquiring readlock on region");
1315     // block waiting for the lock for flushing cache
1316     lock.readLock().lock();
1317     try {
1318       if (this.closed.get()) {
1319         LOG.debug("Skipping flush on " + this + " because closed");
1320         status.abort("Skipped: closed");
1321         return false;
1322       }
1323       if (coprocessorHost != null) {
1324         status.setStatus("Running coprocessor pre-flush hooks");
1325         coprocessorHost.preFlush();
1326       }
1327       if (numPutsWithoutWAL.get() > 0) {
1328         numPutsWithoutWAL.set(0);
1329         dataInMemoryWithoutWAL.set(0);
1330       }
1331       synchronized (writestate) {
1332         if (!writestate.flushing && writestate.writesEnabled) {
1333           this.writestate.flushing = true;
1334         } else {
1335           if (LOG.isDebugEnabled()) {
1336             LOG.debug("NOT flushing memstore for region " + this
1337                 + ", flushing=" + writestate.flushing + ", writesEnabled="
1338                 + writestate.writesEnabled);
1339           }
1340           status.abort("Not flushing since "
1341               + (writestate.flushing ? "already flushing"
1342                   : "writes not enabled"));
1343           return false;
1344         }
1345       }
1346       try {
1347         boolean result = internalFlushcache(status);
1348 
1349         if (coprocessorHost != null) {
1350           status.setStatus("Running post-flush coprocessor hooks");
1351           coprocessorHost.postFlush();
1352         }
1353 
1354         status.markComplete("Flush successful");
1355         return result;
1356       } finally {
1357         synchronized (writestate) {
1358           writestate.flushing = false;
1359           this.writestate.flushRequested = false;
1360           writestate.notifyAll();
1361         }
1362       }
1363     } finally {
1364       lock.readLock().unlock();
1365       status.cleanup();
1366     }
1367   }
1368 
1369   /**
1370    * Should the memstore be flushed now
1371    */
1372   boolean shouldFlush() {
1373     if (flushCheckInterval <= 0) { //disabled
1374       return false;
1375     }
1376     long now = EnvironmentEdgeManager.currentTimeMillis();
1377     //if we flushed in the recent past, we don't need to do again now
1378     if ((now - getLastFlushTime() < flushCheckInterval)) {
1379       return false;
1380     }
1381     //since we didn't flush in the recent past, flush now if certain conditions
1382     //are met. Return true on first such memstore hit.
1383     for (Store s : this.getStores().values()) {
1384       if (s.timeOfOldestEdit() < now - flushCheckInterval) {
1385         // we have an old enough edit in the memstore, flush
1386         return true;
1387       }
1388     }
1389     return false;
1390   }
1391 
1392   /**
1393    * Flush the memstore.
1394    *
1395    * Flushing the memstore is a little tricky. We have a lot of updates in the
1396    * memstore, all of which have also been written to the log. We need to
1397    * write those updates in the memstore out to disk, while being able to
1398    * process reads/writes as much as possible during the flush operation. Also,
1399    * the log has to state clearly the point in time at which the memstore was
1400    * flushed. (That way, during recovery, we know when we can rely on the
1401    * on-disk flushed structures and when we have to recover the memstore from
1402    * the log.)
1403    *
1404    * <p>So, we have a three-step process:
1405    *
1406    * <ul><li>A. Flush the memstore to the on-disk stores, noting the current
1407    * sequence ID for the log.<li>
1408    *
1409    * <li>B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence
1410    * ID that was current at the time of memstore-flush.</li>
1411    *
1412    * <li>C. Get rid of the memstore structures that are now redundant, as
1413    * they've been flushed to the on-disk HStores.</li>
1414    * </ul>
1415    * <p>This method is protected, but can be accessed via several public
1416    * routes.
1417    *
1418    * <p> This method may block for some time.
1419    * @param status
1420    *
1421    * @return true if the region needs compacting
1422    *
1423    * @throws IOException general io exceptions
1424    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1425    * because a Snapshot was not properly persisted.
1426    */
1427   protected boolean internalFlushcache(MonitoredTask status)
1428       throws IOException {
1429     return internalFlushcache(this.log, -1, status);
1430   }
1431 
1432   /**
1433    * @param wal Null if we're NOT to go via hlog/wal.
1434    * @param myseqid The seqid to use if <code>wal</code> is null writing out
1435    * flush file.
1436    * @param status
1437    * @return true if the region needs compacting
1438    * @throws IOException
1439    * @see #internalFlushcache(MonitoredTask)
1440    */
1441   protected boolean internalFlushcache(
1442       final HLog wal, final long myseqid, MonitoredTask status)
1443   throws IOException {
1444     if (this.rsServices != null && this.rsServices.isAborted()) {
1445       // Don't flush when server aborting, it's unsafe
1446       throw new IOException("Aborting flush because server is abortted...");
1447     }
1448     final long startTime = EnvironmentEdgeManager.currentTimeMillis();
1449     // Clear flush flag.
1450     // Record latest flush time
1451     this.lastFlushTime = startTime;
1452     // If nothing to flush, return and avoid logging start/stop flush.
1453     if (this.memstoreSize.get() <= 0) {
1454       return false;
1455     }
1456     if (LOG.isDebugEnabled()) {
1457       LOG.debug("Started memstore flush for " + this +
1458         ", current region memstore size " +
1459         StringUtils.humanReadableInt(this.memstoreSize.get()) +
1460         ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
1461     }
1462 
1463     // Stop updates while we snapshot the memstore of all stores. We only have
1464     // to do this for a moment.  Its quick.  The subsequent sequence id that
1465     // goes into the HLog after we've flushed all these snapshots also goes
1466     // into the info file that sits beside the flushed files.
1467     // We also set the memstore size to zero here before we allow updates
1468     // again so its value will represent the size of the updates received
1469     // during the flush
1470     MultiVersionConsistencyControl.WriteEntry w = null;
1471 
1472     // We have to take a write lock during snapshot, or else a write could
1473     // end up in both snapshot and memstore (makes it difficult to do atomic
1474     // rows then)
1475     status.setStatus("Obtaining lock to block concurrent updates");
1476     // block waiting for the lock for internal flush
1477     this.updatesLock.writeLock().lock();
1478     long flushsize = this.memstoreSize.get();
1479     status.setStatus("Preparing to flush by snapshotting stores");
1480     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
1481     long flushSeqId = -1L;
1482     try {
1483       // Record the mvcc for all transactions in progress.
1484       w = mvcc.beginMemstoreInsert();
1485       mvcc.advanceMemstore(w);
1486 
1487       if (wal != null) {
1488         Long startSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1489         if (startSeqId == null) {
1490           status.setStatus("Flush will not be started for [" + this.getRegionInfo().getEncodedName()
1491               + "] - WAL is going away");
1492           return false;
1493         }
1494         flushSeqId = startSeqId.longValue();
1495       } else {
1496         flushSeqId = myseqid;
1497       }
1498 
1499       for (Store s : stores.values()) {
1500         storeFlushCtxs.add(s.createFlushContext(flushSeqId));
1501       }
1502 
1503       // prepare flush (take a snapshot)
1504       for (StoreFlushContext flush : storeFlushCtxs) {
1505         flush.prepare();
1506       }
1507     } finally {
1508       this.updatesLock.writeLock().unlock();
1509     }
1510     String s = "Finished snapshotting " + this +
1511       ", commencing wait for mvcc, flushsize=" + flushsize;
1512     status.setStatus(s);
1513     LOG.debug(s);
1514 
1515     // sync unflushed WAL changes when deferred log sync is enabled
1516     // see HBASE-8208 for details
1517     if (wal != null && isDeferredLogSyncEnabled()) {
1518       wal.sync();
1519     }
1520 
1521     // wait for all in-progress transactions to commit to HLog before
1522     // we can start the flush. This prevents
1523     // uncommitted transactions from being written into HFiles.
1524     // We have to block before we start the flush, otherwise keys that
1525     // were removed via a rollbackMemstore could be written to Hfiles.
1526     mvcc.waitForRead(w);
1527 
1528     status.setStatus("Flushing stores");
1529     LOG.debug("Finished snapshotting, commencing flushing stores");
1530 
1531     // Any failure from here on out will be catastrophic requiring server
1532     // restart so hlog content can be replayed and put back into the memstore.
1533     // Otherwise, the snapshot content while backed up in the hlog, it will not
1534     // be part of the current running servers state.
1535     boolean compactionRequested = false;
1536     try {
1537       // A.  Flush memstore to all the HStores.
1538       // Keep running vector of all store files that includes both old and the
1539       // just-made new flush store file. The new flushed file is still in the
1540       // tmp directory.
1541 
1542       for (StoreFlushContext flush : storeFlushCtxs) {
1543         flush.flushCache(status);
1544       }
1545 
1546       // Switch snapshot (in memstore) -> new hfile (thus causing
1547       // all the store scanners to reset/reseek).
1548       for (StoreFlushContext flush : storeFlushCtxs) {
1549         boolean needsCompaction = flush.commit(status);
1550         if (needsCompaction) {
1551           compactionRequested = true;
1552         }
1553       }
1554       storeFlushCtxs.clear();
1555 
1556       // Set down the memstore size by amount of flush.
1557       this.addAndGetGlobalMemstoreSize(-flushsize);
1558     } catch (Throwable t) {
1559       // An exception here means that the snapshot was not persisted.
1560       // The hlog needs to be replayed so its content is restored to memstore.
1561       // Currently, only a server restart will do this.
1562       // We used to only catch IOEs but its possible that we'd get other
1563       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
1564       // all and sundry.
1565       if (wal != null) {
1566         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1567       }
1568       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
1569           Bytes.toStringBinary(getRegionName()));
1570       dse.initCause(t);
1571       status.abort("Flush failed: " + StringUtils.stringifyException(t));
1572       throw dse;
1573     }
1574 
1575     // If we get to here, the HStores have been written.
1576     if (wal != null) {
1577       wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1578     }
1579 
1580     // Update the last flushed sequence id for region
1581     if (this.rsServices != null) {
1582       completeSequenceId = flushSeqId;
1583     }
1584 
1585     // C. Finally notify anyone waiting on memstore to clear:
1586     // e.g. checkResources().
1587     synchronized (this) {
1588       notifyAll(); // FindBugs NN_NAKED_NOTIFY
1589     }
1590 
1591     long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
1592     long memstoresize = this.memstoreSize.get();
1593     String msg = "Finished memstore flush of ~" +
1594       StringUtils.humanReadableInt(flushsize) + "/" + flushsize +
1595       ", currentsize=" +
1596       StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize +
1597       " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
1598       ", compaction requested=" + compactionRequested +
1599       ((wal == null)? "; wal=null": "");
1600     LOG.info(msg);
1601     status.setStatus(msg);
1602     this.recentFlushes.add(new Pair<Long,Long>(time/1000, flushsize));
1603 
1604     return compactionRequested;
1605   }
1606 
1607   //////////////////////////////////////////////////////////////////////////////
1608   // get() methods for client use.
1609   //////////////////////////////////////////////////////////////////////////////
1610   /**
1611    * Return all the data for the row that matches <i>row</i> exactly,
1612    * or the one that immediately preceeds it, at or immediately before
1613    * <i>ts</i>.
1614    *
1615    * @param row row key
1616    * @return map of values
1617    * @throws IOException
1618    */
1619   Result getClosestRowBefore(final byte [] row)
1620   throws IOException{
1621     return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
1622   }
1623 
1624   /**
1625    * Return all the data for the row that matches <i>row</i> exactly,
1626    * or the one that immediately preceeds it, at or immediately before
1627    * <i>ts</i>.
1628    *
1629    * @param row row key
1630    * @param family column family to find on
1631    * @return map of values
1632    * @throws IOException read exceptions
1633    */
1634   public Result getClosestRowBefore(final byte [] row, final byte [] family)
1635   throws IOException {
1636     if (coprocessorHost != null) {
1637       Result result = new Result();
1638       if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
1639         return result;
1640       }
1641     }
1642     // look across all the HStores for this region and determine what the
1643     // closest key is across all column families, since the data may be sparse
1644     checkRow(row, "getClosestRowBefore");
1645     startRegionOperation(Operation.GET);
1646     this.readRequestsCount.increment();
1647     try {
1648       Store store = getStore(family);
1649       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
1650       KeyValue key = store.getRowKeyAtOrBefore(row);
1651       Result result = null;
1652       if (key != null) {
1653         Get get = new Get(key.getRow());
1654         get.addFamily(family);
1655         result = get(get);
1656       }
1657       if (coprocessorHost != null) {
1658         coprocessorHost.postGetClosestRowBefore(row, family, result);
1659       }
1660       return result;
1661     } finally {
1662       closeRegionOperation();
1663     }
1664   }
1665 
1666   /**
1667    * Return an iterator that scans over the HRegion, returning the indicated
1668    * columns and rows specified by the {@link Scan}.
1669    * <p>
1670    * This Iterator must be closed by the caller.
1671    *
1672    * @param scan configured {@link Scan}
1673    * @return RegionScanner
1674    * @throws IOException read exceptions
1675    */
1676   public RegionScanner getScanner(Scan scan) throws IOException {
1677    return getScanner(scan, null);
1678   }
1679 
1680   void prepareScanner(Scan scan) throws IOException {
1681     if(!scan.hasFamilies()) {
1682       // Adding all families to scanner
1683       for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
1684         scan.addFamily(family);
1685       }
1686     }
1687   }
1688 
1689   protected RegionScanner getScanner(Scan scan,
1690       List<KeyValueScanner> additionalScanners) throws IOException {
1691     startRegionOperation(Operation.SCAN);
1692     try {
1693       // Verify families are all valid
1694       prepareScanner(scan);
1695       if(scan.hasFamilies()) {
1696         for(byte [] family : scan.getFamilyMap().keySet()) {
1697           checkFamily(family);
1698         }
1699       }
1700       return instantiateRegionScanner(scan, additionalScanners);
1701     } finally {
1702       closeRegionOperation();
1703     }
1704   }
1705 
1706   protected RegionScanner instantiateRegionScanner(Scan scan,
1707       List<KeyValueScanner> additionalScanners) throws IOException {
1708     return new RegionScannerImpl(scan, additionalScanners, this);
1709   }
1710 
1711   /*
1712    * @param delete The passed delete is modified by this method. WARNING!
1713    */
1714   void prepareDelete(Delete delete) throws IOException {
1715     // Check to see if this is a deleteRow insert
1716     if(delete.getFamilyMap().isEmpty()){
1717       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
1718         // Don't eat the timestamp
1719         delete.deleteFamily(family, delete.getTimeStamp());
1720       }
1721     } else {
1722       for(byte [] family : delete.getFamilyMap().keySet()) {
1723         if(family == null) {
1724           throw new NoSuchColumnFamilyException("Empty family is invalid");
1725         }
1726         checkFamily(family);
1727       }
1728     }
1729   }
1730 
1731   //////////////////////////////////////////////////////////////////////////////
1732   // set() methods for client use.
1733   //////////////////////////////////////////////////////////////////////////////
1734   /**
1735    * @param delete delete object
1736    * @throws IOException read exceptions
1737    */
1738   public void delete(Delete delete)
1739   throws IOException {
1740     checkReadOnly();
1741     checkResources();
1742     startRegionOperation(Operation.DELETE);
1743     this.writeRequestsCount.increment();
1744     try {
1745       delete.getRow();
1746       // All edits for the given row (across all column families) must happen atomically.
1747       doBatchMutate(delete, null);
1748     } finally {
1749       closeRegionOperation();
1750     }
1751   }
1752 
1753   /**
1754    * Row needed by below method.
1755    */
1756   private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
1757   /**
1758    * This is used only by unit tests. Not required to be a public API.
1759    * @param familyMap map of family to edits for the given family.
1760    * @param clusterId
1761    * @param durability
1762    * @throws IOException
1763    */
1764   void delete(NavigableMap<byte[], List<? extends Cell>> familyMap, UUID clusterId,
1765       Durability durability) throws IOException {
1766     Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
1767     delete.setFamilyMap(familyMap);
1768     delete.setClusterId(clusterId);
1769     delete.setDurability(durability);
1770     doBatchMutate(delete, null);
1771   }
1772 
1773   /**
1774    * Setup correct timestamps in the KVs in Delete object.
1775    * Caller should have the row and region locks.
1776    * @param familyMap
1777    * @param byteNow
1778    * @throws IOException
1779    */
1780   void prepareDeleteTimestamps(Map<byte[], List<? extends Cell>> familyMap, byte[] byteNow)
1781       throws IOException {
1782     for (Map.Entry<byte[], List<? extends Cell>> e : familyMap.entrySet()) {
1783 
1784       byte[] family = e.getKey();
1785       List<? extends Cell> cells = e.getValue();
1786       Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
1787 
1788       for (Cell cell: cells) {
1789         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
1790         //  Check if time is LATEST, change to time of most recent addition if so
1791         //  This is expensive.
1792         if (kv.isLatestTimestamp() && kv.isDeleteType()) {
1793           byte[] qual = kv.getQualifier();
1794           if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
1795 
1796           Integer count = kvCount.get(qual);
1797           if (count == null) {
1798             kvCount.put(qual, 1);
1799           } else {
1800             kvCount.put(qual, count + 1);
1801           }
1802           count = kvCount.get(qual);
1803 
1804           Get get = new Get(kv.getRow());
1805           get.setMaxVersions(count);
1806           get.addColumn(family, qual);
1807 
1808           List<KeyValue> result = get(get, false);
1809 
1810           if (result.size() < count) {
1811             // Nothing to delete
1812             kv.updateLatestStamp(byteNow);
1813             continue;
1814           }
1815           if (result.size() > count) {
1816             throw new RuntimeException("Unexpected size: " + result.size());
1817           }
1818           KeyValue getkv = result.get(count - 1);
1819           Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
1820               getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
1821         } else {
1822           kv.updateLatestStamp(byteNow);
1823         }
1824       }
1825     }
1826   }
1827 
1828   /**
1829    * @param put
1830    * @throws IOException
1831    */
1832   public void put(Put put)
1833   throws IOException {
1834     checkReadOnly();
1835 
1836     // Do a rough check that we have resources to accept a write.  The check is
1837     // 'rough' in that between the resource check and the call to obtain a
1838     // read lock, resources may run out.  For now, the thought is that this
1839     // will be extremely rare; we'll deal with it when it happens.
1840     checkResources();
1841     startRegionOperation(Operation.PUT);
1842     this.writeRequestsCount.increment();
1843     try {
1844       // All edits for the given row (across all column families) must happen atomically.
1845       doBatchMutate(put, null);
1846     } finally {
1847       closeRegionOperation();
1848     }
1849   }
1850 
1851   /**
1852    * Struct-like class that tracks the progress of a batch operation,
1853    * accumulating status codes and tracking the index at which processing
1854    * is proceeding.
1855    */
1856   private static class BatchOperationInProgress<T> {
1857     T[] operations;
1858     int nextIndexToProcess = 0;
1859     OperationStatus[] retCodeDetails;
1860     WALEdit[] walEditsFromCoprocessors;
1861 
1862     public BatchOperationInProgress(T[] operations) {
1863       this.operations = operations;
1864       this.retCodeDetails = new OperationStatus[operations.length];
1865       this.walEditsFromCoprocessors = new WALEdit[operations.length];
1866       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
1867     }
1868 
1869     public boolean isDone() {
1870       return nextIndexToProcess == operations.length;
1871     }
1872   }
1873 
1874   /**
1875    * Perform a batch put with no pre-specified locks
1876    * @see HRegion#batchMutate(Pair[])
1877    */
1878   public OperationStatus[] put(Put[] puts) throws IOException {
1879     @SuppressWarnings("unchecked")
1880     Pair<Mutation, Integer> putsAndLocks[] = new Pair[puts.length];
1881 
1882     for (int i = 0; i < puts.length; i++) {
1883       putsAndLocks[i] = new Pair<Mutation, Integer>(puts[i], null);
1884     }
1885     return batchMutate(putsAndLocks);
1886   }
1887 
1888   /**
1889    * Perform a batch of mutations.
1890    * It supports only Put and Delete mutations and will ignore other types passed.
1891    * @param mutationsAndLocks
1892    *          the list of mutations paired with their requested lock IDs.
1893    * @return an array of OperationStatus which internally contains the
1894    *         OperationStatusCode and the exceptionMessage if any.
1895    * @throws IOException
1896    */
1897   public OperationStatus[] batchMutate(
1898       Pair<Mutation, Integer>[] mutationsAndLocks) throws IOException {
1899     return batchMutate(mutationsAndLocks, false);
1900   }
1901  
1902   /**
1903    * Perform a batch of mutations.
1904    * It supports only Put and Delete mutations and will ignore other types passed.
1905    * @param mutationsAndLocks
1906    *          the list of mutations paired with their requested lock IDs.
1907    * @return an array of OperationStatus which internally contains the
1908    *         OperationStatusCode and the exceptionMessage if any.
1909    * @throws IOException
1910    */
1911   OperationStatus[] batchMutate(Pair<Mutation, Integer>[] mutationsAndLocks, boolean isReplay)
1912       throws IOException {
1913     BatchOperationInProgress<Pair<Mutation, Integer>> batchOp =
1914       new BatchOperationInProgress<Pair<Mutation,Integer>>(mutationsAndLocks);
1915 
1916     boolean initialized = false;
1917 
1918     while (!batchOp.isDone()) {
1919       if (!isReplay) {
1920         checkReadOnly();
1921       }
1922       checkResources();
1923 
1924       long newSize;
1925       startRegionOperation();
1926 
1927       try {
1928         if (!initialized) {
1929           if (!isReplay) {
1930             this.writeRequestsCount.increment();
1931             doPreMutationHook(batchOp);
1932           }
1933           initialized = true;
1934         }
1935         long addedSize = doMiniBatchMutation(batchOp, isReplay);
1936         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
1937       } finally {
1938         closeRegionOperation();
1939       }
1940       if (isFlushSize(newSize)) {
1941         requestFlush();
1942       }
1943     }
1944     return batchOp.retCodeDetails;
1945   }
1946   
1947 
1948   private void doPreMutationHook(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp)
1949       throws IOException {
1950     /* Run coprocessor pre hook outside of locks to avoid deadlock */
1951     WALEdit walEdit = new WALEdit();
1952     if (coprocessorHost != null) {
1953       for (int i = 0 ; i < batchOp.operations.length; i++) {
1954         Pair<Mutation, Integer> nextPair = batchOp.operations[i];
1955         Mutation m = nextPair.getFirst();
1956         if (m instanceof Put) {
1957           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
1958             // pre hook says skip this Put
1959             // mark as success and skip in doMiniBatchMutation
1960             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
1961           }
1962         } else if (m instanceof Delete) {
1963           if (coprocessorHost.preDelete((Delete) m, walEdit, m.getDurability())) {
1964             // pre hook says skip this Delete
1965             // mark as success and skip in doMiniBatchMutation
1966             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
1967           }
1968         } else {
1969           // In case of passing Append mutations along with the Puts and Deletes in batchMutate
1970           // mark the operation return code as failure so that it will not be considered in
1971           // the doMiniBatchMutation
1972           batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
1973               "Put/Delete mutations only supported in batchMutate() now");
1974         }
1975         if (!walEdit.isEmpty()) {
1976           batchOp.walEditsFromCoprocessors[i] = walEdit;
1977           walEdit = new WALEdit();
1978         }
1979       }
1980     }
1981   }
1982 
1983   @SuppressWarnings("unchecked")
1984   private long doMiniBatchMutation(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
1985       boolean isInReplay) throws IOException {
1986 
1987     // variable to note if all Put items are for the same CF -- metrics related
1988     boolean putsCfSetConsistent = true;
1989     //The set of columnFamilies first seen for Put.
1990     Set<byte[]> putsCfSet = null;
1991     // variable to note if all Delete items are for the same CF -- metrics related
1992     boolean deletesCfSetConsistent = true;
1993     //The set of columnFamilies first seen for Delete.
1994     Set<byte[]> deletesCfSet = null;
1995 
1996     WALEdit walEdit = new WALEdit(isInReplay);
1997     MultiVersionConsistencyControl.WriteEntry w = null;
1998     long txid = 0;
1999     boolean walSyncSuccessful = false;
2000     boolean locked = false;
2001 
2002     /** Keep track of the locks we hold so we can release them in finally clause */
2003     List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2004     // reference family maps directly so coprocessors can mutate them if desired
2005     Map<byte[], List<? extends Cell>>[] familyMaps = new Map[batchOp.operations.length];
2006     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
2007     int firstIndex = batchOp.nextIndexToProcess;
2008     int lastIndexExclusive = firstIndex;
2009     boolean success = false;
2010     int noOfPuts = 0, noOfDeletes = 0;
2011     try {
2012       // ------------------------------------
2013       // STEP 1. Try to acquire as many locks as we can, and ensure
2014       // we acquire at least one.
2015       // ----------------------------------
2016       int numReadyToWrite = 0;
2017       long now = EnvironmentEdgeManager.currentTimeMillis();
2018       while (lastIndexExclusive < batchOp.operations.length) {
2019         Pair<Mutation, Integer> nextPair = batchOp.operations[lastIndexExclusive];
2020         Mutation mutation = nextPair.getFirst();
2021         boolean isPutMutation = mutation instanceof Put;
2022         Integer providedLockId = nextPair.getSecond();
2023 
2024         Map<byte[], List<? extends Cell>> familyMap = mutation.getFamilyMap();
2025         // store the family map reference to allow for mutations
2026         familyMaps[lastIndexExclusive] = familyMap;
2027 
2028         // skip anything that "ran" already
2029         if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2030             != OperationStatusCode.NOT_RUN) {
2031           lastIndexExclusive++;
2032           continue;
2033         }
2034 
2035         try {
2036           if (isPutMutation) {
2037             // Check the families in the put. If bad, skip this one.
2038             if (isInReplay) {
2039               removeNonExistentColumnFamilyForReplay(familyMap);
2040             } else {
2041               checkFamilies(familyMap.keySet());
2042             }
2043             checkTimestamps(mutation.getFamilyMap(), now);
2044           } else {
2045             prepareDelete((Delete) mutation);
2046           }
2047         } catch (NoSuchColumnFamilyException nscf) {
2048           LOG.warn("No such column family in batch mutation", nscf);
2049           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2050               OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2051           lastIndexExclusive++;
2052           continue;
2053         } catch (FailedSanityCheckException fsce) {
2054           LOG.warn("Batch Mutation did not pass sanity check", fsce);
2055           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2056               OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2057           lastIndexExclusive++;
2058           continue;
2059         }
2060         // If we haven't got any rows in our batch, we should block to
2061         // get the next one.
2062         boolean shouldBlock = numReadyToWrite == 0;
2063         Integer acquiredLockId = null;
2064         try {
2065           acquiredLockId = getLock(providedLockId, mutation.getRow(),
2066               shouldBlock);
2067         } catch (IOException ioe) {
2068           LOG.warn("Failed getting lock in batch put, row="
2069                   + Bytes.toStringBinary(mutation.getRow()), ioe);
2070         }
2071         if (acquiredLockId == null) {
2072           // We failed to grab another lock
2073           assert !shouldBlock : "Should never fail to get lock when blocking";
2074           break; // stop acquiring more rows for this batch
2075         }
2076         if (providedLockId == null) {
2077           acquiredLocks.add(acquiredLockId);
2078         }
2079         lastIndexExclusive++;
2080         numReadyToWrite++;
2081 
2082         if (isPutMutation) {
2083           // If Column Families stay consistent through out all of the
2084           // individual puts then metrics can be reported as a mutliput across
2085           // column families in the first put.
2086           if (putsCfSet == null) {
2087             putsCfSet = mutation.getFamilyMap().keySet();
2088           } else {
2089             putsCfSetConsistent = putsCfSetConsistent
2090                 && mutation.getFamilyMap().keySet().equals(putsCfSet);
2091           }
2092         } else {
2093           if (deletesCfSet == null) {
2094             deletesCfSet = mutation.getFamilyMap().keySet();
2095           } else {
2096             deletesCfSetConsistent = deletesCfSetConsistent
2097                 && mutation.getFamilyMap().keySet().equals(deletesCfSet);
2098           }
2099         }
2100       }
2101       
2102       // we should record the timestamp only after we have acquired the rowLock,
2103       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
2104       now = EnvironmentEdgeManager.currentTimeMillis();
2105       byte[] byteNow = Bytes.toBytes(now);
2106 
2107       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
2108       if (numReadyToWrite <= 0) return 0L;
2109 
2110       // We've now grabbed as many mutations off the list as we can
2111 
2112       // ------------------------------------
2113       // STEP 2. Update any LATEST_TIMESTAMP timestamps
2114       // ----------------------------------
2115       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2116         // skip invalid
2117         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2118             != OperationStatusCode.NOT_RUN) continue;
2119 
2120         Mutation mutation = batchOp.operations[i].getFirst();
2121         if (mutation instanceof Put) {
2122           updateKVTimestamps(familyMaps[i].values(), byteNow);
2123           noOfPuts++;
2124         } else {
2125           prepareDeleteTimestamps(familyMaps[i], byteNow);
2126           noOfDeletes++;
2127         }
2128       }
2129 
2130       lock(this.updatesLock.readLock(), numReadyToWrite);
2131       locked = true;
2132 
2133       //
2134       // ------------------------------------
2135       // Acquire the latest mvcc number
2136       // ----------------------------------
2137       w = mvcc.beginMemstoreInsert();
2138 
2139       // calling the pre CP hook for batch mutation
2140       if (!isInReplay && coprocessorHost != null) {
2141         MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp = 
2142           new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations, 
2143           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2144         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2145       }
2146 
2147       // ------------------------------------
2148       // STEP 3. Write back to memstore
2149       // Write to memstore. It is ok to write to memstore
2150       // first without updating the HLog because we do not roll
2151       // forward the memstore MVCC. The MVCC will be moved up when
2152       // the complete operation is done. These changes are not yet
2153       // visible to scanners till we update the MVCC. The MVCC is
2154       // moved only when the sync is complete.
2155       // ----------------------------------
2156       long addedSize = 0;
2157       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2158         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2159             != OperationStatusCode.NOT_RUN) {
2160           continue;
2161         }
2162         addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
2163       }
2164 
2165       // ------------------------------------
2166       // STEP 4. Build WAL edit
2167       // ----------------------------------
2168       Durability durability = Durability.USE_DEFAULT;
2169       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2170         // Skip puts that were determined to be invalid during preprocessing
2171         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2172             != OperationStatusCode.NOT_RUN) {
2173           continue;
2174         }
2175         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2176 
2177         Mutation m = batchOp.operations[i].getFirst();
2178         Durability tmpDur = m.getDurability();
2179         if (tmpDur.ordinal() > durability.ordinal()) {
2180           durability = tmpDur;
2181         }
2182         if (tmpDur == Durability.SKIP_WAL) {
2183           if (m instanceof Put) {
2184             recordPutWithoutWal(m.getFamilyMap());
2185           }
2186           continue;
2187         }
2188 
2189         // Add WAL edits by CP
2190         WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2191         if (fromCP != null) {
2192           for (KeyValue kv : fromCP.getKeyValues()) {
2193             walEdit.add(kv);
2194           }
2195         }
2196         addFamilyMapToWALEdit(familyMaps[i], walEdit);
2197       }
2198 
2199       // -------------------------
2200       // STEP 5. Append the edit to WAL. Do not sync wal.
2201       // -------------------------
2202       Mutation first = batchOp.operations[firstIndex].getFirst();
2203       txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
2204                walEdit, first.getClusterId(), now, this.htableDescriptor);
2205 
2206       // -------------------------------
2207       // STEP 6. Release row locks, etc.
2208       // -------------------------------
2209       if (locked) {
2210         this.updatesLock.readLock().unlock();
2211         locked = false;
2212       }
2213       if (acquiredLocks != null) {
2214         for (Integer toRelease : acquiredLocks) {
2215           releaseRowLock(toRelease);
2216         }
2217         acquiredLocks = null;
2218       }
2219       // -------------------------
2220       // STEP 7. Sync wal.
2221       // -------------------------
2222       if (walEdit.size() > 0) {
2223         syncOrDefer(txid, durability);
2224       }
2225       walSyncSuccessful = true;
2226       // calling the post CP hook for batch mutation
2227       if (!isInReplay && coprocessorHost != null) {
2228         MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp = 
2229           new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations, 
2230           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2231         coprocessorHost.postBatchMutate(miniBatchOp);
2232       }
2233 
2234       // ------------------------------------------------------------------
2235       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
2236       // ------------------------------------------------------------------
2237       if (w != null) {
2238         mvcc.completeMemstoreInsert(w);
2239         w = null;
2240       }
2241 
2242       // ------------------------------------
2243       // STEP 9. Run coprocessor post hooks. This should be done after the wal is
2244       // synced so that the coprocessor contract is adhered to.
2245       // ------------------------------------
2246       if (!isInReplay && coprocessorHost != null) {
2247         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2248           // only for successful puts
2249           if (batchOp.retCodeDetails[i].getOperationStatusCode()
2250               != OperationStatusCode.SUCCESS) {
2251             continue;
2252           }
2253           Mutation m = batchOp.operations[i].getFirst();
2254           if (m instanceof Put) {
2255             coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
2256           } else {
2257             coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
2258           }
2259         }
2260       }
2261 
2262       success = true;
2263       return addedSize;
2264     } finally {
2265 
2266       // if the wal sync was unsuccessful, remove keys from memstore
2267       if (!walSyncSuccessful) {
2268         rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
2269       }
2270       if (w != null) mvcc.completeMemstoreInsert(w);
2271 
2272       if (locked) {
2273         this.updatesLock.readLock().unlock();
2274       }
2275 
2276       if (acquiredLocks != null) {
2277         for (Integer toRelease : acquiredLocks) {
2278           releaseRowLock(toRelease);
2279         }
2280       }
2281 
2282       // See if the column families were consistent through the whole thing.
2283       // if they were then keep them. If they were not then pass a null.
2284       // null will be treated as unknown.
2285       // Total time taken might be involving Puts and Deletes.
2286       // Split the time for puts and deletes based on the total number of Puts and Deletes.
2287 
2288       if (noOfPuts > 0) {
2289         // There were some Puts in the batch.
2290         if (this.metricsRegion != null) {
2291           this.metricsRegion.updatePut();
2292         }
2293       }
2294       if (noOfDeletes > 0) {
2295         // There were some Deletes in the batch.
2296         if (this.metricsRegion != null) {
2297           this.metricsRegion.updateDelete();
2298         }
2299       }
2300       if (!success) {
2301         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2302           if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
2303             batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
2304           }
2305         }
2306       }
2307       batchOp.nextIndexToProcess = lastIndexExclusive;
2308     }
2309   }
2310 
2311   //TODO, Think that gets/puts and deletes should be refactored a bit so that
2312   //the getting of the lock happens before, so that you would just pass it into
2313   //the methods. So in the case of checkAndMutate you could just do lockRow,
2314   //get, put, unlockRow or something
2315   /**
2316    *
2317    * @param row
2318    * @param family
2319    * @param qualifier
2320    * @param compareOp
2321    * @param comparator
2322    * @param w
2323    * @param writeToWAL
2324    * @throws IOException
2325    * @return true if the new put was executed, false otherwise
2326    */
2327   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
2328       CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
2329       boolean writeToWAL)
2330   throws IOException{
2331     checkReadOnly();
2332     //TODO, add check for value length or maybe even better move this to the
2333     //client if this becomes a global setting
2334     checkResources();
2335     boolean isPut = w instanceof Put;
2336     if (!isPut && !(w instanceof Delete))
2337       throw new org.apache.hadoop.hbase.exceptions.DoNotRetryIOException("Action must be Put or Delete");
2338     Row r = (Row)w;
2339     if (!Bytes.equals(row, r.getRow())) {
2340       throw new org.apache.hadoop.hbase.exceptions.DoNotRetryIOException("Action's getRow must match the passed row");
2341     }
2342 
2343     startRegionOperation();
2344     try {
2345       Get get = new Get(row);
2346       checkFamily(family);
2347       get.addColumn(family, qualifier);
2348 
2349       // Lock row
2350       Integer lid = getLock(null, get.getRow(), true);
2351       // wait for all previous transactions to complete (with lock held)
2352       mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
2353       List<KeyValue> result = null;
2354       try {
2355         result = get(get, false);
2356 
2357         boolean valueIsNull = comparator.getValue() == null ||
2358           comparator.getValue().length == 0;
2359         boolean matches = false;
2360         if (result.size() == 0 && valueIsNull) {
2361           matches = true;
2362         } else if (result.size() > 0 && result.get(0).getValue().length == 0 &&
2363             valueIsNull) {
2364           matches = true;
2365         } else if (result.size() == 1 && !valueIsNull) {
2366           KeyValue kv = result.get(0);
2367           int compareResult = comparator.compareTo(kv.getBuffer(),
2368               kv.getValueOffset(), kv.getValueLength());
2369           switch (compareOp) {
2370           case LESS:
2371             matches = compareResult <= 0;
2372             break;
2373           case LESS_OR_EQUAL:
2374             matches = compareResult < 0;
2375             break;
2376           case EQUAL:
2377             matches = compareResult == 0;
2378             break;
2379           case NOT_EQUAL:
2380             matches = compareResult != 0;
2381             break;
2382           case GREATER_OR_EQUAL:
2383             matches = compareResult > 0;
2384             break;
2385           case GREATER:
2386             matches = compareResult >= 0;
2387             break;
2388           default:
2389             throw new RuntimeException("Unknown Compare op " + compareOp.name());
2390           }
2391         }
2392         //If matches put the new put or delete the new delete
2393         if (matches) {
2394           // All edits for the given row (across all column families) must
2395           // happen atomically.
2396           doBatchMutate((Mutation)w, lid);
2397           this.checkAndMutateChecksPassed.increment();
2398           return true;
2399         }
2400         this.checkAndMutateChecksFailed.increment();
2401         return false;
2402       } finally {
2403         releaseRowLock(lid);
2404       }
2405     } finally {
2406       closeRegionOperation();
2407     }
2408   }
2409 
2410   @SuppressWarnings("unchecked")
2411   private void doBatchMutate(Mutation mutation, Integer lid) throws IOException,
2412       org.apache.hadoop.hbase.exceptions.DoNotRetryIOException {
2413     Pair<Mutation, Integer>[] mutateWithLocks = new Pair[] {
2414       new Pair<Mutation, Integer>(mutation, lid)
2415     };
2416     OperationStatus[] batchMutate = this.batchMutate(mutateWithLocks);
2417     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
2418       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
2419     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
2420       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
2421     }
2422   }
2423 
2424   /**
2425    * Complete taking the snapshot on the region. Writes the region info and adds references to the
2426    * working snapshot directory.
2427    *
2428    * TODO for api consistency, consider adding another version with no {@link ForeignExceptionSnare}
2429    * arg.  (In the future other cancellable HRegion methods could eventually add a
2430    * {@link ForeignExceptionSnare}, or we could do something fancier).
2431    *
2432    * @param desc snasphot description object
2433    * @param exnSnare ForeignExceptionSnare that captures external exeptions in case we need to
2434    *   bail out.  This is allowed to be null and will just be ignored in that case.
2435    * @throws IOException if there is an external or internal error causing the snapshot to fail
2436    */
2437   public void addRegionToSnapshot(SnapshotDescription desc,
2438       ForeignExceptionSnare exnSnare) throws IOException {
2439     // This should be "fast" since we don't rewrite store files but instead
2440     // back up the store files by creating a reference
2441     Path rootDir = FSUtils.getRootDir(this.rsServices.getConfiguration());
2442     Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
2443 
2444     // 1. dump region meta info into the snapshot directory
2445     LOG.debug("Storing region-info for snapshot.");
2446     HRegionFileSystem snapshotRegionFs = HRegionFileSystem.createRegionOnFileSystem(conf,
2447         this.fs.getFileSystem(), snapshotDir, getRegionInfo());
2448 
2449     // 2. iterate through all the stores in the region
2450     LOG.debug("Creating references for hfiles");
2451 
2452     // This ensures that we have an atomic view of the directory as long as we have < ls limit
2453     // (batch size of the files in a directory) on the namenode. Otherwise, we get back the files in
2454     // batches and may miss files being added/deleted. This could be more robust (iteratively
2455     // checking to see if we have all the files until we are sure), but the limit is currently 1000
2456     // files/batch, far more than the number of store files under a single column family.
2457     for (Store store : stores.values()) {
2458       // 2.1. build the snapshot reference directory for the store
2459       Path dstStoreDir = snapshotRegionFs.getStoreDir(store.getFamily().getNameAsString());
2460       List<StoreFile> storeFiles = new ArrayList<StoreFile>(store.getStorefiles());
2461       if (LOG.isDebugEnabled()) {
2462         LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
2463       }
2464 
2465       // 2.2. iterate through all the store's files and create "references".
2466       int sz = storeFiles.size();
2467       for (int i = 0; i < sz; i++) {
2468         if (exnSnare != null) {
2469           exnSnare.rethrowException();
2470         }
2471         Path file = storeFiles.get(i).getPath();
2472         // create "reference" to this store file.  It is intentionally an empty file -- all
2473         // necessary infomration is captured by its fs location and filename.  This allows us to
2474         // only figure out what needs to be done via a single nn operation (instead of having to
2475         // open and read the files as well).
2476         LOG.debug("Creating reference for file (" + (i+1) + "/" + sz + ") : " + file);
2477         Path referenceFile = new Path(dstStoreDir, file.getName());
2478         boolean success = fs.getFileSystem().createNewFile(referenceFile);
2479         if (!success) {
2480           throw new IOException("Failed to create reference file:" + referenceFile);
2481         }
2482       }
2483     }
2484   }
2485 
2486   /**
2487    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the
2488    * provided current timestamp.
2489    */
2490   void updateKVTimestamps(final Iterable<List<? extends Cell>> keyLists, final byte[] now) {
2491     for (List<? extends Cell> cells: keyLists) {
2492       if (cells == null) continue;
2493       for (Cell cell : cells) {
2494         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2495         kv.updateLatestStamp(now);
2496       }
2497     }
2498   }
2499 
2500   /*
2501    * Check if resources to support an update.
2502    *
2503    * Here we synchronize on HRegion, a broad scoped lock.  Its appropriate
2504    * given we're figuring in here whether this region is able to take on
2505    * writes.  This is only method with a synchronize (at time of writing),
2506    * this and the synchronize on 'this' inside in internalFlushCache to send
2507    * the notify.
2508    */
2509   private void checkResources()
2510       throws RegionTooBusyException, InterruptedIOException {
2511 
2512     // If catalog region, do not impose resource constraints or block updates.
2513     if (this.getRegionInfo().isMetaRegion()) return;
2514 
2515     boolean blocked = false;
2516     long startTime = 0;
2517     while (this.memstoreSize.get() > this.blockingMemStoreSize) {
2518       requestFlush();
2519       if (!blocked) {
2520         startTime = EnvironmentEdgeManager.currentTimeMillis();
2521         LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
2522           "' on region " + Bytes.toStringBinary(getRegionName()) +
2523           ": memstore size " +
2524           StringUtils.humanReadableInt(this.memstoreSize.get()) +
2525           " is >= than blocking " +
2526           StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");
2527       }
2528       long now = EnvironmentEdgeManager.currentTimeMillis();
2529       long timeToWait = startTime + busyWaitDuration - now;
2530       if (timeToWait <= 0L) {
2531         final long totalTime = now - startTime;
2532         this.updatesBlockedMs.add(totalTime);
2533         LOG.info("Failed to unblock updates for region " + this + " '"
2534           + Thread.currentThread().getName() + "' in " + totalTime
2535           + "ms. The region is still busy.");
2536         throw new RegionTooBusyException("region is flushing");
2537       }
2538       blocked = true;
2539       synchronized(this) {
2540         try {
2541           wait(Math.min(timeToWait, threadWakeFrequency));
2542         } catch (InterruptedException ie) {
2543           final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
2544           if (totalTime > 0) {
2545             this.updatesBlockedMs.add(totalTime);
2546           }
2547           LOG.info("Interrupted while waiting to unblock updates for region "
2548             + this + " '" + Thread.currentThread().getName() + "'");
2549           InterruptedIOException iie = new InterruptedIOException();
2550           iie.initCause(ie);
2551           throw iie;
2552         }
2553       }
2554     }
2555     if (blocked) {
2556       // Add in the blocked time if appropriate
2557       final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
2558       if(totalTime > 0 ){
2559         this.updatesBlockedMs.add(totalTime);
2560       }
2561       LOG.info("Unblocking updates for region " + this + " '"
2562           + Thread.currentThread().getName() + "'");
2563     }
2564   }
2565 
2566   /**
2567    * @throws IOException Throws exception if region is in read-only mode.
2568    */
2569   protected void checkReadOnly() throws IOException {
2570     if (this.writestate.isReadOnly()) {
2571       throw new IOException("region is read only");
2572     }
2573   }
2574 
2575   /**
2576    * Add updates first to the hlog and then add values to memstore.
2577    * Warning: Assumption is caller has lock on passed in row.
2578    * @param family
2579    * @param edits Cell updates by column
2580    * @praram now
2581    * @throws IOException
2582    */
2583   private void put(final byte [] row, byte [] family, List<? extends Cell> edits)
2584   throws IOException {
2585     NavigableMap<byte[], List<? extends Cell>> familyMap;
2586     familyMap = new TreeMap<byte[], List<? extends Cell>>(Bytes.BYTES_COMPARATOR);
2587 
2588     familyMap.put(family, edits);
2589     Put p = new Put(row);
2590     p.setFamilyMap(familyMap);
2591     p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
2592     doBatchMutate(p, null);
2593   }
2594 
2595   /**
2596    * Atomically apply the given map of family->edits to the memstore.
2597    * This handles the consistency control on its own, but the caller
2598    * should already have locked updatesLock.readLock(). This also does
2599    * <b>not</b> check the families for validity.
2600    *
2601    * @param familyMap Map of kvs per family
2602    * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
2603    *        If null, then this method internally creates a mvcc transaction.
2604    * @return the additional memory usage of the memstore caused by the
2605    * new entries.
2606    */
2607   private long applyFamilyMapToMemstore(Map<byte[], List<? extends Cell>> familyMap,
2608     MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
2609     long size = 0;
2610     boolean freemvcc = false;
2611 
2612     try {
2613       if (localizedWriteEntry == null) {
2614         localizedWriteEntry = mvcc.beginMemstoreInsert();
2615         freemvcc = true;
2616       }
2617 
2618       for (Map.Entry<byte[], List<? extends Cell>> e : familyMap.entrySet()) {
2619         byte[] family = e.getKey();
2620         List<? extends Cell> cells = e.getValue();
2621 
2622         Store store = getStore(family);
2623         for (Cell cell: cells) {
2624           KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2625           kv.setMemstoreTS(localizedWriteEntry.getWriteNumber());
2626           size += store.add(kv);
2627         }
2628       }
2629     } finally {
2630       if (freemvcc) {
2631         mvcc.completeMemstoreInsert(localizedWriteEntry);
2632       }
2633     }
2634 
2635      return size;
2636    }
2637 
2638   /**
2639    * Remove all the keys listed in the map from the memstore. This method is
2640    * called when a Put/Delete has updated memstore but subequently fails to update
2641    * the wal. This method is then invoked to rollback the memstore.
2642    */
2643   private void rollbackMemstore(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
2644                                 Map<byte[], List<? extends Cell>>[] familyMaps,
2645                                 int start, int end) {
2646     int kvsRolledback = 0;
2647     for (int i = start; i < end; i++) {
2648       // skip over request that never succeeded in the first place.
2649       if (batchOp.retCodeDetails[i].getOperationStatusCode()
2650             != OperationStatusCode.SUCCESS) {
2651         continue;
2652       }
2653 
2654       // Rollback all the kvs for this row.
2655       Map<byte[], List<? extends Cell>> familyMap  = familyMaps[i];
2656       for (Map.Entry<byte[], List<? extends Cell>> e : familyMap.entrySet()) {
2657         byte[] family = e.getKey();
2658         List<? extends Cell> cells = e.getValue();
2659 
2660         // Remove those keys from the memstore that matches our
2661         // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is
2662         // that even the memstoreTS has to match for keys that will be rolleded-back.
2663         Store store = getStore(family);
2664         for (Cell cell: cells) {
2665           store.rollback(KeyValueUtil.ensureKeyValue(cell));
2666           kvsRolledback++;
2667         }
2668       }
2669     }
2670     LOG.debug("rollbackMemstore rolled back " + kvsRolledback +
2671         " keyvalues from start:" + start + " to end:" + end);
2672   }
2673 
2674   /**
2675    * Check the collection of families for validity.
2676    * @throws NoSuchColumnFamilyException if a family does not exist.
2677    */
2678   void checkFamilies(Collection<byte[]> families)
2679   throws NoSuchColumnFamilyException {
2680     for (byte[] family : families) {
2681       checkFamily(family);
2682     }
2683   }
2684 
2685   /**
2686    * During replay, there could exist column families which are removed between region server
2687    * failure and replay
2688    */
2689   private void removeNonExistentColumnFamilyForReplay(
2690       final Map<byte[], List<? extends Cell>> familyMap) {
2691     List<byte[]> nonExistentList = null;
2692     for (byte[] family : familyMap.keySet()) {
2693       if (!this.htableDescriptor.hasFamily(family)) {
2694         if (nonExistentList == null) {
2695           nonExistentList = new ArrayList<byte[]>();
2696         }
2697         nonExistentList.add(family);
2698       }
2699     }
2700     if (nonExistentList != null) {
2701       for (byte[] family : nonExistentList) {
2702         // Perhaps schema was changed between crash and replay
2703         LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
2704         familyMap.remove(family);
2705       }
2706     }
2707   }
2708 
2709   void checkTimestamps(final Map<byte[], List<? extends Cell>> familyMap,
2710       long now) throws FailedSanityCheckException {
2711     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
2712       return;
2713     }
2714     long maxTs = now + timestampSlop;
2715     for (List<? extends Cell> kvs : familyMap.values()) {
2716       for (Cell cell : kvs) {
2717         // see if the user-side TS is out of range. latest = server-side
2718         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2719         if (!kv.isLatestTimestamp() && kv.getTimestamp() > maxTs) {
2720           throw new FailedSanityCheckException("Timestamp for KV out of range "
2721               + cell + " (too.new=" + timestampSlop + ")");
2722         }
2723       }
2724     }
2725   }
2726 
2727   /**
2728    * Append the given map of family->edits to a WALEdit data structure.
2729    * This does not write to the HLog itself.
2730    * @param familyMap map of family->edits
2731    * @param walEdit the destination entry to append into
2732    */
2733   private void addFamilyMapToWALEdit(Map<byte[], List<? extends Cell>> familyMap,
2734       WALEdit walEdit) {
2735     for (List<? extends Cell> edits : familyMap.values()) {
2736       for (Cell cell : edits) {
2737         walEdit.add(KeyValueUtil.ensureKeyValue(cell));
2738       }
2739     }
2740   }
2741 
2742   private void requestFlush() {
2743     if (this.rsServices == null) {
2744       return;
2745     }
2746     synchronized (writestate) {
2747       if (this.writestate.isFlushRequested()) {
2748         return;
2749       }
2750       writestate.flushRequested = true;
2751     }
2752     // Make request outside of synchronize block; HBASE-818.
2753     this.rsServices.getFlushRequester().requestFlush(this);
2754     if (LOG.isDebugEnabled()) {
2755       LOG.debug("Flush requested on " + this);
2756     }
2757   }
2758 
2759   /*
2760    * @param size
2761    * @return True if size is over the flush threshold
2762    */
2763   private boolean isFlushSize(final long size) {
2764     return size > this.memstoreFlushSize;
2765   }
2766 
2767   /**
2768    * Read the edits log put under this region by wal log splitting process.  Put
2769    * the recovered edits back up into this region.
2770    *
2771    * <p>We can ignore any log message that has a sequence ID that's equal to or
2772    * lower than minSeqId.  (Because we know such log messages are already
2773    * reflected in the HFiles.)
2774    *
2775    * <p>While this is running we are putting pressure on memory yet we are
2776    * outside of our usual accounting because we are not yet an onlined region
2777    * (this stuff is being run as part of Region initialization).  This means
2778    * that if we're up against global memory limits, we'll not be flagged to flush
2779    * because we are not online. We can't be flushed by usual mechanisms anyways;
2780    * we're not yet online so our relative sequenceids are not yet aligned with
2781    * HLog sequenceids -- not till we come up online, post processing of split
2782    * edits.
2783    *
2784    * <p>But to help relieve memory pressure, at least manage our own heap size
2785    * flushing if are in excess of per-region limits.  Flushing, though, we have
2786    * to be careful and avoid using the regionserver/hlog sequenceid.  Its running
2787    * on a different line to whats going on in here in this region context so if we
2788    * crashed replaying these edits, but in the midst had a flush that used the
2789    * regionserver log with a sequenceid in excess of whats going on in here
2790    * in this region and with its split editlogs, then we could miss edits the
2791    * next time we go to recover. So, we have to flush inline, using seqids that
2792    * make sense in a this single region context only -- until we online.
2793    *
2794    * @param regiondir
2795    * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
2796    * the maxSeqId for the store to be applied, else its skipped.
2797    * @param reporter
2798    * @return the sequence id of the last edit added to this region out of the
2799    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
2800    * @throws UnsupportedEncodingException
2801    * @throws IOException
2802    */
2803   protected long replayRecoveredEditsIfAny(final Path regiondir,
2804       Map<byte[], Long> maxSeqIdInStores,
2805       final CancelableProgressable reporter, final MonitoredTask status)
2806       throws UnsupportedEncodingException, IOException {
2807     long minSeqIdForTheRegion = -1;
2808     for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
2809       if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
2810         minSeqIdForTheRegion = maxSeqIdInStore;
2811       }
2812     }
2813     long seqid = minSeqIdForTheRegion;
2814 
2815     FileSystem fs = this.fs.getFileSystem();
2816     NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
2817     if (files == null || files.isEmpty()) return seqid;
2818 
2819     for (Path edits: files) {
2820       if (edits == null || !fs.exists(edits)) {
2821         LOG.warn("Null or non-existent edits file: " + edits);
2822         continue;
2823       }
2824       if (isZeroLengthThenDelete(fs, edits)) continue;
2825 
2826       long maxSeqId = Long.MAX_VALUE;
2827       String fileName = edits.getName();
2828       maxSeqId = Math.abs(Long.parseLong(fileName));
2829       if (maxSeqId <= minSeqIdForTheRegion) {
2830         String msg = "Maximum sequenceid for this log is " + maxSeqId
2831             + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
2832             + ", skipped the whole file, path=" + edits;
2833         LOG.debug(msg);
2834         continue;
2835       }
2836 
2837       try {
2838         seqid = replayRecoveredEdits(edits, maxSeqIdInStores, reporter);
2839       } catch (IOException e) {
2840         boolean skipErrors = conf.getBoolean(
2841             HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
2842             conf.getBoolean(
2843                 "hbase.skip.errors",
2844                 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
2845         if (conf.get("hbase.skip.errors") != null) {
2846           LOG.warn(
2847               "The property 'hbase.skip.errors' has been deprecated. Please use " +
2848               HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
2849         }
2850         if (skipErrors) {
2851           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
2852           LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
2853               + "=true so continuing. Renamed " + edits +
2854               " as " + p, e);
2855         } else {
2856           throw e;
2857         }
2858       }
2859       // The edits size added into rsAccounting during this replaying will not
2860       // be required any more. So just clear it.
2861       if (this.rsAccounting != null) {
2862         this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
2863       }
2864     }
2865     if (seqid > minSeqIdForTheRegion) {
2866       // Then we added some edits to memory. Flush and cleanup split edit files.
2867       internalFlushcache(null, seqid, status);
2868     }
2869     // Now delete the content of recovered edits.  We're done w/ them.
2870     for (Path file: files) {
2871       if (!fs.delete(file, false)) {
2872         LOG.error("Failed delete of " + file);
2873       } else {
2874         LOG.debug("Deleted recovered.edits file=" + file);
2875       }
2876     }
2877     return seqid;
2878   }
2879 
2880   /*
2881    * @param edits File of recovered edits.
2882    * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in log
2883    * must be larger than this to be replayed for each store.
2884    * @param reporter
2885    * @return the sequence id of the last edit added to this region out of the
2886    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
2887    * @throws IOException
2888    */
2889   private long replayRecoveredEdits(final Path edits,
2890       Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
2891     throws IOException {
2892     String msg = "Replaying edits from " + edits;
2893     LOG.info(msg);
2894     MonitoredTask status = TaskMonitor.get().createStatus(msg);
2895     FileSystem fs = this.fs.getFileSystem();
2896 
2897     status.setStatus("Opening logs");
2898     HLog.Reader reader = null;
2899     try {
2900       reader = HLogFactory.createReader(fs, edits, conf);
2901       long currentEditSeqId = -1;
2902       long firstSeqIdInLog = -1;
2903       long skippedEdits = 0;
2904       long editsCount = 0;
2905       long intervalEdits = 0;
2906       HLog.Entry entry;
2907       Store store = null;
2908       boolean reported_once = false;
2909 
2910       try {
2911         // How many edits seen before we check elapsed time
2912         int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
2913             2000);
2914         // How often to send a progress report (default 1/2 master timeout)
2915         int period = this.conf.getInt("hbase.hstore.report.period",
2916             this.conf.getInt("hbase.master.assignment.timeoutmonitor.timeout",
2917                 180000) / 2);
2918         long lastReport = EnvironmentEdgeManager.currentTimeMillis();
2919 
2920         while ((entry = reader.next()) != null) {
2921           HLogKey key = entry.getKey();
2922           WALEdit val = entry.getEdit();
2923 
2924           if (reporter != null) {
2925             intervalEdits += val.size();
2926             if (intervalEdits >= interval) {
2927               // Number of edits interval reached
2928               intervalEdits = 0;
2929               long cur = EnvironmentEdgeManager.currentTimeMillis();
2930               if (lastReport + period <= cur) {
2931                 status.setStatus("Replaying edits..." +
2932                     " skipped=" + skippedEdits +
2933                     " edits=" + editsCount);
2934                 // Timeout reached
2935                 if(!reporter.progress()) {
2936                   msg = "Progressable reporter failed, stopping replay";
2937                   LOG.warn(msg);
2938                   status.abort(msg);
2939                   throw new IOException(msg);
2940                 }
2941                 reported_once = true;
2942                 lastReport = cur;
2943               }
2944             }
2945           }
2946 
2947           // Start coprocessor replay here. The coprocessor is for each WALEdit
2948           // instead of a KeyValue.
2949           if (coprocessorHost != null) {
2950             status.setStatus("Running pre-WAL-restore hook in coprocessors");
2951             if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
2952               // if bypass this log entry, ignore it ...
2953               continue;
2954             }
2955           }
2956 
2957           if (firstSeqIdInLog == -1) {
2958             firstSeqIdInLog = key.getLogSeqNum();
2959           }
2960           boolean flush = false;
2961           for (KeyValue kv: val.getKeyValues()) {
2962             // Check this edit is for me. Also, guard against writing the special
2963             // METACOLUMN info such as HBASE::CACHEFLUSH entries
2964             if (kv.matchingFamily(WALEdit.METAFAMILY) ||
2965                 !Bytes.equals(key.getEncodedRegionName(),
2966                   this.getRegionInfo().getEncodedNameAsBytes())) {
2967               //this is a special edit, we should handle it
2968               CompactionDescriptor compaction = WALEdit.getCompaction(kv);
2969               if (compaction != null) {
2970                 //replay the compaction
2971                 completeCompactionMarker(compaction);
2972               }
2973 
2974               skippedEdits++;
2975               continue;
2976             }
2977             // Figure which store the edit is meant for.
2978             if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
2979               store = this.stores.get(kv.getFamily());
2980             }
2981             if (store == null) {
2982               // This should never happen.  Perhaps schema was changed between
2983               // crash and redeploy?
2984               LOG.warn("No family for " + kv);
2985               skippedEdits++;
2986               continue;
2987             }
2988             // Now, figure if we should skip this edit.
2989             if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
2990                 .getName())) {
2991               skippedEdits++;
2992               continue;
2993             }
2994             currentEditSeqId = key.getLogSeqNum();
2995             // Once we are over the limit, restoreEdit will keep returning true to
2996             // flush -- but don't flush until we've played all the kvs that make up
2997             // the WALEdit.
2998             flush = restoreEdit(store, kv);
2999             editsCount++;
3000           }
3001           if (flush) internalFlushcache(null, currentEditSeqId, status);
3002 
3003           if (coprocessorHost != null) {
3004             coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3005           }
3006         }
3007       } catch (EOFException eof) {
3008         Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3009         msg = "Encountered EOF. Most likely due to Master failure during " +
3010             "log spliting, so we have this data in another edit.  " +
3011             "Continuing, but renaming " + edits + " as " + p;
3012         LOG.warn(msg, eof);
3013         status.abort(msg);
3014       } catch (IOException ioe) {
3015         // If the IOE resulted from bad file format,
3016         // then this problem is idempotent and retrying won't help
3017         if (ioe.getCause() instanceof ParseException) {
3018           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3019           msg = "File corruption encountered!  " +
3020               "Continuing, but renaming " + edits + " as " + p;
3021           LOG.warn(msg, ioe);
3022           status.setStatus(msg);
3023         } else {
3024           status.abort(StringUtils.stringifyException(ioe));
3025           // other IO errors may be transient (bad network connection,
3026           // checksum exception on one datanode, etc).  throw & retry
3027           throw ioe;
3028         }
3029       }
3030       if (reporter != null && !reported_once) {
3031         reporter.progress();
3032       }
3033       msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3034         ", firstSequenceidInLog=" + firstSeqIdInLog +
3035         ", maxSequenceidInLog=" + currentEditSeqId + ", path=" + edits;
3036       status.markComplete(msg);
3037       LOG.debug(msg);
3038       return currentEditSeqId;
3039     } finally {
3040       status.cleanup();
3041       if (reader != null) {
3042          reader.close();
3043       }
3044     }
3045   }
3046 
3047   /**
3048    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
3049    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
3050    * See HBASE-2331.
3051    * @param fs
3052    * @param compaction
3053    */
3054   void completeCompactionMarker(CompactionDescriptor compaction)
3055       throws IOException {
3056     Store store = this.getStore(compaction.getFamilyName().toByteArray());
3057     if (store == null) {
3058       LOG.warn("Found Compaction WAL edit for deleted family:" +
3059           Bytes.toString(compaction.getFamilyName().toByteArray()));
3060       return;
3061     }
3062     store.completeCompactionMarker(compaction);
3063   }
3064 
3065   /**
3066    * Used by tests
3067    * @param s Store to add edit too.
3068    * @param kv KeyValue to add.
3069    * @return True if we should flush.
3070    */
3071   protected boolean restoreEdit(final Store s, final KeyValue kv) {
3072     long kvSize = s.add(kv);
3073     if (this.rsAccounting != null) {
3074       rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
3075     }
3076     return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3077   }
3078 
3079   /*
3080    * @param fs
3081    * @param p File to check.
3082    * @return True if file was zero-length (and if so, we'll delete it in here).
3083    * @throws IOException
3084    */
3085   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3086       throws IOException {
3087     FileStatus stat = fs.getFileStatus(p);
3088     if (stat.getLen() > 0) return false;
3089     LOG.warn("File " + p + " is zero-length, deleting.");
3090     fs.delete(p, false);
3091     return true;
3092   }
3093 
3094   protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
3095     return new HStore(this, family, this.conf);
3096   }
3097 
3098   /**
3099    * Return HStore instance.
3100    * Use with caution.  Exposed for use of fixup utilities.
3101    * @param column Name of column family hosted by this region.
3102    * @return Store that goes with the family on passed <code>column</code>.
3103    * TODO: Make this lookup faster.
3104    */
3105   public Store getStore(final byte[] column) {
3106     return this.stores.get(column);
3107   }
3108 
3109   public Map<byte[], Store> getStores() {
3110     return this.stores;
3111   }
3112 
3113   /**
3114    * Return list of storeFiles for the set of CFs.
3115    * Uses closeLock to prevent the race condition where a region closes
3116    * in between the for loop - closing the stores one by one, some stores
3117    * will return 0 files.
3118    * @return List of storeFiles.
3119    */
3120   public List<String> getStoreFileList(final byte [][] columns)
3121     throws IllegalArgumentException {
3122     List<String> storeFileNames = new ArrayList<String>();
3123     synchronized(closeLock) {
3124       for(byte[] column : columns) {
3125         Store store = this.stores.get(column);
3126         if (store == null) {
3127           throw new IllegalArgumentException("No column family : " +
3128               new String(column) + " available");
3129         }
3130         for (StoreFile storeFile: store.getStorefiles()) {
3131           storeFileNames.add(storeFile.getPath().toString());
3132         }
3133       }
3134     }
3135     return storeFileNames;
3136   }
3137   //////////////////////////////////////////////////////////////////////////////
3138   // Support code
3139   //////////////////////////////////////////////////////////////////////////////
3140 
3141   /** Make sure this is a valid row for the HRegion */
3142   void checkRow(final byte [] row, String op) throws IOException {
3143     if (!rowIsInRange(getRegionInfo(), row)) {
3144       throw new WrongRegionException("Requested row out of range for " +
3145           op + " on HRegion " + this + ", startKey='" +
3146           Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" +
3147           Bytes.toStringBinary(getEndKey()) + "', row='" +
3148           Bytes.toStringBinary(row) + "'");
3149     }
3150   }
3151 
3152   /**
3153    * Obtain a lock on the given row.  Blocks until success.
3154    *
3155    * I know it's strange to have two mappings:
3156    * <pre>
3157    *   ROWS  ==> LOCKS
3158    * </pre>
3159    * as well as
3160    * <pre>
3161    *   LOCKS ==> ROWS
3162    * </pre>
3163    * <p>It would be more memory-efficient to just have one mapping;
3164    * maybe we'll do that in the future.
3165    *
3166    * @param row Name of row to lock.
3167    * @throws IOException
3168    * @return The id of the held lock.
3169    */
3170   public Integer obtainRowLock(final byte [] row) throws IOException {
3171     startRegionOperation();
3172     this.writeRequestsCount.increment();
3173     try {
3174       return internalObtainRowLock(row, true);
3175     } finally {
3176       closeRegionOperation();
3177     }
3178   }
3179 
3180   /**
3181    * Obtains or tries to obtain the given row lock.
3182    * @param waitForLock if true, will block until the lock is available.
3183    *        Otherwise, just tries to obtain the lock and returns
3184    *        null if unavailable.
3185    */
3186   private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
3187   throws IOException {
3188     checkRow(row, "row lock");
3189     startRegionOperation();
3190     try {
3191       HashedBytes rowKey = new HashedBytes(row);
3192       CountDownLatch rowLatch = new CountDownLatch(1);
3193 
3194       // loop until we acquire the row lock (unless !waitForLock)
3195       while (true) {
3196         CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
3197         if (existingLatch == null) {
3198           break;
3199         } else {
3200           // row already locked
3201           if (!waitForLock) {
3202             return null;
3203           }
3204           try {
3205             if (!existingLatch.await(this.rowLockWaitDuration,
3206                             TimeUnit.MILLISECONDS)) {
3207               throw new IOException("Timed out on getting lock for row="
3208                   + Bytes.toStringBinary(row));
3209             }
3210           } catch (InterruptedException ie) {
3211             LOG.warn("internalObtainRowLock interrupted for row=" + Bytes.toStringBinary(row));
3212             InterruptedIOException iie = new InterruptedIOException();
3213             iie.initCause(ie);
3214             throw iie;
3215           }
3216         }
3217       }
3218 
3219       // loop until we generate an unused lock id
3220       while (true) {
3221         Integer lockId = lockIdGenerator.incrementAndGet();
3222         HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
3223         if (existingRowKey == null) {
3224           return lockId;
3225         } else {
3226           // lockId already in use, jump generator to a new spot
3227           lockIdGenerator.set(rand.nextInt());
3228         }
3229       }
3230     } finally {
3231       closeRegionOperation();
3232     }
3233   }
3234 
3235   /**
3236    * Release the row lock!
3237    * @param lockId  The lock ID to release.
3238    */
3239   public void releaseRowLock(final Integer lockId) {
3240     if (lockId == null) return; // null lock id, do nothing
3241     HashedBytes rowKey = lockIds.remove(lockId);
3242     if (rowKey == null) {
3243       LOG.warn("Release unknown lockId: " + lockId);
3244       return;
3245     }
3246     CountDownLatch rowLatch = lockedRows.remove(rowKey);
3247     if (rowLatch == null) {
3248       LOG.error("Releases row not locked, lockId: " + lockId + " row: "
3249           + rowKey);
3250       return;
3251     }
3252     rowLatch.countDown();
3253   }
3254 
3255   /**
3256    * See if row is currently locked.
3257    * @param lockId
3258    * @return boolean
3259    */
3260   boolean isRowLocked(final Integer lockId) {
3261     return lockIds.containsKey(lockId);
3262   }
3263 
3264   /**
3265    * Returns existing row lock if found, otherwise
3266    * obtains a new row lock and returns it.
3267    * @param lockid requested by the user, or null if the user didn't already hold lock
3268    * @param row the row to lock
3269    * @param waitForLock if true, will block until the lock is available, otherwise will
3270    * simply return null if it could not acquire the lock.
3271    * @return lockid or null if waitForLock is false and the lock was unavailable.
3272    */
3273   public Integer getLock(Integer lockid, byte [] row, boolean waitForLock)
3274   throws IOException {
3275     Integer lid = null;
3276     if (lockid == null) {
3277       lid = internalObtainRowLock(row, waitForLock);
3278     } else {
3279       if (!isRowLocked(lockid)) {
3280         throw new IOException("Invalid row lock");
3281       }
3282       lid = lockid;
3283     }
3284     return lid;
3285   }
3286 
3287   /**
3288    * Determines whether multiple column families are present
3289    * Precondition: familyPaths is not null
3290    *
3291    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3292    */
3293   private static boolean hasMultipleColumnFamilies(
3294       List<Pair<byte[], String>> familyPaths) {
3295     boolean multipleFamilies = false;
3296     byte[] family = null;
3297     for (Pair<byte[], String> pair : familyPaths) {
3298       byte[] fam = pair.getFirst();
3299       if (family == null) {
3300         family = fam;
3301       } else if (!Bytes.equals(family, fam)) {
3302         multipleFamilies = true;
3303         break;
3304       }
3305     }
3306     return multipleFamilies;
3307   }
3308 
3309 
3310   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3311                                 boolean assignSeqId) throws IOException {
3312     return bulkLoadHFiles(familyPaths, assignSeqId, null);
3313   }
3314 
3315   /**
3316    * Attempts to atomically load a group of hfiles.  This is critical for loading
3317    * rows with multiple column families atomically.
3318    *
3319    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3320    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
3321    * file about to be bulk loaded
3322    * @param assignSeqId
3323    * @return true if successful, false if failed recoverably
3324    * @throws IOException if failed unrecoverably.
3325    */
3326   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
3327       BulkLoadListener bulkLoadListener) throws IOException {
3328     Preconditions.checkNotNull(familyPaths);
3329     // we need writeLock for multi-family bulk load
3330     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
3331     try {
3332       this.writeRequestsCount.increment();
3333 
3334       // There possibly was a split that happend between when the split keys
3335       // were gathered and before the HReiogn's write lock was taken.  We need
3336       // to validate the HFile region before attempting to bulk load all of them
3337       List<IOException> ioes = new ArrayList<IOException>();
3338       List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
3339       for (Pair<byte[], String> p : familyPaths) {
3340         byte[] familyName = p.getFirst();
3341         String path = p.getSecond();
3342 
3343         Store store = getStore(familyName);
3344         if (store == null) {
3345           IOException ioe = new org.apache.hadoop.hbase.exceptions.DoNotRetryIOException(
3346               "No such column family " + Bytes.toStringBinary(familyName));
3347           ioes.add(ioe);
3348         } else {
3349           try {
3350             store.assertBulkLoadHFileOk(new Path(path));
3351           } catch (WrongRegionException wre) {
3352             // recoverable (file doesn't fit in region)
3353             failures.add(p);
3354           } catch (IOException ioe) {
3355             // unrecoverable (hdfs problem)
3356             ioes.add(ioe);
3357           }
3358         }
3359       }
3360 
3361       // validation failed because of some sort of IO problem.
3362       if (ioes.size() != 0) {
3363         IOException e = MultipleIOException.createIOException(ioes);
3364         LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
3365         throw e;
3366       }
3367 
3368       // validation failed, bail out before doing anything permanent.
3369       if (failures.size() != 0) {
3370         StringBuilder list = new StringBuilder();
3371         for (Pair<byte[], String> p : failures) {
3372           list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
3373             .append(p.getSecond());
3374         }
3375         // problem when validating
3376         LOG.warn("There was a recoverable bulk load failure likely due to a" +
3377             " split.  These (family, HFile) pairs were not loaded: " + list);
3378         return false;
3379       }
3380 
3381       for (Pair<byte[], String> p : familyPaths) {
3382         byte[] familyName = p.getFirst();
3383         String path = p.getSecond();
3384         Store store = getStore(familyName);
3385         try {
3386           String finalPath = path;
3387           if(bulkLoadListener != null) {
3388             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
3389           }
3390           store.bulkLoadHFile(finalPath, assignSeqId ? this.log.obtainSeqNum() : -1);
3391           if(bulkLoadListener != null) {
3392             bulkLoadListener.doneBulkLoad(familyName, path);
3393           }
3394         } catch (IOException ioe) {
3395           // A failure here can cause an atomicity violation that we currently
3396           // cannot recover from since it is likely a failed HDFS operation.
3397 
3398           // TODO Need a better story for reverting partial failures due to HDFS.
3399           LOG.error("There was a partial failure due to IO when attempting to" +
3400               " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe);
3401           if(bulkLoadListener != null) {
3402             try {
3403               bulkLoadListener.failedBulkLoad(familyName, path);
3404             } catch (Exception ex) {
3405               LOG.error("Error while calling failedBulkLoad for family "+
3406                   Bytes.toString(familyName)+" with path "+path, ex);
3407             }
3408           }
3409           throw ioe;
3410         }
3411       }
3412       return true;
3413     } finally {
3414       closeBulkRegionOperation();
3415     }
3416   }
3417 
3418   @Override
3419   public boolean equals(Object o) {
3420     if (!(o instanceof HRegion)) {
3421       return false;
3422     }
3423     return Bytes.equals(this.getRegionName(), ((HRegion) o).getRegionName());
3424   }
3425 
3426   @Override
3427   public int hashCode() {
3428     return Bytes.hashCode(this.getRegionName());
3429   }
3430 
3431   @Override
3432   public String toString() {
3433     return this.getRegionNameAsString();
3434   }
3435 
3436   /**
3437    * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
3438    */
3439   class RegionScannerImpl implements RegionScanner {
3440     // Package local for testability
3441     KeyValueHeap storeHeap = null;
3442     /** Heap of key-values that are not essential for the provided filters and are thus read
3443      * on demand, if on-demand column family loading is enabled.*/
3444     KeyValueHeap joinedHeap = null;
3445     /**
3446      * If the joined heap data gathering is interrupted due to scan limits, this will
3447      * contain the row for which we are populating the values.*/
3448     private KeyValue joinedContinuationRow = null;
3449     // KeyValue indicating that limit is reached when scanning
3450     private final KeyValue KV_LIMIT = new KeyValue();
3451     private final byte [] stopRow;
3452     private Filter filter;
3453     private int batch;
3454     private int isScan;
3455     private boolean filterClosed = false;
3456     private long readPt;
3457     private long maxResultSize;
3458     private HRegion region;
3459 
3460     public HRegionInfo getRegionInfo() {
3461       return region.getRegionInfo();
3462     }
3463 
3464     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
3465         throws IOException {
3466       // DebugPrint.println("HRegionScanner.<init>");
3467       this.region = region;
3468       this.maxResultSize = scan.getMaxResultSize();
3469       if (scan.hasFilter()) {
3470         this.filter = new FilterWrapper(scan.getFilter());
3471       } else {
3472         this.filter = null;
3473       }
3474 
3475       this.batch = scan.getBatch();
3476       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
3477         this.stopRow = null;
3478       } else {
3479         this.stopRow = scan.getStopRow();
3480       }
3481       // If we are doing a get, we want to be [startRow,endRow] normally
3482       // it is [startRow,endRow) and if startRow=endRow we get nothing.
3483       this.isScan = scan.isGetScan() ? -1 : 0;
3484 
3485       // synchronize on scannerReadPoints so that nobody calculates
3486       // getSmallestReadPoint, before scannerReadPoints is updated.
3487       IsolationLevel isolationLevel = scan.getIsolationLevel();
3488       synchronized(scannerReadPoints) {
3489         if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
3490           // This scan can read even uncommitted transactions
3491           this.readPt = Long.MAX_VALUE;
3492           MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
3493         } else {
3494           this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
3495         }
3496         scannerReadPoints.put(this, this.readPt);
3497       }
3498 
3499       // Here we separate all scanners into two lists - scanner that provide data required
3500       // by the filter to operate (scanners list) and all others (joinedScanners list).
3501       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
3502       List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
3503       if (additionalScanners != null) {
3504         scanners.addAll(additionalScanners);
3505       }
3506 
3507       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
3508           scan.getFamilyMap().entrySet()) {
3509         Store store = stores.get(entry.getKey());
3510         KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
3511         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
3512           || this.filter.isFamilyEssential(entry.getKey())) {
3513           scanners.add(scanner);
3514         } else {
3515           joinedScanners.add(scanner);
3516         }
3517       }
3518       this.storeHeap = new KeyValueHeap(scanners, comparator);
3519       if (!joinedScanners.isEmpty()) {
3520         this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
3521       }
3522     }
3523 
3524     RegionScannerImpl(Scan scan, HRegion region) throws IOException {
3525       this(scan, null, region);
3526     }
3527 
3528     @Override
3529     public long getMaxResultSize() {
3530       return maxResultSize;
3531     }
3532 
3533     @Override
3534     public long getMvccReadPoint() {
3535       return this.readPt;
3536     }
3537 
3538     /**
3539      * Reset both the filter and the old filter.
3540      *
3541      * @throws IOException in case a filter raises an I/O exception.
3542      */
3543     protected void resetFilters() throws IOException {
3544       if (filter != null) {
3545         filter.reset();
3546       }
3547     }
3548 
3549     @Override
3550     public boolean next(List<KeyValue> outResults)
3551         throws IOException {
3552       // apply the batching limit by default
3553       return next(outResults, batch);
3554     }
3555 
3556     @Override
3557     public synchronized boolean next(List<KeyValue> outResults, int limit) throws IOException {
3558       if (this.filterClosed) {
3559         throw new UnknownScannerException("Scanner was closed (timed out?) " +
3560             "after we renewed it. Could be caused by a very slow scanner " +
3561             "or a lengthy garbage collection");
3562       }
3563       startRegionOperation(Operation.SCAN);
3564       readRequestsCount.increment();
3565       try {
3566 
3567         // This could be a new thread from the last time we called next().
3568         MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
3569 
3570         return nextRaw(outResults, limit);
3571       } finally {
3572         closeRegionOperation();
3573       }
3574     }
3575 
3576     @Override
3577     public boolean nextRaw(List<KeyValue> outResults)
3578         throws IOException {
3579       return nextRaw(outResults, batch);
3580     }
3581 
3582     @Override
3583     public boolean nextRaw(List<KeyValue> outResults, int limit) throws IOException {
3584       boolean returnResult;
3585       if (outResults.isEmpty()) {
3586         // Usually outResults is empty. This is true when next is called
3587         // to handle scan or get operation.
3588         returnResult = nextInternal(outResults, limit);
3589       } else {
3590         List<KeyValue> tmpList = new ArrayList<KeyValue>();
3591         returnResult = nextInternal(tmpList, limit);
3592         outResults.addAll(tmpList);
3593       }
3594       resetFilters();
3595       if (isFilterDone()) {
3596         return false;
3597       }
3598       if (region != null && region.metricsRegion != null) {
3599         long totalSize = 0;
3600         if (outResults != null) {
3601           for(KeyValue kv:outResults) {
3602             totalSize += kv.getLength();
3603           }
3604         }
3605         region.metricsRegion.updateScanNext(totalSize);
3606       }
3607       return returnResult;
3608     }
3609 
3610 
3611     private void populateFromJoinedHeap(List<KeyValue> results, int limit)
3612         throws IOException {
3613       assert joinedContinuationRow != null;
3614       KeyValue kv = populateResult(results, this.joinedHeap, limit,
3615           joinedContinuationRow.getBuffer(), joinedContinuationRow.getRowOffset(),
3616           joinedContinuationRow.getRowLength());
3617       if (kv != KV_LIMIT) {
3618         // We are done with this row, reset the continuation.
3619         joinedContinuationRow = null;
3620       }
3621       // As the data is obtained from two independent heaps, we need to
3622       // ensure that result list is sorted, because Result relies on that.
3623       Collections.sort(results, comparator);
3624     }
3625 
3626     /**
3627      * Fetches records with currentRow into results list, until next row or limit (if not -1).
3628      * @param results
3629      * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
3630      * @param limit Max amount of KVs to place in result list, -1 means no limit.
3631      * @param currentRow Byte array with key we are fetching.
3632      * @param offset offset for currentRow
3633      * @param length length for currentRow
3634      * @return KV_LIMIT if limit reached, next KeyValue otherwise.
3635      */
3636     private KeyValue populateResult(List<KeyValue> results, KeyValueHeap heap, int limit,
3637         byte[] currentRow, int offset, short length) throws IOException {
3638       KeyValue nextKv;
3639       do {
3640         heap.next(results, limit - results.size());
3641         if (limit > 0 && results.size() == limit) {
3642           return KV_LIMIT;
3643         }
3644         nextKv = heap.peek();
3645       } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
3646 
3647       return nextKv;
3648     }
3649 
3650     /*
3651      * @return True if a filter rules the scanner is over, done.
3652      */
3653     public synchronized boolean isFilterDone() throws IOException {
3654       return this.filter != null && this.filter.filterAllRemaining();
3655     }
3656 
3657     private boolean nextInternal(List<KeyValue> results, int limit)
3658     throws IOException {
3659       if (!results.isEmpty()) {
3660         throw new IllegalArgumentException("First parameter should be an empty list");
3661       }
3662       RpcCallContext rpcCall = RpcServer.getCurrentCall();
3663       // The loop here is used only when at some point during the next we determine
3664       // that due to effects of filters or otherwise, we have an empty row in the result.
3665       // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
3666       // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
3667       // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
3668       while (true) {
3669         if (rpcCall != null) {
3670           // If a user specifies a too-restrictive or too-slow scanner, the
3671           // client might time out and disconnect while the server side
3672           // is still processing the request. We should abort aggressively
3673           // in that case.
3674           rpcCall.throwExceptionIfCallerDisconnected();
3675         }
3676 
3677         // Let's see what we have in the storeHeap.
3678         KeyValue current = this.storeHeap.peek();
3679 
3680         byte[] currentRow = null;
3681         int offset = 0;
3682         short length = 0;
3683         if (current != null) {
3684           currentRow = current.getBuffer();
3685           offset = current.getRowOffset();
3686           length = current.getRowLength();
3687         }
3688         boolean stopRow = isStopRow(currentRow, offset, length);
3689         // Check if we were getting data from the joinedHeap and hit the limit.
3690         // If not, then it's main path - getting results from storeHeap.
3691         if (joinedContinuationRow == null) {
3692           // First, check if we are at a stop row. If so, there are no more results.
3693           if (stopRow) {
3694             if (filter != null && filter.hasFilterRow()) {
3695               filter.filterRow(results);
3696             }
3697             return false;
3698           }
3699 
3700           // Check if rowkey filter wants to exclude this row. If so, loop to next.
3701           // Technically, if we hit limits before on this row, we don't need this call.
3702           if (filterRowKey(currentRow, offset, length)) {
3703             boolean moreRows = nextRow(currentRow, offset, length);
3704             if (!moreRows) return false;
3705             results.clear();
3706             continue;
3707           }
3708 
3709           KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
3710               length);
3711           // Ok, we are good, let's try to get some results from the main heap.
3712           if (nextKv == KV_LIMIT) {
3713             if (this.filter != null && filter.hasFilterRow()) {
3714               throw new IncompatibleFilterException(
3715                 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
3716             }
3717             return true; // We hit the limit.
3718           }
3719 
3720           stopRow = nextKv == null ||
3721               isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
3722           // save that the row was empty before filters applied to it.
3723           final boolean isEmptyRow = results.isEmpty();
3724 
3725           // We have the part of the row necessary for filtering (all of it, usually).
3726           // First filter with the filterRow(List).
3727           if (filter != null && filter.hasFilterRow()) {
3728             filter.filterRow(results);
3729           }
3730           if (isEmptyRow) {
3731             boolean moreRows = nextRow(currentRow, offset, length);
3732             if (!moreRows) return false;
3733             results.clear();
3734             // This row was totally filtered out, if this is NOT the last row,
3735             // we should continue on. Otherwise, nothing else to do.
3736             if (!stopRow) continue;
3737             return false;
3738           }
3739 
3740           // Ok, we are done with storeHeap for this row.
3741           // Now we may need to fetch additional, non-essential data into row.
3742           // These values are not needed for filter to work, so we postpone their
3743           // fetch to (possibly) reduce amount of data loads from disk.
3744           if (this.joinedHeap != null) {
3745             KeyValue nextJoinedKv = joinedHeap.peek();
3746             // If joinedHeap is pointing to some other row, try to seek to a correct one.
3747             boolean mayHaveData =
3748               (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
3749               || (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length),
3750                 true, true)
3751                 && joinedHeap.peek() != null
3752                 && joinedHeap.peek().matchingRow(currentRow, offset, length));
3753             if (mayHaveData) {
3754               joinedContinuationRow = current;
3755               populateFromJoinedHeap(results, limit);
3756             }
3757           }
3758         } else {
3759           // Populating from the joined heap was stopped by limits, populate some more.
3760           populateFromJoinedHeap(results, limit);
3761         }
3762 
3763         // We may have just called populateFromJoinedMap and hit the limits. If that is
3764         // the case, we need to call it again on the next next() invocation.
3765         if (joinedContinuationRow != null) {
3766           return true;
3767         }
3768 
3769         // Finally, we are done with both joinedHeap and storeHeap.
3770         // Double check to prevent empty rows from appearing in result. It could be
3771         // the case when SingleColumnValueExcludeFilter is used.
3772         if (results.isEmpty()) {
3773           boolean moreRows = nextRow(currentRow, offset, length);
3774           if (!moreRows) return false;
3775           if (!stopRow) continue;
3776         }
3777 
3778         // We are done. Return the result.
3779         return !stopRow;
3780       }
3781     }
3782 
3783     private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
3784       return filter != null
3785           && filter.filterRowKey(row, offset, length);
3786     }
3787 
3788     protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
3789       assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
3790       KeyValue next;
3791       while ((next = this.storeHeap.peek()) != null &&
3792              next.matchingRow(currentRow, offset, length)) {
3793         this.storeHeap.next(MOCKED_LIST);
3794       }
3795       resetFilters();
3796       // Calling the hook in CP which allows it to do a fast forward
3797       if (this.region.getCoprocessorHost() != null) {
3798         return this.region.getCoprocessorHost().postScannerFilterRow(this, currentRow);
3799       }
3800       return true;
3801     }
3802 
3803     private boolean isStopRow(byte [] currentRow, int offset, short length) {
3804       return currentRow == null ||
3805           (stopRow != null &&
3806           comparator.compareRows(stopRow, 0, stopRow.length,
3807             currentRow, offset, length) <= isScan);
3808     }
3809 
3810     @Override
3811     public synchronized void close() {
3812       if (storeHeap != null) {
3813         storeHeap.close();
3814         storeHeap = null;
3815       }
3816       if (joinedHeap != null) {
3817         joinedHeap.close();
3818         joinedHeap = null;
3819       }
3820       // no need to sychronize here.
3821       scannerReadPoints.remove(this);
3822       this.filterClosed = true;
3823     }
3824 
3825     KeyValueHeap getStoreHeapForTesting() {
3826       return storeHeap;
3827     }
3828 
3829     @Override
3830     public synchronized boolean reseek(byte[] row) throws IOException {
3831       if (row == null) {
3832         throw new IllegalArgumentException("Row cannot be null.");
3833       }
3834       boolean result = false;
3835       startRegionOperation();
3836       try {
3837         // This could be a new thread from the last time we called next().
3838         MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
3839         KeyValue kv = KeyValue.createFirstOnRow(row);
3840         // use request seek to make use of the lazy seek option. See HBASE-5520
3841         result = this.storeHeap.requestSeek(kv, true, true);
3842         if (this.joinedHeap != null) {
3843           result = this.joinedHeap.requestSeek(kv, true, true) || result;
3844         }
3845       } finally {
3846         closeRegionOperation();
3847       }
3848       return result;
3849     }
3850   }
3851 
3852   // Utility methods
3853   /**
3854    * A utility method to create new instances of HRegion based on the
3855    * {@link HConstants#REGION_IMPL} configuration property.
3856    * @param tableDir qualified path of directory where region should be located,
3857    * usually the table directory.
3858    * @param log The HLog is the outbound log for any updates to the HRegion
3859    * (There's a single HLog for all the HRegions on a single HRegionServer.)
3860    * The log file is a logfile from the previous execution that's
3861    * custom-computed for this HRegion. The HRegionServer computes and sorts the
3862    * appropriate log info for this HRegion. If there is a previous log file
3863    * (implying that the HRegion has been written-to before), then read it from
3864    * the supplied path.
3865    * @param fs is the filesystem.
3866    * @param conf is global configuration settings.
3867    * @param regionInfo - HRegionInfo that describes the region
3868    * is new), then read them from the supplied path.
3869    * @param htd the table descriptor
3870    * @param rsServices
3871    * @return the new instance
3872    */
3873   static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
3874       Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
3875       RegionServerServices rsServices) {
3876     try {
3877       @SuppressWarnings("unchecked")
3878       Class<? extends HRegion> regionClass =
3879           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
3880 
3881       Constructor<? extends HRegion> c =
3882           regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
3883               Configuration.class, HRegionInfo.class, HTableDescriptor.class,
3884               RegionServerServices.class);
3885 
3886       return c.newInstance(tableDir, log, fs, conf, regionInfo, htd, rsServices);
3887     } catch (Throwable e) {
3888       // todo: what should I throw here?
3889       throw new IllegalStateException("Could not instantiate a region instance.", e);
3890     }
3891   }
3892 
3893   /**
3894    * Convenience method creating new HRegions. Used by createTable and by the
3895    * bootstrap code in the HMaster constructor.
3896    * Note, this method creates an {@link HLog} for the created region. It
3897    * needs to be closed explicitly.  Use {@link HRegion#getLog()} to get
3898    * access.  <b>When done with a region created using this method, you will
3899    * need to explicitly close the {@link HLog} it created too; it will not be
3900    * done for you.  Not closing the log will leave at least a daemon thread
3901    * running.</b>  Call {@link #closeHRegion(HRegion)} and it will do
3902    * necessary cleanup for you.
3903    * @param info Info for region to create.
3904    * @param rootDir Root directory for HBase instance
3905    * @param conf
3906    * @param hTableDescriptor
3907    * @return new HRegion
3908    *
3909    * @throws IOException
3910    */
3911   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
3912       final Configuration conf, final HTableDescriptor hTableDescriptor)
3913   throws IOException {
3914     return createHRegion(info, rootDir, conf, hTableDescriptor, null);
3915   }
3916 
3917   /**
3918    * This will do the necessary cleanup a call to
3919    * {@link #createHRegion(HRegionInfo, Path, Configuration, HTableDescriptor)}
3920    * requires.  This method will close the region and then close its
3921    * associated {@link HLog} file.  You use it if you call the other createHRegion,
3922    * the one that takes an {@link HLog} instance but don't be surprised by the
3923    * call to the {@link HLog#closeAndDelete()} on the {@link HLog} the
3924    * HRegion was carrying.
3925    * @param r
3926    * @throws IOException
3927    */
3928   public static void closeHRegion(final HRegion r) throws IOException {
3929     if (r == null) return;
3930     r.close();
3931     if (r.getLog() == null) return;
3932     r.getLog().closeAndDelete();
3933   }
3934 
3935   /**
3936    * Convenience method creating new HRegions. Used by createTable.
3937    * The {@link HLog} for the created region needs to be closed explicitly.
3938    * Use {@link HRegion#getLog()} to get access.
3939    *
3940    * @param info Info for region to create.
3941    * @param rootDir Root directory for HBase instance
3942    * @param conf
3943    * @param hTableDescriptor
3944    * @param hlog shared HLog
3945    * @param initialize - true to initialize the region
3946    * @return new HRegion
3947    *
3948    * @throws IOException
3949    */
3950   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
3951                                       final Configuration conf,
3952                                       final HTableDescriptor hTableDescriptor,
3953                                       final HLog hlog,
3954                                       final boolean initialize)
3955       throws IOException {
3956     return createHRegion(info, rootDir, conf, hTableDescriptor,
3957         hlog, initialize, false);
3958   }
3959 
3960   /**
3961    * Convenience method creating new HRegions. Used by createTable.
3962    * The {@link HLog} for the created region needs to be closed
3963    * explicitly, if it is not null.
3964    * Use {@link HRegion#getLog()} to get access.
3965    *
3966    * @param info Info for region to create.
3967    * @param rootDir Root directory for HBase instance
3968    * @param conf
3969    * @param hTableDescriptor
3970    * @param hlog shared HLog
3971    * @param initialize - true to initialize the region
3972    * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
3973    * @return new HRegion
3974    * @throws IOException
3975    */
3976   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
3977                                       final Configuration conf,
3978                                       final HTableDescriptor hTableDescriptor,
3979                                       final HLog hlog,
3980                                       final boolean initialize, final boolean ignoreHLog)
3981       throws IOException {
3982     LOG.info("creating HRegion " + info.getTableNameAsString()
3983         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
3984         " Table name == " + info.getTableNameAsString());
3985 
3986     Path tableDir = HTableDescriptor.getTableDir(rootDir, info.getTableName());
3987     FileSystem fs = FileSystem.get(conf);
3988     HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
3989     HLog effectiveHLog = hlog;
3990     if (hlog == null && !ignoreHLog) {
3991       effectiveHLog = HLogFactory.createHLog(fs, rfs.getRegionDir(),
3992                                              HConstants.HREGION_LOGDIR_NAME, conf);
3993     }
3994     HRegion region = HRegion.newHRegion(tableDir,
3995         effectiveHLog, fs, conf, info, hTableDescriptor, null);
3996     if (initialize) {
3997       region.initialize();
3998     }
3999     return region;
4000   }
4001 
4002   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4003                                       final Configuration conf,
4004                                       final HTableDescriptor hTableDescriptor,
4005                                       final HLog hlog)
4006     throws IOException {
4007     return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
4008   }
4009 
4010 
4011   /**
4012    * Open a Region.
4013    * @param info Info for region to be opened.
4014    * @param wal HLog for region to use. This method will call
4015    * HLog#setSequenceNumber(long) passing the result of the call to
4016    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4017    * up.  HRegionStore does this every time it opens a new region.
4018    * @param conf
4019    * @return new HRegion
4020    *
4021    * @throws IOException
4022    */
4023   public static HRegion openHRegion(final HRegionInfo info,
4024       final HTableDescriptor htd, final HLog wal,
4025       final Configuration conf)
4026   throws IOException {
4027     return openHRegion(info, htd, wal, conf, null, null);
4028   }
4029 
4030   /**
4031    * Open a Region.
4032    * @param info Info for region to be opened
4033    * @param htd the table descriptor
4034    * @param wal HLog for region to use. This method will call
4035    * HLog#setSequenceNumber(long) passing the result of the call to
4036    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4037    * up.  HRegionStore does this every time it opens a new region.
4038    * @param conf The Configuration object to use.
4039    * @param rsServices An interface we can request flushes against.
4040    * @param reporter An interface we can report progress against.
4041    * @return new HRegion
4042    *
4043    * @throws IOException
4044    */
4045   public static HRegion openHRegion(final HRegionInfo info,
4046     final HTableDescriptor htd, final HLog wal, final Configuration conf,
4047     final RegionServerServices rsServices,
4048     final CancelableProgressable reporter)
4049   throws IOException {
4050     return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
4051   }
4052 
4053   /**
4054    * Open a Region.
4055    * @param rootDir Root directory for HBase instance
4056    * @param info Info for region to be opened.
4057    * @param htd the table descriptor
4058    * @param wal HLog for region to use. This method will call
4059    * HLog#setSequenceNumber(long) passing the result of the call to
4060    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4061    * up.  HRegionStore does this every time it opens a new region.
4062    * @param conf The Configuration object to use.
4063    * @return new HRegion
4064    * @throws IOException
4065    */
4066   public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
4067       final HTableDescriptor htd, final HLog wal, final Configuration conf)
4068   throws IOException {
4069     return openHRegion(rootDir, info, htd, wal, conf, null, null);
4070   }
4071 
4072   /**
4073    * Open a Region.
4074    * @param rootDir Root directory for HBase instance
4075    * @param info Info for region to be opened.
4076    * @param htd the table descriptor
4077    * @param wal HLog for region to use. This method will call
4078    * HLog#setSequenceNumber(long) passing the result of the call to
4079    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4080    * up.  HRegionStore does this every time it opens a new region.
4081    * @param conf The Configuration object to use.
4082    * @param rsServices An interface we can request flushes against.
4083    * @param reporter An interface we can report progress against.
4084    * @return new HRegion
4085    * @throws IOException
4086    */
4087   public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
4088       final HTableDescriptor htd, final HLog wal, final Configuration conf,
4089       final RegionServerServices rsServices,
4090       final CancelableProgressable reporter)
4091   throws IOException {
4092     FileSystem fs = null;
4093     if (rsServices != null) {
4094       fs = rsServices.getFileSystem();
4095     }
4096     if (fs == null) {
4097       fs = FileSystem.get(conf);
4098     }
4099     return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
4100   }
4101 
4102   /**
4103    * Open a Region.
4104    * @param conf The Configuration object to use.
4105    * @param fs Filesystem to use
4106    * @param rootDir Root directory for HBase instance
4107    * @param info Info for region to be opened.
4108    * @param htd the table descriptor
4109    * @param wal HLog for region to use. This method will call
4110    * HLog#setSequenceNumber(long) passing the result of the call to
4111    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4112    * up.  HRegionStore does this every time it opens a new region.
4113    * @return new HRegion
4114    * @throws IOException
4115    */
4116   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4117       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal)
4118       throws IOException {
4119     return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
4120   }
4121 
4122   /**
4123    * Open a Region.
4124    * @param conf The Configuration object to use.
4125    * @param fs Filesystem to use
4126    * @param rootDir Root directory for HBase instance
4127    * @param info Info for region to be opened.
4128    * @param htd the table descriptor
4129    * @param wal HLog for region to use. This method will call
4130    * HLog#setSequenceNumber(long) passing the result of the call to
4131    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4132    * up.  HRegionStore does this every time it opens a new region.
4133    * @param rsServices An interface we can request flushes against.
4134    * @param reporter An interface we can report progress against.
4135    * @return new HRegion
4136    * @throws IOException
4137    */
4138   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4139       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4140       final RegionServerServices rsServices, final CancelableProgressable reporter)
4141       throws IOException {
4142     if (info == null) throw new NullPointerException("Passed region info is null");
4143     LOG.info("HRegion.openHRegion Region name ==" + info.getRegionNameAsString());
4144     if (LOG.isDebugEnabled()) {
4145       LOG.debug("Opening region: " + info);
4146     }
4147     Path dir = HTableDescriptor.getTableDir(rootDir, info.getTableName());
4148     HRegion r = HRegion.newHRegion(dir, wal, fs, conf, info, htd, rsServices);
4149     return r.openHRegion(reporter);
4150   }
4151 
4152   /**
4153    * Useful when reopening a closed region (normally for unit tests)
4154    * @param other original object
4155    * @param reporter An interface we can report progress against.
4156    * @return new HRegion
4157    * @throws IOException
4158    */
4159   public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
4160       throws IOException {
4161     HRegionFileSystem regionFs = other.getRegionFileSystem();
4162     HRegion r = newHRegion(regionFs.getTableDir(), other.getLog(), regionFs.getFileSystem(),
4163         other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
4164     return r.openHRegion(reporter);
4165   }
4166 
4167   /**
4168    * Open HRegion.
4169    * Calls initialize and sets sequenceid.
4170    * @param reporter
4171    * @return Returns <code>this</code>
4172    * @throws IOException
4173    */
4174   protected HRegion openHRegion(final CancelableProgressable reporter)
4175   throws IOException {
4176     checkCompressionCodecs();
4177 
4178     this.openSeqNum = initialize(reporter);
4179     if (this.log != null) {
4180       this.log.setSequenceNumber(this.openSeqNum);
4181     }
4182 
4183     return this;
4184   }
4185 
4186   private void checkCompressionCodecs() throws IOException {
4187     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4188       CompressionTest.testCompression(fam.getCompression());
4189       CompressionTest.testCompression(fam.getCompactionCompression());
4190     }
4191   }
4192 
4193   /**
4194    * Create a daughter region from given a temp directory with the region data.
4195    * @param hri Spec. for daughter region to open.
4196    * @throws IOException
4197    */
4198   HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
4199     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
4200         this.getBaseConf(), hri, this.getTableDesc(), rsServices);
4201     r.readRequestsCount.set(this.getReadRequestsCount() / 2);
4202     r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
4203     fs.commitDaughterRegion(hri);
4204     return r;
4205   }
4206 
4207   /**
4208    * Create a merged region given a temp directory with the region data.
4209    * @param mergedRegionInfo
4210    * @param region_b another merging region
4211    * @return merged hregion
4212    * @throws IOException
4213    */
4214   HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
4215       final HRegion region_b) throws IOException {
4216     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(),
4217         fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
4218         this.getTableDesc(), this.rsServices);
4219     r.readRequestsCount.set(this.getReadRequestsCount()
4220         + region_b.getReadRequestsCount());
4221     r.writeRequestsCount.set(this.getWriteRequestsCount()
4222         + region_b.getWriteRequestsCount());
4223     this.fs.commitMergedRegion(mergedRegionInfo);
4224     return r;
4225   }
4226 
4227   /**
4228    * Inserts a new region's meta information into the passed
4229    * <code>meta</code> region. Used by the HMaster bootstrap code adding
4230    * new table to META table.
4231    *
4232    * @param meta META HRegion to be updated
4233    * @param r HRegion to add to <code>meta</code>
4234    *
4235    * @throws IOException
4236    */
4237   // TODO remove since only test and merge use this
4238   public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
4239     meta.checkResources();
4240     // The row key is the region name
4241     byte[] row = r.getRegionName();
4242     final long now = EnvironmentEdgeManager.currentTimeMillis();
4243     final List<KeyValue> cells = new ArrayList<KeyValue>(2);
4244     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4245       HConstants.REGIONINFO_QUALIFIER, now,
4246       r.getRegionInfo().toByteArray()));
4247     // Set into the root table the version of the meta table.
4248     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4249       HConstants.META_VERSION_QUALIFIER, now,
4250       Bytes.toBytes(HConstants.META_VERSION)));
4251     meta.put(row, HConstants.CATALOG_FAMILY, cells);
4252   }
4253 
4254   /**
4255    * Computes the Path of the HRegion
4256    *
4257    * @param tabledir qualified path for table
4258    * @param name ENCODED region name
4259    * @return Path of HRegion directory
4260    */
4261   @Deprecated
4262   public static Path getRegionDir(final Path tabledir, final String name) {
4263     return new Path(tabledir, name);
4264   }
4265 
4266   /**
4267    * Computes the Path of the HRegion
4268    *
4269    * @param rootdir qualified path of HBase root directory
4270    * @param info HRegionInfo for the region
4271    * @return qualified path of region directory
4272    */
4273   @Deprecated
4274   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
4275     return new Path(
4276       HTableDescriptor.getTableDir(rootdir, info.getTableName()),
4277                                    info.getEncodedName());
4278   }
4279 
4280   /**
4281    * Determines if the specified row is within the row range specified by the
4282    * specified HRegionInfo
4283    *
4284    * @param info HRegionInfo that specifies the row range
4285    * @param row row to be checked
4286    * @return true if the row is within the range specified by the HRegionInfo
4287    */
4288   public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
4289     return ((info.getStartKey().length == 0) ||
4290         (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
4291         ((info.getEndKey().length == 0) ||
4292             (Bytes.compareTo(info.getEndKey(), row) > 0));
4293   }
4294 
4295   /**
4296    * Merge two HRegions.  The regions must be adjacent and must not overlap.
4297    *
4298    * @param srcA
4299    * @param srcB
4300    * @return new merged HRegion
4301    * @throws IOException
4302    */
4303   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
4304   throws IOException {
4305     HRegion a = srcA;
4306     HRegion b = srcB;
4307 
4308     // Make sure that srcA comes first; important for key-ordering during
4309     // write of the merged file.
4310     if (srcA.getStartKey() == null) {
4311       if (srcB.getStartKey() == null) {
4312         throw new IOException("Cannot merge two regions with null start key");
4313       }
4314       // A's start key is null but B's isn't. Assume A comes before B
4315     } else if ((srcB.getStartKey() == null) ||
4316       (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
4317       a = srcB;
4318       b = srcA;
4319     }
4320 
4321     if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
4322       throw new IOException("Cannot merge non-adjacent regions");
4323     }
4324     return merge(a, b);
4325   }
4326 
4327   /**
4328    * Merge two regions whether they are adjacent or not.
4329    *
4330    * @param a region a
4331    * @param b region b
4332    * @return new merged region
4333    * @throws IOException
4334    */
4335   public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
4336     if (!a.getRegionInfo().getTableNameAsString().equals(
4337         b.getRegionInfo().getTableNameAsString())) {
4338       throw new IOException("Regions do not belong to the same table");
4339     }
4340 
4341     FileSystem fs = a.getRegionFileSystem().getFileSystem();
4342     // Make sure each region's cache is empty
4343     a.flushcache();
4344     b.flushcache();
4345 
4346     // Compact each region so we only have one store file per family
4347     a.compactStores(true);
4348     if (LOG.isDebugEnabled()) {
4349       LOG.debug("Files for region: " + a);
4350       a.getRegionFileSystem().logFileSystemState(LOG);
4351     }
4352     b.compactStores(true);
4353     if (LOG.isDebugEnabled()) {
4354       LOG.debug("Files for region: " + b);
4355       b.getRegionFileSystem().logFileSystemState(LOG);
4356     }
4357 
4358     RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
4359     if (!rmt.prepare(null)) {
4360       throw new IOException("Unable to merge regions " + a + " and " + b);
4361     }
4362     HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
4363     LOG.info("starting merge of regions: " + a + " and " + b
4364         + " into new region " + mergedRegionInfo.getRegionNameAsString()
4365         + " with start key <"
4366         + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
4367         + "> and end key <"
4368         + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
4369     HRegion dstRegion = null;
4370     try {
4371       dstRegion = rmt.execute(null, null);
4372     } catch (IOException ioe) {
4373       rmt.rollback(null, null);
4374       throw new IOException("Failed merging region " + a + " and " + b
4375           + ", and succssfully rolled back");
4376     }
4377     dstRegion.compactStores(true);
4378 
4379     if (LOG.isDebugEnabled()) {
4380       LOG.debug("Files for new region");
4381       dstRegion.getRegionFileSystem().logFileSystemState(LOG);
4382     }
4383 
4384     if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
4385       throw new IOException("Merged region " + dstRegion
4386           + " still has references after the compaction, is compaction canceled?");
4387     }
4388 
4389     // Archiving the 'A' region
4390     HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
4391     // Archiving the 'B' region
4392     HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
4393 
4394     LOG.info("merge completed. New region is " + dstRegion);
4395     return dstRegion;
4396   }
4397 
4398   /**
4399    * @return True if needs a major compaction.
4400    * @throws IOException
4401    */
4402   boolean isMajorCompaction() throws IOException {
4403     for (Store store : this.stores.values()) {
4404       if (store.isMajorCompaction()) {
4405         return true;
4406       }
4407     }
4408     return false;
4409   }
4410 
4411   //
4412   // HBASE-880
4413   //
4414   /**
4415    * @param get get object
4416    * @return result
4417    * @throws IOException read exceptions
4418    */
4419   public Result get(final Get get) throws IOException {
4420     checkRow(get.getRow(), "Get");
4421     // Verify families are all valid
4422     if (get.hasFamilies()) {
4423       for (byte [] family: get.familySet()) {
4424         checkFamily(family);
4425       }
4426     } else { // Adding all families to scanner
4427       for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
4428         get.addFamily(family);
4429       }
4430     }
4431     List<KeyValue> results = get(get, true);
4432     return new Result(results);
4433   }
4434 
4435   /*
4436    * Do a get based on the get parameter.
4437    * @param withCoprocessor invoke coprocessor or not. We don't want to
4438    * always invoke cp for this private method.
4439    */
4440   private List<KeyValue> get(Get get, boolean withCoprocessor)
4441   throws IOException {
4442 
4443     List<KeyValue> results = new ArrayList<KeyValue>();
4444 
4445     // pre-get CP hook
4446     if (withCoprocessor && (coprocessorHost != null)) {
4447        if (coprocessorHost.preGet(get, results)) {
4448          return results;
4449        }
4450     }
4451 
4452     Scan scan = new Scan(get);
4453 
4454     RegionScanner scanner = null;
4455     try {
4456       scanner = getScanner(scan);
4457       scanner.next(results);
4458     } finally {
4459       if (scanner != null)
4460         scanner.close();
4461     }
4462 
4463     // post-get CP hook
4464     if (withCoprocessor && (coprocessorHost != null)) {
4465       coprocessorHost.postGet(get, results);
4466     }
4467 
4468     // do after lock
4469     if (this.metricsRegion != null) {
4470       long totalSize = 0l;
4471       if (results != null) {
4472         for (KeyValue kv:results) {
4473           totalSize += kv.getLength();
4474         }
4475       }
4476       this.metricsRegion.updateGet(totalSize);
4477     }
4478 
4479     return results;
4480   }
4481 
4482   public void mutateRow(RowMutations rm) throws IOException {
4483     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
4484   }
4485 
4486   /**
4487    * Perform atomic mutations within the region.
4488    * @param mutations The list of mutations to perform.
4489    * <code>mutations</code> can contain operations for multiple rows.
4490    * Caller has to ensure that all rows are contained in this region.
4491    * @param rowsToLock Rows to lock
4492    * If multiple rows are locked care should be taken that
4493    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
4494    * @throws IOException
4495    */
4496   public void mutateRowsWithLocks(Collection<Mutation> mutations,
4497       Collection<byte[]> rowsToLock) throws IOException {
4498 
4499     MultiRowMutationProcessor proc =
4500         new MultiRowMutationProcessor(mutations, rowsToLock);
4501     processRowsWithLocks(proc, -1);
4502   }
4503 
4504   /**
4505    * Performs atomic multiple reads and writes on a given row.
4506    *
4507    * @param processor The object defines the reads and writes to a row.
4508    */
4509   public void processRowsWithLocks(RowProcessor<?,?> processor)
4510       throws IOException {
4511     processRowsWithLocks(processor, rowProcessorTimeout);
4512   }
4513 
4514   /**
4515    * Performs atomic multiple reads and writes on a given row.
4516    *
4517    * @param processor The object defines the reads and writes to a row.
4518    * @param timeout The timeout of the processor.process() execution
4519    *                Use a negative number to switch off the time bound
4520    */
4521   public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout)
4522       throws IOException {
4523 
4524     for (byte[] row : processor.getRowsToLock()) {
4525       checkRow(row, "processRowsWithLocks");
4526     }
4527     if (!processor.readOnly()) {
4528       checkReadOnly();
4529     }
4530     checkResources();
4531 
4532     startRegionOperation();
4533     WALEdit walEdit = new WALEdit();
4534 
4535     // 1. Run pre-process hook
4536     processor.preProcess(this, walEdit);
4537 
4538     // Short circuit the read only case
4539     if (processor.readOnly()) {
4540       try {
4541         long now = EnvironmentEdgeManager.currentTimeMillis();
4542         doProcessRowWithTimeout(
4543             processor, now, this, null, null, timeout);
4544         processor.postProcess(this, walEdit);
4545       } catch (IOException e) {
4546         throw e;
4547       } finally {
4548         closeRegionOperation();
4549       }
4550       return;
4551     }
4552 
4553     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
4554     boolean locked = false;
4555     boolean walSyncSuccessful = false;
4556     List<Integer> acquiredLocks = null;
4557     long addedSize = 0;
4558     List<KeyValue> mutations = new ArrayList<KeyValue>();
4559     Collection<byte[]> rowsToLock = processor.getRowsToLock();
4560     try {
4561       // 2. Acquire the row lock(s)
4562       acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
4563       for (byte[] row : rowsToLock) {
4564         // Attempt to lock all involved rows, fail if one lock times out
4565         Integer lid = getLock(null, row, true);
4566         if (lid == null) {
4567           throw new IOException("Failed to acquire lock on "
4568               + Bytes.toStringBinary(row));
4569         }
4570         acquiredLocks.add(lid);
4571       }
4572       // 3. Region lock
4573       lock(this.updatesLock.readLock(), acquiredLocks.size());
4574       locked = true;
4575 
4576       long now = EnvironmentEdgeManager.currentTimeMillis();
4577       try {
4578         // 4. Let the processor scan the rows, generate mutations and add
4579         //    waledits
4580         doProcessRowWithTimeout(
4581             processor, now, this, mutations, walEdit, timeout);
4582 
4583         if (!mutations.isEmpty()) {
4584           // 5. Get a mvcc write number
4585           writeEntry = mvcc.beginMemstoreInsert();
4586           // 6. Apply to memstore
4587           for (KeyValue kv : mutations) {
4588             kv.setMemstoreTS(writeEntry.getWriteNumber());
4589             byte[] family = kv.getFamily();
4590             checkFamily(family);
4591             addedSize += stores.get(family).add(kv);
4592           }
4593 
4594           long txid = 0;
4595           // 7. Append no sync
4596           if (!walEdit.isEmpty()) {
4597             txid = this.log.appendNoSync(this.getRegionInfo(),
4598                 this.htableDescriptor.getName(), walEdit,
4599                 processor.getClusterId(), now, this.htableDescriptor);
4600           }
4601           // 8. Release region lock
4602           if (locked) {
4603             this.updatesLock.readLock().unlock();
4604             locked = false;
4605           }
4606 
4607           // 9. Release row lock(s)
4608           if (acquiredLocks != null) {
4609             for (Integer lid : acquiredLocks) {
4610               releaseRowLock(lid);
4611             }
4612             acquiredLocks = null;
4613           }
4614           // 10. Sync edit log
4615           if (txid != 0) {
4616             syncOrDefer(txid, processor.useDurability());
4617           }
4618           walSyncSuccessful = true;
4619         }
4620       } finally {
4621         if (!mutations.isEmpty() && !walSyncSuccessful) {
4622           LOG.warn("Wal sync failed. Roll back " + mutations.size() +
4623               " memstore keyvalues for row(s):" +
4624               processor.getRowsToLock().iterator().next() + "...");
4625           for (KeyValue kv : mutations) {
4626             stores.get(kv.getFamily()).rollback(kv);
4627           }
4628         }
4629         // 11. Roll mvcc forward
4630         if (writeEntry != null) {
4631           mvcc.completeMemstoreInsert(writeEntry);
4632           writeEntry = null;
4633         }
4634         if (locked) {
4635           this.updatesLock.readLock().unlock();
4636           locked = false;
4637         }
4638         if (acquiredLocks != null) {
4639           for (Integer lid : acquiredLocks) {
4640             releaseRowLock(lid);
4641           }
4642         }
4643 
4644       }
4645 
4646       // 12. Run post-process hook
4647       processor.postProcess(this, walEdit);
4648 
4649     } catch (IOException e) {
4650       throw e;
4651     } finally {
4652       closeRegionOperation();
4653       if (!mutations.isEmpty() &&
4654           isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
4655         requestFlush();
4656       }
4657     }
4658   }
4659 
4660   private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
4661                                        final long now,
4662                                        final HRegion region,
4663                                        final List<KeyValue> mutations,
4664                                        final WALEdit walEdit,
4665                                        final long timeout) throws IOException {
4666     // Short circuit the no time bound case.
4667     if (timeout < 0) {
4668       try {
4669         processor.process(now, region, mutations, walEdit);
4670       } catch (IOException e) {
4671         LOG.warn("RowProcessor:" + processor.getClass().getName() +
4672             " throws Exception on row(s):" +
4673             Bytes.toStringBinary(
4674               processor.getRowsToLock().iterator().next()) + "...", e);
4675         throw e;
4676       }
4677       return;
4678     }
4679 
4680     // Case with time bound
4681     FutureTask<Void> task =
4682       new FutureTask<Void>(new Callable<Void>() {
4683         @Override
4684         public Void call() throws IOException {
4685           try {
4686             processor.process(now, region, mutations, walEdit);
4687             return null;
4688           } catch (IOException e) {
4689             LOG.warn("RowProcessor:" + processor.getClass().getName() +
4690                 " throws Exception on row(s):" +
4691                 Bytes.toStringBinary(
4692                     processor.getRowsToLock().iterator().next()) + "...", e);
4693             throw e;
4694           }
4695         }
4696       });
4697     rowProcessorExecutor.execute(task);
4698     try {
4699       task.get(timeout, TimeUnit.MILLISECONDS);
4700     } catch (TimeoutException te) {
4701       LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
4702           Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
4703           "...");
4704       throw new IOException(te);
4705     } catch (Exception e) {
4706       throw new IOException(e);
4707     }
4708   }
4709 
4710   // TODO: There's a lot of boiler plate code identical
4711   // to increment... See how to better unify that.
4712   /**
4713    * Perform one or more append operations on a row.
4714    *
4715    * @param append
4716    * @return new keyvalues after increment
4717    * @throws IOException
4718    */
4719   public Result append(Append append)
4720       throws IOException {
4721     byte[] row = append.getRow();
4722     checkRow(row, "append");
4723     boolean flush = false;
4724     boolean writeToWAL = append.getDurability() != Durability.SKIP_WAL;
4725     WALEdit walEdits = null;
4726     List<KeyValue> allKVs = new ArrayList<KeyValue>(append.size());
4727     Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
4728 
4729     long size = 0;
4730     long txid = 0;
4731 
4732     checkReadOnly();
4733     // Lock row
4734     startRegionOperation(Operation.APPEND);
4735     this.writeRequestsCount.increment();
4736     WriteEntry w = null;
4737     try {
4738       Integer lid = getLock(null, row, true);
4739       lock(this.updatesLock.readLock());
4740       // wait for all prior MVCC transactions to finish - while we hold the row lock
4741       // (so that we are guaranteed to see the latest state)
4742       mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
4743       // now start my own transaction
4744       w = mvcc.beginMemstoreInsert();
4745       try {
4746         long now = EnvironmentEdgeManager.currentTimeMillis();
4747         // Process each family
4748         for (Map.Entry<byte[], List<? extends Cell>> family : append.getFamilyMap().entrySet()) {
4749 
4750           Store store = stores.get(family.getKey());
4751           List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
4752 
4753           // Get previous values for all columns in this family
4754           Get get = new Get(row);
4755           for (Cell cell : family.getValue()) {
4756             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
4757             get.addColumn(family.getKey(), kv.getQualifier());
4758           }
4759           List<KeyValue> results = get(get, false);
4760 
4761           // Iterate the input columns and update existing values if they were
4762           // found, otherwise add new column initialized to the append value
4763 
4764           // Avoid as much copying as possible. Every byte is copied at most
4765           // once.
4766           // Would be nice if KeyValue had scatter/gather logic
4767           int idx = 0;
4768           for (Cell cell : family.getValue()) {
4769             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
4770             KeyValue newKV;
4771             if (idx < results.size()
4772                 && results.get(idx).matchingQualifier(kv.getBuffer(),
4773                     kv.getQualifierOffset(), kv.getQualifierLength())) {
4774               KeyValue oldKv = results.get(idx);
4775               // allocate an empty kv once
4776               newKV = new KeyValue(row.length, kv.getFamilyLength(),
4777                   kv.getQualifierLength(), now, KeyValue.Type.Put,
4778                   oldKv.getValueLength() + kv.getValueLength());
4779               // copy in the value
4780               System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(),
4781                   newKV.getBuffer(), newKV.getValueOffset(),
4782                   oldKv.getValueLength());
4783               System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
4784                   newKV.getBuffer(),
4785                   newKV.getValueOffset() + oldKv.getValueLength(),
4786                   kv.getValueLength());
4787               idx++;
4788             } else {
4789               // allocate an empty kv once
4790               newKV = new KeyValue(row.length, kv.getFamilyLength(),
4791                   kv.getQualifierLength(), now, KeyValue.Type.Put,
4792                   kv.getValueLength());
4793               // copy in the value
4794               System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
4795                   newKV.getBuffer(), newKV.getValueOffset(),
4796                   kv.getValueLength());
4797             }
4798             // copy in row, family, and qualifier
4799             System.arraycopy(kv.getBuffer(), kv.getRowOffset(),
4800                 newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength());
4801             System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(),
4802                 newKV.getBuffer(), newKV.getFamilyOffset(),
4803                 kv.getFamilyLength());
4804             System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
4805                 newKV.getBuffer(), newKV.getQualifierOffset(),
4806                 kv.getQualifierLength());
4807 
4808             newKV.setMemstoreTS(w.getWriteNumber());
4809             kvs.add(newKV);
4810 
4811             // Append update to WAL
4812             if (writeToWAL) {
4813               if (walEdits == null) {
4814                 walEdits = new WALEdit();
4815               }
4816               walEdits.add(newKV);
4817             }
4818           }
4819 
4820           //store the kvs to the temporary memstore before writing HLog
4821           tempMemstore.put(store, kvs);
4822         }
4823 
4824         // Actually write to WAL now
4825         if (writeToWAL) {
4826           // Using default cluster id, as this can only happen in the orginating
4827           // cluster. A slave cluster receives the final value (not the delta)
4828           // as a Put.
4829           txid = this.log.appendNoSync(this.getRegionInfo(),
4830               this.htableDescriptor.getName(), walEdits,
4831               HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
4832               this.htableDescriptor);
4833         }
4834 
4835         //Actually write to Memstore now
4836         for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
4837           Store store = entry.getKey();
4838           if (store.getFamily().getMaxVersions() == 1) {
4839             // upsert if VERSIONS for this CF == 1
4840             size += store.upsert(entry.getValue(), getSmallestReadPoint());
4841           } else {
4842             // otherwise keep older versions around
4843             for (Cell cell: entry.getValue()) {
4844               KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
4845               size += store.add(kv);
4846             }
4847           }
4848           allKVs.addAll(entry.getValue());
4849         }
4850         size = this.addAndGetGlobalMemstoreSize(size);
4851         flush = isFlushSize(size);
4852       } finally {
4853         this.updatesLock.readLock().unlock();
4854         releaseRowLock(lid);
4855       }
4856       if (writeToWAL) {
4857         // sync the transaction log outside the rowlock
4858         syncOrDefer(txid, append.getDurability());
4859       }
4860     } finally {
4861       if (w != null) {
4862         mvcc.completeMemstoreInsert(w);
4863       }
4864       closeRegionOperation();
4865     }
4866 
4867     if (this.metricsRegion != null) {
4868       this.metricsRegion.updateAppend();
4869     }
4870 
4871     if (flush) {
4872       // Request a cache flush. Do it outside update lock.
4873       requestFlush();
4874     }
4875 
4876 
4877     return append.isReturnResults() ? new Result(allKVs) : null;
4878   }
4879 
4880   /**
4881    * Perform one or more increment operations on a row.
4882    * @param increment
4883    * @return new keyvalues after increment
4884    * @throws IOException
4885    */
4886   public Result increment(Increment increment)
4887   throws IOException {
4888     byte [] row = increment.getRow();
4889     checkRow(row, "increment");
4890     TimeRange tr = increment.getTimeRange();
4891     boolean flush = false;
4892     boolean writeToWAL = increment.getDurability() != Durability.SKIP_WAL;
4893     WALEdit walEdits = null;
4894     List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.size());
4895     Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
4896 
4897     long size = 0;
4898     long txid = 0;
4899 
4900     checkReadOnly();
4901     // Lock row
4902     startRegionOperation(Operation.INCREMENT);
4903     this.writeRequestsCount.increment();
4904     WriteEntry w = null;
4905     try {
4906       Integer lid = getLock(null, row, true);
4907       lock(this.updatesLock.readLock());
4908       // wait for all prior MVCC transactions to finish - while we hold the row lock
4909       // (so that we are guaranteed to see the latest state)
4910       mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
4911       // now start my own transaction
4912       w = mvcc.beginMemstoreInsert();
4913       try {
4914         long now = EnvironmentEdgeManager.currentTimeMillis();
4915         // Process each family
4916         for (Map.Entry<byte [], List<? extends Cell>> family:
4917             increment.getFamilyMap().entrySet()) {
4918 
4919           Store store = stores.get(family.getKey());
4920           List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
4921 
4922           // Get previous values for all columns in this family
4923           Get get = new Get(row);
4924           for (Cell cell: family.getValue()) {
4925             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
4926             get.addColumn(family.getKey(), kv.getQualifier());
4927           }
4928           get.setTimeRange(tr.getMin(), tr.getMax());
4929           List<KeyValue> results = get(get, false);
4930 
4931           // Iterate the input columns and update existing values if they were
4932           // found, otherwise add new column initialized to the increment amount
4933           int idx = 0;
4934           for (Cell cell: family.getValue()) {
4935             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
4936             long amount = Bytes.toLong(kv.getValue());
4937             byte [] qualifier = kv.getQualifier();
4938             if (idx < results.size() && results.get(idx).matchingQualifier(qualifier)) {
4939               kv = results.get(idx);
4940               if(kv.getValueLength() == Bytes.SIZEOF_LONG) {
4941                 amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG);
4942               } else {
4943                 // throw DoNotRetryIOException instead of IllegalArgumentException
4944                 throw new org.apache.hadoop.hbase.exceptions.DoNotRetryIOException(
4945                     "Attempted to increment field that isn't 64 bits wide");
4946               }
4947               idx++;
4948             }
4949 
4950             // Append new incremented KeyValue to list
4951             KeyValue newKV =
4952               new KeyValue(row, family.getKey(), qualifier, now, Bytes.toBytes(amount));
4953             newKV.setMemstoreTS(w.getWriteNumber());
4954             kvs.add(newKV);
4955 
4956             // Prepare WAL updates
4957             if (writeToWAL) {
4958               if (walEdits == null) {
4959                 walEdits = new WALEdit();
4960               }
4961               walEdits.add(newKV);
4962             }
4963           }
4964 
4965           //store the kvs to the temporary memstore before writing HLog
4966           tempMemstore.put(store, kvs);
4967         }
4968 
4969         // Actually write to WAL now
4970         if (writeToWAL) {
4971           // Using default cluster id, as this can only happen in the orginating
4972           // cluster. A slave cluster receives the final value (not the delta)
4973           // as a Put.
4974           txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
4975               walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
4976               this.htableDescriptor);
4977         }
4978 
4979         //Actually write to Memstore now
4980         for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
4981           Store store = entry.getKey();
4982           if (store.getFamily().getMaxVersions() == 1) {
4983             // upsert if VERSIONS for this CF == 1
4984             size += store.upsert(entry.getValue(), getSmallestReadPoint());
4985           } else {
4986             // otherwise keep older versions around
4987             for (Cell cell : entry.getValue()) {
4988               KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
4989               size += store.add(kv);
4990             }
4991           }
4992           allKVs.addAll(entry.getValue());
4993         }
4994         size = this.addAndGetGlobalMemstoreSize(size);
4995         flush = isFlushSize(size);
4996       } finally {
4997         this.updatesLock.readLock().unlock();
4998         releaseRowLock(lid);
4999       }
5000       if (writeToWAL) {
5001         // sync the transaction log outside the rowlock
5002         syncOrDefer(txid, increment.getDurability());
5003       }
5004     } finally {
5005       if (w != null) {
5006         mvcc.completeMemstoreInsert(w);
5007       }
5008       closeRegionOperation();
5009       if (this.metricsRegion != null) {
5010         this.metricsRegion.updateIncrement();
5011       }
5012     }
5013 
5014     if (flush) {
5015       // Request a cache flush.  Do it outside update lock.
5016       requestFlush();
5017     }
5018 
5019     return new Result(allKVs);
5020   }
5021 
5022   //
5023   // New HBASE-880 Helpers
5024   //
5025 
5026   private void checkFamily(final byte [] family)
5027   throws NoSuchColumnFamilyException {
5028     if (!this.htableDescriptor.hasFamily(family)) {
5029       throw new NoSuchColumnFamilyException("Column family " +
5030           Bytes.toString(family) + " does not exist in region " + this
5031           + " in table " + this.htableDescriptor);
5032     }
5033   }
5034 
5035   public static final long FIXED_OVERHEAD = ClassSize.align(
5036       ClassSize.OBJECT +
5037       ClassSize.ARRAY +
5038       38 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
5039       (12 * Bytes.SIZEOF_LONG) +
5040       Bytes.SIZEOF_BOOLEAN);
5041 
5042   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
5043       ClassSize.OBJECT + // closeLock
5044       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
5045       (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
5046       ClassSize.ATOMIC_INTEGER + // lockIdGenerator
5047       (3 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, lockIds, scannerReadPoints
5048       WriteState.HEAP_SIZE + // writestate
5049       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
5050       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
5051       ClassSize.ARRAYLIST + // recentFlushes
5052       MultiVersionConsistencyControl.FIXED_SIZE // mvcc
5053       ;
5054 
5055   @Override
5056   public long heapSize() {
5057     long heapSize = DEEP_OVERHEAD;
5058     for (Store store : this.stores.values()) {
5059       heapSize += store.heapSize();
5060     }
5061     // this does not take into account row locks, recent flushes, mvcc entries
5062     return heapSize;
5063   }
5064 
5065   /*
5066    * This method calls System.exit.
5067    * @param message Message to print out.  May be null.
5068    */
5069   private static void printUsageAndExit(final String message) {
5070     if (message != null && message.length() > 0) System.out.println(message);
5071     System.out.println("Usage: HRegion CATLALOG_TABLE_DIR [major_compact]");
5072     System.out.println("Options:");
5073     System.out.println(" major_compact  Pass this option to major compact " +
5074       "passed region.");
5075     System.out.println("Default outputs scan of passed region.");
5076     System.exit(1);
5077   }
5078 
5079   /**
5080    * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
5081    * be available for handling
5082    * {@link HRegion#execService(com.google.protobuf.RpcController,
5083    *    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
5084    *
5085    * <p>
5086    * Only a single instance may be registered per region for a given {@link Service} subclass (the
5087    * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
5088    * After the first registration, subsequent calls with the same service name will fail with
5089    * a return value of {@code false}.
5090    * </p>
5091    * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
5092    * @return {@code true} if the registration was successful, {@code false}
5093    * otherwise
5094    */
5095   public boolean registerService(Service instance) {
5096     /*
5097      * No stacking of instances is allowed for a single service name
5098      */
5099     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
5100     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
5101       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
5102           " already registered, rejecting request from "+instance
5103       );
5104       return false;
5105     }
5106 
5107     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
5108     if (LOG.isDebugEnabled()) {
5109       LOG.debug("Registered coprocessor service: region="+
5110           Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
5111     }
5112     return true;
5113   }
5114 
5115   /**
5116    * Executes a single protocol buffer coprocessor endpoint {@link Service} method using
5117    * the registered protocol handlers.  {@link Service} implementations must be registered via the
5118    * {@link HRegion#registerService(com.google.protobuf.Service)}
5119    * method before they are available.
5120    *
5121    * @param controller an {@code RpcContoller} implementation to pass to the invoked service
5122    * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
5123    *     and parameters for the method invocation
5124    * @return a protocol buffer {@code Message} instance containing the method's result
5125    * @throws IOException if no registered service handler is found or an error
5126    *     occurs during the invocation
5127    * @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)
5128    */
5129   public Message execService(RpcController controller, CoprocessorServiceCall call)
5130       throws IOException {
5131     String serviceName = call.getServiceName();
5132     String methodName = call.getMethodName();
5133     if (!coprocessorServiceHandlers.containsKey(serviceName)) {
5134       throw new UnknownProtocolException(null,
5135           "No registered coprocessor service found for name "+serviceName+
5136           " in region "+Bytes.toStringBinary(getRegionName()));
5137     }
5138 
5139     Service service = coprocessorServiceHandlers.get(serviceName);
5140     Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
5141     Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
5142     if (methodDesc == null) {
5143       throw new UnknownProtocolException(service.getClass(),
5144           "Unknown method "+methodName+" called on service "+serviceName+
5145               " in region "+Bytes.toStringBinary(getRegionName()));
5146     }
5147 
5148     Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
5149         .mergeFrom(call.getRequest()).build();
5150     final Message.Builder responseBuilder =
5151         service.getResponsePrototype(methodDesc).newBuilderForType();
5152     service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
5153       @Override
5154       public void run(Message message) {
5155         if (message != null) {
5156           responseBuilder.mergeFrom(message);
5157         }
5158       }
5159     });
5160 
5161     return responseBuilder.build();
5162   }
5163 
5164   /*
5165    * Process table.
5166    * Do major compaction or list content.
5167    * @param fs
5168    * @param p
5169    * @param log
5170    * @param c
5171    * @param majorCompact
5172    * @throws IOException
5173    */
5174   private static void processTable(final FileSystem fs, final Path p,
5175       final HLog log, final Configuration c,
5176       final boolean majorCompact)
5177   throws IOException {
5178     HRegion region = null;
5179     String metaStr = Bytes.toString(HConstants.META_TABLE_NAME);
5180     // Currently expects tables have one region only.
5181     if (p.getName().startsWith(metaStr)) {
5182       region = HRegion.newHRegion(p, log, fs, c,
5183         HRegionInfo.FIRST_META_REGIONINFO, HTableDescriptor.META_TABLEDESC, null);
5184     } else {
5185       throw new IOException("Not a known catalog table: " + p.toString());
5186     }
5187     try {
5188       region.initialize();
5189       if (majorCompact) {
5190         region.compactStores(true);
5191       } else {
5192         // Default behavior
5193         Scan scan = new Scan();
5194         // scan.addFamily(HConstants.CATALOG_FAMILY);
5195         RegionScanner scanner = region.getScanner(scan);
5196         try {
5197           List<KeyValue> kvs = new ArrayList<KeyValue>();
5198           boolean done = false;
5199           do {
5200             kvs.clear();
5201             done = scanner.next(kvs);
5202             if (kvs.size() > 0) LOG.info(kvs);
5203           } while (done);
5204         } finally {
5205           scanner.close();
5206         }
5207       }
5208     } finally {
5209       region.close();
5210     }
5211   }
5212 
5213   boolean shouldForceSplit() {
5214     return this.splitRequest;
5215   }
5216 
5217   byte[] getExplicitSplitPoint() {
5218     return this.explicitSplitPoint;
5219   }
5220 
5221   void forceSplit(byte[] sp) {
5222     // NOTE : this HRegion will go away after the forced split is successfull
5223     //        therefore, no reason to clear this value
5224     this.splitRequest = true;
5225     if (sp != null) {
5226       this.explicitSplitPoint = sp;
5227     }
5228   }
5229 
5230   void clearSplit_TESTS_ONLY() {
5231     this.splitRequest = false;
5232   }
5233 
5234   /**
5235    * Give the region a chance to prepare before it is split.
5236    */
5237   protected void prepareToSplit() {
5238     // nothing
5239   }
5240 
5241   /**
5242    * Return the splitpoint. null indicates the region isn't splittable
5243    * If the splitpoint isn't explicitly specified, it will go over the stores
5244    * to find the best splitpoint. Currently the criteria of best splitpoint
5245    * is based on the size of the store.
5246    */
5247   public byte[] checkSplit() {
5248     // Can't split META
5249     if (this.getRegionInfo().isMetaTable()) {
5250       if (shouldForceSplit()) {
5251         LOG.warn("Cannot split meta region in HBase 0.20 and above");
5252       }
5253       return null;
5254     }
5255 
5256     if (!splitPolicy.shouldSplit()) {
5257       return null;
5258     }
5259 
5260     byte[] ret = splitPolicy.getSplitPoint();
5261 
5262     if (ret != null) {
5263       try {
5264         checkRow(ret, "calculated split");
5265       } catch (IOException e) {
5266         LOG.error("Ignoring invalid split", e);
5267         return null;
5268       }
5269     }
5270     return ret;
5271   }
5272 
5273   /**
5274    * @return The priority that this region should have in the compaction queue
5275    */
5276   public int getCompactPriority() {
5277     int count = Integer.MAX_VALUE;
5278     for (Store store : stores.values()) {
5279       count = Math.min(count, store.getCompactPriority());
5280     }
5281     return count;
5282   }
5283 
5284   /**
5285    * Checks every store to see if one has too many
5286    * store files
5287    * @return true if any store has too many store files
5288    */
5289   public boolean needsCompaction() {
5290     for (Store store : stores.values()) {
5291       if(store.needsCompaction()) {
5292         return true;
5293       }
5294     }
5295     return false;
5296   }
5297 
5298   /** @return the coprocessor host */
5299   public RegionCoprocessorHost getCoprocessorHost() {
5300     return coprocessorHost;
5301   }
5302 
5303   /** @param coprocessorHost the new coprocessor host */
5304   public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
5305     this.coprocessorHost = coprocessorHost;
5306   }
5307 
5308   /**
5309    * This method needs to be called before any public call that reads or
5310    * modifies data. It has to be called just before a try.
5311    * #closeRegionOperation needs to be called in the try's finally block
5312    * Acquires a read lock and checks if the region is closing or closed.
5313    * @throws NotServingRegionException when the region is closing or closed
5314    * @throws RegionTooBusyException if failed to get the lock in time
5315    * @throws InterruptedIOException if interrupted while waiting for a lock
5316    */
5317   public void startRegionOperation()
5318       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
5319     startRegionOperation(Operation.ANY);
5320   }
5321 
5322   /**
5323    * @param op The operation is about to be taken on the region
5324    * @throws NotServingRegionException
5325    * @throws RegionTooBusyException
5326    * @throws InterruptedIOException
5327    */
5328   protected void startRegionOperation(Operation op) throws NotServingRegionException,
5329       RegionTooBusyException, InterruptedIOException {
5330     switch (op) {
5331     case INCREMENT:
5332     case APPEND:
5333     case GET:
5334     case SCAN:
5335     case SPLIT_REGION:
5336     case MERGE_REGION:
5337       // when a region is in recovering state, no read, split or merge is allowed
5338       if (this.isRecovering()) {
5339         throw new RegionInRecoveryException(this.getRegionNameAsString()
5340             + " is recovering");
5341       }
5342       break;
5343       default:
5344         break;
5345     }
5346     if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION) {
5347       // split or merge region doesn't need to check the closing/closed state or lock the region
5348       return;
5349     }
5350     if (this.closing.get()) {
5351       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5352     }
5353     lock(lock.readLock());
5354     if (this.closed.get()) {
5355       lock.readLock().unlock();
5356       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
5357     }
5358   }
5359 
5360   /**
5361    * Closes the lock. This needs to be called in the finally block corresponding
5362    * to the try block of #startRegionOperation
5363    */
5364   public void closeRegionOperation() {
5365     lock.readLock().unlock();
5366   }
5367 
5368   /**
5369    * This method needs to be called before any public call that reads or
5370    * modifies stores in bulk. It has to be called just before a try.
5371    * #closeBulkRegionOperation needs to be called in the try's finally block
5372    * Acquires a writelock and checks if the region is closing or closed.
5373    * @throws NotServingRegionException when the region is closing or closed
5374    * @throws RegionTooBusyException if failed to get the lock in time
5375    * @throws InterruptedIOException if interrupted while waiting for a lock
5376    */
5377   private void startBulkRegionOperation(boolean writeLockNeeded)
5378       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
5379     if (this.closing.get()) {
5380       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5381     }
5382     if (writeLockNeeded) lock(lock.writeLock());
5383     else lock(lock.readLock());
5384     if (this.closed.get()) {
5385       if (writeLockNeeded) lock.writeLock().unlock();
5386       else lock.readLock().unlock();
5387       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
5388     }
5389   }
5390 
5391   /**
5392    * Closes the lock. This needs to be called in the finally block corresponding
5393    * to the try block of #startRegionOperation
5394    */
5395   private void closeBulkRegionOperation(){
5396     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
5397     else lock.readLock().unlock();
5398   }
5399 
5400   /**
5401    * Update counters for numer of puts without wal and the size of possible data loss.
5402    * These information are exposed by the region server metrics.
5403    */
5404   private void recordPutWithoutWal(final Map<byte [], List<? extends Cell>> familyMap) {
5405     numPutsWithoutWAL.increment();
5406     if (numPutsWithoutWAL.get() <= 1) {
5407       LOG.info("writing data to region " + this +
5408                " with WAL disabled. Data may be lost in the event of a crash.");
5409     }
5410 
5411     long putSize = 0;
5412     for (List<? extends Cell> cells: familyMap.values()) {
5413       for (Cell cell : cells) {
5414         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5415         putSize += kv.getKeyLength() + kv.getValueLength();
5416       }
5417     }
5418 
5419     dataInMemoryWithoutWAL.add(putSize);
5420   }
5421 
5422   private void lock(final Lock lock)
5423       throws RegionTooBusyException, InterruptedIOException {
5424     lock(lock, 1);
5425   }
5426 
5427   /**
5428    * Try to acquire a lock.  Throw RegionTooBusyException
5429    * if failed to get the lock in time. Throw InterruptedIOException
5430    * if interrupted while waiting for the lock.
5431    */
5432   private void lock(final Lock lock, final int multiplier)
5433       throws RegionTooBusyException, InterruptedIOException {
5434     try {
5435       final long waitTime = Math.min(maxBusyWaitDuration,
5436         busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
5437       if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
5438         throw new RegionTooBusyException(
5439           "failed to get a lock in " + waitTime + "ms");
5440       }
5441     } catch (InterruptedException ie) {
5442       LOG.info("Interrupted while waiting for a lock");
5443       InterruptedIOException iie = new InterruptedIOException();
5444       iie.initCause(ie);
5445       throw iie;
5446     }
5447   }
5448 
5449   /**
5450    * Calls sync with the given transaction ID if the region's table is not
5451    * deferring it.
5452    * @param txid should sync up to which transaction
5453    * @throws IOException If anything goes wrong with DFS
5454    */
5455   private void syncOrDefer(long txid, Durability durability) throws IOException {
5456     if (this.getRegionInfo().isMetaRegion()) {
5457       this.log.sync(txid);
5458     } else {
5459       switch(durability) {
5460       case USE_DEFAULT:
5461         // do what CF defaults to
5462         if (!isDeferredLogSyncEnabled()) {
5463           this.log.sync(txid);
5464         }
5465         break;
5466       case SKIP_WAL:
5467         // nothing do to
5468         break;
5469       case ASYNC_WAL:
5470         // defer the sync, unless we globally can't
5471         if (this.deferredLogSyncDisabled) {
5472           this.log.sync(txid);
5473         }
5474         break;
5475       case SYNC_WAL:
5476       case FSYNC_WAL:
5477         // sync the WAL edit (SYNC and FSYNC treated the same for now)
5478         this.log.sync(txid);
5479         break;
5480       }
5481     }
5482   }
5483 
5484   /**
5485    * check if current region is deferred sync enabled.
5486    */
5487   private boolean isDeferredLogSyncEnabled() {
5488     return (this.htableDescriptor.isDeferredLogFlush() && !this.deferredLogSyncDisabled);
5489   }
5490 
5491   /**
5492    * A mocked list implementaion - discards all updates.
5493    */
5494   private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {
5495 
5496     @Override
5497     public void add(int index, KeyValue element) {
5498       // do nothing
5499     }
5500 
5501     @Override
5502     public boolean addAll(int index, Collection<? extends KeyValue> c) {
5503       return false; // this list is never changed as a result of an update
5504     }
5505 
5506     @Override
5507     public KeyValue get(int index) {
5508       throw new UnsupportedOperationException();
5509     }
5510 
5511     @Override
5512     public int size() {
5513       return 0;
5514     }
5515   };
5516 
5517   /**
5518    * Facility for dumping and compacting catalog tables.
5519    * Only does catalog tables since these are only tables we for sure know
5520    * schema on.  For usage run:
5521    * <pre>
5522    *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
5523    * </pre>
5524    * @param args
5525    * @throws IOException
5526    */
5527   public static void main(String[] args) throws IOException {
5528     if (args.length < 1) {
5529       printUsageAndExit(null);
5530     }
5531     boolean majorCompact = false;
5532     if (args.length > 1) {
5533       if (!args[1].toLowerCase().startsWith("major")) {
5534         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
5535       }
5536       majorCompact = true;
5537     }
5538     final Path tableDir = new Path(args[0]);
5539     final Configuration c = HBaseConfiguration.create();
5540     final FileSystem fs = FileSystem.get(c);
5541     final Path logdir = new Path(c.get("hbase.tmp.dir"));
5542     final String logname = "hlog" + tableDir.getName()
5543       + EnvironmentEdgeManager.currentTimeMillis();
5544 
5545     final HLog log = HLogFactory.createHLog(fs, logdir, logname, c);
5546     try {
5547       processTable(fs, tableDir, log, c, majorCompact);
5548     } finally {
5549        log.close();
5550        // TODO: is this still right?
5551        BlockCache bc = new CacheConfig(c).getBlockCache();
5552        if (bc != null) bc.shutdown();
5553     }
5554   }
5555 
5556   /**
5557    * Gets the latest sequence number that was read from storage when this region was opened.
5558    */
5559   public long getOpenSeqNum() {
5560     return this.openSeqNum;
5561   }
5562 
5563   /**
5564    * Gets the min sequence number that was read from storage when this region was opened. WAL Edits
5565    * with smaller sequence number will be skipped from replay.
5566    */
5567   public long getMinSeqIdForLogReplay() {
5568     return this.minSeqIdForLogReplay;
5569   }
5570 
5571   /**
5572    * @return if a given region is in compaction now.
5573    */
5574   public CompactionState getCompactionState() {
5575     boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
5576     return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
5577         : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
5578   }
5579 
5580   public void reportCompactionRequestStart(boolean isMajor){
5581     (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
5582   }
5583 
5584   public void reportCompactionRequestEnd(boolean isMajor){
5585     int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
5586     assert newValue >= 0;
5587   }
5588 
5589   /**
5590    * Listener class to enable callers of
5591    * bulkLoadHFile() to perform any necessary
5592    * pre/post processing of a given bulkload call
5593    */
5594   public static interface BulkLoadListener {
5595 
5596     /**
5597      * Called before an HFile is actually loaded
5598      * @param family family being loaded to
5599      * @param srcPath path of HFile
5600      * @return final path to be used for actual loading
5601      * @throws IOException
5602      */
5603     String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
5604 
5605     /**
5606      * Called after a successful HFile load
5607      * @param family family being loaded to
5608      * @param srcPath path of HFile
5609      * @throws IOException
5610      */
5611     void doneBulkLoad(byte[] family, String srcPath) throws IOException;
5612 
5613     /**
5614      * Called after a failed HFile load
5615      * @param family family being loaded to
5616      * @param srcPath path of HFile
5617      * @throws IOException
5618      */
5619     void failedBulkLoad(byte[] family, String srcPath) throws IOException;
5620   }
5621 }