1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
31 import org.apache.hadoop.util.StringUtils;
32
33
34
35
36 @InterfaceAudience.Private
37 public class DefaultStoreFlusher extends StoreFlusher {
38 private static final Log LOG = LogFactory.getLog(DefaultStoreFlusher.class);
39 private final Object flushLock = new Object();
40
41 public DefaultStoreFlusher(Configuration conf, Store store) {
42 super(conf, store);
43 }
44
45 @Override
46 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
47 MonitoredTask status) throws IOException {
48 ArrayList<Path> result = new ArrayList<Path>();
49 int cellsCount = snapshot.getCellsCount();
50 if (cellsCount == 0) return result;
51
52
53 long smallestReadPoint = store.getSmallestReadPoint();
54 InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
55 if (scanner == null) {
56 return result;
57 }
58
59 StoreFile.Writer writer;
60 try {
61
62
63 synchronized (flushLock) {
64 status.setStatus("Flushing " + store + ": creating writer");
65
66 writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
67
68
69
70
71 writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
72 IOException e = null;
73 try {
74 performFlush(scanner, writer, smallestReadPoint);
75 } catch (IOException ioe) {
76 e = ioe;
77
78 throw ioe;
79 } finally {
80 if (e != null) {
81 writer.close();
82 } else {
83 finalizeWriter(writer, cacheFlushId, status);
84 }
85 }
86 }
87 } finally {
88 scanner.close();
89 }
90 LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
91 + StringUtils.humanReadableInt(snapshot.getSize()) +
92 ", hasBloomFilter=" + writer.hasGeneralBloom() +
93 ", into tmp file " + writer.getPath());
94 result.add(writer.getPath());
95 return result;
96 }
97 }