1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.InetSocketAddress;
24  import java.security.Key;
25  import java.security.KeyException;
26  import java.security.PrivilegedExceptionAction;
27  import java.util.ArrayList;
28  import java.util.Collection;
29  import java.util.Collections;
30  import java.util.HashMap;
31  import java.util.HashSet;
32  import java.util.Iterator;
33  import java.util.List;
34  import java.util.NavigableSet;
35  import java.util.Set;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.CompletionService;
38  import java.util.concurrent.ConcurrentHashMap;
39  import java.util.concurrent.ExecutionException;
40  import java.util.concurrent.ExecutorCompletionService;
41  import java.util.concurrent.Future;
42  import java.util.concurrent.ThreadPoolExecutor;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  import java.util.concurrent.locks.ReentrantReadWriteLock;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.hbase.Cell;
52  import org.apache.hadoop.hbase.CellComparator;
53  import org.apache.hadoop.hbase.CellUtil;
54  import org.apache.hadoop.hbase.CompoundConfiguration;
55  import org.apache.hadoop.hbase.HColumnDescriptor;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.KeyValue;
59  import org.apache.hadoop.hbase.RemoteExceptionHandler;
60  import org.apache.hadoop.hbase.TableName;
61  import org.apache.hadoop.hbase.Tag;
62  import org.apache.hadoop.hbase.TagType;
63  import org.apache.hadoop.hbase.classification.InterfaceAudience;
64  import org.apache.hadoop.hbase.client.Scan;
65  import org.apache.hadoop.hbase.conf.ConfigurationManager;
66  import org.apache.hadoop.hbase.io.compress.Compression;
67  import org.apache.hadoop.hbase.io.crypto.Cipher;
68  import org.apache.hadoop.hbase.io.crypto.Encryption;
69  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
70  import org.apache.hadoop.hbase.io.hfile.HFile;
71  import org.apache.hadoop.hbase.io.hfile.HFileContext;
72  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
73  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
74  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
75  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
76  import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
77  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
78  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
79  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
80  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
81  import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
82  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
83  import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
84  import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
85  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
86  import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
87  import org.apache.hadoop.hbase.security.EncryptionUtil;
88  import org.apache.hadoop.hbase.security.User;
89  import org.apache.hadoop.hbase.util.Bytes;
90  import org.apache.hadoop.hbase.util.ChecksumType;
91  import org.apache.hadoop.hbase.util.ClassSize;
92  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
93  import org.apache.hadoop.hbase.util.Pair;
94  import org.apache.hadoop.hbase.util.ReflectionUtils;
95  import org.apache.hadoop.util.StringUtils;
96  import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
97  
98  import com.google.common.annotations.VisibleForTesting;
99  import com.google.common.base.Preconditions;
100 import com.google.common.collect.ImmutableCollection;
101 import com.google.common.collect.ImmutableList;
102 import com.google.common.collect.Lists;
103 import com.google.common.collect.Sets;
104 
105 
106 
107 
108 
109 
110 
111 
112 
113 
114 
115 
116 
117 
118 
119 
120 
121 
122 
123 
124 
125 
126 
127 
128 @InterfaceAudience.Private
129 public class HStore implements Store {
130   private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
131   public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
132       "hbase.server.compactchecker.interval.multiplier";
133   public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
134   public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
135   public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
136 
137   static final Log LOG = LogFactory.getLog(HStore.class);
138 
139   protected final MemStore memstore;
140   
141   private final HRegion region;
142   private final HColumnDescriptor family;
143   private final HRegionFileSystem fs;
144   private Configuration conf;
145   private final CacheConfig cacheConf;
146   private long lastCompactSize = 0;
147   volatile boolean forceMajor = false;
148   
149   static int closeCheckInterval = 0;
150   private volatile long storeSize = 0L;
151   private volatile long totalUncompressedBytes = 0L;
152 
153   
154 
155 
156 
157 
158 
159 
160 
161 
162   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
163   private final boolean verifyBulkLoads;
164 
165   private ScanInfo scanInfo;
166 
167   
168   final List<StoreFile> filesCompacting = Lists.newArrayList();
169 
170   
171   private final Set<ChangedReadersObserver> changedReaderObservers =
172     Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
173 
174   private final int blocksize;
175   private HFileDataBlockEncoder dataBlockEncoder;
176 
177   
178   private ChecksumType checksumType;
179   private int bytesPerChecksum;
180 
181   
182   private final KeyValue.KVComparator comparator;
183 
184   final StoreEngine<?, ?, ?, ?> storeEngine;
185 
186   private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
187   private volatile OffPeakHours offPeakHours;
188 
189   private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
190   private int flushRetriesNumber;
191   private int pauseTime;
192 
193   private long blockingFileCount;
194   private int compactionCheckMultiplier;
195 
196   private Encryption.Context cryptoContext = Encryption.Context.NONE;
197 
198   private volatile long flushedCellsCount = 0;
199   private volatile long compactedCellsCount = 0;
200   private volatile long majorCompactedCellsCount = 0;
201   private volatile long flushedCellsSize = 0;
202   private volatile long compactedCellsSize = 0;
203   private volatile long majorCompactedCellsSize = 0;
204 
205   
206 
207 
208 
209 
210 
211 
212 
213   protected HStore(final HRegion region, final HColumnDescriptor family,
214       final Configuration confParam) throws IOException {
215 
216     HRegionInfo info = region.getRegionInfo();
217     this.fs = region.getRegionFileSystem();
218 
219     
220     fs.createStoreDir(family.getNameAsString());
221     this.region = region;
222     this.family = family;
223     
224     
225     
226     this.conf = new CompoundConfiguration()
227       .add(confParam)
228       .addStringMap(region.getTableDesc().getConfiguration())
229       .addStringMap(family.getConfiguration())
230       .addWritableMap(family.getValues());
231     this.blocksize = family.getBlocksize();
232 
233     this.dataBlockEncoder =
234         new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
235 
236     this.comparator = info.getComparator();
237     
238     long timeToPurgeDeletes =
239         Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
240     LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
241         "ms in store " + this);
242     
243     long ttl = determineTTLFromFamily(family);
244     
245     
246     scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
247     String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
248     this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
249         Configuration.class, KeyValue.KVComparator.class }, new Object[] { conf, this.comparator });
250     this.offPeakHours = OffPeakHours.getInstance(conf);
251 
252     
253     this.cacheConf = new CacheConfig(conf, family);
254 
255     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
256 
257     this.blockingFileCount =
258         conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
259     this.compactionCheckMultiplier = conf.getInt(
260         COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
261     if (this.compactionCheckMultiplier <= 0) {
262       LOG.error("Compaction check period multiplier must be positive, setting default: "
263           + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
264       this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
265     }
266 
267     if (HStore.closeCheckInterval == 0) {
268       HStore.closeCheckInterval = conf.getInt(
269           "hbase.hstore.close.check.interval", 10*1000*1000 
270     }
271 
272     this.storeEngine = StoreEngine.create(this, this.conf, this.comparator);
273     this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
274 
275     
276     this.checksumType = getChecksumType(conf);
277     
278     this.bytesPerChecksum = getBytesPerChecksum(conf);
279     flushRetriesNumber = conf.getInt(
280         "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
281     pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
282     if (flushRetriesNumber <= 0) {
283       throw new IllegalArgumentException(
284           "hbase.hstore.flush.retries.number must be > 0, not "
285               + flushRetriesNumber);
286     }
287 
288     
289     String cipherName = family.getEncryptionType();
290     if (cipherName != null) {
291       Cipher cipher;
292       Key key;
293       byte[] keyBytes = family.getEncryptionKey();
294       if (keyBytes != null) {
295         
296         String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
297           User.getCurrent().getShortName());
298         try {
299           
300           key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
301         } catch (KeyException e) {
302           
303           
304           if (LOG.isDebugEnabled()) {
305             LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
306           }
307           String alternateKeyName =
308             conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
309           if (alternateKeyName != null) {
310             try {
311               key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
312             } catch (KeyException ex) {
313               throw new IOException(ex);
314             }
315           } else {
316             throw new IOException(e);
317           }
318         }
319         
320         cipher = Encryption.getCipher(conf, key.getAlgorithm());
321         if (cipher == null) {
322           throw new RuntimeException("Cipher '" + key.getAlgorithm() + "' is not available");
323         }
324         
325         
326         
327         if (!cipher.getName().equalsIgnoreCase(cipherName)) {
328           throw new RuntimeException("Encryption for family '" + family.getNameAsString() +
329             "' configured with type '" + cipherName +
330             "' but key specifies algorithm '" + cipher.getName() + "'");
331         }
332       } else {
333         
334         cipher = Encryption.getCipher(conf, cipherName);
335         if (cipher == null) {
336           throw new RuntimeException("Cipher '" + cipherName + "' is not available");
337         }
338         key = cipher.getRandomKey();
339       }
340       cryptoContext = Encryption.newContext(conf);
341       cryptoContext.setCipher(cipher);
342       cryptoContext.setKey(key);
343     }
344   }
345 
346   
347 
348 
349 
350   private static long determineTTLFromFamily(final HColumnDescriptor family) {
351     
352     long ttl = family.getTimeToLive();
353     if (ttl == HConstants.FOREVER) {
354       
355       ttl = Long.MAX_VALUE;
356     } else if (ttl == -1) {
357       ttl = Long.MAX_VALUE;
358     } else {
359       
360       ttl *= 1000;
361     }
362     return ttl;
363   }
364 
365   @Override
366   public String getColumnFamilyName() {
367     return this.family.getNameAsString();
368   }
369 
370   @Override
371   public TableName getTableName() {
372     return this.getRegionInfo().getTable();
373   }
374 
375   @Override
376   public FileSystem getFileSystem() {
377     return this.fs.getFileSystem();
378   }
379 
380   public HRegionFileSystem getRegionFileSystem() {
381     return this.fs;
382   }
383 
384   
385   @Override
386   public long getStoreFileTtl() {
387     
388     return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
389   }
390 
391   @Override
392   public long getMemstoreFlushSize() {
393     
394     return this.region.memstoreFlushSize;
395   }
396 
397   @Override
398   public long getFlushableSize() {
399     return this.memstore.getFlushableSize();
400   }
401 
402   @Override
403   public long getSnapshotSize() {
404     return this.memstore.getSnapshotSize();
405   }
406 
407   @Override
408   public long getCompactionCheckMultiplier() {
409     return this.compactionCheckMultiplier;
410   }
411 
412   @Override
413   public long getBlockingFileCount() {
414     return blockingFileCount;
415   }
416   
417 
418   
419 
420 
421 
422 
423   public static int getBytesPerChecksum(Configuration conf) {
424     return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
425                        HFile.DEFAULT_BYTES_PER_CHECKSUM);
426   }
427 
428   
429 
430 
431 
432 
433   public static ChecksumType getChecksumType(Configuration conf) {
434     String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
435     if (checksumName == null) {
436       return HFile.DEFAULT_CHECKSUM_TYPE;
437     } else {
438       return ChecksumType.nameToType(checksumName);
439     }
440   }
441 
442   
443 
444 
445   public static int getCloseCheckInterval() {
446     return closeCheckInterval;
447   }
448 
449   @Override
450   public HColumnDescriptor getFamily() {
451     return this.family;
452   }
453 
454   
455 
456 
457   @Override
458   public long getMaxSequenceId() {
459     return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
460   }
461 
462   @Override
463   public long getMaxMemstoreTS() {
464     return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
465   }
466 
467   
468 
469 
470 
471 
472 
473   @Deprecated
474   public static Path getStoreHomedir(final Path tabledir,
475       final HRegionInfo hri, final byte[] family) {
476     return getStoreHomedir(tabledir, hri.getEncodedName(), family);
477   }
478 
479   
480 
481 
482 
483 
484 
485   @Deprecated
486   public static Path getStoreHomedir(final Path tabledir,
487       final String encodedName, final byte[] family) {
488     return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
489   }
490 
491   @Override
492   public HFileDataBlockEncoder getDataBlockEncoder() {
493     return dataBlockEncoder;
494   }
495 
496   
497 
498 
499 
500   void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
501     this.dataBlockEncoder = blockEncoder;
502   }
503 
504   
505 
506 
507 
508 
509   private List<StoreFile> loadStoreFiles() throws IOException {
510     Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
511     return openStoreFiles(files);
512   }
513 
514   private List<StoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
515     if (files == null || files.size() == 0) {
516       return new ArrayList<StoreFile>();
517     }
518     
519     ThreadPoolExecutor storeFileOpenerThreadPool =
520       this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
521           this.getColumnFamilyName());
522     CompletionService<StoreFile> completionService =
523       new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
524 
525     int totalValidStoreFile = 0;
526     for (final StoreFileInfo storeFileInfo: files) {
527       
528       completionService.submit(new Callable<StoreFile>() {
529         @Override
530         public StoreFile call() throws IOException {
531           StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
532           return storeFile;
533         }
534       });
535       totalValidStoreFile++;
536     }
537 
538     ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
539     IOException ioe = null;
540     try {
541       for (int i = 0; i < totalValidStoreFile; i++) {
542         try {
543           Future<StoreFile> future = completionService.take();
544           StoreFile storeFile = future.get();
545           long length = storeFile.getReader().length();
546           this.storeSize += length;
547           this.totalUncompressedBytes +=
548               storeFile.getReader().getTotalUncompressedBytes();
549           if (LOG.isDebugEnabled()) {
550             LOG.debug("loaded " + storeFile.toStringDetailed());
551           }
552           results.add(storeFile);
553         } catch (InterruptedException e) {
554           if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
555         } catch (ExecutionException e) {
556           if (ioe == null) ioe = new IOException(e.getCause());
557         }
558       }
559     } finally {
560       storeFileOpenerThreadPool.shutdownNow();
561     }
562     if (ioe != null) {
563       
564       for (StoreFile file : results) {
565         try {
566           if (file != null) file.closeReader(true);
567         } catch (IOException e) {
568           LOG.warn(e.getMessage());
569         }
570       }
571       throw ioe;
572     }
573 
574     return results;
575   }
576 
577   
578 
579 
580 
581 
582 
583 
584   @Override
585   public void refreshStoreFiles() throws IOException {
586     Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName());
587     refreshStoreFilesInternal(newFiles);
588   }
589 
590   @Override
591   public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
592     List<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(newFiles.size());
593     for (String file : newFiles) {
594       storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file));
595     }
596     refreshStoreFilesInternal(storeFiles);
597   }
598 
599   
600 
601 
602 
603 
604 
605 
606   private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
607     StoreFileManager sfm = storeEngine.getStoreFileManager();
608     Collection<StoreFile> currentFiles = sfm.getStorefiles();
609     if (currentFiles == null) currentFiles = new ArrayList<StoreFile>(0);
610 
611     if (newFiles == null) newFiles = new ArrayList<StoreFileInfo>(0);
612 
613     HashMap<StoreFileInfo, StoreFile> currentFilesSet = new HashMap<StoreFileInfo, StoreFile>(currentFiles.size());
614     for (StoreFile sf : currentFiles) {
615       currentFilesSet.put(sf.getFileInfo(), sf);
616     }
617     HashSet<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
618 
619     Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
620     Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
621 
622     if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
623       return;
624     }
625 
626     LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString()
627       + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
628 
629     Set<StoreFile> toBeRemovedStoreFiles = new HashSet<StoreFile>(toBeRemovedFiles.size());
630     for (StoreFileInfo sfi : toBeRemovedFiles) {
631       toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
632     }
633 
634     
635     List<StoreFile> openedFiles = openStoreFiles(toBeAddedFiles);
636 
637     
638     replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); 
639 
640     
641     
642     
643     if (!toBeAddedFiles.isEmpty()) {
644       region.getMVCC().advanceMemstoreReadPointIfNeeded(this.getMaxSequenceId());
645     }
646 
647     
648     completeCompaction(toBeRemovedStoreFiles, false);
649   }
650 
651   private StoreFile createStoreFileAndReader(final Path p) throws IOException {
652     StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
653     return createStoreFileAndReader(info);
654   }
655 
656   private StoreFile createStoreFileAndReader(final StoreFileInfo info)
657       throws IOException {
658     info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
659     StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
660       this.family.getBloomFilterType());
661     storeFile.createReader();
662     return storeFile;
663   }
664 
665   @Override
666   public Pair<Long, Cell> add(final Cell cell) {
667     lock.readLock().lock();
668     try {
669        return this.memstore.add(cell);
670     } finally {
671       lock.readLock().unlock();
672     }
673   }
674 
675   @Override
676   public long timeOfOldestEdit() {
677     return memstore.timeOfOldestEdit();
678   }
679 
680   
681 
682 
683 
684 
685 
686   protected long delete(final KeyValue kv) {
687     lock.readLock().lock();
688     try {
689       return this.memstore.delete(kv);
690     } finally {
691       lock.readLock().unlock();
692     }
693   }
694 
695   @Override
696   public void rollback(final Cell cell) {
697     lock.readLock().lock();
698     try {
699       this.memstore.rollback(cell);
700     } finally {
701       lock.readLock().unlock();
702     }
703   }
704 
705   
706 
707 
708   @Override
709   public Collection<StoreFile> getStorefiles() {
710     return this.storeEngine.getStoreFileManager().getStorefiles();
711   }
712 
713   @Override
714   public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
715     HFile.Reader reader  = null;
716     try {
717       LOG.info("Validating hfile at " + srcPath + " for inclusion in "
718           + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
719       reader = HFile.createReader(srcPath.getFileSystem(conf),
720           srcPath, cacheConf, conf);
721       reader.loadFileInfo();
722 
723       byte[] firstKey = reader.getFirstRowKey();
724       Preconditions.checkState(firstKey != null, "First key can not be null");
725       byte[] lk = reader.getLastKey();
726       Preconditions.checkState(lk != null, "Last key can not be null");
727       byte[] lastKey =  KeyValue.createKeyValueFromKey(lk).getRow();
728 
729       LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
730           " last=" + Bytes.toStringBinary(lastKey));
731       LOG.debug("Region bounds: first=" +
732           Bytes.toStringBinary(getRegionInfo().getStartKey()) +
733           " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
734 
735       if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
736         throw new WrongRegionException(
737             "Bulk load file " + srcPath.toString() + " does not fit inside region "
738             + this.getRegionInfo().getRegionNameAsString());
739       }
740 
741       if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE,
742           HConstants.DEFAULT_MAX_FILE_SIZE)) {
743         LOG.warn("Trying to bulk load hfile " + srcPath.toString() + " with size: " +
744             reader.length() + " bytes can be problematic as it may lead to oversplitting.");
745       }
746 
747       if (verifyBulkLoads) {
748         long verificationStartTime = EnvironmentEdgeManager.currentTime();
749         LOG.info("Full verification started for bulk load hfile: " + srcPath.toString());
750         Cell prevCell = null;
751         HFileScanner scanner = reader.getScanner(false, false, false);
752         scanner.seekTo();
753         do {
754           Cell cell = scanner.getKeyValue();
755           if (prevCell != null) {
756             if (CellComparator.compareRows(prevCell, cell) > 0) {
757               throw new InvalidHFileException("Previous row is greater than"
758                   + " current row: path=" + srcPath + " previous="
759                   + CellUtil.getCellKeyAsString(prevCell) + " current="
760                   + CellUtil.getCellKeyAsString(cell));
761             }
762             if (CellComparator.compareFamilies(prevCell, cell) != 0) {
763               throw new InvalidHFileException("Previous key had different"
764                   + " family compared to current key: path=" + srcPath
765                   + " previous="
766                   + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),
767                       prevCell.getFamilyLength())
768                   + " current="
769                   + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
770                       cell.getFamilyLength()));
771             }
772           }
773           prevCell = cell;
774         } while (scanner.next());
775       LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString()
776          + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime)
777          + " ms");
778       }
779     } finally {
780       if (reader != null) reader.close();
781     }
782   }
783 
784   @Override
785   public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
786     Path srcPath = new Path(srcPathStr);
787     Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
788 
789     LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
790         + dstPath + " - updating store file list.");
791 
792     StoreFile sf = createStoreFileAndReader(dstPath);
793     bulkLoadHFile(sf);
794 
795     LOG.info("Successfully loaded store file " + srcPath + " into store " + this
796         + " (new location: " + dstPath + ")");
797 
798     return dstPath;
799   }
800 
801   @Override
802   public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
803     StoreFile sf = createStoreFileAndReader(fileInfo);
804     bulkLoadHFile(sf);
805   }
806 
807   private void bulkLoadHFile(StoreFile sf) throws IOException {
808     StoreFile.Reader r = sf.getReader();
809     this.storeSize += r.length();
810     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
811 
812     
813     this.lock.writeLock().lock();
814     try {
815       this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
816     } finally {
817       
818       
819       
820       
821       
822       this.lock.writeLock().unlock();
823     }
824     notifyChangedReadersObservers();
825     LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
826     if (LOG.isTraceEnabled()) {
827       String traceMessage = "BULK LOAD time,size,store size,store files ["
828           + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
829           + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
830       LOG.trace(traceMessage);
831     }
832   }
833 
834   @Override
835   public ImmutableCollection<StoreFile> close() throws IOException {
836     this.lock.writeLock().lock();
837     try {
838       
839       ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
840 
841       if (!result.isEmpty()) {
842         
843         ThreadPoolExecutor storeFileCloserThreadPool = this.region
844             .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
845                 + this.getColumnFamilyName());
846 
847         
848         CompletionService<Void> completionService =
849           new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
850         for (final StoreFile f : result) {
851           completionService.submit(new Callable<Void>() {
852             @Override
853             public Void call() throws IOException {
854               boolean evictOnClose = 
855                   cacheConf != null? cacheConf.shouldEvictOnClose(): true; 
856               f.closeReader(evictOnClose);
857               return null;
858             }
859           });
860         }
861 
862         IOException ioe = null;
863         try {
864           for (int i = 0; i < result.size(); i++) {
865             try {
866               Future<Void> future = completionService.take();
867               future.get();
868             } catch (InterruptedException e) {
869               if (ioe == null) {
870                 ioe = new InterruptedIOException();
871                 ioe.initCause(e);
872               }
873             } catch (ExecutionException e) {
874               if (ioe == null) ioe = new IOException(e.getCause());
875             }
876           }
877         } finally {
878           storeFileCloserThreadPool.shutdownNow();
879         }
880         if (ioe != null) throw ioe;
881       }
882       LOG.info("Closed " + this);
883       return result;
884     } finally {
885       this.lock.writeLock().unlock();
886     }
887   }
888 
889   
890 
891 
892 
893 
894   void snapshot() {
895     this.lock.writeLock().lock();
896     try {
897       this.memstore.snapshot();
898     } finally {
899       this.lock.writeLock().unlock();
900     }
901   }
902 
903   
904 
905 
906 
907 
908 
909 
910 
911 
912   protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
913       MonitoredTask status) throws IOException {
914     
915     
916     
917     
918     
919     StoreFlusher flusher = storeEngine.getStoreFlusher();
920     IOException lastException = null;
921     for (int i = 0; i < flushRetriesNumber; i++) {
922       try {
923         List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status);
924         Path lastPathName = null;
925         try {
926           for (Path pathName : pathNames) {
927             lastPathName = pathName;
928             validateStoreFile(pathName);
929           }
930           return pathNames;
931         } catch (Exception e) {
932           LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
933           if (e instanceof IOException) {
934             lastException = (IOException) e;
935           } else {
936             lastException = new IOException(e);
937           }
938         }
939       } catch (IOException e) {
940         LOG.warn("Failed flushing store file, retrying num=" + i, e);
941         lastException = e;
942       }
943       if (lastException != null && i < (flushRetriesNumber - 1)) {
944         try {
945           Thread.sleep(pauseTime);
946         } catch (InterruptedException e) {
947           IOException iie = new InterruptedIOException();
948           iie.initCause(e);
949           throw iie;
950         }
951       }
952     }
953     throw lastException;
954   }
955 
956   
957 
958 
959 
960 
961 
962 
963   private StoreFile commitFile(final Path path, final long logCacheFlushId, MonitoredTask status)
964       throws IOException {
965     
966     Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
967 
968     status.setStatus("Flushing " + this + ": reopening flushed file");
969     StoreFile sf = createStoreFileAndReader(dstPath);
970 
971     StoreFile.Reader r = sf.getReader();
972     this.storeSize += r.length();
973     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
974 
975     if (LOG.isInfoEnabled()) {
976       LOG.info("Added " + sf + ", entries=" + r.getEntries() +
977         ", sequenceid=" + logCacheFlushId +
978         ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1));
979     }
980     return sf;
981   }
982 
983   
984 
985 
986 
987 
988 
989 
990 
991   @Override
992   public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
993       boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)
994   throws IOException {
995     final CacheConfig writerCacheConf;
996     if (isCompaction) {
997       
998       writerCacheConf = new CacheConfig(cacheConf);
999       writerCacheConf.setCacheDataOnWrite(false);
1000     } else {
1001       writerCacheConf = cacheConf;
1002     }
1003     InetSocketAddress[] favoredNodes = null;
1004     if (region.getRegionServerServices() != null) {
1005       favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
1006           region.getRegionInfo().getEncodedName());
1007     }
1008     HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
1009       cryptoContext);
1010     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
1011         this.getFileSystem())
1012             .withFilePath(fs.createTempName())
1013             .withComparator(comparator)
1014             .withBloomType(family.getBloomFilterType())
1015             .withMaxKeyCount(maxKeyCount)
1016             .withFavoredNodes(favoredNodes)
1017             .withFileContext(hFileContext)
1018             .build();
1019     return w;
1020   }
1021 
1022   private HFileContext createFileContext(Compression.Algorithm compression,
1023       boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) {
1024     if (compression == null) {
1025       compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
1026     }
1027     HFileContext hFileContext = new HFileContextBuilder()
1028                                 .withIncludesMvcc(includeMVCCReadpoint)
1029                                 .withIncludesTags(includesTag)
1030                                 .withCompression(compression)
1031                                 .withCompressTags(family.isCompressTags())
1032                                 .withChecksumType(checksumType)
1033                                 .withBytesPerCheckSum(bytesPerChecksum)
1034                                 .withBlockSize(blocksize)
1035                                 .withHBaseCheckSum(true)
1036                                 .withDataBlockEncoding(family.getDataBlockEncoding())
1037                                 .withEncryptionContext(cryptoContext)
1038                                 .withCreateTime(EnvironmentEdgeManager.currentTime())
1039                                 .build();
1040     return hFileContext;
1041   }
1042 
1043 
1044   
1045 
1046 
1047 
1048 
1049 
1050 
1051   private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
1052       throws IOException {
1053     this.lock.writeLock().lock();
1054     try {
1055       this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
1056       if (snapshotId > 0) {
1057         this.memstore.clearSnapshot(snapshotId);
1058       }
1059     } finally {
1060       
1061       
1062       
1063       
1064       
1065       this.lock.writeLock().unlock();
1066     }
1067 
1068     
1069     notifyChangedReadersObservers();
1070 
1071     if (LOG.isTraceEnabled()) {
1072       long totalSize = 0;
1073       for (StoreFile sf : sfs) {
1074         totalSize += sf.getReader().length();
1075       }
1076       String traceMessage = "FLUSH time,count,size,store size,store files ["
1077           + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
1078           + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
1079       LOG.trace(traceMessage);
1080     }
1081     return needsCompaction();
1082   }
1083 
1084   
1085 
1086 
1087 
1088   private void notifyChangedReadersObservers() throws IOException {
1089     for (ChangedReadersObserver o: this.changedReaderObservers) {
1090       o.updateReaders();
1091     }
1092   }
1093 
1094   
1095 
1096 
1097 
1098 
1099   @Override
1100   public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
1101       boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1102       byte[] stopRow, long readPt) throws IOException {
1103     Collection<StoreFile> storeFilesToScan;
1104     List<KeyValueScanner> memStoreScanners;
1105     this.lock.readLock().lock();
1106     try {
1107       storeFilesToScan =
1108           this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
1109       memStoreScanners = this.memstore.getScanners(readPt);
1110     } finally {
1111       this.lock.readLock().unlock();
1112     }
1113 
1114     
1115 
1116     
1117     
1118     
1119     List<StoreFileScanner> sfScanners = StoreFileScanner
1120       .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher,
1121         readPt);
1122     List<KeyValueScanner> scanners =
1123       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
1124     scanners.addAll(sfScanners);
1125     
1126     scanners.addAll(memStoreScanners);
1127     return scanners;
1128   }
1129 
1130   @Override
1131   public void addChangedReaderObserver(ChangedReadersObserver o) {
1132     this.changedReaderObservers.add(o);
1133   }
1134 
1135   @Override
1136   public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1137     
1138     this.changedReaderObservers.remove(o);
1139   }
1140 
1141   
1142   
1143   
1144 
1145   
1146 
1147 
1148 
1149 
1150 
1151 
1152 
1153 
1154 
1155 
1156 
1157 
1158 
1159 
1160 
1161 
1162 
1163 
1164 
1165 
1166 
1167 
1168 
1169 
1170 
1171 
1172 
1173 
1174 
1175 
1176 
1177 
1178 
1179 
1180 
1181 
1182 
1183 
1184 
1185 
1186 
1187 
1188   @Override
1189   public List<StoreFile> compact(CompactionContext compaction,
1190       CompactionThroughputController throughputController) throws IOException {
1191     return compact(compaction, throughputController, null);
1192   }
1193 
1194   @Override
1195   public List<StoreFile> compact(CompactionContext compaction,
1196     CompactionThroughputController throughputController, User user) throws IOException {
1197     assert compaction != null;
1198     List<StoreFile> sfs = null;
1199     CompactionRequest cr = compaction.getRequest();;
1200     try {
1201       
1202       
1203       
1204       long compactionStartTime = EnvironmentEdgeManager.currentTime();
1205       assert compaction.hasSelection();
1206       Collection<StoreFile> filesToCompact = cr.getFiles();
1207       assert !filesToCompact.isEmpty();
1208       synchronized (filesCompacting) {
1209         
1210         
1211         Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1212       }
1213 
1214       
1215       LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
1216           + this + " of " + this.getRegionInfo().getRegionNameAsString()
1217           + " into tmpdir=" + fs.getTempDir() + ", totalSize="
1218           + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
1219 
1220       
1221       List<Path> newFiles = compaction.compact(throughputController, user);
1222 
1223       
1224       if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
1225         LOG.warn("hbase.hstore.compaction.complete is set to false");
1226         sfs = new ArrayList<StoreFile>(newFiles.size());
1227         for (Path newFile : newFiles) {
1228           
1229           StoreFile sf = createStoreFileAndReader(newFile);
1230           sf.closeReader(true);
1231           sfs.add(sf);
1232         }
1233         return sfs;
1234       }
1235       
1236       sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
1237       writeCompactionWalRecord(filesToCompact, sfs);
1238       replaceStoreFiles(filesToCompact, sfs);
1239       if (cr.isMajor()) {
1240         majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
1241         majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
1242       } else {
1243         compactedCellsCount += getCompactionProgress().totalCompactingKVs;
1244         compactedCellsSize += getCompactionProgress().totalCompactedSize;
1245       }
1246       
1247       completeCompaction(filesToCompact, true); 
1248 
1249       logCompactionEndMessage(cr, sfs, compactionStartTime);
1250       return sfs;
1251     } finally {
1252       finishCompactionRequest(cr);
1253     }
1254   }
1255 
1256   private List<StoreFile> moveCompatedFilesIntoPlace(
1257       final CompactionRequest cr, List<Path> newFiles, User user) throws IOException {
1258     List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());
1259     for (Path newFile : newFiles) {
1260       assert newFile != null;
1261       final StoreFile sf = moveFileIntoPlace(newFile);
1262       if (this.getCoprocessorHost() != null) {
1263         final Store thisStore = this;
1264         if (user == null) {
1265           getCoprocessorHost().postCompact(thisStore, sf, cr);
1266         } else {
1267           try {
1268             user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1269               @Override
1270               public Void run() throws Exception {
1271                 getCoprocessorHost().postCompact(thisStore, sf, cr);
1272                 return null;
1273               }
1274             });
1275           } catch (InterruptedException ie) {
1276             InterruptedIOException iioe = new InterruptedIOException();
1277             iioe.initCause(ie);
1278             throw iioe;
1279           }
1280         }
1281       }
1282       assert sf != null;
1283       sfs.add(sf);
1284     }
1285     return sfs;
1286   }
1287 
1288   
1289   StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
1290     validateStoreFile(newFile);
1291     
1292     Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1293     return createStoreFileAndReader(destPath);
1294   }
1295 
1296   
1297 
1298 
1299 
1300 
1301   private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
1302       Collection<StoreFile> newFiles) throws IOException {
1303     if (region.getWAL() == null) return;
1304     List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());
1305     for (StoreFile f : filesCompacted) {
1306       inputPaths.add(f.getPath());
1307     }
1308     List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
1309     for (StoreFile f : newFiles) {
1310       outputPaths.add(f.getPath());
1311     }
1312     HRegionInfo info = this.region.getRegionInfo();
1313     CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1314         family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
1315     WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
1316         this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
1317   }
1318 
1319   @VisibleForTesting
1320   void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
1321       final Collection<StoreFile> result) throws IOException {
1322     this.lock.writeLock().lock();
1323     try {
1324       this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1325       filesCompacting.removeAll(compactedFiles); 
1326     } finally {
1327       this.lock.writeLock().unlock();
1328     }
1329   }
1330 
1331   
1332 
1333 
1334 
1335 
1336 
1337   private void logCompactionEndMessage(
1338       CompactionRequest cr, List<StoreFile> sfs, long compactionStartTime) {
1339     long now = EnvironmentEdgeManager.currentTime();
1340     StringBuilder message = new StringBuilder(
1341       "Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
1342       + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
1343       + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into ");
1344     if (sfs.isEmpty()) {
1345       message.append("none, ");
1346     } else {
1347       for (StoreFile sf: sfs) {
1348         message.append(sf.getPath().getName());
1349         message.append("(size=");
1350         message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
1351         message.append("), ");
1352       }
1353     }
1354     message.append("total size for store is ")
1355       .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize, "", 1))
1356       .append(". This selection was in queue for ")
1357       .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1358       .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1359       .append(" to execute.");
1360     LOG.info(message.toString());
1361     if (LOG.isTraceEnabled()) {
1362       int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1363       long resultSize = 0;
1364       for (StoreFile sf : sfs) {
1365         resultSize += sf.getReader().length();
1366       }
1367       String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1368         + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1369           + cr.getFiles().size() + "," + sfs.size() + "," +  storeSize + "," + fileCount + "]";
1370       LOG.trace(traceMessage);
1371     }
1372   }
1373 
1374   
1375 
1376 
1377 
1378 
1379 
1380   @Override
1381   public void replayCompactionMarker(CompactionDescriptor compaction,
1382       boolean pickCompactionFiles, boolean removeFiles)
1383       throws IOException {
1384     LOG.debug("Completing compaction from the WAL marker");
1385     List<String> compactionInputs = compaction.getCompactionInputList();
1386     List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
1387 
1388     
1389     
1390     
1391     
1392     
1393     
1394     
1395     
1396     
1397     
1398     
1399     
1400     
1401 
1402     String familyName = this.getColumnFamilyName();
1403     List<String> inputFiles = new ArrayList<String>(compactionInputs.size());
1404     for (String compactionInput : compactionInputs) {
1405       Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
1406       inputFiles.add(inputPath.getName());
1407     }
1408 
1409     
1410     List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
1411     for (StoreFile sf : this.getStorefiles()) {
1412       if (inputFiles.contains(sf.getPath().getName())) {
1413         inputStoreFiles.add(sf);
1414       }
1415     }
1416 
1417     
1418     List<StoreFile> outputStoreFiles = new ArrayList<StoreFile>(compactionOutputs.size());
1419 
1420     if (pickCompactionFiles) {
1421       for (StoreFile sf : this.getStorefiles()) {
1422         compactionOutputs.remove(sf.getPath().getName());
1423       }
1424       for (String compactionOutput : compactionOutputs) {
1425         StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput);
1426         StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
1427         outputStoreFiles.add(storeFile);
1428       }
1429     }
1430 
1431     if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
1432       LOG.info("Replaying compaction marker, replacing input files: " +
1433           inputStoreFiles + " with output files : " + outputStoreFiles);
1434       this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
1435       this.completeCompaction(inputStoreFiles, removeFiles);
1436     }
1437   }
1438 
1439   
1440 
1441 
1442 
1443 
1444 
1445 
1446   public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1447     List<StoreFile> filesToCompact;
1448     boolean isMajor;
1449 
1450     this.lock.readLock().lock();
1451     try {
1452       synchronized (filesCompacting) {
1453         filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1454         if (!filesCompacting.isEmpty()) {
1455           
1456           
1457           StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1458           int idx = filesToCompact.indexOf(last);
1459           Preconditions.checkArgument(idx != -1);
1460           filesToCompact.subList(0, idx + 1).clear();
1461         }
1462         int count = filesToCompact.size();
1463         if (N > count) {
1464           throw new RuntimeException("Not enough files");
1465         }
1466 
1467         filesToCompact = filesToCompact.subList(count - N, count);
1468         isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1469         filesCompacting.addAll(filesToCompact);
1470         Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1471       }
1472     } finally {
1473       this.lock.readLock().unlock();
1474     }
1475 
1476     try {
1477       
1478       List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
1479           .compactForTesting(filesToCompact, isMajor);
1480       for (Path newFile: newFiles) {
1481         
1482         StoreFile sf = moveFileIntoPlace(newFile);
1483         if (this.getCoprocessorHost() != null) {
1484           this.getCoprocessorHost().postCompact(this, sf, null);
1485         }
1486         replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
1487         completeCompaction(filesToCompact, true);
1488       }
1489     } finally {
1490       synchronized (filesCompacting) {
1491         filesCompacting.removeAll(filesToCompact);
1492       }
1493     }
1494   }
1495 
1496   @Override
1497   public boolean hasReferences() {
1498     return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
1499   }
1500 
1501   @Override
1502   public CompactionProgress getCompactionProgress() {
1503     return this.storeEngine.getCompactor().getProgress();
1504   }
1505 
1506   @Override
1507   public boolean isMajorCompaction() throws IOException {
1508     for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1509       
1510       if (sf.getReader() == null) {
1511         LOG.debug("StoreFile " + sf + " has null Reader");
1512         return false;
1513       }
1514     }
1515     return storeEngine.getCompactionPolicy().isMajorCompaction(
1516         this.storeEngine.getStoreFileManager().getStorefiles());
1517   }
1518 
1519   @Override
1520   public CompactionContext requestCompaction() throws IOException {
1521     return requestCompaction(Store.NO_PRIORITY, null);
1522   }
1523 
1524   @Override
1525   public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
1526       throws IOException {
1527     return requestCompaction(priority, baseRequest, null);
1528   }
1529   @Override
1530   public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
1531       User user) throws IOException {
1532     
1533     if (!this.areWritesEnabled()) {
1534       return null;
1535     }
1536 
1537     
1538     removeUnneededFiles();
1539 
1540     final CompactionContext compaction = storeEngine.createCompaction();
1541     CompactionRequest request = null;
1542     this.lock.readLock().lock();
1543     try {
1544       synchronized (filesCompacting) {
1545         final Store thisStore = this;
1546         
1547         if (this.getCoprocessorHost() != null) {
1548           final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1549           boolean override = false;
1550           if (user == null) {
1551             override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
1552               baseRequest);
1553           } else {
1554             try {
1555               override = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
1556                 @Override
1557                 public Boolean run() throws Exception {
1558                   return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc,
1559                     baseRequest);
1560                 }
1561               });
1562             } catch (InterruptedException ie) {
1563               InterruptedIOException iioe = new InterruptedIOException();
1564               iioe.initCause(ie);
1565               throw iioe;
1566             }
1567           }
1568           if (override) {
1569             
1570             compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
1571           }
1572         }
1573 
1574         
1575         if (!compaction.hasSelection()) {
1576           boolean isUserCompaction = priority == Store.PRIORITY_USER;
1577           boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1578               offPeakCompactionTracker.compareAndSet(false, true);
1579           try {
1580             compaction.select(this.filesCompacting, isUserCompaction,
1581               mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1582           } catch (IOException e) {
1583             if (mayUseOffPeak) {
1584               offPeakCompactionTracker.set(false);
1585             }
1586             throw e;
1587           }
1588           assert compaction.hasSelection();
1589           if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1590             
1591             offPeakCompactionTracker.set(false);
1592           }
1593         }
1594         if (this.getCoprocessorHost() != null) {
1595           if (user == null) {
1596             this.getCoprocessorHost().postCompactSelection(
1597               this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
1598           } else {
1599             try {
1600               user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1601                 @Override
1602                 public Void run() throws Exception {
1603                   getCoprocessorHost().postCompactSelection(
1604                     thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);
1605                   return null;
1606                 }
1607               });
1608             } catch (InterruptedException ie) {
1609               InterruptedIOException iioe = new InterruptedIOException();
1610               iioe.initCause(ie);
1611               throw iioe;
1612             }
1613           }
1614         }
1615 
1616         
1617         if (baseRequest != null) {
1618           
1619           
1620           compaction.forceSelect(
1621               baseRequest.combineWith(compaction.getRequest()));
1622         }
1623         
1624         request = compaction.getRequest();
1625         final Collection<StoreFile> selectedFiles = request.getFiles();
1626         if (selectedFiles.isEmpty()) {
1627           return null;
1628         }
1629 
1630         addToCompactingFiles(selectedFiles);
1631 
1632         
1633         this.forceMajor = this.forceMajor && !request.isMajor();
1634 
1635         
1636         
1637         request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1638         request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1639       }
1640     } finally {
1641       this.lock.readLock().unlock();
1642     }
1643 
1644     LOG.debug(getRegionInfo().getEncodedName() + " - "  + getColumnFamilyName()
1645         + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1646         + (request.isAllFiles() ? " (all files)" : ""));
1647     this.region.reportCompactionRequestStart(request.isMajor());
1648     return compaction;
1649   }
1650 
1651   
1652   private void addToCompactingFiles(final Collection<StoreFile> filesToAdd) {
1653     if (filesToAdd == null) return;
1654     
1655     if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1656       Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1657     }
1658     filesCompacting.addAll(filesToAdd);
1659     Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1660   }
1661 
1662   private void removeUnneededFiles() throws IOException {
1663     if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
1664     if (getFamily().getMinVersions() > 0) {
1665       LOG.debug("Skipping expired store file removal due to min version being " +
1666           getFamily().getMinVersions());
1667       return;
1668     }
1669     this.lock.readLock().lock();
1670     Collection<StoreFile> delSfs = null;
1671     try {
1672       synchronized (filesCompacting) {
1673         long cfTtl = getStoreFileTtl();
1674         if (cfTtl != Long.MAX_VALUE) {
1675           delSfs = storeEngine.getStoreFileManager().getUnneededFiles(
1676               EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
1677           addToCompactingFiles(delSfs);
1678         }
1679       }
1680     } finally {
1681       this.lock.readLock().unlock();
1682     }
1683     if (delSfs == null || delSfs.isEmpty()) return;
1684 
1685     Collection<StoreFile> newFiles = new ArrayList<StoreFile>(); 
1686     writeCompactionWalRecord(delSfs, newFiles);
1687     replaceStoreFiles(delSfs, newFiles);
1688     completeCompaction(delSfs);
1689     LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
1690         + this + " of " + this.getRegionInfo().getRegionNameAsString()
1691         + "; total size for store is " + TraditionalBinaryPrefix.long2String(storeSize, "", 1));
1692   }
1693 
1694   @Override
1695   public void cancelRequestedCompaction(CompactionContext compaction) {
1696     finishCompactionRequest(compaction.getRequest());
1697   }
1698 
1699   private void finishCompactionRequest(CompactionRequest cr) {
1700     this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1701     if (cr.isOffPeak()) {
1702       offPeakCompactionTracker.set(false);
1703       cr.setOffPeak(false);
1704     }
1705     synchronized (filesCompacting) {
1706       filesCompacting.removeAll(cr.getFiles());
1707     }
1708   }
1709 
1710   
1711 
1712 
1713 
1714 
1715 
1716   private void validateStoreFile(Path path)
1717       throws IOException {
1718     StoreFile storeFile = null;
1719     try {
1720       storeFile = createStoreFileAndReader(path);
1721     } catch (IOException e) {
1722       LOG.error("Failed to open store file : " + path
1723           + ", keeping it in tmp location", e);
1724       throw e;
1725     } finally {
1726       if (storeFile != null) {
1727         storeFile.closeReader(false);
1728       }
1729     }
1730   }
1731 
1732   
1733 
1734 
1735 
1736 
1737 
1738 
1739 
1740 
1741 
1742 
1743 
1744 
1745 
1746   @VisibleForTesting
1747   protected void completeCompaction(final Collection<StoreFile> compactedFiles)
1748     throws IOException {
1749     completeCompaction(compactedFiles, true);
1750   }
1751 
1752 
1753   
1754 
1755 
1756 
1757 
1758 
1759 
1760 
1761 
1762 
1763 
1764 
1765 
1766 
1767   @VisibleForTesting
1768   protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles)
1769       throws IOException {
1770     try {
1771       
1772       
1773       
1774       
1775       notifyChangedReadersObservers();
1776       
1777 
1778       
1779       LOG.debug("Removing store files after compaction...");
1780       for (StoreFile compactedFile : compactedFiles) {
1781         compactedFile.closeReader(true);
1782       }
1783       if (removeFiles) {
1784         this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
1785       }
1786     } catch (IOException e) {
1787       e = RemoteExceptionHandler.checkIOException(e);
1788       LOG.error("Failed removing compacted files in " + this +
1789         ". Files we were trying to remove are " + compactedFiles.toString() +
1790         "; some of them may have been already removed", e);
1791     }
1792 
1793     
1794     this.storeSize = 0L;
1795     this.totalUncompressedBytes = 0L;
1796     for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1797       StoreFile.Reader r = hsf.getReader();
1798       if (r == null) {
1799         LOG.warn("StoreFile " + hsf + " has a null Reader");
1800         continue;
1801       }
1802       this.storeSize += r.length();
1803       this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1804     }
1805   }
1806 
1807   
1808 
1809 
1810 
1811   int versionsToReturn(final int wantedVersions) {
1812     if (wantedVersions <= 0) {
1813       throw new IllegalArgumentException("Number of versions must be > 0");
1814     }
1815     
1816     int maxVersions = this.family.getMaxVersions();
1817     return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1818   }
1819 
1820   
1821 
1822 
1823 
1824 
1825   static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
1826     
1827     
1828     if (cell.getTagsLength() > 0) {
1829       
1830       
1831       
1832       Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
1833         cell.getTagsLength());
1834       while (i.hasNext()) {
1835         Tag t = i.next();
1836         if (TagType.TTL_TAG_TYPE == t.getType()) {
1837           
1838           
1839           long ts = cell.getTimestamp();
1840           assert t.getTagLength() == Bytes.SIZEOF_LONG;
1841           long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
1842           if (ts + ttl < now) {
1843             return true;
1844           }
1845           
1846           
1847           break;
1848         }
1849       }
1850     }
1851     return false;
1852   }
1853 
1854   @Override
1855   public Cell getRowKeyAtOrBefore(final byte[] row) throws IOException {
1856     
1857     
1858     
1859     
1860     
1861     
1862     long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.scanInfo.getTtl();
1863 
1864     KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1865 
1866     GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1867       this.comparator, kv, ttlToUse, this.getRegionInfo().isMetaRegion());
1868     this.lock.readLock().lock();
1869     try {
1870       
1871       this.memstore.getRowKeyAtOrBefore(state);
1872       
1873       
1874       Iterator<StoreFile> sfIterator = this.storeEngine.getStoreFileManager()
1875           .getCandidateFilesForRowKeyBefore(state.getTargetKey());
1876       while (sfIterator.hasNext()) {
1877         StoreFile sf = sfIterator.next();
1878         sfIterator.remove(); 
1879         boolean haveNewCandidate = rowAtOrBeforeFromStoreFile(sf, state);
1880         Cell candidate = state.getCandidate();
1881         
1882         if (candidate != null && CellUtil.matchingRow(candidate, row)) {
1883           return candidate;
1884         }
1885         if (haveNewCandidate) {
1886           sfIterator = this.storeEngine.getStoreFileManager().updateCandidateFilesForRowKeyBefore(
1887               sfIterator, state.getTargetKey(), candidate);
1888         }
1889       }
1890       return state.getCandidate();
1891     } finally {
1892       this.lock.readLock().unlock();
1893     }
1894   }
1895 
1896   
1897 
1898 
1899 
1900 
1901 
1902 
1903   private boolean rowAtOrBeforeFromStoreFile(final StoreFile f,
1904                                           final GetClosestRowBeforeTracker state)
1905       throws IOException {
1906     StoreFile.Reader r = f.getReader();
1907     if (r == null) {
1908       LOG.warn("StoreFile " + f + " has a null Reader");
1909       return false;
1910     }
1911     if (r.getEntries() == 0) {
1912       LOG.warn("StoreFile " + f + " is a empty store file");
1913       return false;
1914     }
1915     
1916     byte [] fk = r.getFirstKey();
1917     if (fk == null) return false;
1918     KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1919     byte [] lk = r.getLastKey();
1920     KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1921     KeyValue firstOnRow = state.getTargetKey();
1922     if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
1923       
1924       
1925       if (!state.isTargetTable(lastKV)) return false;
1926       
1927       
1928       firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
1929     }
1930     
1931     HFileScanner scanner = r.getScanner(true, true, false);
1932     
1933     if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1934     
1935     
1936     if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1937     
1938     while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
1939        firstOnRow.getKeyLength())) {
1940       Cell kv = scanner.getKeyValue();
1941       if (!state.isTargetTable(kv)) break;
1942       if (!state.isBetterCandidate(kv)) break;
1943       
1944       firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
1945       
1946       if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1947       
1948       if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1949     }
1950     return false;
1951   }
1952 
1953   
1954 
1955 
1956 
1957 
1958 
1959 
1960 
1961   private boolean seekToScanner(final HFileScanner scanner,
1962                                 final KeyValue firstOnRow,
1963                                 final KeyValue firstKV)
1964       throws IOException {
1965     KeyValue kv = firstOnRow;
1966     
1967     if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
1968     int result = scanner.seekTo(kv);
1969     return result != -1;
1970   }
1971 
1972   
1973 
1974 
1975 
1976 
1977 
1978 
1979 
1980 
1981 
1982   private boolean walkForwardInSingleRow(final HFileScanner scanner,
1983                                          final KeyValue firstOnRow,
1984                                          final GetClosestRowBeforeTracker state)
1985       throws IOException {
1986     boolean foundCandidate = false;
1987     do {
1988       Cell kv = scanner.getKeyValue();
1989       
1990       if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
1991       
1992       if (state.isTooFar(kv, firstOnRow)) break;
1993       if (state.isExpired(kv)) {
1994         continue;
1995       }
1996       
1997       if (state.handle(kv)) {
1998         foundCandidate = true;
1999         break;
2000       }
2001     } while(scanner.next());
2002     return foundCandidate;
2003   }
2004 
2005   @Override
2006   public boolean canSplit() {
2007     this.lock.readLock().lock();
2008     try {
2009       
2010       boolean result = !hasReferences();
2011       if (!result && LOG.isDebugEnabled()) {
2012         LOG.debug("Cannot split region due to reference files being there");
2013       }
2014       return result;
2015     } finally {
2016       this.lock.readLock().unlock();
2017     }
2018   }
2019 
2020   @Override
2021   public byte[] getSplitPoint() {
2022     this.lock.readLock().lock();
2023     try {
2024       
2025       assert !this.getRegionInfo().isMetaRegion();
2026       
2027       if (hasReferences()) {
2028         return null;
2029       }
2030       return this.storeEngine.getStoreFileManager().getSplitPoint();
2031     } catch(IOException e) {
2032       LOG.warn("Failed getting store size for " + this, e);
2033     } finally {
2034       this.lock.readLock().unlock();
2035     }
2036     return null;
2037   }
2038 
2039   @Override
2040   public long getLastCompactSize() {
2041     return this.lastCompactSize;
2042   }
2043 
2044   @Override
2045   public long getSize() {
2046     return storeSize;
2047   }
2048 
2049   @Override
2050   public void triggerMajorCompaction() {
2051     this.forceMajor = true;
2052   }
2053 
2054 
2055   
2056   
2057   
2058 
2059   @Override
2060   public KeyValueScanner getScanner(Scan scan,
2061       final NavigableSet<byte []> targetCols, long readPt) throws IOException {
2062     lock.readLock().lock();
2063     try {
2064       KeyValueScanner scanner = null;
2065       if (this.getCoprocessorHost() != null) {
2066         scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
2067       }
2068       if (scanner == null) {
2069         scanner = scan.isReversed() ? new ReversedStoreScanner(this,
2070             getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
2071             getScanInfo(), scan, targetCols, readPt);
2072       }
2073       return scanner;
2074     } finally {
2075       lock.readLock().unlock();
2076     }
2077   }
2078 
2079   @Override
2080   public String toString() {
2081     return this.getColumnFamilyName();
2082   }
2083 
2084   @Override
2085   public int getStorefilesCount() {
2086     return this.storeEngine.getStoreFileManager().getStorefileCount();
2087   }
2088 
2089   @Override
2090   public long getStoreSizeUncompressed() {
2091     return this.totalUncompressedBytes;
2092   }
2093 
2094   @Override
2095   public long getStorefilesSize() {
2096     long size = 0;
2097     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
2098       StoreFile.Reader r = s.getReader();
2099       if (r == null) {
2100         LOG.warn("StoreFile " + s + " has a null Reader");
2101         continue;
2102       }
2103       size += r.length();
2104     }
2105     return size;
2106   }
2107 
2108   @Override
2109   public long getStorefilesIndexSize() {
2110     long size = 0;
2111     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
2112       StoreFile.Reader r = s.getReader();
2113       if (r == null) {
2114         LOG.warn("StoreFile " + s + " has a null Reader");
2115         continue;
2116       }
2117       size += r.indexSize();
2118     }
2119     return size;
2120   }
2121 
2122   @Override
2123   public long getTotalStaticIndexSize() {
2124     long size = 0;
2125     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2126       StoreFile.Reader r = s.getReader();
2127       if (r == null) {
2128         continue;
2129       }
2130       size += r.getUncompressedDataIndexSize();
2131     }
2132     return size;
2133   }
2134 
2135   @Override
2136   public long getTotalStaticBloomSize() {
2137     long size = 0;
2138     for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2139       StoreFile.Reader r = s.getReader();
2140       if (r == null) {
2141         continue;
2142       }
2143       size += r.getTotalBloomSize();
2144     }
2145     return size;
2146   }
2147 
2148   @Override
2149   public long getMemStoreSize() {
2150     return this.memstore.size();
2151   }
2152 
2153   @Override
2154   public int getCompactPriority() {
2155     int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
2156     if (priority == PRIORITY_USER) {
2157       LOG.warn("Compaction priority is USER despite there being no user compaction");
2158     }
2159     return priority;
2160   }
2161 
2162   @Override
2163   public boolean throttleCompaction(long compactionSize) {
2164     return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
2165   }
2166 
2167   public HRegion getHRegion() {
2168     return this.region;
2169   }
2170 
2171   @Override
2172   public RegionCoprocessorHost getCoprocessorHost() {
2173     return this.region.getCoprocessorHost();
2174   }
2175 
2176   @Override
2177   public HRegionInfo getRegionInfo() {
2178     return this.fs.getRegionInfo();
2179   }
2180 
2181   @Override
2182   public boolean areWritesEnabled() {
2183     return this.region.areWritesEnabled();
2184   }
2185 
2186   @Override
2187   public long getSmallestReadPoint() {
2188     return this.region.getSmallestReadPoint();
2189   }
2190 
2191   
2192 
2193 
2194 
2195 
2196 
2197 
2198 
2199 
2200 
2201 
2202 
2203 
2204   public long updateColumnValue(byte [] row, byte [] f,
2205                                 byte [] qualifier, long newValue)
2206       throws IOException {
2207 
2208     this.lock.readLock().lock();
2209     try {
2210       long now = EnvironmentEdgeManager.currentTime();
2211 
2212       return this.memstore.updateColumnValue(row,
2213           f,
2214           qualifier,
2215           newValue,
2216           now);
2217 
2218     } finally {
2219       this.lock.readLock().unlock();
2220     }
2221   }
2222 
2223   @Override
2224   public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
2225     this.lock.readLock().lock();
2226     try {
2227       return this.memstore.upsert(cells, readpoint);
2228     } finally {
2229       this.lock.readLock().unlock();
2230     }
2231   }
2232 
2233   @Override
2234   public StoreFlushContext createFlushContext(long cacheFlushId) {
2235     return new StoreFlusherImpl(cacheFlushId);
2236   }
2237 
2238   private class StoreFlusherImpl implements StoreFlushContext {
2239 
2240     private long cacheFlushSeqNum;
2241     private MemStoreSnapshot snapshot;
2242     private List<Path> tempFiles;
2243     private List<Path> committedFiles;
2244     private long cacheFlushCount;
2245     private long cacheFlushSize;
2246 
2247     private StoreFlusherImpl(long cacheFlushSeqNum) {
2248       this.cacheFlushSeqNum = cacheFlushSeqNum;
2249     }
2250 
2251     
2252 
2253 
2254 
2255     @Override
2256     public void prepare() {
2257       this.snapshot = memstore.snapshot();
2258       this.cacheFlushCount = snapshot.getCellsCount();
2259       this.cacheFlushSize = snapshot.getSize();
2260       committedFiles = new ArrayList<Path>(1);
2261     }
2262 
2263     @Override
2264     public void flushCache(MonitoredTask status) throws IOException {
2265       tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status);
2266     }
2267 
2268     @Override
2269     public boolean commit(MonitoredTask status) throws IOException {
2270       if (this.tempFiles == null || this.tempFiles.isEmpty()) {
2271         return false;
2272       }
2273       List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
2274       for (Path storeFilePath : tempFiles) {
2275         try {
2276           storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status));
2277         } catch (IOException ex) {
2278           LOG.error("Failed to commit store file " + storeFilePath, ex);
2279           
2280           for (StoreFile sf : storeFiles) {
2281             Path pathToDelete = sf.getPath();
2282             try {
2283               sf.deleteReader();
2284             } catch (IOException deleteEx) {
2285               LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
2286               Runtime.getRuntime().halt(1);
2287             }
2288           }
2289           throw new IOException("Failed to commit the flush", ex);
2290         }
2291       }
2292 
2293       for (StoreFile sf : storeFiles) {
2294         if (HStore.this.getCoprocessorHost() != null) {
2295           HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
2296         }
2297         committedFiles.add(sf.getPath());
2298       }
2299 
2300       HStore.this.flushedCellsCount += cacheFlushCount;
2301       HStore.this.flushedCellsSize += cacheFlushSize;
2302 
2303       
2304       return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
2305     }
2306 
2307     @Override
2308     public List<Path> getCommittedFiles() {
2309       return committedFiles;
2310     }
2311 
2312     
2313 
2314 
2315 
2316 
2317 
2318 
2319 
2320     @Override
2321     public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
2322         throws IOException {
2323       List<StoreFile> storeFiles = new ArrayList<StoreFile>(fileNames.size());
2324       for (String file : fileNames) {
2325         
2326         StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
2327         StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
2328         storeFiles.add(storeFile);
2329         HStore.this.storeSize += storeFile.getReader().length();
2330         HStore.this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
2331         if (LOG.isInfoEnabled()) {
2332           LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() +
2333             " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() +
2334             ", sequenceid=" +  + storeFile.getReader().getSequenceID() +
2335             ", filesize=" + StringUtils.humanReadableInt(storeFile.getReader().length()));
2336         }
2337       }
2338 
2339       long snapshotId = -1; 
2340       if (dropMemstoreSnapshot && snapshot != null) {
2341         snapshotId = snapshot.getId();
2342       }
2343       HStore.this.updateStorefiles(storeFiles, snapshotId);
2344     }
2345 
2346     
2347 
2348 
2349 
2350     @Override
2351     public void abort() throws IOException {
2352       if (snapshot == null) {
2353         return;
2354       }
2355       HStore.this.updateStorefiles(new ArrayList<StoreFile>(0), snapshot.getId());
2356     }
2357   }
2358 
2359   @Override
2360   public boolean needsCompaction() {
2361     return this.storeEngine.needsCompaction(this.filesCompacting);
2362   }
2363 
2364   @Override
2365   public CacheConfig getCacheConfig() {
2366     return this.cacheConf;
2367   }
2368 
2369   public static final long FIXED_OVERHEAD =
2370       ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
2371               + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
2372 
2373   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
2374       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
2375       + ClassSize.CONCURRENT_SKIPLISTMAP
2376       + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
2377       + ScanInfo.FIXED_OVERHEAD);
2378 
2379   @Override
2380   public long heapSize() {
2381     return DEEP_OVERHEAD + this.memstore.heapSize();
2382   }
2383 
2384   @Override
2385   public KeyValue.KVComparator getComparator() {
2386     return comparator;
2387   }
2388 
2389   @Override
2390   public ScanInfo getScanInfo() {
2391     return scanInfo;
2392   }
2393 
2394   
2395 
2396 
2397 
2398   void setScanInfo(ScanInfo scanInfo) {
2399     this.scanInfo = scanInfo;
2400   }
2401 
2402   @Override
2403   public boolean hasTooManyStoreFiles() {
2404     return getStorefilesCount() > this.blockingFileCount;
2405   }
2406 
2407   @Override
2408   public long getFlushedCellsCount() {
2409     return flushedCellsCount;
2410   }
2411 
2412   @Override
2413   public long getFlushedCellsSize() {
2414     return flushedCellsSize;
2415   }
2416 
2417   @Override
2418   public long getCompactedCellsCount() {
2419     return compactedCellsCount;
2420   }
2421 
2422   @Override
2423   public long getCompactedCellsSize() {
2424     return compactedCellsSize;
2425   }
2426 
2427   @Override
2428   public long getMajorCompactedCellsCount() {
2429     return majorCompactedCellsCount;
2430   }
2431 
2432   @Override
2433   public long getMajorCompactedCellsSize() {
2434     return majorCompactedCellsSize;
2435   }
2436 
2437   
2438 
2439 
2440 
2441   @VisibleForTesting
2442   public StoreEngine<?, ?, ?, ?> getStoreEngine() {
2443     return this.storeEngine;
2444   }
2445 
2446   protected OffPeakHours getOffPeakHours() {
2447     return this.offPeakHours;
2448   }
2449 
2450   
2451 
2452 
2453   @Override
2454   public void onConfigurationChange(Configuration conf) {
2455     this.conf = new CompoundConfiguration()
2456             .add(conf)
2457             .addWritableMap(family.getValues());
2458     this.storeEngine.compactionPolicy.setConf(conf);
2459     this.offPeakHours = OffPeakHours.getInstance(conf);
2460   }
2461 
2462   
2463 
2464 
2465   @Override
2466   public void registerChildren(ConfigurationManager manager) {
2467     
2468   }
2469 
2470   
2471 
2472 
2473   @Override
2474   public void deregisterChildren(ConfigurationManager manager) {
2475     
2476   }
2477 
2478   @Override
2479   public double getCompactionPressure() {
2480     return storeEngine.getStoreFileManager().getCompactionPressure();
2481   }
2482 }