View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
33  import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
34  import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
35  import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
36  
37  import com.google.common.annotations.VisibleForTesting;
38  
39  /**
40   * Stripe implementation of StoreFlusher. Flushes files either into L0 file w/o metadata, or
41   * into separate striped files, avoiding L0.
42   */
43  @InterfaceAudience.Private
44  public class StripeStoreFlusher extends StoreFlusher {
45    private static final Log LOG = LogFactory.getLog(StripeStoreFlusher.class);
46    private final Object flushLock = new Object();
47    private final StripeCompactionPolicy policy;
48    private final StripeCompactionPolicy.StripeInformationProvider stripes;
49  
50    public StripeStoreFlusher(Configuration conf, Store store,
51        StripeCompactionPolicy policy, StripeStoreFileManager stripes) {
52      super(conf, store);
53      this.policy = policy;
54      this.stripes = stripes;
55    }
56  
57    @Override
58    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
59        MonitoredTask status) throws IOException {
60      List<Path> result = new ArrayList<Path>();
61      int cellsCount = snapshot.getCellsCount();
62      if (cellsCount == 0) return result; // don't flush if there are no entries
63  
64      long smallestReadPoint = store.getSmallestReadPoint();
65      InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
66      if (scanner == null) {
67        return result; // NULL scanner returned from coprocessor hooks means skip normal processing
68      }
69  
70      // Let policy select flush method.
71      StripeFlushRequest req = this.policy.selectFlush(this.stripes, cellsCount);
72  
73      boolean success = false;
74      StripeMultiFileWriter mw = null;
75      try {
76        mw = req.createWriter(); // Writer according to the policy.
77        StripeMultiFileWriter.WriterFactory factory = createWriterFactory(
78            snapshot.getTimeRangeTracker(), cellsCount);
79        StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
80        mw.init(storeScanner, factory, store.getComparator());
81  
82        synchronized (flushLock) {
83          performFlush(scanner, mw, smallestReadPoint);
84          result = mw.commitWriters(cacheFlushSeqNum, false);
85          success = true;
86        }
87      } finally {
88        if (!success && (mw != null)) {
89          for (Path leftoverFile : mw.abortWriters()) {
90            try {
91              store.getFileSystem().delete(leftoverFile, false);
92            } catch (Exception e) {
93              LOG.error("Failed to delete a file after failed flush: " + e);
94            }
95          }
96        }
97        try {
98          scanner.close();
99        } catch (IOException ex) {
100         LOG.warn("Failed to close flush scanner, ignoring", ex);
101       }
102     }
103     return result;
104   }
105 
106   private StripeMultiFileWriter.WriterFactory createWriterFactory(
107       final TimeRangeTracker tracker, final long kvCount) {
108     return new StripeMultiFileWriter.WriterFactory() {
109       @Override
110       public Writer createWriter() throws IOException {
111         StoreFile.Writer writer = store.createWriterInTmp(
112             kvCount, store.getFamily().getCompression(),
113             /* isCompaction = */ false,
114             /* includeMVCCReadpoint = */ true,
115             /* includesTags = */ true,
116             /* shouldDropBehind = */ false);
117         writer.setTimeRangeTracker(tracker);
118         return writer;
119       }
120     };
121   }
122 
123   /** Stripe flush request wrapper that writes a non-striped file. */
124   public static class StripeFlushRequest {
125     @VisibleForTesting
126     public StripeMultiFileWriter createWriter() throws IOException {
127       StripeMultiFileWriter writer =
128           new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
129       writer.setNoStripeMetadata();
130       return writer;
131     }
132   }
133 
134   /** Stripe flush request wrapper based on boundaries. */
135   public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
136     private final List<byte[]> targetBoundaries;
137 
138     /** @param targetBoundaries New files should be written with these boundaries. */
139     public BoundaryStripeFlushRequest(List<byte[]> targetBoundaries) {
140       this.targetBoundaries = targetBoundaries;
141     }
142 
143     @Override
144     public StripeMultiFileWriter createWriter() throws IOException {
145       return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, null, null);
146     }
147   }
148 
149   /** Stripe flush request wrapper based on size. */
150   public static class SizeStripeFlushRequest extends StripeFlushRequest {
151     private final int targetCount;
152     private final long targetKvs;
153 
154     /**
155      * @param targetCount The maximum number of stripes to flush into.
156      * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
157      *                  total number of kvs, all the overflow data goes into the last stripe.
158      */
159     public SizeStripeFlushRequest(int targetCount, long targetKvs) {
160       this.targetCount = targetCount;
161       this.targetKvs = targetKvs;
162     }
163 
164     @Override
165     public StripeMultiFileWriter createWriter() throws IOException {
166       return new StripeMultiFileWriter.SizeMultiWriter(
167           this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY);
168     }
169   }
170 }