View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.Collections;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.KeyValue;
30  import org.apache.hadoop.hbase.KeyValue.KVComparator;
31  import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
32  import org.apache.hadoop.hbase.util.Bytes;
33  
34  /**
35   * Base class for cell sink that separates the provided cells into multiple files.
36   */
37  public abstract class StripeMultiFileWriter implements Compactor.CellSink {
38    private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class);
39  
40    /** Factory that is used to produce single StoreFile.Writer-s */
41    protected WriterFactory writerFactory;
42    protected KVComparator comparator;
43  
44    protected List<StoreFile.Writer> existingWriters;
45    protected List<byte[]> boundaries;
46    /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */
47    protected StoreScanner sourceScanner;
48  
49    /** Whether to write stripe metadata */
50    private boolean doWriteStripeMetadata = true;
51  
52    public interface WriterFactory {
53      public StoreFile.Writer createWriter() throws IOException;
54    }
55  
56    /**
57     * Initializes multi-writer before usage.
58     * @param sourceScanner Optional store scanner to obtain the information about read progress.
59     * @param factory Factory used to produce individual file writers.
60     * @param comparator Comparator used to compare rows.
61     */
62    public void init(StoreScanner sourceScanner, WriterFactory factory, KVComparator comparator)
63        throws IOException {
64      this.writerFactory = factory;
65      this.sourceScanner = sourceScanner;
66      this.comparator = comparator;
67    }
68  
69    public void setNoStripeMetadata() {
70      this.doWriteStripeMetadata = false;
71    }
72  
73    public List<Path> commitWriters(long maxSeqId, boolean isMajor) throws IOException {
74      assert this.existingWriters != null;
75      commitWritersInternal();
76      assert this.boundaries.size() == (this.existingWriters.size() + 1);
77      LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w")
78        + "riting out metadata for " + this.existingWriters.size() + " writers");
79      List<Path> paths = new ArrayList<Path>();
80      for (int i = 0; i < this.existingWriters.size(); ++i) {
81        StoreFile.Writer writer = this.existingWriters.get(i);
82        if (writer == null) continue; // writer was skipped due to 0 KVs
83        if (doWriteStripeMetadata) {
84          writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
85          writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
86        }
87        writer.appendMetadata(maxSeqId, isMajor);
88        paths.add(writer.getPath());
89        writer.close();
90      }
91      this.existingWriters = null;
92      return paths;
93    }
94  
95    public List<Path> abortWriters() {
96      assert this.existingWriters != null;
97      List<Path> paths = new ArrayList<Path>();
98      for (StoreFile.Writer writer : this.existingWriters) {
99        try {
100         paths.add(writer.getPath());
101         writer.close();
102       } catch (Exception ex) {
103         LOG.error("Failed to close the writer after an unfinished compaction.", ex);
104       }
105     }
106     this.existingWriters = null;
107     return paths;
108   }
109 
110   /**
111    * Subclasses can call this method to make sure the first KV is within multi-writer range.
112    * @param left The left boundary of the writer.
113    * @param row The row to check.
114    * @param rowOffset Offset for row.
115    * @param rowLength Length for row.
116    */
117   protected void sanityCheckLeft(
118       byte[] left, byte[] row, int rowOffset, int rowLength) throws IOException {
119     if (StripeStoreFileManager.OPEN_KEY != left &&
120         comparator.compareRows(row, rowOffset, rowLength, left, 0, left.length) < 0) {
121       String error = "The first row is lower than the left boundary of [" + Bytes.toString(left)
122         + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
123       LOG.error(error);
124       throw new IOException(error);
125     }
126   }
127 
128   /**
129    * Subclasses can call this method to make sure the last KV is within multi-writer range.
130    * @param right The right boundary of the writer.
131    * @param row The row to check.
132    * @param rowOffset Offset for row.
133    * @param rowLength Length for row.
134    */
135   protected void sanityCheckRight(
136       byte[] right, byte[] row, int rowOffset, int rowLength) throws IOException {
137     if (StripeStoreFileManager.OPEN_KEY != right &&
138         comparator.compareRows(row, rowOffset, rowLength, right, 0, right.length) >= 0) {
139       String error = "The last row is higher or equal than the right boundary of ["
140           + Bytes.toString(right) + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
141       LOG.error(error);
142       throw new IOException(error);
143     }
144   }
145 
146   /**
147    * Subclasses override this method to be called at the end of a successful sequence of
148    * append; all appends are processed before this method is called.
149    */
150   protected abstract void commitWritersInternal() throws IOException;
151 
152   /**
153    * MultiWriter that separates the cells based on fixed row-key boundaries.
154    * All the KVs between each pair of neighboring boundaries from the list supplied to ctor
155    * will end up in one file, and separate from all other such pairs.
156    */
157   public static class BoundaryMultiWriter extends StripeMultiFileWriter {
158     private StoreFile.Writer currentWriter;
159     private byte[] currentWriterEndKey;
160 
161     private KeyValue lastKv;
162     private long kvsInCurrentWriter = 0;
163     private int majorRangeFromIndex = -1, majorRangeToIndex = -1;
164     private boolean hasAnyWriter = false;
165 
166     /**
167      * @param targetBoundaries The boundaries on which writers/files are separated.
168      * @param majorRangeFrom Major range is the range for which at least one file should be
169      *                       written (because all files are included in compaction).
170      *                       majorRangeFrom is the left boundary.
171      * @param majorRangeTo The right boundary of majorRange (see majorRangeFrom).
172      */
173     public BoundaryMultiWriter(List<byte[]> targetBoundaries,
174         byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException {
175       super();
176       this.boundaries = targetBoundaries;
177       this.existingWriters = new ArrayList<StoreFile.Writer>(this.boundaries.size() - 1);
178       // "major" range (range for which all files are included) boundaries, if any,
179       // must match some target boundaries, let's find them.
180       assert  (majorRangeFrom == null) == (majorRangeTo == null);
181       if (majorRangeFrom != null) {
182         majorRangeFromIndex = (majorRangeFrom == StripeStoreFileManager.OPEN_KEY) ? 0
183           : Collections.binarySearch(this.boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR);
184         majorRangeToIndex = (majorRangeTo == StripeStoreFileManager.OPEN_KEY) ? boundaries.size()
185           : Collections.binarySearch(this.boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR);
186         if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) {
187           throw new IOException("Major range does not match writer boundaries: [" +
188               Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from "
189               + majorRangeFromIndex + " to " + majorRangeToIndex);
190         }
191       }
192     }
193 
194     @Override
195     public void append(KeyValue kv) throws IOException {
196       if (currentWriter == null && existingWriters.isEmpty()) {
197         // First append ever, do a sanity check.
198         sanityCheckLeft(this.boundaries.get(0),
199             kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
200       }
201       prepareWriterFor(kv);
202       currentWriter.append(kv);
203       lastKv = kv; // for the sanity check
204       ++kvsInCurrentWriter;
205     }
206 
207     private boolean isKvAfterCurrentWriter(KeyValue kv) {
208       return ((currentWriterEndKey != StripeStoreFileManager.OPEN_KEY) &&
209             (comparator.compareRows(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
210                 currentWriterEndKey, 0, currentWriterEndKey.length) >= 0));
211     }
212 
213     @Override
214     protected void commitWritersInternal() throws IOException {
215       stopUsingCurrentWriter();
216       while (existingWriters.size() < boundaries.size() - 1) {
217         createEmptyWriter();
218       }
219       if (lastKv != null) {
220         sanityCheckRight(boundaries.get(boundaries.size() - 1),
221             lastKv.getRowArray(), lastKv.getRowOffset(), lastKv.getRowLength());
222       }
223     }
224 
225     private void prepareWriterFor(KeyValue kv) throws IOException {
226       if (currentWriter != null && !isKvAfterCurrentWriter(kv)) return; // Use same writer.
227 
228       stopUsingCurrentWriter();
229       // See if KV will be past the writer we are about to create; need to add another one.
230       while (isKvAfterCurrentWriter(kv)) {
231         checkCanCreateWriter();
232         createEmptyWriter();
233       }
234       checkCanCreateWriter();
235       hasAnyWriter = true;
236       currentWriter = writerFactory.createWriter();
237       existingWriters.add(currentWriter);
238     }
239 
240     /**
241      * Called if there are no cells for some stripe.
242      * We need to have something in the writer list for this stripe, so that writer-boundary
243      * list indices correspond to each other. We can insert null in the writer list for that
244      * purpose, except in the following cases where we actually need a file:
245      * 1) If we are in range for which we are compacting all the files, we need to create an
246      * empty file to preserve stripe metadata.
247      * 2) If we have not produced any file at all for this compactions, and this is the
248      * last chance (the last stripe), we need to preserve last seqNum (see also HBASE-6059).
249      */
250     private void createEmptyWriter() throws IOException {
251       int index = existingWriters.size();
252       boolean isInMajorRange = (index >= majorRangeFromIndex) && (index < majorRangeToIndex);
253       // Stripe boundary count = stripe count + 1, so last stripe index is (#boundaries minus 2)
254       boolean isLastWriter = !hasAnyWriter && (index == (boundaries.size() - 2));
255       boolean needEmptyFile = isInMajorRange || isLastWriter;
256       existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null);
257       hasAnyWriter |= needEmptyFile;
258       currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
259           ? null : boundaries.get(existingWriters.size() + 1);
260     }
261 
262     private void checkCanCreateWriter() throws IOException {
263       int maxWriterCount =  boundaries.size() - 1;
264       assert existingWriters.size() <= maxWriterCount;
265       if (existingWriters.size() >= maxWriterCount) {
266         throw new IOException("Cannot create any more writers (created " + existingWriters.size()
267             + " out of " + maxWriterCount + " - row might be out of range of all valid writers");
268       }
269     }
270 
271     private void stopUsingCurrentWriter() {
272       if (currentWriter != null) {
273         if (LOG.isDebugEnabled()) {
274           LOG.debug("Stopping to use a writer after [" + Bytes.toString(currentWriterEndKey)
275               + "] row; wrote out " + kvsInCurrentWriter + " kvs");
276         }
277         kvsInCurrentWriter = 0;
278       }
279       currentWriter = null;
280       currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
281           ? null : boundaries.get(existingWriters.size() + 1);
282     }
283   }
284 
285   /**
286    * MultiWriter that separates the cells based on target cell number per file and file count.
287    * New file is started every time the target number of KVs is reached, unless the fixed
288    * count of writers has already been created (in that case all the remaining KVs go into
289    * the last writer).
290    */
291   public static class SizeMultiWriter extends StripeMultiFileWriter {
292     private int targetCount;
293     private long targetKvs;
294     private byte[] left;
295     private byte[] right;
296 
297     private KeyValue lastKv;
298     private StoreFile.Writer currentWriter;
299     protected byte[] lastRowInCurrentWriter = null;
300     private long kvsInCurrentWriter = 0;
301     private long kvsSeen = 0;
302     private long kvsSeenInPrevious = 0;
303 
304     /**
305      * @param targetCount The maximum count of writers that can be created.
306      * @param targetKvs The number of KVs to read from source before starting each new writer.
307      * @param left The left boundary of the first writer.
308      * @param right The right boundary of the last writer.
309      */
310     public SizeMultiWriter(int targetCount, long targetKvs, byte[] left, byte[] right) {
311       super();
312       this.targetCount = targetCount;
313       this.targetKvs = targetKvs;
314       this.left = left;
315       this.right = right;
316       int preallocate = Math.min(this.targetCount, 64);
317       this.existingWriters = new ArrayList<StoreFile.Writer>(preallocate);
318       this.boundaries = new ArrayList<byte[]>(preallocate + 1);
319     }
320 
321     @Override
322     public void append(KeyValue kv) throws IOException {
323       // If we are waiting for opportunity to close and we started writing different row,
324       // discard the writer and stop waiting.
325       boolean doCreateWriter = false;
326       if (currentWriter == null) {
327         // First append ever, do a sanity check.
328         sanityCheckLeft(left, kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
329         doCreateWriter = true;
330       } else if (lastRowInCurrentWriter != null
331           && !comparator.matchingRows(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
332               lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
333         if (LOG.isDebugEnabled()) {
334           LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter)
335               + "] row; wrote out "  + kvsInCurrentWriter + " kvs");
336         }
337         lastRowInCurrentWriter = null;
338         kvsInCurrentWriter = 0;
339         kvsSeenInPrevious += kvsSeen;
340         doCreateWriter = true;
341       }
342       if (doCreateWriter) {
343         byte[] boundary = existingWriters.isEmpty() ? left : kv.getRow(); // make a copy
344         if (LOG.isDebugEnabled()) {
345           LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]");
346         }
347         currentWriter = writerFactory.createWriter();
348         boundaries.add(boundary);
349         existingWriters.add(currentWriter);
350       }
351 
352       currentWriter.append(kv);
353       lastKv = kv; // for the sanity check
354       ++kvsInCurrentWriter;
355       kvsSeen = kvsInCurrentWriter;
356       if (this.sourceScanner != null) {
357         kvsSeen = Math.max(kvsSeen,
358             this.sourceScanner.getEstimatedNumberOfKvsScanned() - kvsSeenInPrevious);
359       }
360 
361       // If we are not already waiting for opportunity to close, start waiting if we can
362       // create any more writers and if the current one is too big.
363       if (lastRowInCurrentWriter == null
364           && existingWriters.size() < targetCount
365           && kvsSeen >= targetKvs) {
366         lastRowInCurrentWriter = kv.getRow(); // make a copy
367         if (LOG.isDebugEnabled()) {
368           LOG.debug("Preparing to start a new writer after [" + Bytes.toString(
369               lastRowInCurrentWriter) + "] row; observed " + kvsSeen + " kvs and wrote out "
370               + kvsInCurrentWriter + " kvs");
371         }
372       }
373     }
374 
375     @Override
376     protected void commitWritersInternal() throws IOException {
377       if (LOG.isDebugEnabled()) {
378         LOG.debug("Stopping with "  + kvsInCurrentWriter + " kvs in last writer" +
379             ((this.sourceScanner == null) ? "" : ("; observed estimated "
380                 + this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs total")));
381       }
382       if (lastKv != null) {
383         sanityCheckRight(
384             right, lastKv.getRowArray(), lastKv.getRowOffset(), lastKv.getRowLength());
385       }
386       this.boundaries.add(right);
387     }
388   }
389 }