View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.List;
25  
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.KeyValue;
32  import org.apache.hadoop.hbase.KeyValueUtil;
33  import org.apache.hadoop.hbase.client.Scan;
34  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
35  import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
36  
37  /**
38   * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
39   * Custom implementation can be provided.
40   */
41  @InterfaceAudience.Private
42  abstract class StoreFlusher {
43    protected Configuration conf;
44    protected Store store;
45  
46    public StoreFlusher(Configuration conf, Store store) {
47      this.conf = conf;
48      this.store = store;
49    }
50  
51    /**
52     * Turns a snapshot of memstore into a set of store files.
53     * @param snapshot Memstore snapshot.
54     * @param cacheFlushSeqNum Log cache flush sequence number.
55     * @param status Task that represents the flush operation and may be updated with status.
56     * @return List of files written. Can be empty; must not be null.
57     */
58    public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
59        MonitoredTask status) throws IOException;
60  
61    protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum,
62        MonitoredTask status) throws IOException {
63      // Write out the log sequence number that corresponds to this output
64      // hfile. Also write current time in metadata as minFlushTime.
65      // The hfile is current up to and including cacheFlushSeqNum.
66      status.setStatus("Flushing " + store + ": appending metadata");
67      writer.appendMetadata(cacheFlushSeqNum, false);
68      status.setStatus("Flushing " + store + ": closing flushed file");
69      writer.close();
70    }
71  
72  
73    /**
74     * Creates the scanner for flushing snapshot. Also calls coprocessors.
75     * @param snapshotScanner
76     * @param smallestReadPoint
77     * @return The scanner; null if coprocessor is canceling the flush.
78     */
79    protected InternalScanner createScanner(KeyValueScanner snapshotScanner,
80        long smallestReadPoint) throws IOException {
81      InternalScanner scanner = null;
82      if (store.getCoprocessorHost() != null) {
83        scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner);
84      }
85      if (scanner == null) {
86        Scan scan = new Scan();
87        scan.setMaxVersions(store.getScanInfo().getMaxVersions());
88        scanner = new StoreScanner(store, store.getScanInfo(), scan,
89            Collections.singletonList(snapshotScanner), ScanType.COMPACT_RETAIN_DELETES,
90            smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
91      }
92      assert scanner != null;
93      if (store.getCoprocessorHost() != null) {
94        try {
95          return store.getCoprocessorHost().preFlush(store, scanner);
96        } catch (IOException ioe) {
97          scanner.close();
98          throw ioe;
99        }
100     }
101     return scanner;
102   }
103 
104   /**
105    * Performs memstore flush, writing data from scanner into sink.
106    * @param scanner Scanner to get data from.
107    * @param sink Sink to write data to. Could be StoreFile.Writer.
108    * @param smallestReadPoint Smallest read point used for the flush.
109    */
110   protected void performFlush(InternalScanner scanner,
111       Compactor.CellSink sink, long smallestReadPoint) throws IOException {
112     int compactionKVMax =
113       conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
114     List<Cell> kvs = new ArrayList<Cell>();
115     boolean hasMore;
116     do {
117       hasMore = scanner.next(kvs, compactionKVMax);
118       if (!kvs.isEmpty()) {
119         for (Cell c : kvs) {
120           // If we know that this KV is going to be included always, then let us
121           // set its memstoreTS to 0. This will help us save space when writing to
122           // disk.
123           KeyValue kv = KeyValueUtil.ensureKeyValue(c);
124           sink.append(kv);
125         }
126         kvs.clear();
127       }
128     } while (hasMore);
129   }
130 }