View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.base.Preconditions;
23  import com.google.common.collect.ImmutableCollection;
24  import com.google.common.collect.ImmutableList;
25  import com.google.common.collect.Lists;
26  import com.google.common.collect.Sets;
27  
28  import java.io.IOException;
29  import java.io.InterruptedIOException;
30  import java.net.InetSocketAddress;
31  import java.security.PrivilegedExceptionAction;
32  import java.util.ArrayList;
33  import java.util.Collection;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.HashSet;
37  import java.util.Iterator;
38  import java.util.List;
39  import java.util.NavigableSet;
40  import java.util.Set;
41  import java.util.concurrent.Callable;
42  import java.util.concurrent.CompletionService;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.concurrent.ExecutionException;
45  import java.util.concurrent.ExecutorCompletionService;
46  import java.util.concurrent.Future;
47  import java.util.concurrent.ThreadPoolExecutor;
48  import java.util.concurrent.atomic.AtomicBoolean;
49  import java.util.concurrent.locks.ReentrantReadWriteLock;
50  
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.hbase.Cell;
57  import org.apache.hadoop.hbase.CellComparator;
58  import org.apache.hadoop.hbase.CellUtil;
59  import org.apache.hadoop.hbase.CompoundConfiguration;
60  import org.apache.hadoop.hbase.HColumnDescriptor;
61  import org.apache.hadoop.hbase.HConstants;
62  import org.apache.hadoop.hbase.HRegionInfo;
63  import org.apache.hadoop.hbase.KeyValue;
64  import org.apache.hadoop.hbase.TableName;
65  import org.apache.hadoop.hbase.Tag;
66  import org.apache.hadoop.hbase.TagType;
67  import org.apache.hadoop.hbase.TagUtil;
68  import org.apache.hadoop.hbase.classification.InterfaceAudience;
69  import org.apache.hadoop.hbase.client.Scan;
70  import org.apache.hadoop.hbase.conf.ConfigurationManager;
71  import org.apache.hadoop.hbase.io.compress.Compression;
72  import org.apache.hadoop.hbase.io.crypto.Encryption;
73  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
74  import org.apache.hadoop.hbase.io.hfile.HFile;
75  import org.apache.hadoop.hbase.io.hfile.HFileContext;
76  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
77  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
78  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
79  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
80  import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
81  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
82  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
83  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
84  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
85  import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
86  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
87  import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
88  import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
89  import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
90  import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
91  import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
92  import org.apache.hadoop.hbase.security.EncryptionUtil;
93  import org.apache.hadoop.hbase.security.User;
94  import org.apache.hadoop.hbase.util.Bytes;
95  import org.apache.hadoop.hbase.util.ChecksumType;
96  import org.apache.hadoop.hbase.util.ClassSize;
97  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
98  import org.apache.hadoop.hbase.util.ReflectionUtils;
99  import org.apache.hadoop.util.StringUtils;
100 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
101
102 /**
103  * A Store holds a column family in a Region.  Its a memstore and a set of zero
104  * or more StoreFiles, which stretch backwards over time.
105  *
106  * <p>There's no reason to consider append-logging at this level; all logging
107  * and locking is handled at the HRegion level.  Store just provides
108  * services to manage sets of StoreFiles.  One of the most important of those
109  * services is compaction services where files are aggregated once they pass
110  * a configurable threshold.
111  *
112  * <p>Locking and transactions are handled at a higher level.  This API should
113  * not be called directly but by an HRegion manager.
114  */
115 @InterfaceAudience.Private
116 public class HStore implements Store {
117   private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
118   public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
119       "hbase.server.compactchecker.interval.multiplier";
120   public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
121   public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
122   public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
123
124   private static final Log LOG = LogFactory.getLog(HStore.class);
125
126   protected final MemStore memstore;
127   // This stores directory in the filesystem.
128   protected final HRegion region;
129   private final HColumnDescriptor family;
130   private final HRegionFileSystem fs;
131   protected Configuration conf;
132   protected CacheConfig cacheConf;
133   private long lastCompactSize = 0;
134   volatile boolean forceMajor = false;
135   /* how many bytes to write between status checks */
136   static int closeCheckInterval = 0;
137   private volatile long storeSize = 0L;
138   private volatile long totalUncompressedBytes = 0L;
139
140   /**
141    * RWLock for store operations.
142    * Locked in shared mode when the list of component stores is looked at:
143    *   - all reads/writes to table data
144    *   - checking for split
145    * Locked in exclusive mode when the list of component stores is modified:
146    *   - closing
147    *   - completing a compaction
148    */
149   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
150   private final boolean verifyBulkLoads;
151
152   private ScanInfo scanInfo;
153
154   // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it.
155   final List<StoreFile> filesCompacting = Lists.newArrayList();
156
157   // All access must be synchronized.
158   private final Set<ChangedReadersObserver> changedReaderObservers =
159     Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
160
161   protected final int blocksize;
162   private HFileDataBlockEncoder dataBlockEncoder;
163
164   /** Checksum configuration */
165   protected ChecksumType checksumType;
166   protected int bytesPerChecksum;
167
168   // Comparing KeyValues
169   private final CellComparator comparator;
170
171   final StoreEngine<?, ?, ?, ?> storeEngine;
172
173   private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
174   private volatile OffPeakHours offPeakHours;
175
176   private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
177   private int flushRetriesNumber;
178   private int pauseTime;
179
180   private long blockingFileCount;
181   private int compactionCheckMultiplier;
182   protected Encryption.Context cryptoContext = Encryption.Context.NONE;
183
184   private volatile long flushedCellsCount = 0;
185   private volatile long compactedCellsCount = 0;
186   private volatile long majorCompactedCellsCount = 0;
187   private volatile long flushedCellsSize = 0;
188   private volatile long flushedOutputFileSize = 0;
189   private volatile long compactedCellsSize = 0;
190   private volatile long majorCompactedCellsSize = 0;
191
192   /**
193    * Constructor
194    * @param region
195    * @param family HColumnDescriptor for this column
196    * @param confParam configuration object
197    * failed.  Can be null.
198    * @throws IOException
199    */
200   protected HStore(final HRegion region, final HColumnDescriptor family,
201       final Configuration confParam) throws IOException {
202
203     this.fs = region.getRegionFileSystem();
204
205     // Assemble the store's home directory and Ensure it exists.
206     fs.createStoreDir(family.getNameAsString());
207     this.region = region;
208     this.family = family;
209     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
210     // CompoundConfiguration will look for keys in reverse order of addition, so we'd
211     // add global config first, then table and cf overrides, then cf metadata.
212     this.conf = new CompoundConfiguration()
213       .add(confParam)
214       .addStringMap(region.getTableDesc().getConfiguration())
215       .addStringMap(family.getConfiguration())
216       .addBytesMap(family.getValues());
217     this.blocksize = family.getBlocksize();
218
219     this.dataBlockEncoder =
220         new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
221
222     this.comparator = region.getCellCompartor();
223     // used by ScanQueryMatcher
224     long timeToPurgeDeletes =
225         Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
226     LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
227         "ms in store " + this);
228     // Get TTL
229     long ttl = determineTTLFromFamily(family);
230     // Why not just pass a HColumnDescriptor in here altogether?  Even if have
231     // to clone it?
232     scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator);
233     String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
234     if (family.isInMemoryCompaction()) {
235       className = CompactingMemStore.class.getName();
236       this.memstore = new CompactingMemStore(conf, this.comparator, this,
237           this.getHRegion().getRegionServicesForStores());
238     } else {
239       this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
240           Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator });
241     }
242     LOG.info("Memstore class name is " + className);
243     this.offPeakHours = OffPeakHours.getInstance(conf);
244
245     // Setting up cache configuration for this family
246     createCacheConf(family);
247
248     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
249
250     this.blockingFileCount =
251         conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
252     this.compactionCheckMultiplier = conf.getInt(
253         COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
254     if (this.compactionCheckMultiplier <= 0) {
255       LOG.error("Compaction check period multiplier must be positive, setting default: "
256           + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
257       this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
258     }
259
260     if (HStore.closeCheckInterval == 0) {
261       HStore.closeCheckInterval = conf.getInt(
262           "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
263     }
264
265     this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
266     this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
267
268     // Initialize checksum type from name. The names are CRC32, CRC32C, etc.
269     this.checksumType = getChecksumType(conf);
270     // initilize bytes per checksum
271     this.bytesPerChecksum = getBytesPerChecksum(conf);
272     flushRetriesNumber = conf.getInt(
273         "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
274     pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
275     if (flushRetriesNumber <= 0) {
276       throw new IllegalArgumentException(
277           "hbase.hstore.flush.retries.number must be > 0, not "
278               + flushRetriesNumber);
279     }
280     cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
281   }
282
283   /**
284    * Creates the cache config.
285    * @param family The current column family.
286    */
287   protected void createCacheConf(final HColumnDescriptor family) {
288     this.cacheConf = new CacheConfig(conf, family);
289   }
290
291   /**
292    * Creates the store engine configured for the given Store.
293    * @param store The store. An unfortunate dependency needed due to it
294    *              being passed to coprocessors via the compactor.
295    * @param conf Store configuration.
296    * @param kvComparator KVComparator for storeFileManager.
297    * @return StoreEngine to use.
298    */
299   protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
300       CellComparator kvComparator) throws IOException {
301     return StoreEngine.create(store, conf, comparator);
302   }
303
304   /**
305    * @param family
306    * @return TTL in seconds of the specified family
307    */
308   public static long determineTTLFromFamily(final HColumnDescriptor family) {
309     // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
310     long ttl = family.getTimeToLive();
311     if (ttl == HConstants.FOREVER) {
312       // Default is unlimited ttl.
313       ttl = Long.MAX_VALUE;
314     } else if (ttl == -1) {
315       ttl = Long.MAX_VALUE;
316     } else {
317       // Second -> ms adjust for user data
318       ttl *= 1000;
319     }
320     return ttl;
321   }
322
323   @Override
324   public String getColumnFamilyName() {
325     return this.family.getNameAsString();
326   }
327
328   @Override
329   public TableName getTableName() {
330     return this.getRegionInfo().getTable();
331   }
332
333   @Override
334   public FileSystem getFileSystem() {
335     return this.fs.getFileSystem();
336   }
337
338   public HRegionFileSystem getRegionFileSystem() {
339     return this.fs;
340   }
341
342   /* Implementation of StoreConfigInformation */
343   @Override
344   public long getStoreFileTtl() {
345     // TTL only applies if there's no MIN_VERSIONs setting on the column.
346     return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
347   }
348
349   @Override
350   public long getMemstoreFlushSize() {
351     // TODO: Why is this in here?  The flushsize of the region rather than the store?  St.Ack
352     return this.region.memstoreFlushSize;
353   }
354
355   @Override
356   public long getFlushableSize() {
357     return this.memstore.getFlushableSize();
358   }
359
360   @Override
361   public long getSnapshotSize() {
362     return this.memstore.getSnapshotSize();
363   }
364
365   @Override
366   public long getCompactionCheckMultiplier() {
367     return this.compactionCheckMultiplier;
368   }
369
370   @Override
371   public long getBlockingFileCount() {
372     return blockingFileCount;
373   }
374   /* End implementation of StoreConfigInformation */
375
376   /**
377    * Returns the configured bytesPerChecksum value.
378    * @param conf The configuration
379    * @return The bytesPerChecksum that is set in the configuration
380    */
381   public static int getBytesPerChecksum(Configuration conf) {
382     return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
383                        HFile.DEFAULT_BYTES_PER_CHECKSUM);
384   }
385
386   /**
387    * Returns the configured checksum algorithm.
388    * @param conf The configuration
389    * @return The checksum algorithm that is set in the configuration
390    */
391   public static ChecksumType getChecksumType(Configuration conf) {
392     String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
393     if (checksumName == null) {
394       return ChecksumType.getDefaultChecksumType();
395     } else {
396       return ChecksumType.nameToType(checksumName);
397     }
398   }
399
400   /**
401    * @return how many bytes to write between status checks
402    */
403   public static int getCloseCheckInterval() {
404     return closeCheckInterval;
405   }
406
407   @Override
408   public HColumnDescriptor getFamily() {
409     return this.family;
410   }
411
412   /**
413    * @return The maximum sequence id in all store files. Used for log replay.
414    */
415   @Override
416   public long getMaxSequenceId() {
417     return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
418   }
419
420   @Override
421   public long getMaxMemstoreTS() {
422     return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
423   }
424
425   /**
426    * @param tabledir {@link Path} to where the table is being stored
427    * @param hri {@link HRegionInfo} for the region.
428    * @param family {@link HColumnDescriptor} describing the column family
429    * @return Path to family/Store home directory.
430    */
431   @Deprecated
432   public static Path getStoreHomedir(final Path tabledir,
433       final HRegionInfo hri, final byte[] family) {
434     return getStoreHomedir(tabledir, hri.getEncodedName(), family);
435   }
436
437   /**
438    * @param tabledir {@link Path} to where the table is being stored
439    * @param encodedName Encoded region name.
440    * @param family {@link HColumnDescriptor} describing the column family
441    * @return Path to family/Store home directory.
442    */
443   @Deprecated
444   public static Path getStoreHomedir(final Path tabledir,
445       final String encodedName, final byte[] family) {
446     return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
447   }
448
449   @Override
450   public HFileDataBlockEncoder getDataBlockEncoder() {
451     return dataBlockEncoder;
452   }
453
454   /**
455    * Should be used only in tests.
456    * @param blockEncoder the block delta encoder to use
457    */
458   void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
459     this.dataBlockEncoder = blockEncoder;
460   }
461
462   /**
463    * Creates an unsorted list of StoreFile loaded in parallel
464    * from the given directory.
465    * @throws IOException
466    */
467   private List<StoreFile> loadStoreFiles() throws IOException {
468     Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
469     return openStoreFiles(files);
470   }
471
472   private List<StoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
473     if (files == null || files.size() == 0) {
474       return new ArrayList<StoreFile>();
475     }
476     // initialize the thread pool for opening store files in parallel..
477     ThreadPoolExecutor storeFileOpenerThreadPool =
478       this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
479           this.getColumnFamilyName());
480     CompletionService<StoreFile> completionService =
481       new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
482
483     int totalValidStoreFile = 0;
484     for (final StoreFileInfo storeFileInfo: files) {
485       // open each store file in parallel
486       completionService.submit(new Callable<StoreFile>() {
487         @Override
488         public StoreFile call() throws IOException {
489           StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
490           return storeFile;
491         }
492       });
493       totalValidStoreFile++;
494     }
495
496     ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
497     IOException ioe = null;
498     try {
499       for (int i = 0; i < totalValidStoreFile; i++) {
500         try {
501           Future<StoreFile> future = completionService.take();
502           StoreFile storeFile = future.get();
503           if (storeFile != null) {
504             long length = storeFile.getReader().length();
505             this.storeSize += length;
506             this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
507             if (LOG.isDebugEnabled()) {
508               LOG.debug("loaded " + storeFile.toStringDetailed());
509             }
510             results.add(storeFile);
511           }
512         } catch (InterruptedException e) {
513           if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
514         } catch (ExecutionException e) {
515           if (ioe == null) ioe = new IOException(e.getCause());
516         }
517       }
518     } finally {
519       storeFileOpenerThreadPool.shutdownNow();
520     }
521     if (ioe != null) {
522       // close StoreFile readers
523       boolean evictOnClose =
524           cacheConf != null? cacheConf.shouldEvictOnClose(): true;
525       for (StoreFile file : results) {
526         try {
527           if (file != null) file.closeReader(evictOnClose);
528         } catch (IOException e) {
529           LOG.warn(e.getMessage());
530         }
531       }
532       throw ioe;
533     }
534
535     return results;
536   }
537
538   /**
539    * Checks the underlying store files, and opens the files that  have not
540    * been opened, and removes the store file readers for store files no longer
541    * available. Mainly used by secondary region replicas to keep up to date with
542    * the primary region files.
543    * @throws IOException
544    */
545   @Override
546   public void refreshStoreFiles() throws IOException {
547     Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName());
548     refreshStoreFilesInternal(newFiles);
549   }
550
551   @Override
552   public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
553     List<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(newFiles.size());
554     for (String file : newFiles) {
555       storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file));
556     }
557     refreshStoreFilesInternal(storeFiles);
558   }
559
560   /**
561    * Checks the underlying store files, and opens the files that  have not
562    * been opened, and removes the store file readers for store files no longer
563    * available. Mainly used by secondary region replicas to keep up to date with
564    * the primary region files.
565    * @throws IOException
566    */
567   private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
568     StoreFileManager sfm = storeEngine.getStoreFileManager();
569     Collection<StoreFile> currentFiles = sfm.getStorefiles();
570     if (currentFiles == null) currentFiles = new ArrayList<StoreFile>(0);
571
572     if (newFiles == null) newFiles = new ArrayList<StoreFileInfo>(0);
573
574     HashMap<StoreFileInfo, StoreFile> currentFilesSet =
575         new HashMap<StoreFileInfo, StoreFile>(currentFiles.size());
576     for (StoreFile sf : currentFiles) {
577       currentFilesSet.put(sf.getFileInfo(), sf);
578     }
579     HashSet<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
580
581     Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
582     Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
583
584     if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
585       return;
586     }
587
588     LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString()
589       + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
590
591     Set<StoreFile> toBeRemovedStoreFiles = new HashSet<StoreFile>(toBeRemovedFiles.size());
592     for (StoreFileInfo sfi : toBeRemovedFiles) {
593       toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
594     }
595
596     // try to open the files
597     List<StoreFile> openedFiles = openStoreFiles(toBeAddedFiles);
598
599     // propogate the file changes to the underlying store file manager
600     replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception
601
602     // Advance the memstore read point to be at least the new store files seqIds so that
603     // readers might pick it up. This assumes that the store is not getting any writes (otherwise
604     // in-flight transactions might be made visible)
605     if (!toBeAddedFiles.isEmpty()) {
606       region.getMVCC().advanceTo(this.getMaxSequenceId());
607     }
608
609     completeCompaction(toBeRemovedStoreFiles);
610   }
611
612   private StoreFile createStoreFileAndReader(final Path p) throws IOException {
613     StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
614     return createStoreFileAndReader(info);
615   }
616
617   private StoreFile createStoreFileAndReader(final StoreFileInfo info)
618       throws IOException {
619     info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
620     StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
621       this.family.getBloomFilterType());
622     StoreFileReader r = storeFile.createReader();
623     r.setReplicaStoreFile(isPrimaryReplicaStore());
624     return storeFile;
625   }
626
627   @Override
628   public long add(final Cell cell) {
629     lock.readLock().lock();
630     try {
631        return this.memstore.add(cell);
632     } finally {
633       lock.readLock().unlock();
634     }
635   }
636
637   @Override
638   public long timeOfOldestEdit() {
639     return memstore.timeOfOldestEdit();
640   }
641
642   /**
643    * Adds a value to the memstore
644    *
645    * @param kv
646    * @return memstore size delta
647    */
648   protected long delete(final KeyValue kv) {
649     lock.readLock().lock();
650     try {
651       return this.memstore.delete(kv);
652     } finally {
653       lock.readLock().unlock();
654     }
655   }
656
657   /**
658    * @return All store files.
659    */
660   @Override
661   public Collection<StoreFile> getStorefiles() {
662     return this.storeEngine.getStoreFileManager().getStorefiles();
663   }
664
665   @Override
666   public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
667     HFile.Reader reader  = null;
668     try {
669       LOG.info("Validating hfile at " + srcPath + " for inclusion in "
670           + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
671       reader = HFile.createReader(srcPath.getFileSystem(conf),
672           srcPath, cacheConf, conf);
673       reader.loadFileInfo();
674
675       byte[] firstKey = reader.getFirstRowKey();
676       Preconditions.checkState(firstKey != null, "First key can not be null");
677       Cell lk = reader.getLastKey();
678       Preconditions.checkState(lk != null, "Last key can not be null");
679       byte[] lastKey =  CellUtil.cloneRow(lk);
680
681       LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
682           " last=" + Bytes.toStringBinary(lastKey));
683       LOG.debug("Region bounds: first=" +
684           Bytes.toStringBinary(getRegionInfo().getStartKey()) +
685           " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
686
687       if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
688         throw new WrongRegionException(
689             "Bulk load file " + srcPath.toString() + " does not fit inside region "
690             + this.getRegionInfo().getRegionNameAsString());
691       }
692
693       if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE,
694           HConstants.DEFAULT_MAX_FILE_SIZE)) {
695         LOG.warn("Trying to bulk load hfile " + srcPath.toString() + " with size: " +
696             reader.length() + " bytes can be problematic as it may lead to oversplitting.");
697       }
698
699       if (verifyBulkLoads) {
700         long verificationStartTime = EnvironmentEdgeManager.currentTime();
701         LOG.info("Full verification started for bulk load hfile: " + srcPath.toString());
702         Cell prevCell = null;
703         HFileScanner scanner = reader.getScanner(false, false, false);
704         scanner.seekTo();
705         do {
706           Cell cell = scanner.getCell();
707           if (prevCell != null) {
708             if (comparator.compareRows(prevCell, cell) > 0) {
709               throw new InvalidHFileException("Previous row is greater than"
710                   + " current row: path=" + srcPath + " previous="
711                   + CellUtil.getCellKeyAsString(prevCell) + " current="
712                   + CellUtil.getCellKeyAsString(cell));
713             }
714             if (CellComparator.compareFamilies(prevCell, cell) != 0) {
715               throw new InvalidHFileException("Previous key had different"
716                   + " family compared to current key: path=" + srcPath
717                   + " previous="
718                   + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),
719                       prevCell.getFamilyLength())
720                   + " current="
721                   + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
722                       cell.getFamilyLength()));
723             }
724           }
725           prevCell = cell;
726         } while (scanner.next());
727       LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString()
728          + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime)
729          + " ms");
730       }
731     } finally {
732       if (reader != null) reader.close();
733     }
734   }
735
736   @Override
737   public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
738     Path srcPath = new Path(srcPathStr);
739     Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
740
741     LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
742         + dstPath + " - updating store file list.");
743
744     StoreFile sf = createStoreFileAndReader(dstPath);
745     bulkLoadHFile(sf);
746
747     LOG.info("Successfully loaded store file " + srcPath + " into store " + this
748         + " (new location: " + dstPath + ")");
749
750     return dstPath;
751   }
752
753   @Override
754   public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
755     StoreFile sf = createStoreFileAndReader(fileInfo);
756     bulkLoadHFile(sf);
757   }
758
759   private void bulkLoadHFile(StoreFile sf) throws IOException {
760     StoreFileReader r = sf.getReader();
761     this.storeSize += r.length();
762     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
763
764     // Append the new storefile into the list
765     this.lock.writeLock().lock();
766     try {
767       this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
768     } finally {
769       // We need the lock, as long as we are updating the storeFiles
770       // or changing the memstore. Let us release it before calling
771       // notifyChangeReadersObservers. See HBASE-4485 for a possible
772       // deadlock scenario that could have happened if continue to hold
773       // the lock.
774       this.lock.writeLock().unlock();
775     }
776     LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
777     if (LOG.isTraceEnabled()) {
778       String traceMessage = "BULK LOAD time,size,store size,store files ["
779           + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
780           + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
781       LOG.trace(traceMessage);
782     }
783   }
784
785   @Override
786   public ImmutableCollection<StoreFile> close() throws IOException {
787     this.lock.writeLock().lock();
788     try {
789       // Clear so metrics doesn't find them.
790       ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
791       Collection<StoreFile> compactedfiles =
792           storeEngine.getStoreFileManager().clearCompactedFiles();
793       // clear the compacted files
794       if (compactedfiles != null && !compactedfiles.isEmpty()) {
795         removeCompactedfiles(compactedfiles);
796       }
797       if (!result.isEmpty()) {
798         // initialize the thread pool for closing store files in parallel.
799         ThreadPoolExecutor storeFileCloserThreadPool = this.region
800             .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
801                 + this.getColumnFamilyName());
802
803         // close each store file in parallel
804         CompletionService<Void> completionService =
805           new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
806         for (final StoreFile f : result) {
807           completionService.submit(new Callable<Void>() {
808             @Override
809             public Void call() throws IOException {
810               boolean evictOnClose =
811                   cacheConf != null? cacheConf.shouldEvictOnClose(): true;
812               f.closeReader(evictOnClose);
813               return null;
814             }
815           });
816         }
817
818         IOException ioe = null;
819         try {
820           for (int i = 0; i < result.size(); i++) {
821             try {
822               Future<Void> future = completionService.take();
823               future.get();
824             } catch (InterruptedException e) {
825               if (ioe == null) {
826                 ioe = new InterruptedIOException();
827                 ioe.initCause(e);
828               }
829             } catch (ExecutionException e) {
830               if (ioe == null) ioe = new IOException(e.getCause());
831             }
832           }
833         } finally {
834           storeFileCloserThreadPool.shutdownNow();
835         }
836         if (ioe != null) throw ioe;
837       }
838       LOG.info("Closed " + this);
839       return result;
840     } finally {
841       this.lock.writeLock().unlock();
842     }
843   }
844
845   /**
846    * Snapshot this stores memstore. Call before running
847    * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController)}
848    *  so it has some work to do.
849    */
850   void snapshot() {
851     this.lock.writeLock().lock();
852     try {
853       this.memstore.snapshot();
854     } finally {
855       this.lock.writeLock().unlock();
856     }
857   }
858
859   /**
860    * Write out current snapshot. Presumes {@link #snapshot()} has been called previously.
861    * @param logCacheFlushId flush sequence number
862    * @param snapshot
863    * @param status
864    * @param throughputController
865    * @return The path name of the tmp file to which the store was flushed
866    * @throws IOException if exception occurs during process
867    */
868   protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
869       MonitoredTask status, ThroughputController throughputController) throws IOException {
870     // If an exception happens flushing, we let it out without clearing
871     // the memstore snapshot.  The old snapshot will be returned when we say
872     // 'snapshot', the next time flush comes around.
873     // Retry after catching exception when flushing, otherwise server will abort
874     // itself
875     StoreFlusher flusher = storeEngine.getStoreFlusher();
876     IOException lastException = null;
877     for (int i = 0; i < flushRetriesNumber; i++) {
878       try {
879         List<Path> pathNames =
880             flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController);
881         Path lastPathName = null;
882         try {
883           for (Path pathName : pathNames) {
884             lastPathName = pathName;
885             validateStoreFile(pathName);
886           }
887           return pathNames;
888         } catch (Exception e) {
889           LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
890           if (e instanceof IOException) {
891             lastException = (IOException) e;
892           } else {
893             lastException = new IOException(e);
894           }
895         }
896       } catch (IOException e) {
897         LOG.warn("Failed flushing store file, retrying num=" + i, e);
898         lastException = e;
899       }
900       if (lastException != null && i < (flushRetriesNumber - 1)) {
901         try {
902           Thread.sleep(pauseTime);
903         } catch (InterruptedException e) {
904           IOException iie = new InterruptedIOException();
905           iie.initCause(e);
906           throw iie;
907         }
908       }
909     }
910     throw lastException;
911   }
912
913   /*
914    * @param path The pathname of the tmp file into which the store was flushed
915    * @param logCacheFlushId
916    * @param status
917    * @return StoreFile created.
918    * @throws IOException
919    */
920   private StoreFile commitFile(final Path path, final long logCacheFlushId, MonitoredTask status)
921       throws IOException {
922     // Write-out finished successfully, move into the right spot
923     Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
924
925     status.setStatus("Flushing " + this + ": reopening flushed file");
926     StoreFile sf = createStoreFileAndReader(dstPath);
927
928     StoreFileReader r = sf.getReader();
929     this.storeSize += r.length();
930     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
931
932     if (LOG.isInfoEnabled()) {
933       LOG.info("Added " + sf + ", entries=" + r.getEntries() +
934         ", sequenceid=" + logCacheFlushId +
935         ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1));
936     }
937     return sf;
938   }
939
940   @Override
941   public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
942                                             boolean isCompaction, boolean includeMVCCReadpoint,
943                                             boolean includesTag)
944       throws IOException {
945     return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
946         includesTag, false);
947   }
948
949   /*
950    * @param maxKeyCount
951    * @param compression Compression algorithm to use
952    * @param isCompaction whether we are creating a new file in a compaction
953    * @param includesMVCCReadPoint - whether to include MVCC or not
954    * @param includesTag - includesTag or not
955    * @return Writer for a new StoreFile in the tmp dir.
956    */
957   @Override
958   public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
959       boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
960       boolean shouldDropBehind)
961   throws IOException {
962     return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
963         includesTag, shouldDropBehind, null);
964   }
965
966   /*
967    * @param maxKeyCount
968    * @param compression Compression algorithm to use
969    * @param isCompaction whether we are creating a new file in a compaction
970    * @param includesMVCCReadPoint - whether to include MVCC or not
971    * @param includesTag - includesTag or not
972    * @return Writer for a new StoreFile in the tmp dir.
973    */
974   @Override
975   public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
976       boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
977       boolean shouldDropBehind, final TimeRangeTracker trt)
978   throws IOException {
979     final CacheConfig writerCacheConf;
980     if (isCompaction) {
981       // Don't cache data on write on compactions.
982       writerCacheConf = new CacheConfig(cacheConf);
983       writerCacheConf.setCacheDataOnWrite(false);
984     } else {
985       writerCacheConf = cacheConf;
986     }
987     InetSocketAddress[] favoredNodes = null;
988     if (region.getRegionServerServices() != null) {
989       favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
990           region.getRegionInfo().getEncodedName());
991     }
992     HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
993       cryptoContext);
994     StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf,
995         this.getFileSystem())
996             .withFilePath(fs.createTempName())
997             .withComparator(comparator)
998             .withBloomType(family.getBloomFilterType())
999             .withMaxKeyCount(maxKeyCount)
1000             .withFavoredNodes(favoredNodes)
1001             .withFileContext(hFileContext)
1002             .withShouldDropCacheBehind(shouldDropBehind);
1003     if (trt != null) {
1004       builder.withTimeRangeTracker(trt);
1005     }
1006     return builder.build();
1007   }
1008
1009   private HFileContext createFileContext(Compression.Algorithm compression,
1010       boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) {
1011     if (compression == null) {
1012       compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
1013     }
1014     HFileContext hFileContext = new HFileContextBuilder()
1015                                 .withIncludesMvcc(includeMVCCReadpoint)
1016                                 .withIncludesTags(includesTag)
1017                                 .withCompression(compression)
1018                                 .withCompressTags(family.isCompressTags())
1019                                 .withChecksumType(checksumType)
1020                                 .withBytesPerCheckSum(bytesPerChecksum)
1021                                 .withBlockSize(blocksize)
1022                                 .withHBaseCheckSum(true)
1023                                 .withDataBlockEncoding(family.getDataBlockEncoding())
1024                                 .withEncryptionContext(cryptoContext)
1025                                 .withCreateTime(EnvironmentEdgeManager.currentTime())
1026                                 .build();
1027     return hFileContext;
1028   }
1029
1030
1031   /*
1032    * Change storeFiles adding into place the Reader produced by this new flush.
1033    * @param sfs Store files
1034    * @param snapshotId
1035    * @throws IOException
1036    * @return Whether compaction is required.
1037    */
1038   private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
1039       throws IOException {
1040     this.lock.writeLock().lock();
1041     try {
1042       this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
1043       if (snapshotId > 0) {
1044         this.memstore.clearSnapshot(snapshotId);
1045       }
1046     } finally {
1047       // We need the lock, as long as we are updating the storeFiles
1048       // or changing the memstore. Let us release it before calling
1049       // notifyChangeReadersObservers. See HBASE-4485 for a possible
1050       // deadlock scenario that could have happened if continue to hold
1051       // the lock.
1052       this.lock.writeLock().unlock();
1053     }
1054     // notify to be called here - only in case of flushes
1055     notifyChangedReadersObservers(sfs);
1056     if (LOG.isTraceEnabled()) {
1057       long totalSize = 0;
1058       for (StoreFile sf : sfs) {
1059         totalSize += sf.getReader().length();
1060       }
1061       String traceMessage = "FLUSH time,count,size,store size,store files ["
1062           + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
1063           + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
1064       LOG.trace(traceMessage);
1065     }
1066     return needsCompaction();
1067   }
1068
1069   /*
1070    * Notify all observers that set of Readers has changed.
1071    * @throws IOException
1072    */
1073   private void notifyChangedReadersObservers(List<StoreFile> sfs) throws IOException {
1074     for (ChangedReadersObserver o : this.changedReaderObservers) {
1075       o.updateReaders(sfs);
1076     }
1077   }
1078
1079   /**
1080    * Get all scanners with no filtering based on TTL (that happens further down
1081    * the line).
1082    * @return all scanners for this store
1083    */
1084   @Override
1085   public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
1086       boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1087       byte[] stopRow, long readPt) throws IOException {
1088     Collection<StoreFile> storeFilesToScan;
1089     List<KeyValueScanner> memStoreScanners;
1090     this.lock.readLock().lock();
1091     try {
1092       storeFilesToScan =
1093           this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
1094       memStoreScanners = this.memstore.getScanners(readPt);
1095     } finally {
1096       this.lock.readLock().unlock();
1097     }
1098
1099     // First the store file scanners
1100
1101     // TODO this used to get the store files in descending order,
1102     // but now we get them in ascending order, which I think is
1103     // actually more correct, since memstore get put at the end.
1104     List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
1105         cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
1106     List<KeyValueScanner> scanners =
1107       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
1108     scanners.addAll(sfScanners);
1109     // Then the memstore scanners
1110     scanners.addAll(memStoreScanners);
1111     return scanners;
1112   }
1113
1114   @Override
1115   public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
1116       boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
1117       byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException {
1118     List<KeyValueScanner> memStoreScanners = null;
1119     if (includeMemstoreScanner) {
1120       this.lock.readLock().lock();
1121       try {
1122         memStoreScanners = this.memstore.getScanners(readPt);
1123       } finally {
1124         this.lock.readLock().unlock();
1125       }
1126     }
1127     List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
1128       cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
1129     List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(sfScanners.size() + 1);
1130     scanners.addAll(sfScanners);
1131     // Then the memstore scanners
1132     if (memStoreScanners != null) {
1133       scanners.addAll(memStoreScanners);
1134     }
1135     return scanners;
1136   }
1137
1138   @Override
1139   public void addChangedReaderObserver(ChangedReadersObserver o) {
1140     this.changedReaderObservers.add(o);
1141   }
1142
1143   @Override
1144   public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1145     // We don't check if observer present; it may not be (legitimately)
1146     this.changedReaderObservers.remove(o);
1147   }
1148
1149   //////////////////////////////////////////////////////////////////////////////
1150   // Compaction
1151   //////////////////////////////////////////////////////////////////////////////
1152
1153   /**
1154    * Compact the StoreFiles.  This method may take some time, so the calling
1155    * thread must be able to block for long periods.
1156    *
1157    * <p>During this time, the Store can work as usual, getting values from
1158    * StoreFiles and writing new StoreFiles from the memstore.
1159    *
1160    * Existing StoreFiles are not destroyed until the new compacted StoreFile is
1161    * completely written-out to disk.
1162    *
1163    * <p>The compactLock prevents multiple simultaneous compactions.
1164    * The structureLock prevents us from interfering with other write operations.
1165    *
1166    * <p>We don't want to hold the structureLock for the whole time, as a compact()
1167    * can be lengthy and we want to allow cache-flushes during this period.
1168    *
1169    * <p> Compaction event should be idempotent, since there is no IO Fencing for
1170    * the region directory in hdfs. A region server might still try to complete the
1171    * compaction after it lost the region. That is why the following events are carefully
1172    * ordered for a compaction:
1173    *  1. Compaction writes new files under region/.tmp directory (compaction output)
1174    *  2. Compaction atomically moves the temporary file under region directory
1175    *  3. Compaction appends a WAL edit containing the compaction input and output files.
1176    *  Forces sync on WAL.
1177    *  4. Compaction deletes the input files from the region directory.
1178    *
1179    * Failure conditions are handled like this:
1180    *  - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes
1181    *  the compaction later, it will only write the new data file to the region directory.
1182    *  Since we already have this data, this will be idempotent but we will have a redundant
1183    *  copy of the data.
1184    *  - If RS fails between 2 and 3, the region will have a redundant copy of the data. The
1185    *  RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL.
1186    *  - If RS fails after 3, the region region server who opens the region will pick up the
1187    *  the compaction marker from the WAL and replay it by removing the compaction input files.
1188    *  Failed RS can also attempt to delete those files, but the operation will be idempotent
1189    *
1190    * See HBASE-2231 for details.
1191    *
1192    * @param compaction compaction details obtained from requestCompaction()
1193    * @throws IOException
1194    * @return Storefile we compacted into or null if we failed or opted out early.
1195    */
1196   @Override
1197   public List<StoreFile> compact(CompactionContext compaction,
1198       ThroughputController throughputController) throws IOException {
1199     return compact(compaction, throughputController, null);
1200   }
1201
1202   @Override
1203   public List<StoreFile> compact(CompactionContext compaction,
1204     ThroughputController throughputController, User user) throws IOException {
1205     assert compaction != null;
1206     List<StoreFile> sfs = null;
1207     CompactionRequest cr = compaction.getRequest();
1208     try {
1209       // Do all sanity checking in here if we have a valid CompactionRequest
1210       // because we need to clean up after it on the way out in a finally
1211       // block below
1212       long compactionStartTime = EnvironmentEdgeManager.currentTime();
1213       assert compaction.hasSelection();
1214       Collection<StoreFile> filesToCompact = cr.getFiles();
1215       assert !filesToCompact.isEmpty();
1216       synchronized (filesCompacting) {
1217         // sanity check: we're compacting files that this store knows about
1218         // TODO: change this to LOG.error() after more debugging
1219         Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1220       }
1221
1222       // Ready to go. Have list of files to compact.
1223       LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
1224           + this + " of " + this.getRegionInfo().getRegionNameAsString()
1225           + " into tmpdir=" + fs.getTempDir() + ", totalSize="
1226           + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
1227
1228       // Commence the compaction.
1229       List<Path> newFiles = compaction.compact(throughputController, user);
1230
1231       long outputBytes = 0L;
1232       // TODO: get rid of this!
1233       if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
1234         LOG.warn("hbase.hstore.compaction.complete is set to false");
1235         sfs = new ArrayList<StoreFile>(newFiles.size());
1236         final boolean evictOnClose =
1237             cacheConf != null? cacheConf.shouldEvictOnClose(): true;
1238         for (Path newFile : newFiles) {
1239           // Create storefile around what we wrote with a reader on it.
1240           StoreFile sf = createStoreFileAndReader(newFile);
1241           sf.closeReader(evictOnClose);
1242           sfs.add(sf);
1243         }
1244         return sfs;
1245       }
1246       // Do the steps necessary to complete the compaction.
1247       sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
1248       writeCompactionWalRecord(filesToCompact, sfs);
1249       replaceStoreFiles(filesToCompact, sfs);
1250       if (cr.isMajor()) {
1251         majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
1252         majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
1253       } else {
1254         compactedCellsCount += getCompactionProgress().totalCompactingKVs;
1255         compactedCellsSize += getCompactionProgress().totalCompactedSize;
1256       }
1257
1258       for (StoreFile sf : sfs) {
1259         outputBytes += sf.getReader().length();
1260       }
1261
1262       // At this point the store will use new files for all new scanners.
1263       completeCompaction(filesToCompact); // update store size.
1264
1265       long now = EnvironmentEdgeManager.currentTime();
1266       if (region.getRegionServerServices() != null
1267           && region.getRegionServerServices().getMetrics() != null) {
1268         region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
1269           now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
1270           outputBytes);
1271       }
1272
1273       logCompactionEndMessage(cr, sfs, now, compactionStartTime);
1274       return sfs;
1275     } finally {
1276       finishCompactionRequest(cr);
1277     }
1278   }
1279
1280   private List<StoreFile> moveCompatedFilesIntoPlace(
1281       final CompactionRequest cr, List<Path> newFiles, User user) throws IOException {
1282     List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());
1283     for (Path newFile : newFiles) {
1284       assert newFile != null;
1285       final StoreFile sf = moveFileIntoPlace(newFile);
1286       if (this.getCoprocessorHost() != null) {
1287         final Store thisStore = this;
1288         getCoprocessorHost().postCompact(thisStore, sf, cr, user);
1289       }
1290       assert sf != null;
1291       sfs.add(sf);
1292     }
1293     return sfs;
1294   }
1295
1296   // Package-visible for tests
1297   StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
1298     validateStoreFile(newFile);
1299     // Move the file into the right spot
1300     Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1301     return createStoreFileAndReader(destPath);
1302   }
1303
1304   /**
1305    * Writes the compaction WAL record.
1306    * @param filesCompacted Files compacted (input).
1307    * @param newFiles Files from compaction.
1308    */
1309   private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
1310       Collection<StoreFile> newFiles) throws IOException {
1311     if (region.getWAL() == null) return;
1312     List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());
1313     for (StoreFile f : filesCompacted) {
1314       inputPaths.add(f.getPath());
1315     }
1316     List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
1317     for (StoreFile f : newFiles) {
1318       outputPaths.add(f.getPath());
1319     }
1320     HRegionInfo info = this.region.getRegionInfo();
1321     CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1322         family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
1323     // Fix reaching into Region to get the maxWaitForSeqId.
1324     // Does this method belong in Region altogether given it is making so many references up there?
1325     // Could be Region#writeCompactionMarker(compactionDescriptor);
1326     WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
1327         this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
1328   }
1329
1330   @VisibleForTesting
1331   void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
1332       final Collection<StoreFile> result) throws IOException {
1333     this.lock.writeLock().lock();
1334     try {
1335       this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1336       filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock();
1337     } finally {
1338       this.lock.writeLock().unlock();
1339     }
1340   }
1341
1342   /**
1343    * Log a very elaborate compaction completion message.
1344    * @param cr Request.
1345    * @param sfs Resulting files.
1346    * @param compactionStartTime Start time.
1347    */
1348   private void logCompactionEndMessage(
1349       CompactionRequest cr, List<StoreFile> sfs, long now, long compactionStartTime) {
1350     StringBuilder message = new StringBuilder(
1351       "Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
1352       + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
1353       + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into ");
1354     if (sfs.isEmpty()) {
1355       message.append("none, ");
1356     } else {
1357       for (StoreFile sf: sfs) {
1358         message.append(sf.getPath().getName());
1359         message.append("(size=");
1360         message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
1361         message.append("), ");
1362       }
1363     }
1364     message.append("total size for store is ")
1365       .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize, "", 1))
1366       .append(". This selection was in queue for ")
1367       .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1368       .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1369       .append(" to execute.");
1370     LOG.info(message.toString());
1371     if (LOG.isTraceEnabled()) {
1372       int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1373       long resultSize = 0;
1374       for (StoreFile sf : sfs) {
1375         resultSize += sf.getReader().length();
1376       }
1377       String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1378         + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1379           + cr.getFiles().size() + "," + sfs.size() + "," +  storeSize + "," + fileCount + "]";
1380       LOG.trace(traceMessage);
1381     }
1382   }
1383
1384   /**
1385    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
1386    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
1387    * See HBASE-2231.
1388    * @param compaction
1389    */
1390   @Override
1391   public void replayCompactionMarker(CompactionDescriptor compaction,
1392       boolean pickCompactionFiles, boolean removeFiles)
1393       throws IOException {
1394     LOG.debug("Completing compaction from the WAL marker");
1395     List<String> compactionInputs = compaction.getCompactionInputList();
1396     List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
1397
1398     // The Compaction Marker is written after the compaction is completed,
1399     // and the files moved into the region/family folder.
1400     //
1401     // If we crash after the entry is written, we may not have removed the
1402     // input files, but the output file is present.
1403     // (The unremoved input files will be removed by this function)
1404     //
1405     // If we scan the directory and the file is not present, it can mean that:
1406     //   - The file was manually removed by the user
1407     //   - The file was removed as consequence of subsequent compaction
1408     // so, we can't do anything with the "compaction output list" because those
1409     // files have already been loaded when opening the region (by virtue of
1410     // being in the store's folder) or they may be missing due to a compaction.
1411
1412     String familyName = this.getColumnFamilyName();
1413     List<String> inputFiles = new ArrayList<String>(compactionInputs.size());
1414     for (String compactionInput : compactionInputs) {
1415       Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
1416       inputFiles.add(inputPath.getName());
1417     }
1418
1419     //some of the input files might already be deleted
1420     List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
1421     for (StoreFile sf : this.getStorefiles()) {
1422       if (inputFiles.contains(sf.getPath().getName())) {
1423         inputStoreFiles.add(sf);
1424       }
1425     }
1426
1427     // check whether we need to pick up the new files
1428     List<StoreFile> outputStoreFiles = new ArrayList<StoreFile>(compactionOutputs.size());
1429
1430     if (pickCompactionFiles) {
1431       for (StoreFile sf : this.getStorefiles()) {
1432         compactionOutputs.remove(sf.getPath().getName());
1433       }
1434       for (String compactionOutput : compactionOutputs) {
1435         StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput);
1436         StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
1437         outputStoreFiles.add(storeFile);
1438       }
1439     }
1440
1441     if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
1442       LOG.info("Replaying compaction marker, replacing input files: " +
1443           inputStoreFiles + " with output files : " + outputStoreFiles);
1444       this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
1445       this.completeCompaction(inputStoreFiles);
1446     }
1447   }
1448
1449   /**
1450    * This method tries to compact N recent files for testing.
1451    * Note that because compacting "recent" files only makes sense for some policies,
1452    * e.g. the default one, it assumes default policy is used. It doesn't use policy,
1453    * but instead makes a compaction candidate list by itself.
1454    * @param N Number of files.
1455    */
1456   public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1457     List<StoreFile> filesToCompact;
1458     boolean isMajor;
1459
1460     this.lock.readLock().lock();
1461     try {
1462       synchronized (filesCompacting) {
1463         filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1464         if (!filesCompacting.isEmpty()) {
1465           // exclude all files older than the newest file we're currently
1466           // compacting. this allows us to preserve contiguity (HBASE-2856)
1467           StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1468           int idx = filesToCompact.indexOf(last);
1469           Preconditions.checkArgument(idx != -1);
1470           filesToCompact.subList(0, idx + 1).clear();
1471         }
1472         int count = filesToCompact.size();
1473         if (N > count) {
1474           throw new RuntimeException("Not enough files");
1475         }
1476
1477         filesToCompact = filesToCompact.subList(count - N, count);
1478         isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1479         filesCompacting.addAll(filesToCompact);
1480         Collections.sort(filesCompacting, storeEngine.getStoreFileManager()
1481             .getStoreFileComparator());
1482       }
1483     } finally {
1484       this.lock.readLock().unlock();
1485     }
1486
1487     try {
1488       // Ready to go. Have list of files to compact.
1489       List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
1490           .compactForTesting(filesToCompact, isMajor);
1491       for (Path newFile: newFiles) {
1492         // Move the compaction into place.
1493         StoreFile sf = moveFileIntoPlace(newFile);
1494         if (this.getCoprocessorHost() != null) {
1495           this.getCoprocessorHost().postCompact(this, sf, null, null);
1496         }
1497         replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
1498         completeCompaction(filesToCompact);
1499       }
1500     } finally {
1501       synchronized (filesCompacting) {
1502         filesCompacting.removeAll(filesToCompact);
1503       }
1504     }
1505   }
1506
1507   @Override
1508   public boolean hasReferences() {
1509     return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
1510   }
1511
1512   @Override
1513   public CompactionProgress getCompactionProgress() {
1514     return this.storeEngine.getCompactor().getProgress();
1515   }
1516
1517   @Override
1518   public boolean isMajorCompaction() throws IOException {
1519     for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1520       // TODO: what are these reader checks all over the place?
1521       if (sf.getReader() == null) {
1522         LOG.debug("StoreFile " + sf + " has null Reader");
1523         return false;
1524       }
1525     }
1526     return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction(
1527         this.storeEngine.getStoreFileManager().getStorefiles());
1528   }
1529
1530   @Override
1531   public CompactionContext requestCompaction() throws IOException {
1532     return requestCompaction(Store.NO_PRIORITY, null);
1533   }
1534
1535   @Override
1536   public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
1537       throws IOException {
1538     return requestCompaction(priority, baseRequest, null);
1539   }
1540   @Override
1541   public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
1542       User user) throws IOException {
1543     // don't even select for compaction if writes are disabled
1544     if (!this.areWritesEnabled()) {
1545       return null;
1546     }
1547
1548     // Before we do compaction, try to get rid of unneeded files to simplify things.
1549     removeUnneededFiles();
1550
1551     final CompactionContext compaction = storeEngine.createCompaction();
1552     CompactionRequest request = null;
1553     this.lock.readLock().lock();
1554     try {
1555       synchronized (filesCompacting) {
1556         // First, see if coprocessor would want to override selection.
1557         if (this.getCoprocessorHost() != null) {
1558           final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1559           boolean override = false;
1560           override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
1561               baseRequest, user);
1562           if (override) {
1563             // Coprocessor is overriding normal file selection.
1564             compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
1565           }
1566         }
1567
1568         // Normal case - coprocessor is not overriding file selection.
1569         if (!compaction.hasSelection()) {
1570           boolean isUserCompaction = priority == Store.PRIORITY_USER;
1571           boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1572               offPeakCompactionTracker.compareAndSet(false, true);
1573           try {
1574             compaction.select(this.filesCompacting, isUserCompaction,
1575               mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1576           } catch (IOException e) {
1577             if (mayUseOffPeak) {
1578               offPeakCompactionTracker.set(false);
1579             }
1580             throw e;
1581           }
1582           assert compaction.hasSelection();
1583           if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1584             // Compaction policy doesn't want to take advantage of off-peak.
1585             offPeakCompactionTracker.set(false);
1586           }
1587         }
1588         if (this.getCoprocessorHost() != null) {
1589           this.getCoprocessorHost().postCompactSelection(
1590               this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest, user);
1591         }
1592
1593         // Selected files; see if we have a compaction with some custom base request.
1594         if (baseRequest != null) {
1595           // Update the request with what the system thinks the request should be;
1596           // its up to the request if it wants to listen.
1597           compaction.forceSelect(
1598               baseRequest.combineWith(compaction.getRequest()));
1599         }
1600         // Finally, we have the resulting files list. Check if we have any files at all.
1601         request = compaction.getRequest();
1602         final Collection<StoreFile> selectedFiles = request.getFiles();
1603         if (selectedFiles.isEmpty()) {
1604           return null;
1605         }
1606
1607         addToCompactingFiles(selectedFiles);
1608
1609         // If we're enqueuing a major, clear the force flag.
1610         this.forceMajor = this.forceMajor && !request.isMajor();
1611
1612         // Set common request properties.
1613         // Set priority, either override value supplied by caller or from store.
1614         request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1615         request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1616       }
1617     } finally {
1618       this.lock.readLock().unlock();
1619     }
1620
1621     LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
1622         + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1623         + (request.isAllFiles() ? " (all files)" : ""));
1624     this.region.reportCompactionRequestStart(request.isMajor());
1625     return compaction;
1626   }
1627
1628   /** Adds the files to compacting files. filesCompacting must be locked. */
1629   private void addToCompactingFiles(final Collection<StoreFile> filesToAdd) {
1630     if (filesToAdd == null) return;
1631     // Check that we do not try to compact the same StoreFile twice.
1632     if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1633       Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1634     }
1635     filesCompacting.addAll(filesToAdd);
1636     Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());
1637   }
1638
1639   private void removeUnneededFiles() throws IOException {
1640     if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
1641     if (getFamily().getMinVersions() > 0) {
1642       LOG.debug("Skipping expired store file removal due to min version being " +
1643           getFamily().getMinVersions());
1644       return;
1645     }
1646     this.lock.readLock().lock();
1647     Collection<StoreFile> delSfs = null;
1648     try {
1649       synchronized (filesCompacting) {
1650         long cfTtl = getStoreFileTtl();
1651         if (cfTtl != Long.MAX_VALUE) {
1652           delSfs = storeEngine.getStoreFileManager().getUnneededFiles(
1653               EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
1654           addToCompactingFiles(delSfs);
1655         }
1656       }
1657     } finally {
1658       this.lock.readLock().unlock();
1659     }
1660     if (delSfs == null || delSfs.isEmpty()) return;
1661
1662     Collection<StoreFile> newFiles = new ArrayList<StoreFile>(); // No new files.
1663     writeCompactionWalRecord(delSfs, newFiles);
1664     replaceStoreFiles(delSfs, newFiles);
1665     completeCompaction(delSfs);
1666     LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
1667         + this + " of " + this.getRegionInfo().getRegionNameAsString()
1668         + "; total size for store is " + TraditionalBinaryPrefix.long2String(storeSize, "", 1));
1669   }
1670
1671   @Override
1672   public void cancelRequestedCompaction(CompactionContext compaction) {
1673     finishCompactionRequest(compaction.getRequest());
1674   }
1675
1676   private void finishCompactionRequest(CompactionRequest cr) {
1677     this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1678     if (cr.isOffPeak()) {
1679       offPeakCompactionTracker.set(false);
1680       cr.setOffPeak(false);
1681     }
1682     synchronized (filesCompacting) {
1683       filesCompacting.removeAll(cr.getFiles());
1684     }
1685   }
1686
1687   /**
1688    * Validates a store file by opening and closing it. In HFileV2 this should
1689    * not be an expensive operation.
1690    *
1691    * @param path the path to the store file
1692    */
1693   private void validateStoreFile(Path path)
1694       throws IOException {
1695     StoreFile storeFile = null;
1696     try {
1697       storeFile = createStoreFileAndReader(path);
1698     } catch (IOException e) {
1699       LOG.error("Failed to open store file : " + path
1700           + ", keeping it in tmp location", e);
1701       throw e;
1702     } finally {
1703       if (storeFile != null) {
1704         storeFile.closeReader(false);
1705       }
1706     }
1707   }
1708
1709   /**
1710    * <p>It works by processing a compaction that's been written to disk.
1711    *
1712    * <p>It is usually invoked at the end of a compaction, but might also be
1713    * invoked at HStore startup, if the prior execution died midway through.
1714    *
1715    * <p>Moving the compacted TreeMap into place means:
1716    * <pre>
1717    * 1) Unload all replaced StoreFile, close and collect list to delete.
1718    * 2) Compute new store size
1719    * </pre>
1720    *
1721    * @param compactedFiles list of files that were compacted
1722    */
1723   @VisibleForTesting
1724   protected void completeCompaction(final Collection<StoreFile> compactedFiles)
1725     throws IOException {
1726     LOG.debug("Completing compaction...");
1727     this.storeSize = 0L;
1728     this.totalUncompressedBytes = 0L;
1729     for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1730       StoreFileReader r = hsf.getReader();
1731       if (r == null) {
1732         LOG.warn("StoreFile " + hsf + " has a null Reader");
1733         continue;
1734       }
1735       this.storeSize += r.length();
1736       this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1737     }
1738   }
1739
1740   /*
1741    * @param wantedVersions How many versions were asked for.
1742    * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1743    */
1744   int versionsToReturn(final int wantedVersions) {
1745     if (wantedVersions <= 0) {
1746       throw new IllegalArgumentException("Number of versions must be > 0");
1747     }
1748     // Make sure we do not return more than maximum versions for this store.
1749     int maxVersions = this.family.getMaxVersions();
1750     return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1751   }
1752
1753   @Override
1754   public boolean canSplit() {
1755     this.lock.readLock().lock();
1756     try {
1757       // Not split-able if we find a reference store file present in the store.
1758       boolean result = !hasReferences();
1759       if (!result) {
1760           if (LOG.isTraceEnabled()) {
1761             LOG.trace("Not splittable; has references: " + this);
1762           }
1763       }
1764       return result;
1765     } finally {
1766       this.lock.readLock().unlock();
1767     }
1768   }
1769
1770   @Override
1771   public byte[] getSplitPoint() {
1772     this.lock.readLock().lock();
1773     try {
1774       // Should already be enforced by the split policy!
1775       assert !this.getRegionInfo().isMetaRegion();
1776       // Not split-able if we find a reference store file present in the store.
1777       if (hasReferences()) {
1778         if (LOG.isTraceEnabled()) {
1779           LOG.trace("Not splittable; has references: " + this);
1780         }
1781         return null;
1782       }
1783       return this.storeEngine.getStoreFileManager().getSplitPoint();
1784     } catch(IOException e) {
1785       LOG.warn("Failed getting store size for " + this, e);
1786     } finally {
1787       this.lock.readLock().unlock();
1788     }
1789     return null;
1790   }
1791
1792   @Override
1793   public long getLastCompactSize() {
1794     return this.lastCompactSize;
1795   }
1796
1797   @Override
1798   public long getSize() {
1799     return storeSize;
1800   }
1801
1802   @Override
1803   public void triggerMajorCompaction() {
1804     this.forceMajor = true;
1805   }
1806
1807
1808   //////////////////////////////////////////////////////////////////////////////
1809   // File administration
1810   //////////////////////////////////////////////////////////////////////////////
1811
1812   @Override
1813   public KeyValueScanner getScanner(Scan scan,
1814       final NavigableSet<byte []> targetCols, long readPt) throws IOException {
1815     lock.readLock().lock();
1816     try {
1817       KeyValueScanner scanner = null;
1818       if (this.getCoprocessorHost() != null) {
1819         scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols, readPt);
1820       }
1821       scanner = createScanner(scan, targetCols, readPt, scanner);
1822       return scanner;
1823     } finally {
1824       lock.readLock().unlock();
1825     }
1826   }
1827
1828   protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
1829       long readPt, KeyValueScanner scanner) throws IOException {
1830     if (scanner == null) {
1831       scanner = scan.isReversed() ? new ReversedStoreScanner(this,
1832           getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
1833           getScanInfo(), scan, targetCols, readPt);
1834     }
1835     return scanner;
1836   }
1837
1838   @Override
1839   public String toString() {
1840     return this.getColumnFamilyName();
1841   }
1842
1843   @Override
1844   public int getStorefilesCount() {
1845     return this.storeEngine.getStoreFileManager().getStorefileCount();
1846   }
1847
1848   @Override
1849   public long getMaxStoreFileAge() {
1850     long earliestTS = Long.MAX_VALUE;
1851     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1852       StoreFileReader r = s.getReader();
1853       if (r == null) {
1854         LOG.warn("StoreFile " + s + " has a null Reader");
1855         continue;
1856       }
1857       if (!s.isHFile()) {
1858         continue;
1859       }
1860       long createdTS = s.getFileInfo().getCreatedTimestamp();
1861       earliestTS = (createdTS < earliestTS) ? createdTS : earliestTS;
1862     }
1863     long now = EnvironmentEdgeManager.currentTime();
1864     return now - earliestTS;
1865   }
1866
1867   @Override
1868   public long getMinStoreFileAge() {
1869     long latestTS = 0;
1870     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1871       StoreFileReader r = s.getReader();
1872       if (r == null) {
1873         LOG.warn("StoreFile " + s + " has a null Reader");
1874         continue;
1875       }
1876       if (!s.isHFile()) {
1877         continue;
1878       }
1879       long createdTS = s.getFileInfo().getCreatedTimestamp();
1880       latestTS = (createdTS > latestTS) ? createdTS : latestTS;
1881     }
1882     long now = EnvironmentEdgeManager.currentTime();
1883     return now - latestTS;
1884   }
1885
1886   @Override
1887   public long getAvgStoreFileAge() {
1888     long sum = 0, count = 0;
1889     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1890       StoreFileReader r = s.getReader();
1891       if (r == null) {
1892         LOG.warn("StoreFile " + s + " has a null Reader");
1893         continue;
1894       }
1895       if (!s.isHFile()) {
1896         continue;
1897       }
1898       sum += s.getFileInfo().getCreatedTimestamp();
1899       count++;
1900     }
1901     if (count == 0) {
1902       return 0;
1903     }
1904     long avgTS = sum / count;
1905     long now = EnvironmentEdgeManager.currentTime();
1906     return now - avgTS;
1907   }
1908
1909   @Override
1910   public long getNumReferenceFiles() {
1911     long numRefFiles = 0;
1912     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1913       if (s.isReference()) {
1914         numRefFiles++;
1915       }
1916     }
1917     return numRefFiles;
1918   }
1919
1920   @Override
1921   public long getNumHFiles() {
1922     long numHFiles = 0;
1923     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1924       if (s.isHFile()) {
1925         numHFiles++;
1926       }
1927     }
1928     return numHFiles;
1929   }
1930
1931   @Override
1932   public long getStoreSizeUncompressed() {
1933     return this.totalUncompressedBytes;
1934   }
1935
1936   @Override
1937   public long getStorefilesSize() {
1938     long size = 0;
1939     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1940       StoreFileReader r = s.getReader();
1941       if (r == null) {
1942         LOG.warn("StoreFile " + s + " has a null Reader");
1943         continue;
1944       }
1945       size += r.length();
1946     }
1947     return size;
1948   }
1949
1950   @Override
1951   public long getStorefilesIndexSize() {
1952     long size = 0;
1953     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1954       StoreFileReader r = s.getReader();
1955       if (r == null) {
1956         LOG.warn("StoreFile " + s + " has a null Reader");
1957         continue;
1958       }
1959       size += r.indexSize();
1960     }
1961     return size;
1962   }
1963
1964   @Override
1965   public long getTotalStaticIndexSize() {
1966     long size = 0;
1967     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1968       StoreFileReader r = s.getReader();
1969       if (r == null) {
1970         continue;
1971       }
1972       size += r.getUncompressedDataIndexSize();
1973     }
1974     return size;
1975   }
1976
1977   @Override
1978   public long getTotalStaticBloomSize() {
1979     long size = 0;
1980     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1981       StoreFileReader r = s.getReader();
1982       if (r == null) {
1983         continue;
1984       }
1985       size += r.getTotalBloomSize();
1986     }
1987     return size;
1988   }
1989
1990   @Override
1991   public long getMemStoreSize() {
1992     return this.memstore.size();
1993   }
1994
1995   @Override
1996   public int getCompactPriority() {
1997     int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
1998     if (priority == PRIORITY_USER) {
1999       LOG.warn("Compaction priority is USER despite there being no user compaction");
2000     }
2001     return priority;
2002   }
2003
2004   @Override
2005   public boolean throttleCompaction(long compactionSize) {
2006     return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
2007   }
2008
2009   public HRegion getHRegion() {
2010     return this.region;
2011   }
2012
2013   @Override
2014   public RegionCoprocessorHost getCoprocessorHost() {
2015     return this.region.getCoprocessorHost();
2016   }
2017
2018   @Override
2019   public HRegionInfo getRegionInfo() {
2020     return this.fs.getRegionInfo();
2021   }
2022
2023   @Override
2024   public boolean areWritesEnabled() {
2025     return this.region.areWritesEnabled();
2026   }
2027
2028   @Override
2029   public long getSmallestReadPoint() {
2030     return this.region.getSmallestReadPoint();
2031   }
2032
2033   /**
2034    * Updates the value for the given row/family/qualifier. This function will always be seen as
2035    * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
2036    * control necessary.
2037    * @param row row to update
2038    * @param f family to update
2039    * @param qualifier qualifier to update
2040    * @param newValue the new value to set into memstore
2041    * @return memstore size delta
2042    * @throws IOException
2043    */
2044   @VisibleForTesting
2045   public long updateColumnValue(byte [] row, byte [] f,
2046                                 byte [] qualifier, long newValue)
2047       throws IOException {
2048
2049     this.lock.readLock().lock();
2050     try {
2051       long now = EnvironmentEdgeManager.currentTime();
2052
2053       return this.memstore.updateColumnValue(row,
2054           f,
2055           qualifier,
2056           newValue,
2057           now);
2058
2059     } finally {
2060       this.lock.readLock().unlock();
2061     }
2062   }
2063
2064   @Override
2065   public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
2066     this.lock.readLock().lock();
2067     try {
2068       return this.memstore.upsert(cells, readpoint);
2069     } finally {
2070       this.lock.readLock().unlock();
2071     }
2072   }
2073
2074   @Override
2075   public StoreFlushContext createFlushContext(long cacheFlushId) {
2076     return new StoreFlusherImpl(cacheFlushId);
2077   }
2078
2079   private final class StoreFlusherImpl implements StoreFlushContext {
2080
2081     private long cacheFlushSeqNum;
2082     private MemStoreSnapshot snapshot;
2083     private List<Path> tempFiles;
2084     private List<Path> committedFiles;
2085     private long cacheFlushCount;
2086     private long cacheFlushSize;
2087     private long outputFileSize;
2088
2089     private StoreFlusherImpl(long cacheFlushSeqNum) {
2090       this.cacheFlushSeqNum = cacheFlushSeqNum;
2091     }
2092
2093     /**
2094      * This is not thread safe. The caller should have a lock on the region or the store.
2095      * If necessary, the lock can be added with the patch provided in HBASE-10087
2096      */
2097     @Override
2098     public void prepare() {
2099       // passing the current sequence number of the wal - to allow bookkeeping in the memstore
2100       this.snapshot = memstore.snapshot();
2101       this.cacheFlushCount = snapshot.getCellsCount();
2102       this.cacheFlushSize = snapshot.getSize();
2103       committedFiles = new ArrayList<Path>(1);
2104     }
2105
2106     @Override
2107     public void flushCache(MonitoredTask status) throws IOException {
2108       RegionServerServices rsService = region.getRegionServerServices();
2109       ThroughputController throughputController =
2110           rsService == null ? null : rsService.getFlushThroughputController();
2111       tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController);
2112     }
2113
2114     @Override
2115     public boolean commit(MonitoredTask status) throws IOException {
2116       if (this.tempFiles == null || this.tempFiles.isEmpty()) {
2117         return false;
2118       }
2119       List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
2120       for (Path storeFilePath : tempFiles) {
2121         try {
2122           StoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status);
2123           outputFileSize += sf.getReader().length();
2124           storeFiles.add(sf);
2125         } catch (IOException ex) {
2126           LOG.error("Failed to commit store file " + storeFilePath, ex);
2127           // Try to delete the files we have committed before.
2128           for (StoreFile sf : storeFiles) {
2129             Path pathToDelete = sf.getPath();
2130             try {
2131               sf.deleteReader();
2132             } catch (IOException deleteEx) {
2133               LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
2134               Runtime.getRuntime().halt(1);
2135             }
2136           }
2137           throw new IOException("Failed to commit the flush", ex);
2138         }
2139       }
2140
2141       for (StoreFile sf : storeFiles) {
2142         if (HStore.this.getCoprocessorHost() != null) {
2143           HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
2144         }
2145         committedFiles.add(sf.getPath());
2146       }
2147
2148       HStore.this.flushedCellsCount += cacheFlushCount;
2149       HStore.this.flushedCellsSize += cacheFlushSize;
2150       HStore.this.flushedOutputFileSize += outputFileSize;
2151
2152       // Add new file to store files.  Clear snapshot too while we have the Store write lock.
2153       return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
2154     }
2155
2156     @Override
2157     public long getOutputFileSize() {
2158       return outputFileSize;
2159     }
2160
2161     @Override
2162     public List<Path> getCommittedFiles() {
2163       return committedFiles;
2164     }
2165
2166     /**
2167      * Similar to commit, but called in secondary region replicas for replaying the
2168      * flush cache from primary region. Adds the new files to the store, and drops the
2169      * snapshot depending on dropMemstoreSnapshot argument.
2170      * @param fileNames names of the flushed files
2171      * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot
2172      * @throws IOException
2173      */
2174     @Override
2175     public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
2176         throws IOException {
2177       List<StoreFile> storeFiles = new ArrayList<StoreFile>(fileNames.size());
2178       for (String file : fileNames) {
2179         // open the file as a store file (hfile link, etc)
2180         StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
2181         StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
2182         storeFiles.add(storeFile);
2183         HStore.this.storeSize += storeFile.getReader().length();
2184         HStore.this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
2185         if (LOG.isInfoEnabled()) {
2186           LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() +
2187             " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() +
2188             ", sequenceid=" +  + storeFile.getReader().getSequenceID() +
2189             ", filesize=" + StringUtils.humanReadableInt(storeFile.getReader().length()));
2190         }
2191       }
2192
2193       long snapshotId = -1; // -1 means do not drop
2194       if (dropMemstoreSnapshot && snapshot != null) {
2195         snapshotId = snapshot.getId();
2196       }
2197       HStore.this.updateStorefiles(storeFiles, snapshotId);
2198     }
2199
2200     /**
2201      * Abort the snapshot preparation. Drops the snapshot if any.
2202      * @throws IOException
2203      */
2204     @Override
2205     public void abort() throws IOException {
2206       if (snapshot == null) {
2207         return;
2208       }
2209       HStore.this.updateStorefiles(new ArrayList<StoreFile>(0), snapshot.getId());
2210     }
2211   }
2212
2213   @Override
2214   public boolean needsCompaction() {
2215     return this.storeEngine.needsCompaction(this.filesCompacting);
2216   }
2217
2218   @Override
2219   public CacheConfig getCacheConfig() {
2220     return this.cacheConf;
2221   }
2222
2223   public static final long FIXED_OVERHEAD =
2224       ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (11 * Bytes.SIZEOF_LONG)
2225               + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
2226
2227   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
2228       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
2229       + ClassSize.CONCURRENT_SKIPLISTMAP
2230       + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
2231       + ScanInfo.FIXED_OVERHEAD);
2232
2233   @Override
2234   public long heapSize() {
2235     return DEEP_OVERHEAD + this.memstore.heapSize();
2236   }
2237
2238   @Override
2239   public CellComparator getComparator() {
2240     return comparator;
2241   }
2242
2243   @Override
2244   public ScanInfo getScanInfo() {
2245     return scanInfo;
2246   }
2247
2248   /**
2249    * Set scan info, used by test
2250    * @param scanInfo new scan info to use for test
2251    */
2252   void setScanInfo(ScanInfo scanInfo) {
2253     this.scanInfo = scanInfo;
2254   }
2255
2256   @Override
2257   public boolean hasTooManyStoreFiles() {
2258     return getStorefilesCount() > this.blockingFileCount;
2259   }
2260
2261   @Override
2262   public long getFlushedCellsCount() {
2263     return flushedCellsCount;
2264   }
2265
2266   @Override
2267   public long getFlushedCellsSize() {
2268     return flushedCellsSize;
2269   }
2270
2271   @Override
2272   public long getFlushedOutputFileSize() {
2273     return flushedOutputFileSize;
2274   }
2275
2276   @Override
2277   public long getCompactedCellsCount() {
2278     return compactedCellsCount;
2279   }
2280
2281   @Override
2282   public long getCompactedCellsSize() {
2283     return compactedCellsSize;
2284   }
2285
2286   @Override
2287   public long getMajorCompactedCellsCount() {
2288     return majorCompactedCellsCount;
2289   }
2290
2291   @Override
2292   public long getMajorCompactedCellsSize() {
2293     return majorCompactedCellsSize;
2294   }
2295
2296   /**
2297    * Returns the StoreEngine that is backing this concrete implementation of Store.
2298    * @return Returns the {@link StoreEngine} object used internally inside this HStore object.
2299    */
2300   @VisibleForTesting
2301   public StoreEngine<?, ?, ?, ?> getStoreEngine() {
2302     return this.storeEngine;
2303   }
2304
2305   protected OffPeakHours getOffPeakHours() {
2306     return this.offPeakHours;
2307   }
2308
2309   /**
2310    * {@inheritDoc}
2311    */
2312   @Override
2313   public void onConfigurationChange(Configuration conf) {
2314     this.conf = new CompoundConfiguration()
2315             .add(conf)
2316             .addBytesMap(family.getValues());
2317     this.storeEngine.compactionPolicy.setConf(conf);
2318     this.offPeakHours = OffPeakHours.getInstance(conf);
2319   }
2320
2321   /**
2322    * {@inheritDoc}
2323    */
2324   @Override
2325   public void registerChildren(ConfigurationManager manager) {
2326     // No children to register
2327   }
2328
2329   /**
2330    * {@inheritDoc}
2331    */
2332   @Override
2333   public void deregisterChildren(ConfigurationManager manager) {
2334     // No children to deregister
2335   }
2336
2337   @Override
2338   public double getCompactionPressure() {
2339     return storeEngine.getStoreFileManager().getCompactionPressure();
2340   }
2341
2342   @Override
2343   public boolean isPrimaryReplicaStore() {
2344 	   return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
2345   }
2346
2347   @Override
2348   public void closeAndArchiveCompactedFiles() throws IOException {
2349     lock.readLock().lock();
2350     Collection<StoreFile> copyCompactedfiles = null;
2351     try {
2352       Collection<StoreFile> compactedfiles =
2353           this.getStoreEngine().getStoreFileManager().getCompactedfiles();
2354       if (compactedfiles != null && compactedfiles.size() != 0) {
2355         // Do a copy under read lock
2356         copyCompactedfiles = new ArrayList<StoreFile>(compactedfiles);
2357       } else {
2358         if (LOG.isTraceEnabled()) {
2359           LOG.trace("No compacted files to archive");
2360           return;
2361         }
2362       }
2363     } finally {
2364       lock.readLock().unlock();
2365     }
2366     if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
2367       removeCompactedfiles(copyCompactedfiles);
2368     }
2369   }
2370
2371   /**
2372    * Archives and removes the compacted files
2373    * @param compactedfiles The compacted files in this store that are not active in reads
2374    * @throws IOException
2375    */
2376   private void removeCompactedfiles(Collection<StoreFile> compactedfiles)
2377       throws IOException {
2378     final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
2379     for (final StoreFile file : compactedfiles) {
2380       synchronized (file) {
2381         try {
2382           StoreFileReader r = file.getReader();
2383           if (r == null) {
2384             if (LOG.isDebugEnabled()) {
2385               LOG.debug("The file " + file + " was closed but still not archived.");
2386             }
2387             filesToRemove.add(file);
2388           }
2389           if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
2390             // Even if deleting fails we need not bother as any new scanners won't be
2391             // able to use the compacted file as the status is already compactedAway
2392             if (LOG.isTraceEnabled()) {
2393               LOG.trace("Closing and archiving the file " + file.getPath());
2394             }
2395             r.close(true);
2396             // Just close and return
2397             filesToRemove.add(file);
2398           }
2399         } catch (Exception e) {
2400           LOG.error(
2401             "Exception while trying to close the compacted store file " + file.getPath().getName());
2402         }
2403       }
2404     }
2405     if (this.isPrimaryReplicaStore()) {
2406       // Only the primary region is allowed to move the file to archive.
2407       // The secondary region does not move the files to archive. Any active reads from
2408       // the secondary region will still work because the file as such has active readers on it.
2409       if (!filesToRemove.isEmpty()) {
2410         if (LOG.isDebugEnabled()) {
2411           LOG.debug("Moving the files " + filesToRemove + " to archive");
2412         }
2413         // Only if this is successful it has to be removed
2414         this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToRemove);
2415       }
2416     }
2417     if (!filesToRemove.isEmpty()) {
2418       // Clear the compactedfiles from the store file manager
2419       clearCompactedfiles(filesToRemove);
2420     }
2421   }
2422
2423   @Override public void finalizeFlush() {
2424     memstore.finalizeFlush();
2425   }
2426
2427   @Override public MemStore getMemStore() {
2428     return memstore;
2429   }
2430
2431   private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException {
2432     if (LOG.isTraceEnabled()) {
2433       LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");
2434     }
2435     try {
2436       lock.writeLock().lock();
2437       this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);
2438     } finally {
2439       lock.writeLock().unlock();
2440     }
2441   }
2442 }