View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
22  
23  import java.io.IOException;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
31  import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
32  import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
33  import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
34  
35  import com.google.common.annotations.VisibleForTesting;
36  
37  /**
38   * Stripe implementation of StoreFlusher. Flushes files either into L0 file w/o metadata, or
39   * into separate striped files, avoiding L0.
40   */
41  public class StripeStoreFlusher extends StoreFlusher {
42    private static final Log LOG = LogFactory.getLog(StripeStoreFlusher.class);
43    private final Object flushLock = new Object();
44    private final StripeCompactionPolicy policy;
45    private final StripeCompactionPolicy.StripeInformationProvider stripes;
46  
47    public StripeStoreFlusher(Configuration conf, Store store,
48        StripeCompactionPolicy policy, StripeStoreFileManager stripes) {
49      super(conf, store);
50      this.policy = policy;
51      this.stripes = stripes;
52    }
53  
54    @Override
55    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
56        MonitoredTask status) throws IOException {
57      List<Path> result = null;
58      int cellsCount = snapshot.getCellsCount();
59      if (cellsCount == 0) return result; // don't flush if there are no entries
60  
61      long smallestReadPoint = store.getSmallestReadPoint();
62      InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
63      if (scanner == null) {
64        return result; // NULL scanner returned from coprocessor hooks means skip normal processing
65      }
66  
67      // Let policy select flush method.
68      StripeFlushRequest req = this.policy.selectFlush(this.stripes, cellsCount);
69  
70      boolean success = false;
71      StripeMultiFileWriter mw = null;
72      try {
73        mw = req.createWriter(); // Writer according to the policy.
74        StripeMultiFileWriter.WriterFactory factory = createWriterFactory(
75            snapshot.getTimeRangeTracker(), cellsCount);
76        StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
77        mw.init(storeScanner, factory, store.getComparator());
78  
79        synchronized (flushLock) {
80          performFlush(scanner, mw, smallestReadPoint);
81          result = mw.commitWriters(cacheFlushSeqNum, false);
82          success = true;
83        }
84      } finally {
85        if (!success && (mw != null)) {
86          if (result != null) {
87            result.clear();
88          }
89          for (Path leftoverFile : mw.abortWriters()) {
90            try {
91              store.getFileSystem().delete(leftoverFile, false);
92            } catch (Exception e) {
93              LOG.error("Failed to delete a file after failed flush: " + e);
94            }
95          }
96        }
97        try {
98          scanner.close();
99        } catch (IOException ex) {
100         LOG.warn("Failed to close flush scanner, ignoring", ex);
101       }
102     }
103     return result;
104   }
105 
106   private StripeMultiFileWriter.WriterFactory createWriterFactory(
107       final TimeRangeTracker tracker, final long kvCount) {
108     return new StripeMultiFileWriter.WriterFactory() {
109       @Override
110       public Writer createWriter() throws IOException {
111         StoreFile.Writer writer = store.createWriterInTmp(
112             kvCount, store.getFamily().getCompression(), false, true, true);
113         writer.setTimeRangeTracker(tracker);
114         return writer;
115       }
116     };
117   }
118 
119   /** Stripe flush request wrapper that writes a non-striped file. */
120   public static class StripeFlushRequest {
121     @VisibleForTesting
122     public StripeMultiFileWriter createWriter() throws IOException {
123       StripeMultiFileWriter writer =
124           new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
125       writer.setNoStripeMetadata();
126       return writer;
127     }
128   }
129 
130   /** Stripe flush request wrapper based on boundaries. */
131   public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
132     private final List<byte[]> targetBoundaries;
133 
134     /** @param targetBoundaries New files should be written with these boundaries. */
135     public BoundaryStripeFlushRequest(List<byte[]> targetBoundaries) {
136       this.targetBoundaries = targetBoundaries;
137     }
138 
139     @Override
140     public StripeMultiFileWriter createWriter() throws IOException {
141       return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, null, null);
142     }
143   }
144 
145   /** Stripe flush request wrapper based on size. */
146   public static class SizeStripeFlushRequest extends StripeFlushRequest {
147     private final int targetCount;
148     private final long targetKvs;
149 
150     /**
151      * @param targetCount The maximum number of stripes to flush into.
152      * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
153      *                  total number of kvs, all the overflow data goes into the last stripe.
154      */
155     public SizeStripeFlushRequest(int targetCount, long targetKvs) {
156       this.targetCount = targetCount;
157       this.targetKvs = targetKvs;
158     }
159 
160     @Override
161     public StripeMultiFileWriter createWriter() throws IOException {
162       return new StripeMultiFileWriter.SizeMultiWriter(
163           this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY);
164     }
165   }
166 }