View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.base.Preconditions;
23  import com.google.common.collect.ImmutableCollection;
24  import com.google.common.collect.ImmutableList;
25  import com.google.common.collect.Lists;
26  import com.google.common.collect.Sets;
27  
28  import java.io.IOException;
29  import java.io.InterruptedIOException;
30  import java.net.InetSocketAddress;
31  import java.security.PrivilegedExceptionAction;
32  import java.util.ArrayList;
33  import java.util.Collection;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.HashSet;
37  import java.util.Iterator;
38  import java.util.List;
39  import java.util.NavigableSet;
40  import java.util.Set;
41  import java.util.concurrent.Callable;
42  import java.util.concurrent.CompletionService;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.concurrent.ExecutionException;
45  import java.util.concurrent.ExecutorCompletionService;
46  import java.util.concurrent.Future;
47  import java.util.concurrent.ThreadPoolExecutor;
48  import java.util.concurrent.atomic.AtomicBoolean;
49  import java.util.concurrent.locks.ReentrantReadWriteLock;
50  
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.hbase.Cell;
57  import org.apache.hadoop.hbase.CellComparator;
58  import org.apache.hadoop.hbase.CellUtil;
59  import org.apache.hadoop.hbase.CompoundConfiguration;
60  import org.apache.hadoop.hbase.HColumnDescriptor;
61  import org.apache.hadoop.hbase.HConstants;
62  import org.apache.hadoop.hbase.HRegionInfo;
63  import org.apache.hadoop.hbase.KeyValue;
64  import org.apache.hadoop.hbase.TableName;
65  import org.apache.hadoop.hbase.Tag;
66  import org.apache.hadoop.hbase.TagType;
67  import org.apache.hadoop.hbase.TagUtil;
68  import org.apache.hadoop.hbase.classification.InterfaceAudience;
69  import org.apache.hadoop.hbase.client.Scan;
70  import org.apache.hadoop.hbase.conf.ConfigurationManager;
71  import org.apache.hadoop.hbase.io.compress.Compression;
72  import org.apache.hadoop.hbase.io.crypto.Encryption;
73  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
74  import org.apache.hadoop.hbase.io.hfile.HFile;
75  import org.apache.hadoop.hbase.io.hfile.HFileContext;
76  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
77  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
78  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
79  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
80  import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
81  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
82  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
83  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
84  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
85  import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
86  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
87  import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
88  import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
89  import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
90  import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
91  import org.apache.hadoop.hbase.security.EncryptionUtil;
92  import org.apache.hadoop.hbase.security.User;
93  import org.apache.hadoop.hbase.util.Bytes;
94  import org.apache.hadoop.hbase.util.ChecksumType;
95  import org.apache.hadoop.hbase.util.ClassSize;
96  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
97  import org.apache.hadoop.hbase.util.ReflectionUtils;
98  import org.apache.hadoop.util.StringUtils;
99  import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
100 
101 /**
102  * A Store holds a column family in a Region.  Its a memstore and a set of zero
103  * or more StoreFiles, which stretch backwards over time.
104  *
105  * <p>There's no reason to consider append-logging at this level; all logging
106  * and locking is handled at the HRegion level.  Store just provides
107  * services to manage sets of StoreFiles.  One of the most important of those
108  * services is compaction services where files are aggregated once they pass
109  * a configurable threshold.
110  *
111  * <p>Locking and transactions are handled at a higher level.  This API should
112  * not be called directly but by an HRegion manager.
113  */
114 @InterfaceAudience.Private
115 public class HStore implements Store {
116   private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
117   public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
118       "hbase.server.compactchecker.interval.multiplier";
119   public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
120   public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
121   public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
122 
123   private static final Log LOG = LogFactory.getLog(HStore.class);
124 
125   protected final MemStore memstore;
126   // This stores directory in the filesystem.
127   protected final HRegion region;
128   private final HColumnDescriptor family;
129   private final HRegionFileSystem fs;
130   protected Configuration conf;
131   protected CacheConfig cacheConf;
132   private long lastCompactSize = 0;
133   volatile boolean forceMajor = false;
134   /* how many bytes to write between status checks */
135   static int closeCheckInterval = 0;
136   private volatile long storeSize = 0L;
137   private volatile long totalUncompressedBytes = 0L;
138 
139   /**
140    * RWLock for store operations.
141    * Locked in shared mode when the list of component stores is looked at:
142    *   - all reads/writes to table data
143    *   - checking for split
144    * Locked in exclusive mode when the list of component stores is modified:
145    *   - closing
146    *   - completing a compaction
147    */
148   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
149   private final boolean verifyBulkLoads;
150 
151   private ScanInfo scanInfo;
152 
153   // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it.
154   final List<StoreFile> filesCompacting = Lists.newArrayList();
155 
156   // All access must be synchronized.
157   private final Set<ChangedReadersObserver> changedReaderObservers =
158     Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
159 
160   protected final int blocksize;
161   private HFileDataBlockEncoder dataBlockEncoder;
162 
163   /** Checksum configuration */
164   protected ChecksumType checksumType;
165   protected int bytesPerChecksum;
166 
167   // Comparing KeyValues
168   private final CellComparator comparator;
169 
170   final StoreEngine<?, ?, ?, ?> storeEngine;
171 
172   private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
173   private volatile OffPeakHours offPeakHours;
174 
175   private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
176   private int flushRetriesNumber;
177   private int pauseTime;
178 
179   private long blockingFileCount;
180   private int compactionCheckMultiplier;
181   protected Encryption.Context cryptoContext = Encryption.Context.NONE;
182 
183   private volatile long flushedCellsCount = 0;
184   private volatile long compactedCellsCount = 0;
185   private volatile long majorCompactedCellsCount = 0;
186   private volatile long flushedCellsSize = 0;
187   private volatile long flushedOutputFileSize = 0;
188   private volatile long compactedCellsSize = 0;
189   private volatile long majorCompactedCellsSize = 0;
190 
191   /**
192    * Constructor
193    * @param region
194    * @param family HColumnDescriptor for this column
195    * @param confParam configuration object
196    * failed.  Can be null.
197    * @throws IOException
198    */
199   protected HStore(final HRegion region, final HColumnDescriptor family,
200       final Configuration confParam) throws IOException {
201 
202     this.fs = region.getRegionFileSystem();
203 
204     // Assemble the store's home directory and Ensure it exists.
205     fs.createStoreDir(family.getNameAsString());
206     this.region = region;
207     this.family = family;
208     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
209     // CompoundConfiguration will look for keys in reverse order of addition, so we'd
210     // add global config first, then table and cf overrides, then cf metadata.
211     this.conf = new CompoundConfiguration()
212       .add(confParam)
213       .addStringMap(region.getTableDesc().getConfiguration())
214       .addStringMap(family.getConfiguration())
215       .addBytesMap(family.getValues());
216     this.blocksize = family.getBlocksize();
217 
218     this.dataBlockEncoder =
219         new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
220 
221     this.comparator = region.getCellCompartor();
222     // used by ScanQueryMatcher
223     long timeToPurgeDeletes =
224         Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
225     LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
226         "ms in store " + this);
227     // Get TTL
228     long ttl = determineTTLFromFamily(family);
229     // Why not just pass a HColumnDescriptor in here altogether?  Even if have
230     // to clone it?
231     scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator);
232     String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
233     if (family.isInMemoryCompaction()) {
234       className = CompactingMemStore.class.getName();
235       this.memstore = new CompactingMemStore(conf, this.comparator, this,
236           this.getHRegion().getRegionServicesForStores());
237     } else {
238       this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
239           Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator });
240     }
241     LOG.info("Memstore class name is " + className);
242     this.offPeakHours = OffPeakHours.getInstance(conf);
243
244     // Setting up cache configuration for this family
245     createCacheConf(family);
246
247     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
248
249     this.blockingFileCount =
250         conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
251     this.compactionCheckMultiplier = conf.getInt(
252         COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
253     if (this.compactionCheckMultiplier <= 0) {
254       LOG.error("Compaction check period multiplier must be positive, setting default: "
255           + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
256       this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
257     }
258
259     if (HStore.closeCheckInterval == 0) {
260       HStore.closeCheckInterval = conf.getInt(
261           "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
262     }
263
264     this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
265     this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
266
267     // Initialize checksum type from name. The names are CRC32, CRC32C, etc.
268     this.checksumType = getChecksumType(conf);
269     // initilize bytes per checksum
270     this.bytesPerChecksum = getBytesPerChecksum(conf);
271     flushRetriesNumber = conf.getInt(
272         "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
273     pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
274     if (flushRetriesNumber <= 0) {
275       throw new IllegalArgumentException(
276           "hbase.hstore.flush.retries.number must be > 0, not "
277               + flushRetriesNumber);
278     }
279     cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
280   }
281
282   /**
283    * Creates the cache config.
284    * @param family The current column family.
285    */
286   protected void createCacheConf(final HColumnDescriptor family) {
287     this.cacheConf = new CacheConfig(conf, family);
288   }
289
290   /**
291    * Creates the store engine configured for the given Store.
292    * @param store The store. An unfortunate dependency needed due to it
293    *              being passed to coprocessors via the compactor.
294    * @param conf Store configuration.
295    * @param kvComparator KVComparator for storeFileManager.
296    * @return StoreEngine to use.
297    */
298   protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
299       CellComparator kvComparator) throws IOException {
300     return StoreEngine.create(store, conf, comparator);
301   }
302
303   /**
304    * @param family
305    * @return TTL in seconds of the specified family
306    */
307   public static long determineTTLFromFamily(final HColumnDescriptor family) {
308     // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
309     long ttl = family.getTimeToLive();
310     if (ttl == HConstants.FOREVER) {
311       // Default is unlimited ttl.
312       ttl = Long.MAX_VALUE;
313     } else if (ttl == -1) {
314       ttl = Long.MAX_VALUE;
315     } else {
316       // Second -> ms adjust for user data
317       ttl *= 1000;
318     }
319     return ttl;
320   }
321
322   @Override
323   public String getColumnFamilyName() {
324     return this.family.getNameAsString();
325   }
326
327   @Override
328   public TableName getTableName() {
329     return this.getRegionInfo().getTable();
330   }
331
332   @Override
333   public FileSystem getFileSystem() {
334     return this.fs.getFileSystem();
335   }
336
337   public HRegionFileSystem getRegionFileSystem() {
338     return this.fs;
339   }
340 
341   /* Implementation of StoreConfigInformation */
342   @Override
343   public long getStoreFileTtl() {
344     // TTL only applies if there's no MIN_VERSIONs setting on the column.
345     return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
346   }
347
348   @Override
349   public long getMemstoreFlushSize() {
350     // TODO: Why is this in here?  The flushsize of the region rather than the store?  St.Ack
351     return this.region.memstoreFlushSize;
352   }
353
354   @Override
355   public long getFlushableSize() {
356     return this.memstore.getFlushableSize();
357   }
358
359   @Override
360   public long getSnapshotSize() {
361     return this.memstore.getSnapshotSize();
362   }
363
364   @Override
365   public long getCompactionCheckMultiplier() {
366     return this.compactionCheckMultiplier;
367   }
368
369   @Override
370   public long getBlockingFileCount() {
371     return blockingFileCount;
372   }
373   /* End implementation of StoreConfigInformation */
374
375   /**
376    * Returns the configured bytesPerChecksum value.
377    * @param conf The configuration
378    * @return The bytesPerChecksum that is set in the configuration
379    */
380   public static int getBytesPerChecksum(Configuration conf) {
381     return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
382                        HFile.DEFAULT_BYTES_PER_CHECKSUM);
383   }
384
385   /**
386    * Returns the configured checksum algorithm.
387    * @param conf The configuration
388    * @return The checksum algorithm that is set in the configuration
389    */
390   public static ChecksumType getChecksumType(Configuration conf) {
391     String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
392     if (checksumName == null) {
393       return ChecksumType.getDefaultChecksumType();
394     } else {
395       return ChecksumType.nameToType(checksumName);
396     }
397   }
398 
399   /**
400    * @return how many bytes to write between status checks
401    */
402   public static int getCloseCheckInterval() {
403     return closeCheckInterval;
404   }
405
406   @Override
407   public HColumnDescriptor getFamily() {
408     return this.family;
409   }
410
411   /**
412    * @return The maximum sequence id in all store files. Used for log replay.
413    */
414   @Override
415   public long getMaxSequenceId() {
416     return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
417   }
418
419   @Override
420   public long getMaxMemstoreTS() {
421     return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
422   }
423
424   /**
425    * @param tabledir {@link Path} to where the table is being stored
426    * @param hri {@link HRegionInfo} for the region.
427    * @param family {@link HColumnDescriptor} describing the column family
428    * @return Path to family/Store home directory.
429    */
430   @Deprecated
431   public static Path getStoreHomedir(final Path tabledir,
432       final HRegionInfo hri, final byte[] family) {
433     return getStoreHomedir(tabledir, hri.getEncodedName(), family);
434   }
435
436   /**
437    * @param tabledir {@link Path} to where the table is being stored
438    * @param encodedName Encoded region name.
439    * @param family {@link HColumnDescriptor} describing the column family
440    * @return Path to family/Store home directory.
441    */
442   @Deprecated
443   public static Path getStoreHomedir(final Path tabledir,
444       final String encodedName, final byte[] family) {
445     return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
446   }
447
448   @Override
449   public HFileDataBlockEncoder getDataBlockEncoder() {
450     return dataBlockEncoder;
451   }
452
453   /**
454    * Should be used only in tests.
455    * @param blockEncoder the block delta encoder to use
456    */
457   void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
458     this.dataBlockEncoder = blockEncoder;
459   }
460
461   /**
462    * Creates an unsorted list of StoreFile loaded in parallel
463    * from the given directory.
464    * @throws IOException
465    */
466   private List<StoreFile> loadStoreFiles() throws IOException {
467     Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
468     return openStoreFiles(files);
469   }
470
471   private List<StoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
472     if (files == null || files.size() == 0) {
473       return new ArrayList<StoreFile>();
474     }
475     // initialize the thread pool for opening store files in parallel..
476     ThreadPoolExecutor storeFileOpenerThreadPool =
477       this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
478           this.getColumnFamilyName());
479     CompletionService<StoreFile> completionService =
480       new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
481
482     int totalValidStoreFile = 0;
483     for (final StoreFileInfo storeFileInfo: files) {
484       // open each store file in parallel
485       completionService.submit(new Callable<StoreFile>() {
486         @Override
487         public StoreFile call() throws IOException {
488           StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
489           return storeFile;
490         }
491       });
492       totalValidStoreFile++;
493     }
494
495     ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
496     IOException ioe = null;
497     try {
498       for (int i = 0; i < totalValidStoreFile; i++) {
499         try {
500           Future<StoreFile> future = completionService.take();
501           StoreFile storeFile = future.get();
502           if (storeFile != null) {
503             long length = storeFile.getReader().length();
504             this.storeSize += length;
505             this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
506             if (LOG.isDebugEnabled()) {
507               LOG.debug("loaded " + storeFile.toStringDetailed());
508             }
509             results.add(storeFile);
510           }
511         } catch (InterruptedException e) {
512           if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
513         } catch (ExecutionException e) {
514           if (ioe == null) ioe = new IOException(e.getCause());
515         }
516       }
517     } finally {
518       storeFileOpenerThreadPool.shutdownNow();
519     }
520     if (ioe != null) {
521       // close StoreFile readers
522       boolean evictOnClose =
523           cacheConf != null? cacheConf.shouldEvictOnClose(): true;
524       for (StoreFile file : results) {
525         try {
526           if (file != null) file.closeReader(evictOnClose);
527         } catch (IOException e) {
528           LOG.warn(e.getMessage());
529         }
530       }
531       throw ioe;
532     }
533
534     return results;
535   }
536
537   /**
538    * Checks the underlying store files, and opens the files that  have not
539    * been opened, and removes the store file readers for store files no longer
540    * available. Mainly used by secondary region replicas to keep up to date with
541    * the primary region files.
542    * @throws IOException
543    */
544   @Override
545   public void refreshStoreFiles() throws IOException {
546     Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName());
547     refreshStoreFilesInternal(newFiles);
548   }
549
550   @Override
551   public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
552     List<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(newFiles.size());
553     for (String file : newFiles) {
554       storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file));
555     }
556     refreshStoreFilesInternal(storeFiles);
557   }
558
559   /**
560    * Checks the underlying store files, and opens the files that  have not
561    * been opened, and removes the store file readers for store files no longer
562    * available. Mainly used by secondary region replicas to keep up to date with
563    * the primary region files.
564    * @throws IOException
565    */
566   private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
567     StoreFileManager sfm = storeEngine.getStoreFileManager();
568     Collection<StoreFile> currentFiles = sfm.getStorefiles();
569     if (currentFiles == null) currentFiles = new ArrayList<StoreFile>(0);
570
571     if (newFiles == null) newFiles = new ArrayList<StoreFileInfo>(0);
572 
573     HashMap<StoreFileInfo, StoreFile> currentFilesSet =
574         new HashMap<StoreFileInfo, StoreFile>(currentFiles.size());
575     for (StoreFile sf : currentFiles) {
576       currentFilesSet.put(sf.getFileInfo(), sf);
577     }
578     HashSet<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
579 
580     Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
581     Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
582 
583     if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
584       return;
585     }
586
587     LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString()
588       + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
589
590     Set<StoreFile> toBeRemovedStoreFiles = new HashSet<StoreFile>(toBeRemovedFiles.size());
591     for (StoreFileInfo sfi : toBeRemovedFiles) {
592       toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
593     }
594
595     // try to open the files
596     List<StoreFile> openedFiles = openStoreFiles(toBeAddedFiles);
597
598     // propogate the file changes to the underlying store file manager
599     replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception
600 
601     // Advance the memstore read point to be at least the new store files seqIds so that
602     // readers might pick it up. This assumes that the store is not getting any writes (otherwise
603     // in-flight transactions might be made visible)
604     if (!toBeAddedFiles.isEmpty()) {
605       region.getMVCC().advanceTo(this.getMaxSequenceId());
606     }
607
608     completeCompaction(toBeRemovedStoreFiles);
609   }
610
611   private StoreFile createStoreFileAndReader(final Path p) throws IOException {
612     StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
613     return createStoreFileAndReader(info);
614   }
615
616   private StoreFile createStoreFileAndReader(final StoreFileInfo info)
617       throws IOException {
618     info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
619     StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
620       this.family.getBloomFilterType());
621     StoreFileReader r = storeFile.createReader();
622     r.setReplicaStoreFile(isPrimaryReplicaStore());
623     return storeFile;
624   }
625
626   @Override
627   public long add(final Cell cell) {
628     lock.readLock().lock();
629     try {
630        return this.memstore.add(cell);
631     } finally {
632       lock.readLock().unlock();
633     }
634   }
635
636   @Override
637   public long timeOfOldestEdit() {
638     return memstore.timeOfOldestEdit();
639   }
640
641   /**
642    * Adds a value to the memstore
643    *
644    * @param kv
645    * @return memstore size delta
646    */
647   protected long delete(final KeyValue kv) {
648     lock.readLock().lock();
649     try {
650       return this.memstore.delete(kv);
651     } finally {
652       lock.readLock().unlock();
653     }
654   }
655
656   /**
657    * @return All store files.
658    */
659   @Override
660   public Collection<StoreFile> getStorefiles() {
661     return this.storeEngine.getStoreFileManager().getStorefiles();
662   }
663
664   @Override
665   public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
666     HFile.Reader reader  = null;
667     try {
668       LOG.info("Validating hfile at " + srcPath + " for inclusion in "
669           + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
670       reader = HFile.createReader(srcPath.getFileSystem(conf),
671           srcPath, cacheConf, conf);
672       reader.loadFileInfo();
673
674       byte[] firstKey = reader.getFirstRowKey();
675       Preconditions.checkState(firstKey != null, "First key can not be null");
676       Cell lk = reader.getLastKey();
677       Preconditions.checkState(lk != null, "Last key can not be null");
678       byte[] lastKey =  CellUtil.cloneRow(lk);
679
680       LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
681           " last=" + Bytes.toStringBinary(lastKey));
682       LOG.debug("Region bounds: first=" +
683           Bytes.toStringBinary(getRegionInfo().getStartKey()) +
684           " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
685
686       if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
687         throw new WrongRegionException(
688             "Bulk load file " + srcPath.toString() + " does not fit inside region "
689             + this.getRegionInfo().getRegionNameAsString());
690       }
691
692       if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE,
693           HConstants.DEFAULT_MAX_FILE_SIZE)) {
694         LOG.warn("Trying to bulk load hfile " + srcPath.toString() + " with size: " +
695             reader.length() + " bytes can be problematic as it may lead to oversplitting.");
696       }
697
698       if (verifyBulkLoads) {
699         long verificationStartTime = EnvironmentEdgeManager.currentTime();
700         LOG.info("Full verification started for bulk load hfile: " + srcPath.toString());
701         Cell prevCell = null;
702         HFileScanner scanner = reader.getScanner(false, false, false);
703         scanner.seekTo();
704         do {
705           Cell cell = scanner.getCell();
706           if (prevCell != null) {
707             if (comparator.compareRows(prevCell, cell) > 0) {
708               throw new InvalidHFileException("Previous row is greater than"
709                   + " current row: path=" + srcPath + " previous="
710                   + CellUtil.getCellKeyAsString(prevCell) + " current="
711                   + CellUtil.getCellKeyAsString(cell));
712             }
713             if (CellComparator.compareFamilies(prevCell, cell) != 0) {
714               throw new InvalidHFileException("Previous key had different"
715                   + " family compared to current key: path=" + srcPath
716                   + " previous="
717                   + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),
718                       prevCell.getFamilyLength())
719                   + " current="
720                   + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
721                       cell.getFamilyLength()));
722             }
723           }
724           prevCell = cell;
725         } while (scanner.next());
726       LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString()
727          + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime)
728          + " ms");
729       }
730     } finally {
731       if (reader != null) reader.close();
732     }
733   }
734
735   @Override
736   public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
737     Path srcPath = new Path(srcPathStr);
738     Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
739
740     LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
741         + dstPath + " - updating store file list.");
742
743     StoreFile sf = createStoreFileAndReader(dstPath);
744     bulkLoadHFile(sf);
745
746     LOG.info("Successfully loaded store file " + srcPath + " into store " + this
747         + " (new location: " + dstPath + ")");
748
749     return dstPath;
750   }
751
752   @Override
753   public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
754     StoreFile sf = createStoreFileAndReader(fileInfo);
755     bulkLoadHFile(sf);
756   }
757
758   private void bulkLoadHFile(StoreFile sf) throws IOException {
759     StoreFileReader r = sf.getReader();
760     this.storeSize += r.length();
761     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
762
763     // Append the new storefile into the list
764     this.lock.writeLock().lock();
765     try {
766       this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
767     } finally {
768       // We need the lock, as long as we are updating the storeFiles
769       // or changing the memstore. Let us release it before calling
770       // notifyChangeReadersObservers. See HBASE-4485 for a possible
771       // deadlock scenario that could have happened if continue to hold
772       // the lock.
773       this.lock.writeLock().unlock();
774     }
775     LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
776     if (LOG.isTraceEnabled()) {
777       String traceMessage = "BULK LOAD time,size,store size,store files ["
778           + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
779           + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
780       LOG.trace(traceMessage);
781     }
782   }
783
784   @Override
785   public ImmutableCollection<StoreFile> close() throws IOException {
786     this.lock.writeLock().lock();
787     try {
788       // Clear so metrics doesn't find them.
789       ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
790       Collection<StoreFile> compactedfiles =
791           storeEngine.getStoreFileManager().clearCompactedFiles();
792       // clear the compacted files
793       if (compactedfiles != null && !compactedfiles.isEmpty()) {
794         removeCompactedfiles(compactedfiles);
795       }
796       if (!result.isEmpty()) {
797         // initialize the thread pool for closing store files in parallel.
798         ThreadPoolExecutor storeFileCloserThreadPool = this.region
799             .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
800                 + this.getColumnFamilyName());
801
802         // close each store file in parallel
803         CompletionService<Void> completionService =
804           new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
805         for (final StoreFile f : result) {
806           completionService.submit(new Callable<Void>() {
807             @Override
808             public Void call() throws IOException {
809               boolean evictOnClose =
810                   cacheConf != null? cacheConf.shouldEvictOnClose(): true;
811               f.closeReader(evictOnClose);
812               return null;
813             }
814           });
815         }
816
817         IOException ioe = null;
818         try {
819           for (int i = 0; i < result.size(); i++) {
820             try {
821               Future<Void> future = completionService.take();
822               future.get();
823             } catch (InterruptedException e) {
824               if (ioe == null) {
825                 ioe = new InterruptedIOException();
826                 ioe.initCause(e);
827               }
828             } catch (ExecutionException e) {
829               if (ioe == null) ioe = new IOException(e.getCause());
830             }
831           }
832         } finally {
833           storeFileCloserThreadPool.shutdownNow();
834         }
835         if (ioe != null) throw ioe;
836       }
837       LOG.info("Closed " + this);
838       return result;
839     } finally {
840       this.lock.writeLock().unlock();
841     }
842   }
843
844   /**
845    * Snapshot this stores memstore. Call before running
846    * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController)}
847    *  so it has some work to do.
848    */
849   void snapshot() {
850     this.lock.writeLock().lock();
851     try {
852       this.memstore.snapshot();
853     } finally {
854       this.lock.writeLock().unlock();
855     }
856   }
857
858   /**
859    * Write out current snapshot. Presumes {@link #snapshot()} has been called previously.
860    * @param logCacheFlushId flush sequence number
861    * @param snapshot
862    * @param status
863    * @param throughputController
864    * @return The path name of the tmp file to which the store was flushed
865    * @throws IOException if exception occurs during process
866    */
867   protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
868       MonitoredTask status, ThroughputController throughputController) throws IOException {
869     // If an exception happens flushing, we let it out without clearing
870     // the memstore snapshot.  The old snapshot will be returned when we say
871     // 'snapshot', the next time flush comes around.
872     // Retry after catching exception when flushing, otherwise server will abort
873     // itself
874     StoreFlusher flusher = storeEngine.getStoreFlusher();
875     IOException lastException = null;
876     for (int i = 0; i < flushRetriesNumber; i++) {
877       try {
878         List<Path> pathNames =
879             flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController);
880         Path lastPathName = null;
881         try {
882           for (Path pathName : pathNames) {
883             lastPathName = pathName;
884             validateStoreFile(pathName);
885           }
886           return pathNames;
887         } catch (Exception e) {
888           LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
889           if (e instanceof IOException) {
890             lastException = (IOException) e;
891           } else {
892             lastException = new IOException(e);
893           }
894         }
895       } catch (IOException e) {
896         LOG.warn("Failed flushing store file, retrying num=" + i, e);
897         lastException = e;
898       }
899       if (lastException != null && i < (flushRetriesNumber - 1)) {
900         try {
901           Thread.sleep(pauseTime);
902         } catch (InterruptedException e) {
903           IOException iie = new InterruptedIOException();
904           iie.initCause(e);
905           throw iie;
906         }
907       }
908     }
909     throw lastException;
910   }
911
912   /*
913    * @param path The pathname of the tmp file into which the store was flushed
914    * @param logCacheFlushId
915    * @param status
916    * @return StoreFile created.
917    * @throws IOException
918    */
919   private StoreFile commitFile(final Path path, final long logCacheFlushId, MonitoredTask status)
920       throws IOException {
921     // Write-out finished successfully, move into the right spot
922     Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
923 
924     status.setStatus("Flushing " + this + ": reopening flushed file");
925     StoreFile sf = createStoreFileAndReader(dstPath);
926
927     StoreFileReader r = sf.getReader();
928     this.storeSize += r.length();
929     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
930
931     if (LOG.isInfoEnabled()) {
932       LOG.info("Added " + sf + ", entries=" + r.getEntries() +
933         ", sequenceid=" + logCacheFlushId +
934         ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1));
935     }
936     return sf;
937   }
938
939   @Override
940   public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
941                                             boolean isCompaction, boolean includeMVCCReadpoint,
942                                             boolean includesTag)
943       throws IOException {
944     return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
945         includesTag, false);
946   }
947
948   /*
949    * @param maxKeyCount
950    * @param compression Compression algorithm to use
951    * @param isCompaction whether we are creating a new file in a compaction
952    * @param includesMVCCReadPoint - whether to include MVCC or not
953    * @param includesTag - includesTag or not
954    * @return Writer for a new StoreFile in the tmp dir.
955    */
956   @Override
957   public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
958       boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
959       boolean shouldDropBehind)
960   throws IOException {
961     final CacheConfig writerCacheConf;
962     if (isCompaction) {
963       // Don't cache data on write on compactions.
964       writerCacheConf = new CacheConfig(cacheConf);
965       writerCacheConf.setCacheDataOnWrite(false);
966     } else {
967       writerCacheConf = cacheConf;
968     }
969     InetSocketAddress[] favoredNodes = null;
970     if (region.getRegionServerServices() != null) {
971       favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
972           region.getRegionInfo().getEncodedName());
973     }
974     HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
975       cryptoContext);
976     StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf,
977         this.getFileSystem())
978             .withFilePath(fs.createTempName())
979             .withComparator(comparator)
980             .withBloomType(family.getBloomFilterType())
981             .withMaxKeyCount(maxKeyCount)
982             .withFavoredNodes(favoredNodes)
983             .withFileContext(hFileContext)
984             .withShouldDropCacheBehind(shouldDropBehind)
985             .build();
986     return w;
987   }
988
989   private HFileContext createFileContext(Compression.Algorithm compression,
990       boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) {
991     if (compression == null) {
992       compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
993     }
994     HFileContext hFileContext = new HFileContextBuilder()
995                                 .withIncludesMvcc(includeMVCCReadpoint)
996                                 .withIncludesTags(includesTag)
997                                 .withCompression(compression)
998                                 .withCompressTags(family.isCompressTags())
999                                 .withChecksumType(checksumType)
1000                                 .withBytesPerCheckSum(bytesPerChecksum)
1001                                 .withBlockSize(blocksize)
1002                                 .withHBaseCheckSum(true)
1003                                 .withDataBlockEncoding(family.getDataBlockEncoding())
1004                                 .withEncryptionContext(cryptoContext)
1005                                 .withCreateTime(EnvironmentEdgeManager.currentTime())
1006                                 .build();
1007     return hFileContext;
1008   }
1009
1010
1011   /*
1012    * Change storeFiles adding into place the Reader produced by this new flush.
1013    * @param sfs Store files
1014    * @param snapshotId
1015    * @throws IOException
1016    * @return Whether compaction is required.
1017    */
1018   private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
1019       throws IOException {
1020     this.lock.writeLock().lock();
1021     try {
1022       this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
1023       if (snapshotId > 0) {
1024         this.memstore.clearSnapshot(snapshotId);
1025       }
1026     } finally {
1027       // We need the lock, as long as we are updating the storeFiles
1028       // or changing the memstore. Let us release it before calling
1029       // notifyChangeReadersObservers. See HBASE-4485 for a possible
1030       // deadlock scenario that could have happened if continue to hold
1031       // the lock.
1032       this.lock.writeLock().unlock();
1033     }
1034     // notify to be called here - only in case of flushes
1035     notifyChangedReadersObservers(sfs);
1036     if (LOG.isTraceEnabled()) {
1037       long totalSize = 0;
1038       for (StoreFile sf : sfs) {
1039         totalSize += sf.getReader().length();
1040       }
1041       String traceMessage = "FLUSH time,count,size,store size,store files ["
1042           + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
1043           + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
1044       LOG.trace(traceMessage);
1045     }
1046     return needsCompaction();
1047   }
1048
1049   /*
1050    * Notify all observers that set of Readers has changed.
1051    * @throws IOException
1052    */
1053   private void notifyChangedReadersObservers(List<StoreFile> sfs) throws IOException {
1054     for (ChangedReadersObserver o : this.changedReaderObservers) {
1055       o.updateReaders(sfs);
1056     }
1057   }
1058
1059   /**
1060    * Get all scanners with no filtering based on TTL (that happens further down
1061    * the line).
1062    * @return all scanners for this store
1063    */
1064   @Override
1065   public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
1066       boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1067       byte[] stopRow, long readPt) throws IOException {
1068     Collection<StoreFile> storeFilesToScan;
1069     List<KeyValueScanner> memStoreScanners;
1070     this.lock.readLock().lock();
1071     try {
1072       storeFilesToScan =
1073           this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
1074       memStoreScanners = this.memstore.getScanners(readPt);
1075     } finally {
1076       this.lock.readLock().unlock();
1077     }
1078
1079     // First the store file scanners
1080
1081     // TODO this used to get the store files in descending order,
1082     // but now we get them in ascending order, which I think is
1083     // actually more correct, since memstore get put at the end.
1084     List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
1085         cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
1086     List<KeyValueScanner> scanners =
1087       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
1088     scanners.addAll(sfScanners);
1089     // Then the memstore scanners
1090     scanners.addAll(memStoreScanners);
1091     return scanners;
1092   }
1093
1094   @Override
1095   public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
1096       boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
1097       byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException {
1098     List<KeyValueScanner> memStoreScanners = null;
1099     if (includeMemstoreScanner) {
1100       this.lock.readLock().lock();
1101       try {
1102         memStoreScanners = this.memstore.getScanners(readPt);
1103       } finally {
1104         this.lock.readLock().unlock();
1105       }
1106     }
1107     List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
1108       cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
1109     List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(sfScanners.size() + 1);
1110     scanners.addAll(sfScanners);
1111     // Then the memstore scanners
1112     if (memStoreScanners != null) {
1113       scanners.addAll(memStoreScanners);
1114     }
1115     return scanners;
1116   }
1117
1118   @Override
1119   public void addChangedReaderObserver(ChangedReadersObserver o) {
1120     this.changedReaderObservers.add(o);
1121   }
1122
1123   @Override
1124   public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1125     // We don't check if observer present; it may not be (legitimately)
1126     this.changedReaderObservers.remove(o);
1127   }
1128
1129   //////////////////////////////////////////////////////////////////////////////
1130   // Compaction
1131   //////////////////////////////////////////////////////////////////////////////
1132
1133   /**
1134    * Compact the StoreFiles.  This method may take some time, so the calling
1135    * thread must be able to block for long periods.
1136    *
1137    * <p>During this time, the Store can work as usual, getting values from
1138    * StoreFiles and writing new StoreFiles from the memstore.
1139    *
1140    * Existing StoreFiles are not destroyed until the new compacted StoreFile is
1141    * completely written-out to disk.
1142    *
1143    * <p>The compactLock prevents multiple simultaneous compactions.
1144    * The structureLock prevents us from interfering with other write operations.
1145    *
1146    * <p>We don't want to hold the structureLock for the whole time, as a compact()
1147    * can be lengthy and we want to allow cache-flushes during this period.
1148    *
1149    * <p> Compaction event should be idempotent, since there is no IO Fencing for
1150    * the region directory in hdfs. A region server might still try to complete the
1151    * compaction after it lost the region. That is why the following events are carefully
1152    * ordered for a compaction:
1153    *  1. Compaction writes new files under region/.tmp directory (compaction output)
1154    *  2. Compaction atomically moves the temporary file under region directory
1155    *  3. Compaction appends a WAL edit containing the compaction input and output files.
1156    *  Forces sync on WAL.
1157    *  4. Compaction deletes the input files from the region directory.
1158    *
1159    * Failure conditions are handled like this:
1160    *  - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes
1161    *  the compaction later, it will only write the new data file to the region directory.
1162    *  Since we already have this data, this will be idempotent but we will have a redundant
1163    *  copy of the data.
1164    *  - If RS fails between 2 and 3, the region will have a redundant copy of the data. The
1165    *  RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL.
1166    *  - If RS fails after 3, the region region server who opens the region will pick up the
1167    *  the compaction marker from the WAL and replay it by removing the compaction input files.
1168    *  Failed RS can also attempt to delete those files, but the operation will be idempotent
1169    *
1170    * See HBASE-2231 for details.
1171    *
1172    * @param compaction compaction details obtained from requestCompaction()
1173    * @throws IOException
1174    * @return Storefile we compacted into or null if we failed or opted out early.
1175    */
1176   @Override
1177   public List<StoreFile> compact(CompactionContext compaction,
1178       ThroughputController throughputController) throws IOException {
1179     return compact(compaction, throughputController, null);
1180   }
1181
1182   @Override
1183   public List<StoreFile> compact(CompactionContext compaction,
1184     ThroughputController throughputController, User user) throws IOException {
1185     assert compaction != null;
1186     List<StoreFile> sfs = null;
1187     CompactionRequest cr = compaction.getRequest();
1188     try {
1189       // Do all sanity checking in here if we have a valid CompactionRequest
1190       // because we need to clean up after it on the way out in a finally
1191       // block below
1192       long compactionStartTime = EnvironmentEdgeManager.currentTime();
1193       assert compaction.hasSelection();
1194       Collection<StoreFile> filesToCompact = cr.getFiles();
1195       assert !filesToCompact.isEmpty();
1196       synchronized (filesCompacting) {
1197         // sanity check: we're compacting files that this store knows about
1198         // TODO: change this to LOG.error() after more debugging
1199         Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1200       }
1201
1202       // Ready to go. Have list of files to compact.
1203       LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
1204           + this + " of " + this.getRegionInfo().getRegionNameAsString()
1205           + " into tmpdir=" + fs.getTempDir() + ", totalSize="
1206           + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
1207
1208       // Commence the compaction.
1209       List<Path> newFiles = compaction.compact(throughputController, user);
1210
1211       long outputBytes = 0L;
1212       // TODO: get rid of this!
1213       if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
1214         LOG.warn("hbase.hstore.compaction.complete is set to false");
1215         sfs = new ArrayList<StoreFile>(newFiles.size());
1216         final boolean evictOnClose =
1217             cacheConf != null? cacheConf.shouldEvictOnClose(): true;
1218         for (Path newFile : newFiles) {
1219           // Create storefile around what we wrote with a reader on it.
1220           StoreFile sf = createStoreFileAndReader(newFile);
1221           sf.closeReader(evictOnClose);
1222           sfs.add(sf);
1223         }
1224         return sfs;
1225       }
1226       // Do the steps necessary to complete the compaction.
1227       sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
1228       writeCompactionWalRecord(filesToCompact, sfs);
1229       replaceStoreFiles(filesToCompact, sfs);
1230       if (cr.isMajor()) {
1231         majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
1232         majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
1233       } else {
1234         compactedCellsCount += getCompactionProgress().totalCompactingKVs;
1235         compactedCellsSize += getCompactionProgress().totalCompactedSize;
1236       }
1237 
1238       for (StoreFile sf : sfs) {
1239         outputBytes += sf.getReader().length();
1240       }
1241
1242       // At this point the store will use new files for all new scanners.
1243       completeCompaction(filesToCompact); // update store size.
1244
1245       long now = EnvironmentEdgeManager.currentTime();
1246       if (region.getRegionServerServices() != null
1247           && region.getRegionServerServices().getMetrics() != null) {
1248         region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
1249           now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
1250           outputBytes);
1251       }
1252 
1253       logCompactionEndMessage(cr, sfs, now, compactionStartTime);
1254       return sfs;
1255     } finally {
1256       finishCompactionRequest(cr);
1257     }
1258   }
1259
1260   private List<StoreFile> moveCompatedFilesIntoPlace(
1261       final CompactionRequest cr, List<Path> newFiles, User user) throws IOException {
1262     List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());
1263     for (Path newFile : newFiles) {
1264       assert newFile != null;
1265       final StoreFile sf = moveFileIntoPlace(newFile);
1266       if (this.getCoprocessorHost() != null) {
1267         final Store thisStore = this;
1268         if (user == null) {
1269           getCoprocessorHost().postCompact(thisStore, sf, cr);
1270         } else {
1271           try {
1272             user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1273               @Override
1274               public Void run() throws Exception {
1275                 getCoprocessorHost().postCompact(thisStore, sf, cr);
1276                 return null;
1277               }
1278             });
1279           } catch (InterruptedException ie) {
1280             InterruptedIOException iioe = new InterruptedIOException();
1281             iioe.initCause(ie);
1282             throw iioe;
1283           }
1284         }
1285       }
1286       assert sf != null;
1287       sfs.add(sf);
1288     }
1289     return sfs;
1290   }
1291
1292   // Package-visible for tests
1293   StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
1294     validateStoreFile(newFile);
1295     // Move the file into the right spot
1296     Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1297     return createStoreFileAndReader(destPath);
1298   }
1299
1300   /**
1301    * Writes the compaction WAL record.
1302    * @param filesCompacted Files compacted (input).
1303    * @param newFiles Files from compaction.
1304    */
1305   private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
1306       Collection<StoreFile> newFiles) throws IOException {
1307     if (region.getWAL() == null) return;
1308     List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());
1309     for (StoreFile f : filesCompacted) {
1310       inputPaths.add(f.getPath());
1311     }
1312     List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
1313     for (StoreFile f : newFiles) {
1314       outputPaths.add(f.getPath());
1315     }
1316     HRegionInfo info = this.region.getRegionInfo();
1317     CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1318         family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
1319     // Fix reaching into Region to get the maxWaitForSeqId.
1320     // Does this method belong in Region altogether given it is making so many references up there?
1321     // Could be Region#writeCompactionMarker(compactionDescriptor);
1322     WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
1323         this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
1324   }
1325
1326   @VisibleForTesting
1327   void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
1328       final Collection<StoreFile> result) throws IOException {
1329     this.lock.writeLock().lock();
1330     try {
1331       this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1332       filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock();
1333     } finally {
1334       this.lock.writeLock().unlock();
1335     }
1336   }
1337
1338   /**
1339    * Log a very elaborate compaction completion message.
1340    * @param cr Request.
1341    * @param sfs Resulting files.
1342    * @param compactionStartTime Start time.
1343    */
1344   private void logCompactionEndMessage(
1345       CompactionRequest cr, List<StoreFile> sfs, long now, long compactionStartTime) {
1346     StringBuilder message = new StringBuilder(
1347       "Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
1348       + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
1349       + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into ");
1350     if (sfs.isEmpty()) {
1351       message.append("none, ");
1352     } else {
1353       for (StoreFile sf: sfs) {
1354         message.append(sf.getPath().getName());
1355         message.append("(size=");
1356         message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
1357         message.append("), ");
1358       }
1359     }
1360     message.append("total size for store is ")
1361       .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize, "", 1))
1362       .append(". This selection was in queue for ")
1363       .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1364       .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1365       .append(" to execute.");
1366     LOG.info(message.toString());
1367     if (LOG.isTraceEnabled()) {
1368       int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1369       long resultSize = 0;
1370       for (StoreFile sf : sfs) {
1371         resultSize += sf.getReader().length();
1372       }
1373       String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1374         + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1375           + cr.getFiles().size() + "," + sfs.size() + "," +  storeSize + "," + fileCount + "]";
1376       LOG.trace(traceMessage);
1377     }
1378   }
1379
1380   /**
1381    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
1382    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
1383    * See HBASE-2231.
1384    * @param compaction
1385    */
1386   @Override
1387   public void replayCompactionMarker(CompactionDescriptor compaction,
1388       boolean pickCompactionFiles, boolean removeFiles)
1389       throws IOException {
1390     LOG.debug("Completing compaction from the WAL marker");
1391     List<String> compactionInputs = compaction.getCompactionInputList();
1392     List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
1393
1394     // The Compaction Marker is written after the compaction is completed,
1395     // and the files moved into the region/family folder.
1396     //
1397     // If we crash after the entry is written, we may not have removed the
1398     // input files, but the output file is present.
1399     // (The unremoved input files will be removed by this function)
1400     //
1401     // If we scan the directory and the file is not present, it can mean that:
1402     //   - The file was manually removed by the user
1403     //   - The file was removed as consequence of subsequent compaction
1404     // so, we can't do anything with the "compaction output list" because those
1405     // files have already been loaded when opening the region (by virtue of
1406     // being in the store's folder) or they may be missing due to a compaction.
1407 
1408     String familyName = this.getColumnFamilyName();
1409     List<String> inputFiles = new ArrayList<String>(compactionInputs.size());
1410     for (String compactionInput : compactionInputs) {
1411       Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
1412       inputFiles.add(inputPath.getName());
1413     }
1414
1415     //some of the input files might already be deleted
1416     List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
1417     for (StoreFile sf : this.getStorefiles()) {
1418       if (inputFiles.contains(sf.getPath().getName())) {
1419         inputStoreFiles.add(sf);
1420       }
1421     }
1422
1423     // check whether we need to pick up the new files
1424     List<StoreFile> outputStoreFiles = new ArrayList<StoreFile>(compactionOutputs.size());
1425
1426     if (pickCompactionFiles) {
1427       for (StoreFile sf : this.getStorefiles()) {
1428         compactionOutputs.remove(sf.getPath().getName());
1429       }
1430       for (String compactionOutput : compactionOutputs) {
1431         StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput);
1432         StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
1433         outputStoreFiles.add(storeFile);
1434       }
1435     }
1436
1437     if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
1438       LOG.info("Replaying compaction marker, replacing input files: " +
1439           inputStoreFiles + " with output files : " + outputStoreFiles);
1440       this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
1441       this.completeCompaction(inputStoreFiles);
1442     }
1443   }
1444
1445   /**
1446    * This method tries to compact N recent files for testing.
1447    * Note that because compacting "recent" files only makes sense for some policies,
1448    * e.g. the default one, it assumes default policy is used. It doesn't use policy,
1449    * but instead makes a compaction candidate list by itself.
1450    * @param N Number of files.
1451    */
1452   public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1453     List<StoreFile> filesToCompact;
1454     boolean isMajor;
1455
1456     this.lock.readLock().lock();
1457     try {
1458       synchronized (filesCompacting) {
1459         filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1460         if (!filesCompacting.isEmpty()) {
1461           // exclude all files older than the newest file we're currently
1462           // compacting. this allows us to preserve contiguity (HBASE-2856)
1463           StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1464           int idx = filesToCompact.indexOf(last);
1465           Preconditions.checkArgument(idx != -1);
1466           filesToCompact.subList(0, idx + 1).clear();
1467         }
1468         int count = filesToCompact.size();
1469         if (N > count) {
1470           throw new RuntimeException("Not enough files");
1471         }
1472
1473         filesToCompact = filesToCompact.subList(count - N, count);
1474         isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1475         filesCompacting.addAll(filesToCompact);
1476         Collections.sort(filesCompacting, storeEngine.getStoreFileManager()
1477             .getStoreFileComparator());
1478       }
1479     } finally {
1480       this.lock.readLock().unlock();
1481     }
1482
1483     try {
1484       // Ready to go. Have list of files to compact.
1485       List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
1486           .compactForTesting(filesToCompact, isMajor);
1487       for (Path newFile: newFiles) {
1488         // Move the compaction into place.
1489         StoreFile sf = moveFileIntoPlace(newFile);
1490         if (this.getCoprocessorHost() != null) {
1491           this.getCoprocessorHost().postCompact(this, sf, null);
1492         }
1493         replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
1494         completeCompaction(filesToCompact);
1495       }
1496     } finally {
1497       synchronized (filesCompacting) {
1498         filesCompacting.removeAll(filesToCompact);
1499       }
1500     }
1501   }
1502
1503   @Override
1504   public boolean hasReferences() {
1505     return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
1506   }
1507
1508   @Override
1509   public CompactionProgress getCompactionProgress() {
1510     return this.storeEngine.getCompactor().getProgress();
1511   }
1512
1513   @Override
1514   public boolean isMajorCompaction() throws IOException {
1515     for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1516       // TODO: what are these reader checks all over the place?
1517       if (sf.getReader() == null) {
1518         LOG.debug("StoreFile " + sf + " has null Reader");
1519         return false;
1520       }
1521     }
1522     return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction(
1523         this.storeEngine.getStoreFileManager().getStorefiles());
1524   }
1525
1526   @Override
1527   public CompactionContext requestCompaction() throws IOException {
1528     return requestCompaction(Store.NO_PRIORITY, null);
1529   }
1530
1531   @Override
1532   public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
1533       throws IOException {
1534     return requestCompaction(priority, baseRequest, null);
1535   }
1536   @Override
1537   public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
1538       User user) throws IOException {
1539     // don't even select for compaction if writes are disabled
1540     if (!this.areWritesEnabled()) {
1541       return null;
1542     }
1543
1544     // Before we do compaction, try to get rid of unneeded files to simplify things.
1545     removeUnneededFiles();
1546
1547     final CompactionContext compaction = storeEngine.createCompaction();
1548     CompactionRequest request = null;
1549     this.lock.readLock().lock();
1550     try {
1551       synchronized (filesCompacting) {
1552         final Store thisStore = this;
1553         // First, see if coprocessor would want to override selection.
1554         if (this.getCoprocessorHost() != null) {
1555           final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1556           boolean override = false;
1557           if (user == null) {
1558             override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
1559               baseRequest);
1560           } else {
1561             try {
1562               override = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
1563                 @Override
1564                 public Boolean run() throws Exception {
1565                   return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc,
1566                     baseRequest);
1567                 }
1568               });
1569             } catch (InterruptedException ie) {
1570               InterruptedIOException iioe = new InterruptedIOException();
1571               iioe.initCause(ie);
1572               throw iioe;
1573             }
1574           }
1575           if (override) {
1576             // Coprocessor is overriding normal file selection.
1577             compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
1578           }
1579         }
1580
1581         // Normal case - coprocessor is not overriding file selection.
1582         if (!compaction.hasSelection()) {
1583           boolean isUserCompaction = priority == Store.PRIORITY_USER;
1584           boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1585               offPeakCompactionTracker.compareAndSet(false, true);
1586           try {
1587             compaction.select(this.filesCompacting, isUserCompaction,
1588               mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1589           } catch (IOException e) {
1590             if (mayUseOffPeak) {
1591               offPeakCompactionTracker.set(false);
1592             }
1593             throw e;
1594           }
1595           assert compaction.hasSelection();
1596           if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1597             // Compaction policy doesn't want to take advantage of off-peak.
1598             offPeakCompactionTracker.set(false);
1599           }
1600         }
1601         if (this.getCoprocessorHost() != null) {
1602           if (user == null) {
1603             this.getCoprocessorHost().postCompactSelection(
1604               this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
1605           } else {
1606             try {
1607               user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1608                 @Override
1609                 public Void run() throws Exception {
1610                   getCoprocessorHost().postCompactSelection(
1611                     thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);
1612                   return null;
1613                 }
1614               });
1615             } catch (InterruptedException ie) {
1616               InterruptedIOException iioe = new InterruptedIOException();
1617               iioe.initCause(ie);
1618               throw iioe;
1619             }
1620           }
1621         }
1622
1623         // Selected files; see if we have a compaction with some custom base request.
1624         if (baseRequest != null) {
1625           // Update the request with what the system thinks the request should be;
1626           // its up to the request if it wants to listen.
1627           compaction.forceSelect(
1628               baseRequest.combineWith(compaction.getRequest()));
1629         }
1630         // Finally, we have the resulting files list. Check if we have any files at all.
1631         request = compaction.getRequest();
1632         final Collection<StoreFile> selectedFiles = request.getFiles();
1633         if (selectedFiles.isEmpty()) {
1634           return null;
1635         }
1636
1637         addToCompactingFiles(selectedFiles);
1638
1639         // If we're enqueuing a major, clear the force flag.
1640         this.forceMajor = this.forceMajor && !request.isMajor();
1641
1642         // Set common request properties.
1643         // Set priority, either override value supplied by caller or from store.
1644         request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1645         request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1646       }
1647     } finally {
1648       this.lock.readLock().unlock();
1649     }
1650 
1651     LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
1652         + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1653         + (request.isAllFiles() ? " (all files)" : ""));
1654     this.region.reportCompactionRequestStart(request.isMajor());
1655     return compaction;
1656   }
1657
1658   /** Adds the files to compacting files. filesCompacting must be locked. */
1659   private void addToCompactingFiles(final Collection<StoreFile> filesToAdd) {
1660     if (filesToAdd == null) return;
1661     // Check that we do not try to compact the same StoreFile twice.
1662     if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1663       Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1664     }
1665     filesCompacting.addAll(filesToAdd);
1666     Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());
1667   }
1668
1669   private void removeUnneededFiles() throws IOException {
1670     if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
1671     if (getFamily().getMinVersions() > 0) {
1672       LOG.debug("Skipping expired store file removal due to min version being " +
1673           getFamily().getMinVersions());
1674       return;
1675     }
1676     this.lock.readLock().lock();
1677     Collection<StoreFile> delSfs = null;
1678     try {
1679       synchronized (filesCompacting) {
1680         long cfTtl = getStoreFileTtl();
1681         if (cfTtl != Long.MAX_VALUE) {
1682           delSfs = storeEngine.getStoreFileManager().getUnneededFiles(
1683               EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
1684           addToCompactingFiles(delSfs);
1685         }
1686       }
1687     } finally {
1688       this.lock.readLock().unlock();
1689     }
1690     if (delSfs == null || delSfs.isEmpty()) return;
1691
1692     Collection<StoreFile> newFiles = new ArrayList<StoreFile>(); // No new files.
1693     writeCompactionWalRecord(delSfs, newFiles);
1694     replaceStoreFiles(delSfs, newFiles);
1695     completeCompaction(delSfs);
1696     LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
1697         + this + " of " + this.getRegionInfo().getRegionNameAsString()
1698         + "; total size for store is " + TraditionalBinaryPrefix.long2String(storeSize, "", 1));
1699   }
1700
1701   @Override
1702   public void cancelRequestedCompaction(CompactionContext compaction) {
1703     finishCompactionRequest(compaction.getRequest());
1704   }
1705
1706   private void finishCompactionRequest(CompactionRequest cr) {
1707     this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1708     if (cr.isOffPeak()) {
1709       offPeakCompactionTracker.set(false);
1710       cr.setOffPeak(false);
1711     }
1712     synchronized (filesCompacting) {
1713       filesCompacting.removeAll(cr.getFiles());
1714     }
1715   }
1716
1717   /**
1718    * Validates a store file by opening and closing it. In HFileV2 this should
1719    * not be an expensive operation.
1720    *
1721    * @param path the path to the store file
1722    */
1723   private void validateStoreFile(Path path)
1724       throws IOException {
1725     StoreFile storeFile = null;
1726     try {
1727       storeFile = createStoreFileAndReader(path);
1728     } catch (IOException e) {
1729       LOG.error("Failed to open store file : " + path
1730           + ", keeping it in tmp location", e);
1731       throw e;
1732     } finally {
1733       if (storeFile != null) {
1734         storeFile.closeReader(false);
1735       }
1736     }
1737   }
1738
1739   /**
1740    * <p>It works by processing a compaction that's been written to disk.
1741    *
1742    * <p>It is usually invoked at the end of a compaction, but might also be
1743    * invoked at HStore startup, if the prior execution died midway through.
1744    *
1745    * <p>Moving the compacted TreeMap into place means:
1746    * <pre>
1747    * 1) Unload all replaced StoreFile, close and collect list to delete.
1748    * 2) Compute new store size
1749    * </pre>
1750    *
1751    * @param compactedFiles list of files that were compacted
1752    */
1753   @VisibleForTesting
1754   protected void completeCompaction(final Collection<StoreFile> compactedFiles)
1755     throws IOException {
1756     LOG.debug("Completing compaction...");
1757     this.storeSize = 0L;
1758     this.totalUncompressedBytes = 0L;
1759     for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1760       StoreFileReader r = hsf.getReader();
1761       if (r == null) {
1762         LOG.warn("StoreFile " + hsf + " has a null Reader");
1763         continue;
1764       }
1765       this.storeSize += r.length();
1766       this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1767     }
1768   }
1769
1770   /*
1771    * @param wantedVersions How many versions were asked for.
1772    * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1773    */
1774   int versionsToReturn(final int wantedVersions) {
1775     if (wantedVersions <= 0) {
1776       throw new IllegalArgumentException("Number of versions must be > 0");
1777     }
1778     // Make sure we do not return more than maximum versions for this store.
1779     int maxVersions = this.family.getMaxVersions();
1780     return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1781   }
1782
1783   /**
1784    * @param cell
1785    * @param oldestTimestamp
1786    * @return true if the cell is expired
1787    */
1788   static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
1789     // Look for a TTL tag first. Use it instead of the family setting if
1790     // found. If a cell has multiple TTLs, resolve the conflict by using the
1791     // first tag encountered.
1792     Iterator<Tag> i = CellUtil.tagsIterator(cell);
1793     while (i.hasNext()) {
1794       Tag t = i.next();
1795       if (TagType.TTL_TAG_TYPE == t.getType()) {
1796         // Unlike in schema cell TTLs are stored in milliseconds, no need
1797         // to convert
1798         long ts = cell.getTimestamp();
1799         assert t.getValueLength() == Bytes.SIZEOF_LONG;
1800         long ttl = TagUtil.getValueAsLong(t);
1801         if (ts + ttl < now) {
1802           return true;
1803         }
1804         // Per cell TTLs cannot extend lifetime beyond family settings, so
1805         // fall through to check that
1806         break;
1807       }
1808     }
1809     return false;
1810   }
1811
1812   @Override
1813   public boolean canSplit() {
1814     this.lock.readLock().lock();
1815     try {
1816       // Not split-able if we find a reference store file present in the store.
1817       boolean result = !hasReferences();
1818       if (!result) {
1819           if (LOG.isTraceEnabled()) {
1820             LOG.trace("Not splittable; has references: " + this);
1821           }
1822       }
1823       return result;
1824     } finally {
1825       this.lock.readLock().unlock();
1826     }
1827   }
1828
1829   @Override
1830   public byte[] getSplitPoint() {
1831     this.lock.readLock().lock();
1832     try {
1833       // Should already be enforced by the split policy!
1834       assert !this.getRegionInfo().isMetaRegion();
1835       // Not split-able if we find a reference store file present in the store.
1836       if (hasReferences()) {
1837         if (LOG.isTraceEnabled()) {
1838           LOG.trace("Not splittable; has references: " + this);
1839         }
1840         return null;
1841       }
1842       return this.storeEngine.getStoreFileManager().getSplitPoint();
1843     } catch(IOException e) {
1844       LOG.warn("Failed getting store size for " + this, e);
1845     } finally {
1846       this.lock.readLock().unlock();
1847     }
1848     return null;
1849   }
1850
1851   @Override
1852   public long getLastCompactSize() {
1853     return this.lastCompactSize;
1854   }
1855
1856   @Override
1857   public long getSize() {
1858     return storeSize;
1859   }
1860
1861   @Override
1862   public void triggerMajorCompaction() {
1863     this.forceMajor = true;
1864   }
1865
1866
1867   //////////////////////////////////////////////////////////////////////////////
1868   // File administration
1869   //////////////////////////////////////////////////////////////////////////////
1870
1871   @Override
1872   public KeyValueScanner getScanner(Scan scan,
1873       final NavigableSet<byte []> targetCols, long readPt) throws IOException {
1874     lock.readLock().lock();
1875     try {
1876       KeyValueScanner scanner = null;
1877       if (this.getCoprocessorHost() != null) {
1878         scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols, readPt);
1879       }
1880       scanner = createScanner(scan, targetCols, readPt, scanner);
1881       return scanner;
1882     } finally {
1883       lock.readLock().unlock();
1884     }
1885   }
1886
1887   protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
1888       long readPt, KeyValueScanner scanner) throws IOException {
1889     if (scanner == null) {
1890       scanner = scan.isReversed() ? new ReversedStoreScanner(this,
1891           getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
1892           getScanInfo(), scan, targetCols, readPt);
1893     }
1894     return scanner;
1895   }
1896
1897   @Override
1898   public String toString() {
1899     return this.getColumnFamilyName();
1900   }
1901
1902   @Override
1903   public int getStorefilesCount() {
1904     return this.storeEngine.getStoreFileManager().getStorefileCount();
1905   }
1906
1907   @Override
1908   public long getMaxStoreFileAge() {
1909     long earliestTS = Long.MAX_VALUE;
1910     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1911       StoreFileReader r = s.getReader();
1912       if (r == null) {
1913         LOG.warn("StoreFile " + s + " has a null Reader");
1914         continue;
1915       }
1916       if (!s.isHFile()) {
1917         continue;
1918       }
1919       long createdTS = s.getFileInfo().getCreatedTimestamp();
1920       earliestTS = (createdTS < earliestTS) ? createdTS : earliestTS;
1921     }
1922     long now = EnvironmentEdgeManager.currentTime();
1923     return now - earliestTS;
1924   }
1925
1926   @Override
1927   public long getMinStoreFileAge() {
1928     long latestTS = 0;
1929     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1930       StoreFileReader r = s.getReader();
1931       if (r == null) {
1932         LOG.warn("StoreFile " + s + " has a null Reader");
1933         continue;
1934       }
1935       if (!s.isHFile()) {
1936         continue;
1937       }
1938       long createdTS = s.getFileInfo().getCreatedTimestamp();
1939       latestTS = (createdTS > latestTS) ? createdTS : latestTS;
1940     }
1941     long now = EnvironmentEdgeManager.currentTime();
1942     return now - latestTS;
1943   }
1944
1945   @Override
1946   public long getAvgStoreFileAge() {
1947     long sum = 0, count = 0;
1948     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1949       StoreFileReader r = s.getReader();
1950       if (r == null) {
1951         LOG.warn("StoreFile " + s + " has a null Reader");
1952         continue;
1953       }
1954       if (!s.isHFile()) {
1955         continue;
1956       }
1957       sum += s.getFileInfo().getCreatedTimestamp();
1958       count++;
1959     }
1960     if (count == 0) {
1961       return 0;
1962     }
1963     long avgTS = sum / count;
1964     long now = EnvironmentEdgeManager.currentTime();
1965     return now - avgTS;
1966   }
1967
1968   @Override
1969   public long getNumReferenceFiles() {
1970     long numRefFiles = 0;
1971     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1972       if (s.isReference()) {
1973         numRefFiles++;
1974       }
1975     }
1976     return numRefFiles;
1977   }
1978
1979   @Override
1980   public long getNumHFiles() {
1981     long numHFiles = 0;
1982     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1983       if (s.isHFile()) {
1984         numHFiles++;
1985       }
1986     }
1987     return numHFiles;
1988   }
1989
1990   @Override
1991   public long getStoreSizeUncompressed() {
1992     return this.totalUncompressedBytes;
1993   }
1994
1995   @Override
1996   public long getStorefilesSize() {
1997     long size = 0;
1998     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1999       StoreFileReader r = s.getReader();
2000       if (r == null) {
2001         LOG.warn("StoreFile " + s + " has a null Reader");
2002         continue;
2003       }
2004       size += r.length();
2005     }
2006     return size;
2007   }
2008
2009   @Override
2010   public long getStorefilesIndexSize() {
2011     long size = 0;
2012     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
2013       StoreFileReader r = s.getReader();
2014       if (r == null) {
2015         LOG.warn("StoreFile " + s + " has a null Reader");
2016         continue;
2017       }
2018       size += r.indexSize();
2019     }
2020     return size;
2021   }
2022
2023   @Override
2024   public long getTotalStaticIndexSize() {
2025     long size = 0;
2026     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2027       StoreFileReader r = s.getReader();
2028       if (r == null) {
2029         continue;
2030       }
2031       size += r.getUncompressedDataIndexSize();
2032     }
2033     return size;
2034   }
2035
2036   @Override
2037   public long getTotalStaticBloomSize() {
2038     long size = 0;
2039     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2040       StoreFileReader r = s.getReader();
2041       if (r == null) {
2042         continue;
2043       }
2044       size += r.getTotalBloomSize();
2045     }
2046     return size;
2047   }
2048
2049   @Override
2050   public long getMemStoreSize() {
2051     return this.memstore.size();
2052   }
2053
2054   @Override
2055   public int getCompactPriority() {
2056     int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
2057     if (priority == PRIORITY_USER) {
2058       LOG.warn("Compaction priority is USER despite there being no user compaction");
2059     }
2060     return priority;
2061   }
2062
2063   @Override
2064   public boolean throttleCompaction(long compactionSize) {
2065     return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
2066   }
2067
2068   public HRegion getHRegion() {
2069     return this.region;
2070   }
2071
2072   @Override
2073   public RegionCoprocessorHost getCoprocessorHost() {
2074     return this.region.getCoprocessorHost();
2075   }
2076
2077   @Override
2078   public HRegionInfo getRegionInfo() {
2079     return this.fs.getRegionInfo();
2080   }
2081
2082   @Override
2083   public boolean areWritesEnabled() {
2084     return this.region.areWritesEnabled();
2085   }
2086
2087   @Override
2088   public long getSmallestReadPoint() {
2089     return this.region.getSmallestReadPoint();
2090   }
2091
2092   /**
2093    * Updates the value for the given row/family/qualifier. This function will always be seen as
2094    * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
2095    * control necessary.
2096    * @param row row to update
2097    * @param f family to update
2098    * @param qualifier qualifier to update
2099    * @param newValue the new value to set into memstore
2100    * @return memstore size delta
2101    * @throws IOException
2102    */
2103   @VisibleForTesting
2104   public long updateColumnValue(byte [] row, byte [] f,
2105                                 byte [] qualifier, long newValue)
2106       throws IOException {
2107
2108     this.lock.readLock().lock();
2109     try {
2110       long now = EnvironmentEdgeManager.currentTime();
2111
2112       return this.memstore.updateColumnValue(row,
2113           f,
2114           qualifier,
2115           newValue,
2116           now);
2117
2118     } finally {
2119       this.lock.readLock().unlock();
2120     }
2121   }
2122
2123   @Override
2124   public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
2125     this.lock.readLock().lock();
2126     try {
2127       return this.memstore.upsert(cells, readpoint);
2128     } finally {
2129       this.lock.readLock().unlock();
2130     }
2131   }
2132 
2133   @Override
2134   public StoreFlushContext createFlushContext(long cacheFlushId) {
2135     return new StoreFlusherImpl(cacheFlushId);
2136   }
2137
2138   private final class StoreFlusherImpl implements StoreFlushContext {
2139
2140     private long cacheFlushSeqNum;
2141     private MemStoreSnapshot snapshot;
2142     private List<Path> tempFiles;
2143     private List<Path> committedFiles;
2144     private long cacheFlushCount;
2145     private long cacheFlushSize;
2146     private long outputFileSize;
2147
2148     private StoreFlusherImpl(long cacheFlushSeqNum) {
2149       this.cacheFlushSeqNum = cacheFlushSeqNum;
2150     }
2151
2152     /**
2153      * This is not thread safe. The caller should have a lock on the region or the store.
2154      * If necessary, the lock can be added with the patch provided in HBASE-10087
2155      */
2156     @Override
2157     public void prepare() {
2158       // passing the current sequence number of the wal - to allow bookkeeping in the memstore
2159       this.snapshot = memstore.snapshot();
2160       this.cacheFlushCount = snapshot.getCellsCount();
2161       this.cacheFlushSize = snapshot.getSize();
2162       committedFiles = new ArrayList<Path>(1);
2163     }
2164
2165     @Override
2166     public void flushCache(MonitoredTask status) throws IOException {
2167       RegionServerServices rsService = region.getRegionServerServices();
2168       ThroughputController throughputController =
2169           rsService == null ? null : rsService.getFlushThroughputController();
2170       tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController);
2171     }
2172
2173     @Override
2174     public boolean commit(MonitoredTask status) throws IOException {
2175       if (this.tempFiles == null || this.tempFiles.isEmpty()) {
2176         return false;
2177       }
2178       List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
2179       for (Path storeFilePath : tempFiles) {
2180         try {
2181           StoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status);
2182           outputFileSize += sf.getReader().length();
2183           storeFiles.add(sf);
2184         } catch (IOException ex) {
2185           LOG.error("Failed to commit store file " + storeFilePath, ex);
2186           // Try to delete the files we have committed before.
2187           for (StoreFile sf : storeFiles) {
2188             Path pathToDelete = sf.getPath();
2189             try {
2190               sf.deleteReader();
2191             } catch (IOException deleteEx) {
2192               LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
2193               Runtime.getRuntime().halt(1);
2194             }
2195           }
2196           throw new IOException("Failed to commit the flush", ex);
2197         }
2198       }
2199 
2200       for (StoreFile sf : storeFiles) {
2201         if (HStore.this.getCoprocessorHost() != null) {
2202           HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
2203         }
2204         committedFiles.add(sf.getPath());
2205       }
2206
2207       HStore.this.flushedCellsCount += cacheFlushCount;
2208       HStore.this.flushedCellsSize += cacheFlushSize;
2209       HStore.this.flushedOutputFileSize += outputFileSize;
2210
2211       // Add new file to store files.  Clear snapshot too while we have the Store write lock.
2212       return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
2213     }
2214
2215     @Override
2216     public long getOutputFileSize() {
2217       return outputFileSize;
2218     }
2219
2220     @Override
2221     public List<Path> getCommittedFiles() {
2222       return committedFiles;
2223     }
2224
2225     /**
2226      * Similar to commit, but called in secondary region replicas for replaying the
2227      * flush cache from primary region. Adds the new files to the store, and drops the
2228      * snapshot depending on dropMemstoreSnapshot argument.
2229      * @param fileNames names of the flushed files
2230      * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot
2231      * @throws IOException
2232      */
2233     @Override
2234     public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
2235         throws IOException {
2236       List<StoreFile> storeFiles = new ArrayList<StoreFile>(fileNames.size());
2237       for (String file : fileNames) {
2238         // open the file as a store file (hfile link, etc)
2239         StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
2240         StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
2241         storeFiles.add(storeFile);
2242         HStore.this.storeSize += storeFile.getReader().length();
2243         HStore.this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
2244         if (LOG.isInfoEnabled()) {
2245           LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() +
2246             " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() +
2247             ", sequenceid=" +  + storeFile.getReader().getSequenceID() +
2248             ", filesize=" + StringUtils.humanReadableInt(storeFile.getReader().length()));
2249         }
2250       }
2251 
2252       long snapshotId = -1; // -1 means do not drop
2253       if (dropMemstoreSnapshot && snapshot != null) {
2254         snapshotId = snapshot.getId();
2255       }
2256       HStore.this.updateStorefiles(storeFiles, snapshotId);
2257     }
2258
2259     /**
2260      * Abort the snapshot preparation. Drops the snapshot if any.
2261      * @throws IOException
2262      */
2263     @Override
2264     public void abort() throws IOException {
2265       if (snapshot == null) {
2266         return;
2267       }
2268       HStore.this.updateStorefiles(new ArrayList<StoreFile>(0), snapshot.getId());
2269     }
2270   }
2271
2272   @Override
2273   public boolean needsCompaction() {
2274     return this.storeEngine.needsCompaction(this.filesCompacting);
2275   }
2276
2277   @Override
2278   public CacheConfig getCacheConfig() {
2279     return this.cacheConf;
2280   }
2281
2282   public static final long FIXED_OVERHEAD =
2283       ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (11 * Bytes.SIZEOF_LONG)
2284               + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
2285
2286   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
2287       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
2288       + ClassSize.CONCURRENT_SKIPLISTMAP
2289       + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
2290       + ScanInfo.FIXED_OVERHEAD);
2291
2292   @Override
2293   public long heapSize() {
2294     return DEEP_OVERHEAD + this.memstore.heapSize();
2295   }
2296
2297   @Override
2298   public CellComparator getComparator() {
2299     return comparator;
2300   }
2301
2302   @Override
2303   public ScanInfo getScanInfo() {
2304     return scanInfo;
2305   }
2306
2307   /**
2308    * Set scan info, used by test
2309    * @param scanInfo new scan info to use for test
2310    */
2311   void setScanInfo(ScanInfo scanInfo) {
2312     this.scanInfo = scanInfo;
2313   }
2314
2315   @Override
2316   public boolean hasTooManyStoreFiles() {
2317     return getStorefilesCount() > this.blockingFileCount;
2318   }
2319
2320   @Override
2321   public long getFlushedCellsCount() {
2322     return flushedCellsCount;
2323   }
2324
2325   @Override
2326   public long getFlushedCellsSize() {
2327     return flushedCellsSize;
2328   }
2329
2330   @Override
2331   public long getFlushedOutputFileSize() {
2332     return flushedOutputFileSize;
2333   }
2334
2335   @Override
2336   public long getCompactedCellsCount() {
2337     return compactedCellsCount;
2338   }
2339
2340   @Override
2341   public long getCompactedCellsSize() {
2342     return compactedCellsSize;
2343   }
2344
2345   @Override
2346   public long getMajorCompactedCellsCount() {
2347     return majorCompactedCellsCount;
2348   }
2349
2350   @Override
2351   public long getMajorCompactedCellsSize() {
2352     return majorCompactedCellsSize;
2353   }
2354
2355   /**
2356    * Returns the StoreEngine that is backing this concrete implementation of Store.
2357    * @return Returns the {@link StoreEngine} object used internally inside this HStore object.
2358    */
2359   @VisibleForTesting
2360   public StoreEngine<?, ?, ?, ?> getStoreEngine() {
2361     return this.storeEngine;
2362   }
2363
2364   protected OffPeakHours getOffPeakHours() {
2365     return this.offPeakHours;
2366   }
2367
2368   /**
2369    * {@inheritDoc}
2370    */
2371   @Override
2372   public void onConfigurationChange(Configuration conf) {
2373     this.conf = new CompoundConfiguration()
2374             .add(conf)
2375             .addBytesMap(family.getValues());
2376     this.storeEngine.compactionPolicy.setConf(conf);
2377     this.offPeakHours = OffPeakHours.getInstance(conf);
2378   }
2379
2380   /**
2381    * {@inheritDoc}
2382    */
2383   @Override
2384   public void registerChildren(ConfigurationManager manager) {
2385     // No children to register
2386   }
2387
2388   /**
2389    * {@inheritDoc}
2390    */
2391   @Override
2392   public void deregisterChildren(ConfigurationManager manager) {
2393     // No children to deregister
2394   }
2395
2396   @Override
2397   public double getCompactionPressure() {
2398     return storeEngine.getStoreFileManager().getCompactionPressure();
2399   }
2400
2401   @Override
2402   public boolean isPrimaryReplicaStore() {
2403 	   return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
2404   }
2405
2406   @Override
2407   public void closeAndArchiveCompactedFiles() throws IOException {
2408     lock.readLock().lock();
2409     Collection<StoreFile> copyCompactedfiles = null;
2410     try {
2411       Collection<StoreFile> compactedfiles =
2412           this.getStoreEngine().getStoreFileManager().getCompactedfiles();
2413       if (compactedfiles != null && compactedfiles.size() != 0) {
2414         // Do a copy under read lock
2415         copyCompactedfiles = new ArrayList<StoreFile>(compactedfiles);
2416       } else {
2417         if (LOG.isTraceEnabled()) {
2418           LOG.trace("No compacted files to archive");
2419           return;
2420         }
2421       }
2422     } finally {
2423       lock.readLock().unlock();
2424     }
2425     if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
2426       removeCompactedfiles(copyCompactedfiles);
2427     }
2428   }
2429
2430   /**
2431    * Archives and removes the compacted files
2432    * @param compactedfiles The compacted files in this store that are not active in reads
2433    * @throws IOException
2434    */
2435   private void removeCompactedfiles(Collection<StoreFile> compactedfiles)
2436       throws IOException {
2437     final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
2438     for (final StoreFile file : compactedfiles) {
2439       synchronized (file) {
2440         try {
2441           StoreFileReader r = file.getReader();
2442           if (r == null) {
2443             if (LOG.isDebugEnabled()) {
2444               LOG.debug("The file " + file + " was closed but still not archived.");
2445             }
2446             filesToRemove.add(file);
2447           }
2448           if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
2449             // Even if deleting fails we need not bother as any new scanners won't be
2450             // able to use the compacted file as the status is already compactedAway
2451             if (LOG.isTraceEnabled()) {
2452               LOG.trace("Closing and archiving the file " + file.getPath());
2453             }
2454             r.close(true);
2455             // Just close and return
2456             filesToRemove.add(file);
2457           }
2458         } catch (Exception e) {
2459           LOG.error(
2460             "Exception while trying to close the compacted store file " + file.getPath().getName());
2461         }
2462       }
2463     }
2464     if (this.isPrimaryReplicaStore()) {
2465       // Only the primary region is allowed to move the file to archive.
2466       // The secondary region does not move the files to archive. Any active reads from
2467       // the secondary region will still work because the file as such has active readers on it.
2468       if (!filesToRemove.isEmpty()) {
2469         if (LOG.isDebugEnabled()) {
2470           LOG.debug("Moving the files " + filesToRemove + " to archive");
2471         }
2472         // Only if this is successful it has to be removed
2473         this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToRemove);
2474       }
2475     }
2476     if (!filesToRemove.isEmpty()) {
2477       // Clear the compactedfiles from the store file manager
2478       clearCompactedfiles(filesToRemove);
2479     }
2480   }
2481
2482   @Override public void finalizeFlush() {
2483     memstore.finalizeFlush();
2484   }
2485
2486   @Override public MemStore getMemStore() {
2487     return memstore;
2488   }
2489
2490   private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException {
2491     if (LOG.isTraceEnabled()) {
2492       LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");
2493     }
2494     try {
2495       lock.writeLock().lock();
2496       this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);
2497     } finally {
2498       lock.writeLock().unlock();
2499     }
2500   }
2501 }