001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.List; 026 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellComparator; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.PrivateCellUtil; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034import org.apache.hadoop.hbase.util.Bytes; 035 036/** 037 * Base class for cell sink that separates the provided cells into multiple files for stripe 038 * compaction. 039 */ 040@InterfaceAudience.Private 041public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter { 042 043 private static final Logger LOG = LoggerFactory.getLogger(StripeMultiFileWriter.class); 044 045 protected final CellComparator comparator; 046 protected List<StoreFileWriter> existingWriters; 047 protected List<byte[]> boundaries; 048 049 /** Whether to write stripe metadata */ 050 private boolean doWriteStripeMetadata = true; 051 052 public StripeMultiFileWriter(CellComparator comparator) { 053 this.comparator = comparator; 054 } 055 056 public void setNoStripeMetadata() { 057 this.doWriteStripeMetadata = false; 058 } 059 060 @Override 061 protected Collection<StoreFileWriter> writers() { 062 return existingWriters; 063 } 064 065 protected abstract void preCommitWritersInternal() throws IOException; 066 067 @Override 068 protected final void preCommitWriters() throws IOException { 069 // do some sanity check here. 070 assert this.existingWriters != null; 071 preCommitWritersInternal(); 072 assert this.boundaries.size() == (this.existingWriters.size() + 1); 073 } 074 075 @Override 076 protected void preCloseWriter(StoreFileWriter writer) throws IOException { 077 if (doWriteStripeMetadata) { 078 if (LOG.isDebugEnabled()) { 079 LOG.debug("Write stripe metadata for " + writer.getPath().toString()); 080 } 081 int index = existingWriters.indexOf(writer); 082 writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, boundaries.get(index)); 083 writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, boundaries.get(index + 1)); 084 } else { 085 if (LOG.isDebugEnabled()) { 086 LOG.debug("Skip writing stripe metadata for " + writer.getPath().toString()); 087 } 088 } 089 } 090 091 /** 092 * Subclasses can call this method to make sure the first KV is within multi-writer range. 093 * @param left The left boundary of the writer. 094 * @param cell The cell whose row has to be checked. 095 */ 096 protected void sanityCheckLeft(byte[] left, Cell cell) throws IOException { 097 if (!Arrays.equals(StripeStoreFileManager.OPEN_KEY, left) 098 && comparator.compareRows(cell, left, 0, left.length) < 0) { 099 String error = 100 "The first row is lower than the left boundary of [" + Bytes.toString(left) + "]: [" 101 + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + "]"; 102 LOG.error(error); 103 throw new IOException(error); 104 } 105 } 106 107 /** 108 * Subclasses can call this method to make sure the last KV is within multi-writer range. 109 * @param right The right boundary of the writer. 110 */ 111 protected void sanityCheckRight(byte[] right, Cell cell) throws IOException { 112 if (!Arrays.equals(StripeStoreFileManager.OPEN_KEY, right) 113 && comparator.compareRows(cell, right, 0, right.length) >= 0) { 114 String error = 115 "The last row is higher or equal than the right boundary of [" + Bytes.toString(right) 116 + "]: [" 117 + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + "]"; 118 LOG.error(error); 119 throw new IOException(error); 120 } 121 } 122 123 /** 124 * MultiWriter that separates the cells based on fixed row-key boundaries. All the KVs between 125 * each pair of neighboring boundaries from the list supplied to ctor will end up in one file, and 126 * separate from all other such pairs. 127 */ 128 public static class BoundaryMultiWriter extends StripeMultiFileWriter { 129 private StoreFileWriter currentWriter; 130 private byte[] currentWriterEndKey; 131 132 private Cell lastCell; 133 private long cellsInCurrentWriter = 0; 134 private int majorRangeFromIndex = -1, majorRangeToIndex = -1; 135 private boolean hasAnyWriter = false; 136 137 /** 138 * @param targetBoundaries The boundaries on which writers/files are separated. 139 * @param majorRangeFrom Major range is the range for which at least one file should be written 140 * (because all files are included in compaction). majorRangeFrom is the left boundary. 141 * @param majorRangeTo The right boundary of majorRange (see majorRangeFrom). 142 */ 143 public BoundaryMultiWriter(CellComparator comparator, List<byte[]> targetBoundaries, 144 byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException { 145 super(comparator); 146 this.boundaries = targetBoundaries; 147 this.existingWriters = new ArrayList<>(this.boundaries.size() - 1); 148 // "major" range (range for which all files are included) boundaries, if any, 149 // must match some target boundaries, let's find them. 150 assert (majorRangeFrom == null) == (majorRangeTo == null); 151 if (majorRangeFrom != null) { 152 majorRangeFromIndex = 153 Arrays.equals(majorRangeFrom, StripeStoreFileManager.OPEN_KEY) ? 0 : Collections 154 .binarySearch(boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR); 155 majorRangeToIndex = 156 Arrays.equals(majorRangeTo, StripeStoreFileManager.OPEN_KEY) ? boundaries.size() 157 : Collections.binarySearch(boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR); 158 if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) { 159 throw new IOException("Major range does not match writer boundaries: [" 160 + Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from " 161 + majorRangeFromIndex + " to " + majorRangeToIndex); 162 } 163 } 164 } 165 166 @Override 167 public void append(Cell cell) throws IOException { 168 if (currentWriter == null && existingWriters.isEmpty()) { 169 // First append ever, do a sanity check. 170 sanityCheckLeft(this.boundaries.get(0), cell); 171 } 172 prepareWriterFor(cell); 173 currentWriter.append(cell); 174 lastCell = cell; // for the sanity check 175 ++cellsInCurrentWriter; 176 } 177 178 private boolean isCellAfterCurrentWriter(Cell cell) { 179 return !Arrays.equals(currentWriterEndKey, StripeStoreFileManager.OPEN_KEY) 180 && (comparator.compareRows(cell, currentWriterEndKey, 0, currentWriterEndKey.length) >= 0); 181 } 182 183 @Override 184 protected void preCommitWritersInternal() throws IOException { 185 stopUsingCurrentWriter(); 186 while (existingWriters.size() < boundaries.size() - 1) { 187 createEmptyWriter(); 188 } 189 if (lastCell != null) { 190 sanityCheckRight(boundaries.get(boundaries.size() - 1), lastCell); 191 } 192 } 193 194 private void prepareWriterFor(Cell cell) throws IOException { 195 if (currentWriter != null && !isCellAfterCurrentWriter(cell)) return; // Use same writer. 196 197 stopUsingCurrentWriter(); 198 // See if KV will be past the writer we are about to create; need to add another one. 199 while (isCellAfterCurrentWriter(cell)) { 200 checkCanCreateWriter(); 201 createEmptyWriter(); 202 } 203 checkCanCreateWriter(); 204 hasAnyWriter = true; 205 currentWriter = writerFactory.createWriter(); 206 existingWriters.add(currentWriter); 207 } 208 209 /** 210 * Called if there are no cells for some stripe. We need to have something in the writer list 211 * for this stripe, so that writer-boundary list indices correspond to each other. We can insert 212 * null in the writer list for that purpose, except in the following cases where we actually 213 * need a file: 1) If we are in range for which we are compacting all the files, we need to 214 * create an empty file to preserve stripe metadata. 2) If we have not produced any file at all 215 * for this compactions, and this is the last chance (the last stripe), we need to preserve last 216 * seqNum (see also HBASE-6059). 217 */ 218 private void createEmptyWriter() throws IOException { 219 int index = existingWriters.size(); 220 boolean isInMajorRange = (index >= majorRangeFromIndex) && (index < majorRangeToIndex); 221 // Stripe boundary count = stripe count + 1, so last stripe index is (#boundaries minus 2) 222 boolean isLastWriter = !hasAnyWriter && (index == (boundaries.size() - 2)); 223 boolean needEmptyFile = isInMajorRange || isLastWriter; 224 existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null); 225 hasAnyWriter |= needEmptyFile; 226 currentWriterEndKey = 227 (existingWriters.size() + 1 == boundaries.size()) ? null : boundaries.get(existingWriters 228 .size() + 1); 229 } 230 231 private void checkCanCreateWriter() throws IOException { 232 int maxWriterCount = boundaries.size() - 1; 233 assert existingWriters.size() <= maxWriterCount; 234 if (existingWriters.size() >= maxWriterCount) { 235 throw new IOException("Cannot create any more writers (created " + existingWriters.size() 236 + " out of " + maxWriterCount + " - row might be out of range of all valid writers"); 237 } 238 } 239 240 private void stopUsingCurrentWriter() { 241 if (currentWriter != null) { 242 if (LOG.isDebugEnabled()) { 243 LOG.debug("Stopping to use a writer after [" + Bytes.toString(currentWriterEndKey) 244 + "] row; wrote out " + cellsInCurrentWriter + " kvs"); 245 } 246 cellsInCurrentWriter = 0; 247 } 248 currentWriter = null; 249 currentWriterEndKey = 250 (existingWriters.size() + 1 == boundaries.size()) ? null : boundaries.get(existingWriters 251 .size() + 1); 252 } 253 } 254 255 /** 256 * MultiWriter that separates the cells based on target cell number per file and file count. New 257 * file is started every time the target number of KVs is reached, unless the fixed count of 258 * writers has already been created (in that case all the remaining KVs go into the last writer). 259 */ 260 public static class SizeMultiWriter extends StripeMultiFileWriter { 261 private int targetCount; 262 private long targetCells; 263 private byte[] left; 264 private byte[] right; 265 266 private Cell lastCell; 267 private StoreFileWriter currentWriter; 268 protected byte[] lastRowInCurrentWriter = null; 269 private long cellsInCurrentWriter = 0; 270 private long cellsSeen = 0; 271 private long cellsSeenInPrevious = 0; 272 273 /** 274 * @param targetCount The maximum count of writers that can be created. 275 * @param targetKvs The number of KVs to read from source before starting each new writer. 276 * @param left The left boundary of the first writer. 277 * @param right The right boundary of the last writer. 278 */ 279 public SizeMultiWriter(CellComparator comparator, int targetCount, long targetKvs, byte[] left, 280 byte[] right) { 281 super(comparator); 282 this.targetCount = targetCount; 283 this.targetCells = targetKvs; 284 this.left = left; 285 this.right = right; 286 int preallocate = Math.min(this.targetCount, 64); 287 this.existingWriters = new ArrayList<>(preallocate); 288 this.boundaries = new ArrayList<>(preallocate + 1); 289 } 290 291 @Override 292 public void append(Cell cell) throws IOException { 293 // If we are waiting for opportunity to close and we started writing different row, 294 // discard the writer and stop waiting. 295 boolean doCreateWriter = false; 296 if (currentWriter == null) { 297 // First append ever, do a sanity check. 298 sanityCheckLeft(left, cell); 299 doCreateWriter = true; 300 } else if (lastRowInCurrentWriter != null 301 && !PrivateCellUtil.matchingRows(cell, lastRowInCurrentWriter, 0, 302 lastRowInCurrentWriter.length)) { 303 if (LOG.isDebugEnabled()) { 304 LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter) 305 + "] row; wrote out " + cellsInCurrentWriter + " kvs"); 306 } 307 lastRowInCurrentWriter = null; 308 cellsInCurrentWriter = 0; 309 cellsSeenInPrevious += cellsSeen; 310 doCreateWriter = true; 311 } 312 if (doCreateWriter) { 313 // make a copy 314 byte[] boundary = existingWriters.isEmpty() ? left : CellUtil.cloneRow(cell); 315 if (LOG.isDebugEnabled()) { 316 LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]"); 317 } 318 currentWriter = writerFactory.createWriter(); 319 boundaries.add(boundary); 320 existingWriters.add(currentWriter); 321 } 322 323 currentWriter.append(cell); 324 lastCell = cell; // for the sanity check 325 ++cellsInCurrentWriter; 326 cellsSeen = cellsInCurrentWriter; 327 if (this.sourceScanner != null) { 328 cellsSeen = 329 Math.max(cellsSeen, this.sourceScanner.getEstimatedNumberOfKvsScanned() 330 - cellsSeenInPrevious); 331 } 332 333 // If we are not already waiting for opportunity to close, start waiting if we can 334 // create any more writers and if the current one is too big. 335 if (lastRowInCurrentWriter == null && existingWriters.size() < targetCount 336 && cellsSeen >= targetCells) { 337 lastRowInCurrentWriter = CellUtil.cloneRow(cell); // make a copy 338 if (LOG.isDebugEnabled()) { 339 LOG.debug("Preparing to start a new writer after [" 340 + Bytes.toString(lastRowInCurrentWriter) + "] row; observed " + cellsSeen 341 + " kvs and wrote out " + cellsInCurrentWriter + " kvs"); 342 } 343 } 344 } 345 346 @Override 347 protected void preCommitWritersInternal() throws IOException { 348 if (LOG.isDebugEnabled()) { 349 LOG.debug("Stopping with " 350 + cellsInCurrentWriter 351 + " kvs in last writer" 352 + ((this.sourceScanner == null) ? "" : ("; observed estimated " 353 + this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs total"))); 354 } 355 if (lastCell != null) { 356 sanityCheckRight(right, lastCell); 357 } 358 359 // When expired stripes were going to be merged into one, and if no writer was created during 360 // the compaction, we need to create an empty file to preserve metadata. 361 if (existingWriters.isEmpty() && 1 == targetCount) { 362 if (LOG.isDebugEnabled()) { 363 LOG.debug("Merge expired stripes into one, create an empty file to preserve metadata."); 364 } 365 boundaries.add(left); 366 existingWriters.add(writerFactory.createWriter()); 367 } 368 369 this.boundaries.add(right); 370 } 371 } 372}