View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.InetSocketAddress;
24  import java.security.Key;
25  import java.security.KeyException;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.Collections;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.NavigableSet;
32  import java.util.Set;
33  import java.util.concurrent.Callable;
34  import java.util.concurrent.CompletionService;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.ExecutorCompletionService;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.atomic.AtomicBoolean;
41  import java.util.concurrent.locks.ReentrantReadWriteLock;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.hbase.Cell;
50  import org.apache.hadoop.hbase.CompoundConfiguration;
51  import org.apache.hadoop.hbase.HColumnDescriptor;
52  import org.apache.hadoop.hbase.HConstants;
53  import org.apache.hadoop.hbase.HRegionInfo;
54  import org.apache.hadoop.hbase.KeyValue;
55  import org.apache.hadoop.hbase.RemoteExceptionHandler;
56  import org.apache.hadoop.hbase.TableName;
57  import org.apache.hadoop.hbase.client.Scan;
58  import org.apache.hadoop.hbase.io.compress.Compression;
59  import org.apache.hadoop.hbase.io.crypto.Cipher;
60  import org.apache.hadoop.hbase.io.crypto.Encryption;
61  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
62  import org.apache.hadoop.hbase.io.hfile.HFile;
63  import org.apache.hadoop.hbase.io.hfile.HFileContext;
64  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
65  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
66  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
67  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
68  import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
69  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
70  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
71  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
72  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
73  import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
74  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
75  import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
76  import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
77  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
78  import org.apache.hadoop.hbase.security.EncryptionUtil;
79  import org.apache.hadoop.hbase.security.User;
80  import org.apache.hadoop.hbase.util.Bytes;
81  import org.apache.hadoop.hbase.util.ChecksumType;
82  import org.apache.hadoop.hbase.util.ClassSize;
83  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
84  import org.apache.hadoop.hbase.util.ReflectionUtils;
85  import org.apache.hadoop.util.StringUtils;
86  
87  import com.google.common.annotations.VisibleForTesting;
88  import com.google.common.base.Preconditions;
89  import com.google.common.collect.ImmutableCollection;
90  import com.google.common.collect.ImmutableList;
91  import com.google.common.collect.Lists;
92  
93  /**
94   * A Store holds a column family in a Region.  Its a memstore and a set of zero
95   * or more StoreFiles, which stretch backwards over time.
96   *
97   * <p>There's no reason to consider append-logging at this level; all logging
98   * and locking is handled at the HRegion level.  Store just provides
99   * services to manage sets of StoreFiles.  One of the most important of those
100  * services is compaction services where files are aggregated once they pass
101  * a configurable threshold.
102  *
103  * <p>The only thing having to do with logs that Store needs to deal with is
104  * the reconstructionLog.  This is a segment of an HRegion's log that might
105  * NOT be present upon startup.  If the param is NULL, there's nothing to do.
106  * If the param is non-NULL, we need to process the log to reconstruct
107  * a TreeMap that might not have been written to disk before the process
108  * died.
109  *
110  * <p>It's assumed that after this constructor returns, the reconstructionLog
111  * file will be deleted (by whoever has instantiated the Store).
112  *
113  * <p>Locking and transactions are handled at a higher level.  This API should
114  * not be called directly but by an HRegion manager.
115  */
116 @InterfaceAudience.Private
117 public class HStore implements Store {
118   private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
119   public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
120       "hbase.server.compactchecker.interval.multiplier";
121   public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
122   public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
123   public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
124 
125   static final Log LOG = LogFactory.getLog(HStore.class);
126 
127   protected final MemStore memstore;
128   // This stores directory in the filesystem.
129   private final HRegion region;
130   private final HColumnDescriptor family;
131   private final HRegionFileSystem fs;
132   private final Configuration conf;
133   private final CacheConfig cacheConf;
134   private long lastCompactSize = 0;
135   volatile boolean forceMajor = false;
136   /* how many bytes to write between status checks */
137   static int closeCheckInterval = 0;
138   private volatile long storeSize = 0L;
139   private volatile long totalUncompressedBytes = 0L;
140 
141   /**
142    * RWLock for store operations.
143    * Locked in shared mode when the list of component stores is looked at:
144    *   - all reads/writes to table data
145    *   - checking for split
146    * Locked in exclusive mode when the list of component stores is modified:
147    *   - closing
148    *   - completing a compaction
149    */
150   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
151   private final boolean verifyBulkLoads;
152 
153   private ScanInfo scanInfo;
154 
155   final List<StoreFile> filesCompacting = Lists.newArrayList();
156 
157   // All access must be synchronized.
158   private final Set<ChangedReadersObserver> changedReaderObservers =
159     Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
160 
161   private final int blocksize;
162   private HFileDataBlockEncoder dataBlockEncoder;
163 
164   /** Checksum configuration */
165   private ChecksumType checksumType;
166   private int bytesPerChecksum;
167 
168   // Comparing KeyValues
169   private final KeyValue.KVComparator comparator;
170 
171   final StoreEngine<?, ?, ?, ?> storeEngine;
172 
173   private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
174   private final OffPeakHours offPeakHours;
175 
176   private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
177   private int flushRetriesNumber;
178   private int pauseTime;
179 
180   private long blockingFileCount;
181   private int compactionCheckMultiplier;
182 
183   private Encryption.Context cryptoContext = Encryption.Context.NONE;
184 
185   /**
186    * Constructor
187    * @param region
188    * @param family HColumnDescriptor for this column
189    * @param confParam configuration object
190    * failed.  Can be null.
191    * @throws IOException
192    */
193   protected HStore(final HRegion region, final HColumnDescriptor family,
194       final Configuration confParam) throws IOException {
195 
196     HRegionInfo info = region.getRegionInfo();
197     this.fs = region.getRegionFileSystem();
198 
199     // Assemble the store's home directory and Ensure it exists.
200     fs.createStoreDir(family.getNameAsString());
201     this.region = region;
202     this.family = family;
203     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
204     // CompoundConfiguration will look for keys in reverse order of addition, so we'd
205     // add global config first, then table and cf overrides, then cf metadata.
206     this.conf = new CompoundConfiguration()
207       .add(confParam)
208       .addStringMap(region.getTableDesc().getConfiguration())
209       .addStringMap(family.getConfiguration())
210       .addWritableMap(family.getValues());
211     this.blocksize = family.getBlocksize();
212 
213     this.dataBlockEncoder =
214         new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
215 
216     this.comparator = info.getComparator();
217     // used by ScanQueryMatcher
218     long timeToPurgeDeletes =
219         Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
220     LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
221         "ms in store " + this);
222     // Get TTL
223     long ttl = determineTTLFromFamily(family);
224     // Why not just pass a HColumnDescriptor in here altogether?  Even if have
225     // to clone it?
226     scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
227     String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
228     this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
229         Configuration.class, KeyValue.KVComparator.class }, new Object[] { conf, this.comparator });
230     this.offPeakHours = OffPeakHours.getInstance(conf);
231 
232     // Setting up cache configuration for this family
233     this.cacheConf = new CacheConfig(conf, family);
234 
235     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
236 
237     this.blockingFileCount =
238         conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
239     this.compactionCheckMultiplier = conf.getInt(
240         COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
241     if (this.compactionCheckMultiplier <= 0) {
242       LOG.error("Compaction check period multiplier must be positive, setting default: "
243           + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
244       this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
245     }
246 
247     if (HStore.closeCheckInterval == 0) {
248       HStore.closeCheckInterval = conf.getInt(
249           "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
250     }
251 
252     this.storeEngine = StoreEngine.create(this, this.conf, this.comparator);
253     this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
254 
255     // Initialize checksum type from name. The names are CRC32, CRC32C, etc.
256     this.checksumType = getChecksumType(conf);
257     // initilize bytes per checksum
258     this.bytesPerChecksum = getBytesPerChecksum(conf);
259     flushRetriesNumber = conf.getInt(
260         "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
261     pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
262     if (flushRetriesNumber <= 0) {
263       throw new IllegalArgumentException(
264           "hbase.hstore.flush.retries.number must be > 0, not "
265               + flushRetriesNumber);
266     }
267 
268     // Crypto context for new store files
269     String cipherName = family.getEncryptionType();
270     if (cipherName != null) {
271       Cipher cipher;
272       Key key;
273       byte[] keyBytes = family.getEncryptionKey();
274       if (keyBytes != null) {
275         // Family provides specific key material
276         String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
277           User.getCurrent().getShortName());
278         try {
279           // First try the master key
280           key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
281         } catch (KeyException e) {
282           // If the current master key fails to unwrap, try the alternate, if
283           // one is configured
284           if (LOG.isDebugEnabled()) {
285             LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
286           }
287           String alternateKeyName =
288             conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
289           if (alternateKeyName != null) {
290             try {
291               key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
292             } catch (KeyException ex) {
293               throw new IOException(ex);
294             }
295           } else {
296             throw new IOException(e);
297           }
298         }
299         // Use the algorithm the key wants
300         cipher = Encryption.getCipher(conf, key.getAlgorithm());
301         if (cipher == null) {
302           throw new RuntimeException("Cipher '" + cipher + "' is not available");
303         }
304         // Fail if misconfigured
305         // We use the encryption type specified in the column schema as a sanity check on
306         // what the wrapped key is telling us
307         if (!cipher.getName().equalsIgnoreCase(cipherName)) {
308           throw new RuntimeException("Encryption for family '" + family.getNameAsString() +
309             "' configured with type '" + cipherName +
310             "' but key specifies algorithm '" + cipher.getName() + "'");
311         }
312       } else {
313         // Family does not provide key material, create a random key
314         cipher = Encryption.getCipher(conf, cipherName);
315         if (cipher == null) {
316           throw new RuntimeException("Cipher '" + cipher + "' is not available");
317         }
318         key = cipher.getRandomKey();
319       }
320       cryptoContext = Encryption.newContext(conf);
321       cryptoContext.setCipher(cipher);
322       cryptoContext.setKey(key);
323     }
324   }
325 
326   /**
327    * @param family
328    * @return TTL in seconds of the specified family
329    */
330   private static long determineTTLFromFamily(final HColumnDescriptor family) {
331     // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
332     long ttl = family.getTimeToLive();
333     if (ttl == HConstants.FOREVER) {
334       // Default is unlimited ttl.
335       ttl = Long.MAX_VALUE;
336     } else if (ttl == -1) {
337       ttl = Long.MAX_VALUE;
338     } else {
339       // Second -> ms adjust for user data
340       ttl *= 1000;
341     }
342     return ttl;
343   }
344 
345   @Override
346   public String getColumnFamilyName() {
347     return this.family.getNameAsString();
348   }
349 
350   @Override
351   public TableName getTableName() {
352     return this.getRegionInfo().getTable();
353   }
354 
355   @Override
356   public FileSystem getFileSystem() {
357     return this.fs.getFileSystem();
358   }
359 
360   public HRegionFileSystem getRegionFileSystem() {
361     return this.fs;
362   }
363 
364   /* Implementation of StoreConfigInformation */
365   @Override
366   public long getStoreFileTtl() {
367     // TTL only applies if there's no MIN_VERSIONs setting on the column.
368     return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
369   }
370 
371   @Override
372   public long getMemstoreFlushSize() {
373     // TODO: Why is this in here?  The flushsize of the region rather than the store?  St.Ack
374     return this.region.memstoreFlushSize;
375   }
376 
377   @Override
378   public long getFlushableSize() {
379     return this.memstore.getFlushableSize();
380   }
381 
382   @Override
383   public long getCompactionCheckMultiplier() {
384     return this.compactionCheckMultiplier;
385   }
386 
387   @Override
388   public long getBlockingFileCount() {
389     return blockingFileCount;
390   }
391   /* End implementation of StoreConfigInformation */
392 
393   /**
394    * Returns the configured bytesPerChecksum value.
395    * @param conf The configuration
396    * @return The bytesPerChecksum that is set in the configuration
397    */
398   public static int getBytesPerChecksum(Configuration conf) {
399     return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
400                        HFile.DEFAULT_BYTES_PER_CHECKSUM);
401   }
402 
403   /**
404    * Returns the configured checksum algorithm.
405    * @param conf The configuration
406    * @return The checksum algorithm that is set in the configuration
407    */
408   public static ChecksumType getChecksumType(Configuration conf) {
409     String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
410     if (checksumName == null) {
411       return HFile.DEFAULT_CHECKSUM_TYPE;
412     } else {
413       return ChecksumType.nameToType(checksumName);
414     }
415   }
416 
417   /**
418    * @return how many bytes to write between status checks
419    */
420   public static int getCloseCheckInterval() {
421     return closeCheckInterval;
422   }
423 
424   @Override
425   public HColumnDescriptor getFamily() {
426     return this.family;
427   }
428 
429   /**
430    * @return The maximum sequence id in all store files. Used for log replay.
431    */
432   long getMaxSequenceId(boolean includeBulkFiles) {
433     return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkFiles);
434   }
435 
436   @Override
437   public long getMaxMemstoreTS() {
438     return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
439   }
440 
441   /**
442    * @param tabledir {@link Path} to where the table is being stored
443    * @param hri {@link HRegionInfo} for the region.
444    * @param family {@link HColumnDescriptor} describing the column family
445    * @return Path to family/Store home directory.
446    */
447   @Deprecated
448   public static Path getStoreHomedir(final Path tabledir,
449       final HRegionInfo hri, final byte[] family) {
450     return getStoreHomedir(tabledir, hri.getEncodedName(), family);
451   }
452 
453   /**
454    * @param tabledir {@link Path} to where the table is being stored
455    * @param encodedName Encoded region name.
456    * @param family {@link HColumnDescriptor} describing the column family
457    * @return Path to family/Store home directory.
458    */
459   @Deprecated
460   public static Path getStoreHomedir(final Path tabledir,
461       final String encodedName, final byte[] family) {
462     return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
463   }
464 
465   @Override
466   public HFileDataBlockEncoder getDataBlockEncoder() {
467     return dataBlockEncoder;
468   }
469 
470   /**
471    * Should be used only in tests.
472    * @param blockEncoder the block delta encoder to use
473    */
474   void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
475     this.dataBlockEncoder = blockEncoder;
476   }
477 
478   /**
479    * Creates an unsorted list of StoreFile loaded in parallel
480    * from the given directory.
481    * @throws IOException
482    */
483   private List<StoreFile> loadStoreFiles() throws IOException {
484     Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
485     if (files == null || files.size() == 0) {
486       return new ArrayList<StoreFile>();
487     }
488 
489     // initialize the thread pool for opening store files in parallel..
490     ThreadPoolExecutor storeFileOpenerThreadPool =
491       this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
492           this.getColumnFamilyName());
493     CompletionService<StoreFile> completionService =
494       new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
495 
496     int totalValidStoreFile = 0;
497     for (final StoreFileInfo storeFileInfo: files) {
498       // open each store file in parallel
499       completionService.submit(new Callable<StoreFile>() {
500         @Override
501         public StoreFile call() throws IOException {
502           StoreFile storeFile = createStoreFileAndReader(storeFileInfo.getPath());
503           return storeFile;
504         }
505       });
506       totalValidStoreFile++;
507     }
508 
509     ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
510     IOException ioe = null;
511     try {
512       for (int i = 0; i < totalValidStoreFile; i++) {
513         try {
514           Future<StoreFile> future = completionService.take();
515           StoreFile storeFile = future.get();
516           long length = storeFile.getReader().length();
517           this.storeSize += length;
518           this.totalUncompressedBytes +=
519               storeFile.getReader().getTotalUncompressedBytes();
520           if (LOG.isDebugEnabled()) {
521             LOG.debug("loaded " + storeFile.toStringDetailed());
522           }
523           results.add(storeFile);
524         } catch (InterruptedException e) {
525           if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
526         } catch (ExecutionException e) {
527           if (ioe == null) ioe = new IOException(e.getCause());
528         }
529       }
530     } finally {
531       storeFileOpenerThreadPool.shutdownNow();
532     }
533     if (ioe != null) {
534       // close StoreFile readers
535       for (StoreFile file : results) {
536         try {
537           if (file != null) file.closeReader(true);
538         } catch (IOException e) { 
539           LOG.warn(e.getMessage());
540         }
541       }
542       throw ioe;
543     }
544 
545     return results;
546   }
547 
548   private StoreFile createStoreFileAndReader(final Path p) throws IOException {
549     StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
550     info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
551     StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
552       this.family.getBloomFilterType());
553     storeFile.createReader();
554     return storeFile;
555   }
556 
557   @Override
558   public long add(final KeyValue kv) {
559     lock.readLock().lock();
560     try {
561       return this.memstore.add(kv);
562     } finally {
563       lock.readLock().unlock();
564     }
565   }
566 
567   @Override
568   public long timeOfOldestEdit() {
569     return memstore.timeOfOldestEdit();
570   }
571 
572   /**
573    * Adds a value to the memstore
574    *
575    * @param kv
576    * @return memstore size delta
577    */
578   protected long delete(final KeyValue kv) {
579     lock.readLock().lock();
580     try {
581       return this.memstore.delete(kv);
582     } finally {
583       lock.readLock().unlock();
584     }
585   }
586 
587   @Override
588   public void rollback(final KeyValue kv) {
589     lock.readLock().lock();
590     try {
591       this.memstore.rollback(kv);
592     } finally {
593       lock.readLock().unlock();
594     }
595   }
596 
597   /**
598    * @return All store files.
599    */
600   @Override
601   public Collection<StoreFile> getStorefiles() {
602     return this.storeEngine.getStoreFileManager().getStorefiles();
603   }
604 
605   @Override
606   public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
607     HFile.Reader reader  = null;
608     try {
609       LOG.info("Validating hfile at " + srcPath + " for inclusion in "
610           + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
611       reader = HFile.createReader(srcPath.getFileSystem(conf),
612           srcPath, cacheConf, conf);
613       reader.loadFileInfo();
614 
615       byte[] firstKey = reader.getFirstRowKey();
616       Preconditions.checkState(firstKey != null, "First key can not be null");
617       byte[] lk = reader.getLastKey();
618       Preconditions.checkState(lk != null, "Last key can not be null");
619       byte[] lastKey =  KeyValue.createKeyValueFromKey(lk).getRow();
620 
621       LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
622           " last=" + Bytes.toStringBinary(lastKey));
623       LOG.debug("Region bounds: first=" +
624           Bytes.toStringBinary(getRegionInfo().getStartKey()) +
625           " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
626 
627       if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
628         throw new WrongRegionException(
629             "Bulk load file " + srcPath.toString() + " does not fit inside region "
630             + this.getRegionInfo().getRegionNameAsString());
631       }
632 
633       if (verifyBulkLoads) {
634         KeyValue prevKV = null;
635         HFileScanner scanner = reader.getScanner(false, false, false);
636         scanner.seekTo();
637         do {
638           KeyValue kv = scanner.getKeyValue();
639           if (prevKV != null) {
640             if (Bytes.compareTo(prevKV.getRowArray(), prevKV.getRowOffset(),
641                 prevKV.getRowLength(), kv.getRowArray(), kv.getRowOffset(),
642                 kv.getRowLength()) > 0) {
643               throw new InvalidHFileException("Previous row is greater than"
644                   + " current row: path=" + srcPath + " previous="
645                   + Bytes.toStringBinary(prevKV.getKey()) + " current="
646                   + Bytes.toStringBinary(kv.getKey()));
647             }
648             if (Bytes.compareTo(prevKV.getFamilyArray(), prevKV.getFamilyOffset(),
649                 prevKV.getFamilyLength(), kv.getFamilyArray(), kv.getFamilyOffset(),
650                 kv.getFamilyLength()) != 0) {
651               throw new InvalidHFileException("Previous key had different"
652                   + " family compared to current key: path=" + srcPath
653                   + " previous=" + Bytes.toStringBinary(prevKV.getFamily())
654                   + " current=" + Bytes.toStringBinary(kv.getFamily()));
655             }
656           }
657           prevKV = kv;
658         } while (scanner.next());
659       }
660     } finally {
661       if (reader != null) reader.close();
662     }
663   }
664 
665   @Override
666   public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
667     Path srcPath = new Path(srcPathStr);
668     Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
669 
670     StoreFile sf = createStoreFileAndReader(dstPath);
671 
672     StoreFile.Reader r = sf.getReader();
673     this.storeSize += r.length();
674     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
675 
676     LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() +
677         "' as " + dstPath + " - updating store file list.");
678 
679     // Append the new storefile into the list
680     this.lock.writeLock().lock();
681     try {
682       this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
683     } finally {
684       // We need the lock, as long as we are updating the storeFiles
685       // or changing the memstore. Let us release it before calling
686       // notifyChangeReadersObservers. See HBASE-4485 for a possible
687       // deadlock scenario that could have happened if continue to hold
688       // the lock.
689       this.lock.writeLock().unlock();
690     }
691     notifyChangedReadersObservers();
692     LOG.info("Successfully loaded store file " + srcPath
693         + " into store " + this + " (new location: " + dstPath + ")");
694     if (LOG.isTraceEnabled()) {
695       String traceMessage = "BULK LOAD time,size,store size,store files ["
696           + EnvironmentEdgeManager.currentTimeMillis() + "," + r.length() + "," + storeSize
697           + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
698       LOG.trace(traceMessage);
699     }
700   }
701 
702   @Override
703   public ImmutableCollection<StoreFile> close() throws IOException {
704     this.lock.writeLock().lock();
705     try {
706       // Clear so metrics doesn't find them.
707       ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
708 
709       if (!result.isEmpty()) {
710         // initialize the thread pool for closing store files in parallel.
711         ThreadPoolExecutor storeFileCloserThreadPool = this.region
712             .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
713                 + this.getColumnFamilyName());
714 
715         // close each store file in parallel
716         CompletionService<Void> completionService =
717           new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
718         for (final StoreFile f : result) {
719           completionService.submit(new Callable<Void>() {
720             @Override
721             public Void call() throws IOException {
722               f.closeReader(true);
723               return null;
724             }
725           });
726         }
727 
728         IOException ioe = null;
729         try {
730           for (int i = 0; i < result.size(); i++) {
731             try {
732               Future<Void> future = completionService.take();
733               future.get();
734             } catch (InterruptedException e) {
735               if (ioe == null) {
736                 ioe = new InterruptedIOException();
737                 ioe.initCause(e);
738               }
739             } catch (ExecutionException e) {
740               if (ioe == null) ioe = new IOException(e.getCause());
741             }
742           }
743         } finally {
744           storeFileCloserThreadPool.shutdownNow();
745         }
746         if (ioe != null) throw ioe;
747       }
748       LOG.info("Closed " + this);
749       return result;
750     } finally {
751       this.lock.writeLock().unlock();
752     }
753   }
754 
755   /**
756    * Snapshot this stores memstore. Call before running
757    * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask)}
758    *  so it has some work to do.
759    */
760   void snapshot() {
761     this.lock.writeLock().lock();
762     try {
763       this.memstore.snapshot();
764     } finally {
765       this.lock.writeLock().unlock();
766     }
767   }
768 
769   /**
770    * Write out current snapshot.  Presumes {@link #snapshot()} has been called
771    * previously.
772    * @param logCacheFlushId flush sequence number
773    * @param snapshot
774    * @param status
775    * @return The path name of the tmp file to which the store was flushed
776    * @throws IOException
777    */
778   protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
779       MonitoredTask status) throws IOException {
780     // If an exception happens flushing, we let it out without clearing
781     // the memstore snapshot.  The old snapshot will be returned when we say
782     // 'snapshot', the next time flush comes around.
783     // Retry after catching exception when flushing, otherwise server will abort
784     // itself
785     StoreFlusher flusher = storeEngine.getStoreFlusher();
786     IOException lastException = null;
787     for (int i = 0; i < flushRetriesNumber; i++) {
788       try {
789         List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status);
790         Path lastPathName = null;
791         try {
792           for (Path pathName : pathNames) {
793             lastPathName = pathName;
794             validateStoreFile(pathName);
795           }
796           return pathNames;
797         } catch (Exception e) {
798           LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
799           if (e instanceof IOException) {
800             lastException = (IOException) e;
801           } else {
802             lastException = new IOException(e);
803           }
804         }
805       } catch (IOException e) {
806         LOG.warn("Failed flushing store file, retrying num=" + i, e);
807         lastException = e;
808       }
809       if (lastException != null && i < (flushRetriesNumber - 1)) {
810         try {
811           Thread.sleep(pauseTime);
812         } catch (InterruptedException e) {
813           IOException iie = new InterruptedIOException();
814           iie.initCause(e);
815           throw iie;
816         }
817       }
818     }
819     throw lastException;
820   }
821 
822   /*
823    * @param path The pathname of the tmp file into which the store was flushed
824    * @param logCacheFlushId
825    * @param status
826    * @return StoreFile created.
827    * @throws IOException
828    */
829   private StoreFile commitFile(final Path path, final long logCacheFlushId, MonitoredTask status)
830       throws IOException {
831     // Write-out finished successfully, move into the right spot
832     Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
833 
834     status.setStatus("Flushing " + this + ": reopening flushed file");
835     StoreFile sf = createStoreFileAndReader(dstPath);
836 
837     StoreFile.Reader r = sf.getReader();
838     this.storeSize += r.length();
839     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
840 
841     if (LOG.isInfoEnabled()) {
842       LOG.info("Added " + sf + ", entries=" + r.getEntries() +
843         ", sequenceid=" + logCacheFlushId +
844         ", filesize=" + StringUtils.humanReadableInt(r.length()));
845     }
846     return sf;
847   }
848 
849   /*
850    * @param maxKeyCount
851    * @param compression Compression algorithm to use
852    * @param isCompaction whether we are creating a new file in a compaction
853    * @param includesMVCCReadPoint - whether to include MVCC or not
854    * @param includesTag - includesTag or not
855    * @return Writer for a new StoreFile in the tmp dir.
856    */
857   @Override
858   public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
859       boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)
860   throws IOException {
861     final CacheConfig writerCacheConf;
862     if (isCompaction) {
863       // Don't cache data on write on compactions.
864       writerCacheConf = new CacheConfig(cacheConf);
865       writerCacheConf.setCacheDataOnWrite(false);
866     } else {
867       writerCacheConf = cacheConf;
868     }
869     InetSocketAddress[] favoredNodes = null;
870     if (region.getRegionServerServices() != null) {
871       favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
872           region.getRegionInfo().getEncodedName());
873     }
874     HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
875       cryptoContext);
876     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
877         this.getFileSystem())
878             .withFilePath(fs.createTempName())
879             .withComparator(comparator)
880             .withBloomType(family.getBloomFilterType())
881             .withMaxKeyCount(maxKeyCount)
882             .withFavoredNodes(favoredNodes)
883             .withFileContext(hFileContext)
884             .build();
885     return w;
886   }
887 
888   private HFileContext createFileContext(Compression.Algorithm compression,
889       boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) {
890     if (compression == null) {
891       compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
892     }
893     HFileContext hFileContext = new HFileContextBuilder()
894                                 .withIncludesMvcc(includeMVCCReadpoint)
895                                 .withIncludesTags(includesTag)
896                                 .withCompression(compression)
897                                 .withCompressTags(family.shouldCompressTags())
898                                 .withChecksumType(checksumType)
899                                 .withBytesPerCheckSum(bytesPerChecksum)
900                                 .withBlockSize(blocksize)
901                                 .withHBaseCheckSum(true)
902                                 .withDataBlockEncoding(family.getDataBlockEncoding())
903                                 .withEncryptionContext(cryptoContext)
904                                 .build();
905     return hFileContext;
906   }
907 
908 
909   /*
910    * Change storeFiles adding into place the Reader produced by this new flush.
911    * @param sfs Store files
912    * @param snapshotId
913    * @throws IOException
914    * @return Whether compaction is required.
915    */
916   private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
917       throws IOException {
918     this.lock.writeLock().lock();
919     try {
920       this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
921       this.memstore.clearSnapshot(snapshotId);
922     } finally {
923       // We need the lock, as long as we are updating the storeFiles
924       // or changing the memstore. Let us release it before calling
925       // notifyChangeReadersObservers. See HBASE-4485 for a possible
926       // deadlock scenario that could have happened if continue to hold
927       // the lock.
928       this.lock.writeLock().unlock();
929     }
930 
931     // Tell listeners of the change in readers.
932     notifyChangedReadersObservers();
933 
934     if (LOG.isTraceEnabled()) {
935       long totalSize = 0;
936       for (StoreFile sf : sfs) {
937         totalSize += sf.getReader().length();
938       }
939       String traceMessage = "FLUSH time,count,size,store size,store files ["
940           + EnvironmentEdgeManager.currentTimeMillis() + "," + sfs.size() + "," + totalSize
941           + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
942       LOG.trace(traceMessage);
943     }
944     return needsCompaction();
945   }
946 
947   /*
948    * Notify all observers that set of Readers has changed.
949    * @throws IOException
950    */
951   private void notifyChangedReadersObservers() throws IOException {
952     for (ChangedReadersObserver o: this.changedReaderObservers) {
953       o.updateReaders();
954     }
955   }
956 
957   /**
958    * Get all scanners with no filtering based on TTL (that happens further down
959    * the line).
960    * @return all scanners for this store
961    */
962   @Override
963   public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
964       boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
965       byte[] stopRow, long readPt) throws IOException {
966     Collection<StoreFile> storeFilesToScan;
967     List<KeyValueScanner> memStoreScanners;
968     this.lock.readLock().lock();
969     try {
970       storeFilesToScan =
971           this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
972       memStoreScanners = this.memstore.getScanners(readPt);
973     } finally {
974       this.lock.readLock().unlock();
975     }
976 
977     // First the store file scanners
978 
979     // TODO this used to get the store files in descending order,
980     // but now we get them in ascending order, which I think is
981     // actually more correct, since memstore get put at the end.
982     List<StoreFileScanner> sfScanners = StoreFileScanner
983       .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher,
984         readPt);
985     List<KeyValueScanner> scanners =
986       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
987     scanners.addAll(sfScanners);
988     // Then the memstore scanners
989     scanners.addAll(memStoreScanners);
990     return scanners;
991   }
992 
993   @Override
994   public void addChangedReaderObserver(ChangedReadersObserver o) {
995     this.changedReaderObservers.add(o);
996   }
997 
998   @Override
999   public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1000     // We don't check if observer present; it may not be (legitimately)
1001     this.changedReaderObservers.remove(o);
1002   }
1003 
1004   //////////////////////////////////////////////////////////////////////////////
1005   // Compaction
1006   //////////////////////////////////////////////////////////////////////////////
1007 
1008   /**
1009    * Compact the StoreFiles.  This method may take some time, so the calling
1010    * thread must be able to block for long periods.
1011    *
1012    * <p>During this time, the Store can work as usual, getting values from
1013    * StoreFiles and writing new StoreFiles from the memstore.
1014    *
1015    * Existing StoreFiles are not destroyed until the new compacted StoreFile is
1016    * completely written-out to disk.
1017    *
1018    * <p>The compactLock prevents multiple simultaneous compactions.
1019    * The structureLock prevents us from interfering with other write operations.
1020    *
1021    * <p>We don't want to hold the structureLock for the whole time, as a compact()
1022    * can be lengthy and we want to allow cache-flushes during this period.
1023    *
1024    * <p> Compaction event should be idempotent, since there is no IO Fencing for
1025    * the region directory in hdfs. A region server might still try to complete the
1026    * compaction after it lost the region. That is why the following events are carefully
1027    * ordered for a compaction:
1028    *  1. Compaction writes new files under region/.tmp directory (compaction output)
1029    *  2. Compaction atomically moves the temporary file under region directory
1030    *  3. Compaction appends a WAL edit containing the compaction input and output files.
1031    *  Forces sync on WAL.
1032    *  4. Compaction deletes the input files from the region directory.
1033    *
1034    * Failure conditions are handled like this:
1035    *  - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes
1036    *  the compaction later, it will only write the new data file to the region directory.
1037    *  Since we already have this data, this will be idempotent but we will have a redundant
1038    *  copy of the data.
1039    *  - If RS fails between 2 and 3, the region will have a redundant copy of the data. The
1040    *  RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL.
1041    *  - If RS fails after 3, the region region server who opens the region will pick up the
1042    *  the compaction marker from the WAL and replay it by removing the compaction input files.
1043    *  Failed RS can also attempt to delete those files, but the operation will be idempotent
1044    *
1045    * See HBASE-2231 for details.
1046    *
1047    * @param compaction compaction details obtained from requestCompaction()
1048    * @throws IOException
1049    * @return Storefile we compacted into or null if we failed or opted out early.
1050    */
1051   @Override
1052   public List<StoreFile> compact(CompactionContext compaction) throws IOException {
1053     assert compaction != null && compaction.hasSelection();
1054     CompactionRequest cr = compaction.getRequest();
1055     Collection<StoreFile> filesToCompact = cr.getFiles();
1056     assert !filesToCompact.isEmpty();
1057     synchronized (filesCompacting) {
1058       // sanity check: we're compacting files that this store knows about
1059       // TODO: change this to LOG.error() after more debugging
1060       Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1061     }
1062 
1063     // Ready to go. Have list of files to compact.
1064     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
1065         + this + " of " + this.getRegionInfo().getRegionNameAsString()
1066         + " into tmpdir=" + fs.getTempDir() + ", totalSize="
1067         + StringUtils.humanReadableInt(cr.getSize()));
1068 
1069     long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
1070     List<StoreFile> sfs = null;
1071     try {
1072       // Commence the compaction.
1073       List<Path> newFiles = compaction.compact();
1074 
1075       // TODO: get rid of this!
1076       if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
1077         LOG.warn("hbase.hstore.compaction.complete is set to false");
1078         sfs = new ArrayList<StoreFile>();
1079         for (Path newFile : newFiles) {
1080           // Create storefile around what we wrote with a reader on it.
1081           StoreFile sf = createStoreFileAndReader(newFile);
1082           sf.closeReader(true);
1083           sfs.add(sf);
1084         }
1085         return sfs;
1086       }
1087       // Do the steps necessary to complete the compaction.
1088       sfs = moveCompatedFilesIntoPlace(cr, newFiles);
1089       writeCompactionWalRecord(filesToCompact, sfs);
1090       replaceStoreFiles(filesToCompact, sfs);
1091       // At this point the store will use new files for all new scanners.
1092       completeCompaction(filesToCompact); // Archive old files & update store size.
1093     } finally {
1094       finishCompactionRequest(cr);
1095     }
1096     logCompactionEndMessage(cr, sfs, compactionStartTime);
1097     return sfs;
1098   }
1099 
1100   private List<StoreFile> moveCompatedFilesIntoPlace(
1101       CompactionRequest cr, List<Path> newFiles) throws IOException {
1102     List<StoreFile> sfs = new ArrayList<StoreFile>();
1103     for (Path newFile : newFiles) {
1104       assert newFile != null;
1105       StoreFile sf = moveFileIntoPlace(newFile);
1106       if (this.getCoprocessorHost() != null) {
1107         this.getCoprocessorHost().postCompact(this, sf, cr);
1108       }
1109       assert sf != null;
1110       sfs.add(sf);
1111     }
1112     return sfs;
1113   }
1114 
1115   // Package-visible for tests
1116   StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
1117     validateStoreFile(newFile);
1118     // Move the file into the right spot
1119     Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1120     return createStoreFileAndReader(destPath);
1121   }
1122 
1123   /**
1124    * Writes the compaction WAL record.
1125    * @param filesCompacted Files compacted (input).
1126    * @param newFiles Files from compaction.
1127    */
1128   private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
1129       Collection<StoreFile> newFiles) throws IOException {
1130     if (region.getLog() == null) return;
1131     List<Path> inputPaths = new ArrayList<Path>();
1132     for (StoreFile f : filesCompacted) {
1133       inputPaths.add(f.getPath());
1134     }
1135     List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
1136     for (StoreFile f : newFiles) {
1137       outputPaths.add(f.getPath());
1138     }
1139     HRegionInfo info = this.region.getRegionInfo();
1140     CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1141         family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
1142     HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
1143         this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
1144   }
1145 
1146   private void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
1147       final Collection<StoreFile> result) throws IOException {
1148     this.lock.writeLock().lock();
1149     try {
1150       this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1151       filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock();
1152     } finally {
1153       this.lock.writeLock().unlock();
1154     }
1155   }
1156 
1157   /**
1158    * Log a very elaborate compaction completion message.
1159    * @param cr Request.
1160    * @param sfs Resulting files.
1161    * @param compactionStartTime Start time.
1162    */
1163   private void logCompactionEndMessage(
1164       CompactionRequest cr, List<StoreFile> sfs, long compactionStartTime) {
1165     long now = EnvironmentEdgeManager.currentTimeMillis();
1166     StringBuilder message = new StringBuilder(
1167       "Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
1168       + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
1169       + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into ");
1170     if (sfs.isEmpty()) {
1171       message.append("none, ");
1172     } else {
1173       for (StoreFile sf: sfs) {
1174         message.append(sf.getPath().getName());
1175         message.append("(size=");
1176         message.append(StringUtils.humanReadableInt(sf.getReader().length()));
1177         message.append("), ");
1178       }
1179     }
1180     message.append("total size for store is ")
1181       .append(StringUtils.humanReadableInt(storeSize))
1182       .append(". This selection was in queue for ")
1183       .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1184       .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1185       .append(" to execute.");
1186     LOG.info(message.toString());
1187     if (LOG.isTraceEnabled()) {
1188       int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1189       long resultSize = 0;
1190       for (StoreFile sf : sfs) {
1191         resultSize += sf.getReader().length();
1192       }
1193       String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1194         + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1195           + cr.getFiles().size() + "," + sfs.size() + "," +  storeSize + "," + fileCount + "]";
1196       LOG.trace(traceMessage);
1197     }
1198   }
1199 
1200   /**
1201    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
1202    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
1203    * See HBASE-2331.
1204    * @param compaction
1205    */
1206   @Override
1207   public void completeCompactionMarker(CompactionDescriptor compaction)
1208       throws IOException {
1209     LOG.debug("Completing compaction from the WAL marker");
1210     List<String> compactionInputs = compaction.getCompactionInputList();
1211     List<String> compactionOutputs = compaction.getCompactionOutputList();
1212 
1213     List<StoreFile> outputStoreFiles = new ArrayList<StoreFile>(compactionOutputs.size());
1214     for (String compactionOutput : compactionOutputs) {
1215       //we should have this store file already
1216       boolean found = false;
1217       Path outputPath = new Path(fs.getStoreDir(family.getNameAsString()), compactionOutput);
1218       outputPath = outputPath.makeQualified(fs.getFileSystem());
1219       for (StoreFile sf : this.getStorefiles()) {
1220         if (sf.getPath().makeQualified(sf.getPath().getFileSystem(conf)).equals(outputPath)) {
1221           found = true;
1222           break;
1223         }
1224       }
1225       if (!found) {
1226         if (getFileSystem().exists(outputPath)) {
1227           outputStoreFiles.add(createStoreFileAndReader(outputPath));
1228         }
1229       }
1230     }
1231 
1232     List<Path> inputPaths = new ArrayList<Path>(compactionInputs.size());
1233     for (String compactionInput : compactionInputs) {
1234       Path inputPath = new Path(fs.getStoreDir(family.getNameAsString()), compactionInput);
1235       inputPath = inputPath.makeQualified(fs.getFileSystem());
1236       inputPaths.add(inputPath);
1237     }
1238 
1239     //some of the input files might already be deleted
1240     List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
1241     for (StoreFile sf : this.getStorefiles()) {
1242       if (inputPaths.contains(sf.getPath().makeQualified(fs.getFileSystem()))) {
1243         inputStoreFiles.add(sf);
1244       }
1245     }
1246 
1247     this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
1248     this.completeCompaction(inputStoreFiles);
1249   }
1250 
1251   /**
1252    * This method tries to compact N recent files for testing.
1253    * Note that because compacting "recent" files only makes sense for some policies,
1254    * e.g. the default one, it assumes default policy is used. It doesn't use policy,
1255    * but instead makes a compaction candidate list by itself.
1256    * @param N Number of files.
1257    */
1258   public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1259     List<StoreFile> filesToCompact;
1260     boolean isMajor;
1261 
1262     this.lock.readLock().lock();
1263     try {
1264       synchronized (filesCompacting) {
1265         filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1266         if (!filesCompacting.isEmpty()) {
1267           // exclude all files older than the newest file we're currently
1268           // compacting. this allows us to preserve contiguity (HBASE-2856)
1269           StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1270           int idx = filesToCompact.indexOf(last);
1271           Preconditions.checkArgument(idx != -1);
1272           filesToCompact.subList(0, idx + 1).clear();
1273         }
1274         int count = filesToCompact.size();
1275         if (N > count) {
1276           throw new RuntimeException("Not enough files");
1277         }
1278 
1279         filesToCompact = filesToCompact.subList(count - N, count);
1280         isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1281         filesCompacting.addAll(filesToCompact);
1282         Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1283       }
1284     } finally {
1285       this.lock.readLock().unlock();
1286     }
1287 
1288     try {
1289       // Ready to go. Have list of files to compact.
1290       List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
1291           .compactForTesting(filesToCompact, isMajor);
1292       for (Path newFile: newFiles) {
1293         // Move the compaction into place.
1294         StoreFile sf = moveFileIntoPlace(newFile);
1295         if (this.getCoprocessorHost() != null) {
1296           this.getCoprocessorHost().postCompact(this, sf, null);
1297         }
1298         replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
1299         completeCompaction(filesToCompact);
1300       }
1301     } finally {
1302       synchronized (filesCompacting) {
1303         filesCompacting.removeAll(filesToCompact);
1304       }
1305     }
1306   }
1307 
1308   @Override
1309   public boolean hasReferences() {
1310     return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
1311   }
1312 
1313   @Override
1314   public CompactionProgress getCompactionProgress() {
1315     return this.storeEngine.getCompactor().getProgress();
1316   }
1317 
1318   @Override
1319   public boolean isMajorCompaction() throws IOException {
1320     for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1321       // TODO: what are these reader checks all over the place?
1322       if (sf.getReader() == null) {
1323         LOG.debug("StoreFile " + sf + " has null Reader");
1324         return false;
1325       }
1326     }
1327     return storeEngine.getCompactionPolicy().isMajorCompaction(
1328         this.storeEngine.getStoreFileManager().getStorefiles());
1329   }
1330 
1331   @Override
1332   public CompactionContext requestCompaction() throws IOException {
1333     return requestCompaction(Store.NO_PRIORITY, null);
1334   }
1335 
1336   @Override
1337   public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
1338       throws IOException {
1339     // don't even select for compaction if writes are disabled
1340     if (!this.areWritesEnabled()) {
1341       return null;
1342     }
1343 
1344     CompactionContext compaction = storeEngine.createCompaction();
1345     CompactionRequest request = null;
1346     this.lock.readLock().lock();
1347     try {
1348       synchronized (filesCompacting) {
1349         // First, see if coprocessor would want to override selection.
1350         if (this.getCoprocessorHost() != null) {
1351           List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1352           boolean override = this.getCoprocessorHost().preCompactSelection(
1353               this, candidatesForCoproc, baseRequest);
1354           if (override) {
1355             // Coprocessor is overriding normal file selection.
1356             compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
1357           }
1358         }
1359 
1360         // Normal case - coprocessor is not overriding file selection.
1361         if (!compaction.hasSelection()) {
1362           boolean isUserCompaction = priority == Store.PRIORITY_USER;
1363           boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1364               offPeakCompactionTracker.compareAndSet(false, true);
1365           try {
1366             compaction.select(this.filesCompacting, isUserCompaction,
1367               mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1368           } catch (IOException e) {
1369             if (mayUseOffPeak) {
1370               offPeakCompactionTracker.set(false);
1371             }
1372             throw e;
1373           }
1374           assert compaction.hasSelection();
1375           if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1376             // Compaction policy doesn't want to take advantage of off-peak.
1377             offPeakCompactionTracker.set(false);
1378           }
1379         }
1380         if (this.getCoprocessorHost() != null) {
1381           this.getCoprocessorHost().postCompactSelection(
1382               this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
1383         }
1384 
1385         // Selected files; see if we have a compaction with some custom base request.
1386         if (baseRequest != null) {
1387           // Update the request with what the system thinks the request should be;
1388           // its up to the request if it wants to listen.
1389           compaction.forceSelect(
1390               baseRequest.combineWith(compaction.getRequest()));
1391         }
1392         // Finally, we have the resulting files list. Check if we have any files at all.
1393         request = compaction.getRequest();
1394         final Collection<StoreFile> selectedFiles = request.getFiles();
1395         if (selectedFiles.isEmpty()) {
1396           return null;
1397         }
1398 
1399         // Update filesCompacting (check that we do not try to compact the same StoreFile twice).
1400         if (!Collections.disjoint(filesCompacting, selectedFiles)) {
1401           Preconditions.checkArgument(false, "%s overlaps with %s",
1402               selectedFiles, filesCompacting);
1403         }
1404         filesCompacting.addAll(selectedFiles);
1405         Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1406 
1407         // If we're enqueuing a major, clear the force flag.
1408         this.forceMajor = this.forceMajor && !request.isMajor();
1409 
1410         // Set common request properties.
1411         // Set priority, either override value supplied by caller or from store.
1412         request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1413         request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1414       }
1415     } finally {
1416       this.lock.readLock().unlock();
1417     }
1418 
1419     LOG.debug(getRegionInfo().getEncodedName() + " - "  + getColumnFamilyName()
1420         + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1421         + (request.isAllFiles() ? " (all files)" : ""));
1422     this.region.reportCompactionRequestStart(request.isMajor());
1423     return compaction;
1424   }
1425 
1426   @Override
1427   public void cancelRequestedCompaction(CompactionContext compaction) {
1428     finishCompactionRequest(compaction.getRequest());
1429   }
1430 
1431   private void finishCompactionRequest(CompactionRequest cr) {
1432     this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1433     if (cr.isOffPeak()) {
1434       offPeakCompactionTracker.set(false);
1435       cr.setOffPeak(false);
1436     }
1437     synchronized (filesCompacting) {
1438       filesCompacting.removeAll(cr.getFiles());
1439     }
1440   }
1441 
1442   /**
1443    * Validates a store file by opening and closing it. In HFileV2 this should
1444    * not be an expensive operation.
1445    *
1446    * @param path the path to the store file
1447    */
1448   private void validateStoreFile(Path path)
1449       throws IOException {
1450     StoreFile storeFile = null;
1451     try {
1452       storeFile = createStoreFileAndReader(path);
1453     } catch (IOException e) {
1454       LOG.error("Failed to open store file : " + path
1455           + ", keeping it in tmp location", e);
1456       throw e;
1457     } finally {
1458       if (storeFile != null) {
1459         storeFile.closeReader(false);
1460       }
1461     }
1462   }
1463 
1464   /*
1465    * <p>It works by processing a compaction that's been written to disk.
1466    *
1467    * <p>It is usually invoked at the end of a compaction, but might also be
1468    * invoked at HStore startup, if the prior execution died midway through.
1469    *
1470    * <p>Moving the compacted TreeMap into place means:
1471    * <pre>
1472    * 1) Unload all replaced StoreFile, close and collect list to delete.
1473    * 2) Compute new store size
1474    * </pre>
1475    *
1476    * @param compactedFiles list of files that were compacted
1477    * @param newFile StoreFile that is the result of the compaction
1478    */
1479   @VisibleForTesting
1480   protected void completeCompaction(final Collection<StoreFile> compactedFiles)
1481       throws IOException {
1482     try {
1483       // Do not delete old store files until we have sent out notification of
1484       // change in case old files are still being accessed by outstanding scanners.
1485       // Don't do this under writeLock; see HBASE-4485 for a possible deadlock
1486       // scenario that could have happened if continue to hold the lock.
1487       notifyChangedReadersObservers();
1488       // At this point the store will use new files for all scanners.
1489 
1490       // let the archive util decide if we should archive or delete the files
1491       LOG.debug("Removing store files after compaction...");
1492       for (StoreFile compactedFile : compactedFiles) {
1493         compactedFile.closeReader(true);
1494       }
1495       this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
1496     } catch (IOException e) {
1497       e = RemoteExceptionHandler.checkIOException(e);
1498       LOG.error("Failed removing compacted files in " + this +
1499         ". Files we were trying to remove are " + compactedFiles.toString() +
1500         "; some of them may have been already removed", e);
1501     }
1502 
1503     // 4. Compute new store size
1504     this.storeSize = 0L;
1505     this.totalUncompressedBytes = 0L;
1506     for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1507       StoreFile.Reader r = hsf.getReader();
1508       if (r == null) {
1509         LOG.warn("StoreFile " + hsf + " has a null Reader");
1510         continue;
1511       }
1512       this.storeSize += r.length();
1513       this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1514     }
1515   }
1516 
1517   /*
1518    * @param wantedVersions How many versions were asked for.
1519    * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1520    */
1521   int versionsToReturn(final int wantedVersions) {
1522     if (wantedVersions <= 0) {
1523       throw new IllegalArgumentException("Number of versions must be > 0");
1524     }
1525     // Make sure we do not return more than maximum versions for this store.
1526     int maxVersions = this.family.getMaxVersions();
1527     return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1528   }
1529 
1530   static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
1531     return key.getTimestamp() < oldestTimestamp;
1532   }
1533 
1534   @Override
1535   public KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException {
1536     // If minVersions is set, we will not ignore expired KVs.
1537     // As we're only looking for the latest matches, that should be OK.
1538     // With minVersions > 0 we guarantee that any KV that has any version
1539     // at all (expired or not) has at least one version that will not expire.
1540     // Note that this method used to take a KeyValue as arguments. KeyValue
1541     // can be back-dated, a row key cannot.
1542     long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.scanInfo.getTtl();
1543 
1544     KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1545 
1546     GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1547       this.comparator, kv, ttlToUse, this.getRegionInfo().isMetaRegion());
1548     this.lock.readLock().lock();
1549     try {
1550       // First go to the memstore.  Pick up deletes and candidates.
1551       this.memstore.getRowKeyAtOrBefore(state);
1552       // Check if match, if we got a candidate on the asked for 'kv' row.
1553       // Process each relevant store file. Run through from newest to oldest.
1554       Iterator<StoreFile> sfIterator = this.storeEngine.getStoreFileManager()
1555           .getCandidateFilesForRowKeyBefore(state.getTargetKey());
1556       while (sfIterator.hasNext()) {
1557         StoreFile sf = sfIterator.next();
1558         sfIterator.remove(); // Remove sf from iterator.
1559         boolean haveNewCandidate = rowAtOrBeforeFromStoreFile(sf, state);
1560         KeyValue keyv = state.getCandidate();
1561         // we have an optimization here which stops the search if we find exact match.
1562         if (keyv != null && keyv.matchingRow(row)) return state.getCandidate();
1563         if (haveNewCandidate) {
1564           sfIterator = this.storeEngine.getStoreFileManager().updateCandidateFilesForRowKeyBefore(
1565               sfIterator, state.getTargetKey(), state.getCandidate());
1566         }
1567       }
1568       return state.getCandidate();
1569     } finally {
1570       this.lock.readLock().unlock();
1571     }
1572   }
1573 
1574   /*
1575    * Check an individual MapFile for the row at or before a given row.
1576    * @param f
1577    * @param state
1578    * @throws IOException
1579    * @return True iff the candidate has been updated in the state.
1580    */
1581   private boolean rowAtOrBeforeFromStoreFile(final StoreFile f,
1582                                           final GetClosestRowBeforeTracker state)
1583       throws IOException {
1584     StoreFile.Reader r = f.getReader();
1585     if (r == null) {
1586       LOG.warn("StoreFile " + f + " has a null Reader");
1587       return false;
1588     }
1589     if (r.getEntries() == 0) {
1590       LOG.warn("StoreFile " + f + " is a empty store file");
1591       return false;
1592     }
1593     // TODO: Cache these keys rather than make each time?
1594     byte [] fk = r.getFirstKey();
1595     if (fk == null) return false;
1596     KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1597     byte [] lk = r.getLastKey();
1598     KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1599     KeyValue firstOnRow = state.getTargetKey();
1600     if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
1601       // If last key in file is not of the target table, no candidates in this
1602       // file.  Return.
1603       if (!state.isTargetTable(lastKV)) return false;
1604       // If the row we're looking for is past the end of file, set search key to
1605       // last key. TODO: Cache last and first key rather than make each time.
1606       firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
1607     }
1608     // Get a scanner that caches blocks and that uses pread.
1609     HFileScanner scanner = r.getScanner(true, true, false);
1610     // Seek scanner.  If can't seek it, return.
1611     if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1612     // If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN!
1613     // Unlikely that there'll be an instance of actual first row in table.
1614     if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1615     // If here, need to start backing up.
1616     while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
1617        firstOnRow.getKeyLength())) {
1618       KeyValue kv = scanner.getKeyValue();
1619       if (!state.isTargetTable(kv)) break;
1620       if (!state.isBetterCandidate(kv)) break;
1621       // Make new first on row.
1622       firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
1623       // Seek scanner.  If can't seek it, break.
1624       if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1625       // If we find something, break;
1626       if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1627     }
1628     return false;
1629   }
1630 
1631   /*
1632    * Seek the file scanner to firstOnRow or first entry in file.
1633    * @param scanner
1634    * @param firstOnRow
1635    * @param firstKV
1636    * @return True if we successfully seeked scanner.
1637    * @throws IOException
1638    */
1639   private boolean seekToScanner(final HFileScanner scanner,
1640                                 final KeyValue firstOnRow,
1641                                 final KeyValue firstKV)
1642       throws IOException {
1643     KeyValue kv = firstOnRow;
1644     // If firstOnRow < firstKV, set to firstKV
1645     if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
1646     int result = scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(),
1647       kv.getKeyLength());
1648     return result != -1;
1649   }
1650 
1651   /*
1652    * When we come in here, we are probably at the kv just before we break into
1653    * the row that firstOnRow is on.  Usually need to increment one time to get
1654    * on to the row we are interested in.
1655    * @param scanner
1656    * @param firstOnRow
1657    * @param state
1658    * @return True we found a candidate.
1659    * @throws IOException
1660    */
1661   private boolean walkForwardInSingleRow(final HFileScanner scanner,
1662                                          final KeyValue firstOnRow,
1663                                          final GetClosestRowBeforeTracker state)
1664       throws IOException {
1665     boolean foundCandidate = false;
1666     do {
1667       KeyValue kv = scanner.getKeyValue();
1668       // If we are not in the row, skip.
1669       if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
1670       // Did we go beyond the target row? If so break.
1671       if (state.isTooFar(kv, firstOnRow)) break;
1672       if (state.isExpired(kv)) {
1673         continue;
1674       }
1675       // If we added something, this row is a contender. break.
1676       if (state.handle(kv)) {
1677         foundCandidate = true;
1678         break;
1679       }
1680     } while(scanner.next());
1681     return foundCandidate;
1682   }
1683 
1684   @Override
1685   public boolean canSplit() {
1686     this.lock.readLock().lock();
1687     try {
1688       // Not split-able if we find a reference store file present in the store.
1689       boolean result = !hasReferences();
1690       if (!result && LOG.isDebugEnabled()) {
1691         LOG.debug("Cannot split region due to reference files being there");
1692       }
1693       return result;
1694     } finally {
1695       this.lock.readLock().unlock();
1696     }
1697   }
1698 
1699   @Override
1700   public byte[] getSplitPoint() {
1701     this.lock.readLock().lock();
1702     try {
1703       // Should already be enforced by the split policy!
1704       assert !this.getRegionInfo().isMetaRegion();
1705       // Not split-able if we find a reference store file present in the store.
1706       if (hasReferences()) {
1707         return null;
1708       }
1709       return this.storeEngine.getStoreFileManager().getSplitPoint();
1710     } catch(IOException e) {
1711       LOG.warn("Failed getting store size for " + this, e);
1712     } finally {
1713       this.lock.readLock().unlock();
1714     }
1715     return null;
1716   }
1717 
1718   @Override
1719   public long getLastCompactSize() {
1720     return this.lastCompactSize;
1721   }
1722 
1723   @Override
1724   public long getSize() {
1725     return storeSize;
1726   }
1727 
1728   @Override
1729   public void triggerMajorCompaction() {
1730     this.forceMajor = true;
1731   }
1732 
1733 
1734   //////////////////////////////////////////////////////////////////////////////
1735   // File administration
1736   //////////////////////////////////////////////////////////////////////////////
1737 
1738   @Override
1739   public KeyValueScanner getScanner(Scan scan,
1740       final NavigableSet<byte []> targetCols, long readPt) throws IOException {
1741     lock.readLock().lock();
1742     try {
1743       KeyValueScanner scanner = null;
1744       if (this.getCoprocessorHost() != null) {
1745         scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
1746       }
1747       if (scanner == null) {
1748         scanner = scan.isReversed() ? new ReversedStoreScanner(this,
1749             getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
1750             getScanInfo(), scan, targetCols, readPt);
1751       }
1752       return scanner;
1753     } finally {
1754       lock.readLock().unlock();
1755     }
1756   }
1757 
1758   @Override
1759   public String toString() {
1760     return this.getColumnFamilyName();
1761   }
1762 
1763   @Override
1764   // TODO: why is there this and also getNumberOfStorefiles?! Remove one.
1765   public int getStorefilesCount() {
1766     return this.storeEngine.getStoreFileManager().getStorefileCount();
1767   }
1768 
1769   @Override
1770   public long getStoreSizeUncompressed() {
1771     return this.totalUncompressedBytes;
1772   }
1773 
1774   @Override
1775   public long getStorefilesSize() {
1776     long size = 0;
1777     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1778       StoreFile.Reader r = s.getReader();
1779       if (r == null) {
1780         LOG.warn("StoreFile " + s + " has a null Reader");
1781         continue;
1782       }
1783       size += r.length();
1784     }
1785     return size;
1786   }
1787 
1788   @Override
1789   public long getStorefilesIndexSize() {
1790     long size = 0;
1791     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1792       StoreFile.Reader r = s.getReader();
1793       if (r == null) {
1794         LOG.warn("StoreFile " + s + " has a null Reader");
1795         continue;
1796       }
1797       size += r.indexSize();
1798     }
1799     return size;
1800   }
1801 
1802   @Override
1803   public long getTotalStaticIndexSize() {
1804     long size = 0;
1805     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1806       size += s.getReader().getUncompressedDataIndexSize();
1807     }
1808     return size;
1809   }
1810 
1811   @Override
1812   public long getTotalStaticBloomSize() {
1813     long size = 0;
1814     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1815       StoreFile.Reader r = s.getReader();
1816       size += r.getTotalBloomSize();
1817     }
1818     return size;
1819   }
1820 
1821   @Override
1822   public long getMemStoreSize() {
1823     return this.memstore.size();
1824   }
1825 
1826   @Override
1827   public int getCompactPriority() {
1828     int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
1829     if (priority == PRIORITY_USER) {
1830       LOG.warn("Compaction priority is USER despite there being no user compaction");
1831     }
1832     return priority;
1833   }
1834 
1835   @Override
1836   public boolean throttleCompaction(long compactionSize) {
1837     return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
1838   }
1839 
1840   public HRegion getHRegion() {
1841     return this.region;
1842   }
1843 
1844   @Override
1845   public RegionCoprocessorHost getCoprocessorHost() {
1846     return this.region.getCoprocessorHost();
1847   }
1848 
1849   @Override
1850   public HRegionInfo getRegionInfo() {
1851     return this.fs.getRegionInfo();
1852   }
1853 
1854   @Override
1855   public boolean areWritesEnabled() {
1856     return this.region.areWritesEnabled();
1857   }
1858 
1859   @Override
1860   public long getSmallestReadPoint() {
1861     return this.region.getSmallestReadPoint();
1862   }
1863 
1864   /**
1865    * Used in tests. TODO: Remove
1866    *
1867    * Updates the value for the given row/family/qualifier. This function will always be seen as
1868    * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
1869    * control necessary.
1870    * @param row row to update
1871    * @param f family to update
1872    * @param qualifier qualifier to update
1873    * @param newValue the new value to set into memstore
1874    * @return memstore size delta
1875    * @throws IOException
1876    */
1877   public long updateColumnValue(byte [] row, byte [] f,
1878                                 byte [] qualifier, long newValue)
1879       throws IOException {
1880 
1881     this.lock.readLock().lock();
1882     try {
1883       long now = EnvironmentEdgeManager.currentTimeMillis();
1884 
1885       return this.memstore.updateColumnValue(row,
1886           f,
1887           qualifier,
1888           newValue,
1889           now);
1890 
1891     } finally {
1892       this.lock.readLock().unlock();
1893     }
1894   }
1895 
1896   @Override
1897   public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
1898     this.lock.readLock().lock();
1899     try {
1900       return this.memstore.upsert(cells, readpoint);
1901     } finally {
1902       this.lock.readLock().unlock();
1903     }
1904   }
1905 
1906   @Override
1907   public StoreFlushContext createFlushContext(long cacheFlushId) {
1908     return new StoreFlusherImpl(cacheFlushId);
1909   }
1910 
1911   private class StoreFlusherImpl implements StoreFlushContext {
1912 
1913     private long cacheFlushSeqNum;
1914     private MemStoreSnapshot snapshot;
1915     private List<Path> tempFiles;
1916 
1917     private StoreFlusherImpl(long cacheFlushSeqNum) {
1918       this.cacheFlushSeqNum = cacheFlushSeqNum;
1919     }
1920 
1921     /**
1922      * This is not thread safe. The caller should have a lock on the region or the store.
1923      * If necessary, the lock can be added with the patch provided in HBASE-10087
1924      */
1925     @Override
1926     public void prepare() {
1927       this.snapshot = memstore.snapshot();
1928     }
1929 
1930     @Override
1931     public void flushCache(MonitoredTask status) throws IOException {
1932       tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status);
1933     }
1934 
1935     @Override
1936     public boolean commit(MonitoredTask status) throws IOException {
1937       if (this.tempFiles == null || this.tempFiles.isEmpty()) {
1938         return false;
1939       }
1940       List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
1941       for (Path storeFilePath : tempFiles) {
1942         try {
1943           storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status));
1944         } catch (IOException ex) {
1945           LOG.error("Failed to commit store file " + storeFilePath, ex);
1946           // Try to delete the files we have committed before.
1947           for (StoreFile sf : storeFiles) {
1948             Path pathToDelete = sf.getPath();
1949             try {
1950               sf.deleteReader();
1951             } catch (IOException deleteEx) {
1952               LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
1953               Runtime.getRuntime().halt(1);
1954             }
1955           }
1956           throw new IOException("Failed to commit the flush", ex);
1957         }
1958       }
1959 
1960       if (HStore.this.getCoprocessorHost() != null) {
1961         for (StoreFile sf : storeFiles) {
1962           HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
1963         }
1964       }
1965       // Add new file to store files.  Clear snapshot too while we have the Store write lock.
1966       return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
1967     }
1968   }
1969 
1970   @Override
1971   public boolean needsCompaction() {
1972     return this.storeEngine.needsCompaction(this.filesCompacting);
1973   }
1974 
1975   @Override
1976   public CacheConfig getCacheConfig() {
1977     return this.cacheConf;
1978   }
1979 
1980   public static final long FIXED_OVERHEAD =
1981       ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
1982               + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
1983 
1984   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
1985       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
1986       + ClassSize.CONCURRENT_SKIPLISTMAP
1987       + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
1988       + ScanInfo.FIXED_OVERHEAD);
1989 
1990   @Override
1991   public long heapSize() {
1992     return DEEP_OVERHEAD + this.memstore.heapSize();
1993   }
1994 
1995   @Override
1996   public KeyValue.KVComparator getComparator() {
1997     return comparator;
1998   }
1999 
2000   @Override
2001   public ScanInfo getScanInfo() {
2002     return scanInfo;
2003   }
2004 
2005   /**
2006    * Set scan info, used by test
2007    * @param scanInfo new scan info to use for test
2008    */
2009   void setScanInfo(ScanInfo scanInfo) {
2010     this.scanInfo = scanInfo;
2011   }
2012 
2013   @Override
2014   public boolean hasTooManyStoreFiles() {
2015     return getStorefilesCount() > this.blockingFileCount;
2016   }
2017 }