1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.List;
25  
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.client.Scan;
32  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
33  import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
34  
35  
36  
37  
38  
39  @InterfaceAudience.Private
40  abstract class StoreFlusher {
41    protected Configuration conf;
42    protected Store store;
43  
44    public StoreFlusher(Configuration conf, Store store) {
45      this.conf = conf;
46      this.store = store;
47    }
48  
49    
50  
51  
52  
53  
54  
55  
56    public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
57        MonitoredTask status) throws IOException;
58  
59    protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum,
60        MonitoredTask status) throws IOException {
61      
62      
63      
64      status.setStatus("Flushing " + store + ": appending metadata");
65      writer.appendMetadata(cacheFlushSeqNum, false);
66      status.setStatus("Flushing " + store + ": closing flushed file");
67      writer.close();
68    }
69  
70  
71    
72  
73  
74  
75  
76  
77    protected InternalScanner createScanner(KeyValueScanner snapshotScanner,
78        long smallestReadPoint) throws IOException {
79      InternalScanner scanner = null;
80      if (store.getCoprocessorHost() != null) {
81        scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner);
82      }
83      if (scanner == null) {
84        Scan scan = new Scan();
85        scan.setMaxVersions(store.getScanInfo().getMaxVersions());
86        scanner = new StoreScanner(store, store.getScanInfo(), scan,
87            Collections.singletonList(snapshotScanner), ScanType.COMPACT_RETAIN_DELETES,
88            smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
89      }
90      assert scanner != null;
91      if (store.getCoprocessorHost() != null) {
92        try {
93          return store.getCoprocessorHost().preFlush(store, scanner);
94        } catch (IOException ioe) {
95          scanner.close();
96          throw ioe;
97        }
98      }
99      return scanner;
100   }
101 
102   
103 
104 
105 
106 
107 
108   protected void performFlush(InternalScanner scanner,
109       Compactor.CellSink sink, long smallestReadPoint) throws IOException {
110     int compactionKVMax =
111       conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
112 
113     ScannerContext scannerContext =
114         ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
115 
116     List<Cell> kvs = new ArrayList<Cell>();
117     boolean hasMore;
118     do {
119       hasMore = scanner.next(kvs, scannerContext);
120       if (!kvs.isEmpty()) {
121         for (Cell c : kvs) {
122           
123           
124           
125           sink.append(c);
126         }
127         kvs.clear();
128       }
129     } while (hasMore);
130   }
131 }