View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Collections;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.KeyValue.KVComparator;
31  import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
32  import org.apache.hadoop.hbase.util.Bytes;
33  
34  /**
35   * Base class for cell sink that separates the provided cells into multiple files.
36   */
37  @InterfaceAudience.Private
38  public abstract class StripeMultiFileWriter implements Compactor.CellSink {
39    private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class);
40  
41    /** Factory that is used to produce single StoreFile.Writer-s */
42    protected WriterFactory writerFactory;
43    protected KVComparator comparator;
44  
45    protected List<StoreFile.Writer> existingWriters;
46    protected List<byte[]> boundaries;
47    /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */
48    protected StoreScanner sourceScanner;
49  
50    /** Whether to write stripe metadata */
51    private boolean doWriteStripeMetadata = true;
52  
53    public interface WriterFactory {
54      public StoreFile.Writer createWriter() throws IOException;
55    }
56  
57    /**
58     * Initializes multi-writer before usage.
59     * @param sourceScanner Optional store scanner to obtain the information about read progress.
60     * @param factory Factory used to produce individual file writers.
61     * @param comparator Comparator used to compare rows.
62     */
63    public void init(StoreScanner sourceScanner, WriterFactory factory, KVComparator comparator)
64        throws IOException {
65      this.writerFactory = factory;
66      this.sourceScanner = sourceScanner;
67      this.comparator = comparator;
68    }
69  
70    public void setNoStripeMetadata() {
71      this.doWriteStripeMetadata = false;
72    }
73  
74    public List<Path> commitWriters(long maxSeqId, boolean isMajor) throws IOException {
75      assert this.existingWriters != null;
76      commitWritersInternal();
77      assert this.boundaries.size() == (this.existingWriters.size() + 1);
78      LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w")
79        + "riting out metadata for " + this.existingWriters.size() + " writers");
80      List<Path> paths = new ArrayList<Path>();
81      for (int i = 0; i < this.existingWriters.size(); ++i) {
82        StoreFile.Writer writer = this.existingWriters.get(i);
83        if (writer == null) continue; // writer was skipped due to 0 KVs
84        if (doWriteStripeMetadata) {
85          writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
86          writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
87        }
88        writer.appendMetadata(maxSeqId, isMajor);
89        paths.add(writer.getPath());
90        writer.close();
91      }
92      this.existingWriters = null;
93      return paths;
94    }
95  
96    public List<Path> abortWriters() {
97      assert this.existingWriters != null;
98      List<Path> paths = new ArrayList<Path>();
99      for (StoreFile.Writer writer : this.existingWriters) {
100       try {
101         paths.add(writer.getPath());
102         writer.close();
103       } catch (Exception ex) {
104         LOG.error("Failed to close the writer after an unfinished compaction.", ex);
105       }
106     }
107     this.existingWriters = null;
108     return paths;
109   }
110 
111   /**
112    * Subclasses can call this method to make sure the first KV is within multi-writer range.
113    * @param left The left boundary of the writer.
114    * @param row The row to check.
115    * @param rowOffset Offset for row.
116    * @param rowLength Length for row.
117    */
118   protected void sanityCheckLeft(
119       byte[] left, byte[] row, int rowOffset, int rowLength) throws IOException {
120     if (StripeStoreFileManager.OPEN_KEY != left &&
121         comparator.compareRows(row, rowOffset, rowLength, left, 0, left.length) < 0) {
122       String error = "The first row is lower than the left boundary of [" + Bytes.toString(left)
123         + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
124       LOG.error(error);
125       throw new IOException(error);
126     }
127   }
128 
129   /**
130    * Subclasses can call this method to make sure the last KV is within multi-writer range.
131    * @param right The right boundary of the writer.
132    * @param row The row to check.
133    * @param rowOffset Offset for row.
134    * @param rowLength Length for row.
135    */
136   protected void sanityCheckRight(
137       byte[] right, byte[] row, int rowOffset, int rowLength) throws IOException {
138     if (StripeStoreFileManager.OPEN_KEY != right &&
139         comparator.compareRows(row, rowOffset, rowLength, right, 0, right.length) >= 0) {
140       String error = "The last row is higher or equal than the right boundary of ["
141           + Bytes.toString(right) + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
142       LOG.error(error);
143       throw new IOException(error);
144     }
145   }
146 
147   /**
148    * Subclasses override this method to be called at the end of a successful sequence of
149    * append; all appends are processed before this method is called.
150    */
151   protected abstract void commitWritersInternal() throws IOException;
152 
153   /**
154    * MultiWriter that separates the cells based on fixed row-key boundaries.
155    * All the KVs between each pair of neighboring boundaries from the list supplied to ctor
156    * will end up in one file, and separate from all other such pairs.
157    */
158   public static class BoundaryMultiWriter extends StripeMultiFileWriter {
159     private StoreFile.Writer currentWriter;
160     private byte[] currentWriterEndKey;
161 
162     private Cell lastCell;
163     private long cellsInCurrentWriter = 0;
164     private int majorRangeFromIndex = -1, majorRangeToIndex = -1;
165     private boolean hasAnyWriter = false;
166 
167     /**
168      * @param targetBoundaries The boundaries on which writers/files are separated.
169      * @param majorRangeFrom Major range is the range for which at least one file should be
170      *                       written (because all files are included in compaction).
171      *                       majorRangeFrom is the left boundary.
172      * @param majorRangeTo The right boundary of majorRange (see majorRangeFrom).
173      */
174     public BoundaryMultiWriter(List<byte[]> targetBoundaries,
175         byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException {
176       super();
177       this.boundaries = targetBoundaries;
178       this.existingWriters = new ArrayList<StoreFile.Writer>(this.boundaries.size() - 1);
179       // "major" range (range for which all files are included) boundaries, if any,
180       // must match some target boundaries, let's find them.
181       assert  (majorRangeFrom == null) == (majorRangeTo == null);
182       if (majorRangeFrom != null) {
183         majorRangeFromIndex = (majorRangeFrom == StripeStoreFileManager.OPEN_KEY) ? 0
184           : Collections.binarySearch(this.boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR);
185         majorRangeToIndex = (majorRangeTo == StripeStoreFileManager.OPEN_KEY) ? boundaries.size()
186           : Collections.binarySearch(this.boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR);
187         if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) {
188           throw new IOException("Major range does not match writer boundaries: [" +
189               Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from "
190               + majorRangeFromIndex + " to " + majorRangeToIndex);
191         }
192       }
193     }
194 
195     @Override
196     public void append(Cell cell) throws IOException {
197       if (currentWriter == null && existingWriters.isEmpty()) {
198         // First append ever, do a sanity check.
199         sanityCheckLeft(this.boundaries.get(0),
200             cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
201       }
202       prepareWriterFor(cell);
203       currentWriter.append(cell);
204       lastCell = cell; // for the sanity check
205       ++cellsInCurrentWriter;
206     }
207 
208     private boolean isCellAfterCurrentWriter(Cell cell) {
209       return ((currentWriterEndKey != StripeStoreFileManager.OPEN_KEY) &&
210             (comparator.compareRows(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
211                 currentWriterEndKey, 0, currentWriterEndKey.length) >= 0));
212     }
213 
214     @Override
215     protected void commitWritersInternal() throws IOException {
216       stopUsingCurrentWriter();
217       while (existingWriters.size() < boundaries.size() - 1) {
218         createEmptyWriter();
219       }
220       if (lastCell != null) {
221         sanityCheckRight(boundaries.get(boundaries.size() - 1),
222             lastCell.getRowArray(), lastCell.getRowOffset(), lastCell.getRowLength());
223       }
224     }
225 
226     private void prepareWriterFor(Cell cell) throws IOException {
227       if (currentWriter != null && !isCellAfterCurrentWriter(cell)) return; // Use same writer.
228 
229       stopUsingCurrentWriter();
230       // See if KV will be past the writer we are about to create; need to add another one.
231       while (isCellAfterCurrentWriter(cell)) {
232         checkCanCreateWriter();
233         createEmptyWriter();
234       }
235       checkCanCreateWriter();
236       hasAnyWriter = true;
237       currentWriter = writerFactory.createWriter();
238       existingWriters.add(currentWriter);
239     }
240 
241     /**
242      * Called if there are no cells for some stripe.
243      * We need to have something in the writer list for this stripe, so that writer-boundary
244      * list indices correspond to each other. We can insert null in the writer list for that
245      * purpose, except in the following cases where we actually need a file:
246      * 1) If we are in range for which we are compacting all the files, we need to create an
247      * empty file to preserve stripe metadata.
248      * 2) If we have not produced any file at all for this compactions, and this is the
249      * last chance (the last stripe), we need to preserve last seqNum (see also HBASE-6059).
250      */
251     private void createEmptyWriter() throws IOException {
252       int index = existingWriters.size();
253       boolean isInMajorRange = (index >= majorRangeFromIndex) && (index < majorRangeToIndex);
254       // Stripe boundary count = stripe count + 1, so last stripe index is (#boundaries minus 2)
255       boolean isLastWriter = !hasAnyWriter && (index == (boundaries.size() - 2));
256       boolean needEmptyFile = isInMajorRange || isLastWriter;
257       existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null);
258       hasAnyWriter |= needEmptyFile;
259       currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
260           ? null : boundaries.get(existingWriters.size() + 1);
261     }
262 
263     private void checkCanCreateWriter() throws IOException {
264       int maxWriterCount =  boundaries.size() - 1;
265       assert existingWriters.size() <= maxWriterCount;
266       if (existingWriters.size() >= maxWriterCount) {
267         throw new IOException("Cannot create any more writers (created " + existingWriters.size()
268             + " out of " + maxWriterCount + " - row might be out of range of all valid writers");
269       }
270     }
271 
272     private void stopUsingCurrentWriter() {
273       if (currentWriter != null) {
274         if (LOG.isDebugEnabled()) {
275           LOG.debug("Stopping to use a writer after [" + Bytes.toString(currentWriterEndKey)
276               + "] row; wrote out " + cellsInCurrentWriter + " kvs");
277         }
278         cellsInCurrentWriter = 0;
279       }
280       currentWriter = null;
281       currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
282           ? null : boundaries.get(existingWriters.size() + 1);
283     }
284   }
285 
286   /**
287    * MultiWriter that separates the cells based on target cell number per file and file count.
288    * New file is started every time the target number of KVs is reached, unless the fixed
289    * count of writers has already been created (in that case all the remaining KVs go into
290    * the last writer).
291    */
292   public static class SizeMultiWriter extends StripeMultiFileWriter {
293     private int targetCount;
294     private long targetCells;
295     private byte[] left;
296     private byte[] right;
297 
298     private Cell lastCell;
299     private StoreFile.Writer currentWriter;
300     protected byte[] lastRowInCurrentWriter = null;
301     private long cellsInCurrentWriter = 0;
302     private long cellsSeen = 0;
303     private long cellsSeenInPrevious = 0;
304 
305     /**
306      * @param targetCount The maximum count of writers that can be created.
307      * @param targetKvs The number of KVs to read from source before starting each new writer.
308      * @param left The left boundary of the first writer.
309      * @param right The right boundary of the last writer.
310      */
311     public SizeMultiWriter(int targetCount, long targetKvs, byte[] left, byte[] right) {
312       super();
313       this.targetCount = targetCount;
314       this.targetCells = targetKvs;
315       this.left = left;
316       this.right = right;
317       int preallocate = Math.min(this.targetCount, 64);
318       this.existingWriters = new ArrayList<StoreFile.Writer>(preallocate);
319       this.boundaries = new ArrayList<byte[]>(preallocate + 1);
320     }
321 
322     @Override
323     public void append(Cell cell) throws IOException {
324       // If we are waiting for opportunity to close and we started writing different row,
325       // discard the writer and stop waiting.
326       boolean doCreateWriter = false;
327       if (currentWriter == null) {
328         // First append ever, do a sanity check.
329         sanityCheckLeft(left, cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
330         doCreateWriter = true;
331       } else if (lastRowInCurrentWriter != null
332           && !comparator.matchingRows(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
333               lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
334         if (LOG.isDebugEnabled()) {
335           LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter)
336               + "] row; wrote out "  + cellsInCurrentWriter + " kvs");
337         }
338         lastRowInCurrentWriter = null;
339         cellsInCurrentWriter = 0;
340         cellsSeenInPrevious += cellsSeen;
341         doCreateWriter = true;
342       }
343       if (doCreateWriter) {
344         byte[] boundary = existingWriters.isEmpty() ? left : cell.getRow(); // make a copy
345         if (LOG.isDebugEnabled()) {
346           LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]");
347         }
348         currentWriter = writerFactory.createWriter();
349         boundaries.add(boundary);
350         existingWriters.add(currentWriter);
351       }
352 
353       currentWriter.append(cell);
354       lastCell = cell; // for the sanity check
355       ++cellsInCurrentWriter;
356       cellsSeen = cellsInCurrentWriter;
357       if (this.sourceScanner != null) {
358         cellsSeen = Math.max(cellsSeen,
359             this.sourceScanner.getEstimatedNumberOfKvsScanned() - cellsSeenInPrevious);
360       }
361 
362       // If we are not already waiting for opportunity to close, start waiting if we can
363       // create any more writers and if the current one is too big.
364       if (lastRowInCurrentWriter == null
365           && existingWriters.size() < targetCount
366           && cellsSeen >= targetCells) {
367         lastRowInCurrentWriter = cell.getRow(); // make a copy
368         if (LOG.isDebugEnabled()) {
369           LOG.debug("Preparing to start a new writer after [" + Bytes.toString(
370               lastRowInCurrentWriter) + "] row; observed " + cellsSeen + " kvs and wrote out "
371               + cellsInCurrentWriter + " kvs");
372         }
373       }
374     }
375 
376     @Override
377     protected void commitWritersInternal() throws IOException {
378       if (LOG.isDebugEnabled()) {
379         LOG.debug("Stopping with "  + cellsInCurrentWriter + " kvs in last writer" +
380             ((this.sourceScanner == null) ? "" : ("; observed estimated "
381                 + this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs total")));
382       }
383       if (lastCell != null) {
384         sanityCheckRight(
385             right, lastCell.getRowArray(), lastCell.getRowOffset(), lastCell.getRowLength());
386       }
387 
388       // When expired stripes were going to be merged into one, and if no writer was created during
389       // the compaction, we need to create an empty file to preserve metadata.
390       if (existingWriters.isEmpty() && 1 == targetCount) {
391         if (LOG.isDebugEnabled()) {
392           LOG.debug("Merge expired stripes into one, create an empty file to preserve metadata.");
393         }
394         boundaries.add(left);
395         existingWriters.add(writerFactory.createWriter());
396       }
397 
398       this.boundaries.add(right);
399     }
400   }
401 }