1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.List;
25
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.client.Scan;
32 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
33 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
34
35
36
37
38
39 @InterfaceAudience.Private
40 abstract class StoreFlusher {
41 protected Configuration conf;
42 protected Store store;
43
44 public StoreFlusher(Configuration conf, Store store) {
45 this.conf = conf;
46 this.store = store;
47 }
48
49
50
51
52
53
54
55
56 public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
57 MonitoredTask status) throws IOException;
58
59 protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum,
60 MonitoredTask status) throws IOException {
61
62
63
64 status.setStatus("Flushing " + store + ": appending metadata");
65 writer.appendMetadata(cacheFlushSeqNum, false);
66 status.setStatus("Flushing " + store + ": closing flushed file");
67 writer.close();
68 }
69
70
71
72
73
74
75
76
77 protected InternalScanner createScanner(KeyValueScanner snapshotScanner,
78 long smallestReadPoint) throws IOException {
79 InternalScanner scanner = null;
80 if (store.getCoprocessorHost() != null) {
81 scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner);
82 }
83 if (scanner == null) {
84 Scan scan = new Scan();
85 scan.setMaxVersions(store.getScanInfo().getMaxVersions());
86 scanner = new StoreScanner(store, store.getScanInfo(), scan,
87 Collections.singletonList(snapshotScanner), ScanType.COMPACT_RETAIN_DELETES,
88 smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
89 }
90 assert scanner != null;
91 if (store.getCoprocessorHost() != null) {
92 try {
93 return store.getCoprocessorHost().preFlush(store, scanner);
94 } catch (IOException ioe) {
95 scanner.close();
96 throw ioe;
97 }
98 }
99 return scanner;
100 }
101
102
103
104
105
106
107
108 protected void performFlush(InternalScanner scanner,
109 Compactor.CellSink sink, long smallestReadPoint) throws IOException {
110 int compactionKVMax =
111 conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
112
113 ScannerContext scannerContext =
114 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
115
116 List<Cell> kvs = new ArrayList<Cell>();
117 boolean hasMore;
118 do {
119 hasMore = scanner.next(kvs, scannerContext);
120 if (!kvs.isEmpty()) {
121 for (Cell c : kvs) {
122
123
124
125 sink.append(c);
126 }
127 kvs.clear();
128 }
129 } while (hasMore);
130 }
131 }