001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.List; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.ExtendedCell; 030import org.apache.hadoop.hbase.PrivateCellUtil; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * Base class for cell sink that separates the provided cells into multiple files for stripe 038 * compaction. 039 */ 040@InterfaceAudience.Private 041public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter { 042 043 private static final Logger LOG = LoggerFactory.getLogger(StripeMultiFileWriter.class); 044 045 protected final CellComparator comparator; 046 protected List<StoreFileWriter> existingWriters; 047 protected List<byte[]> boundaries; 048 049 /** Whether to write stripe metadata */ 050 private boolean doWriteStripeMetadata = true; 051 052 public StripeMultiFileWriter(CellComparator comparator) { 053 this.comparator = comparator; 054 } 055 056 public void setNoStripeMetadata() { 057 this.doWriteStripeMetadata = false; 058 } 059 060 @Override 061 protected Collection<StoreFileWriter> writers() { 062 return existingWriters; 063 } 064 065 protected abstract void preCommitWritersInternal() throws IOException; 066 067 @Override 068 protected final void preCommitWriters() throws IOException { 069 // do some sanity check here. 070 assert this.existingWriters != null; 071 preCommitWritersInternal(); 072 assert this.boundaries.size() == (this.existingWriters.size() + 1); 073 } 074 075 @Override 076 protected void preCloseWriter(StoreFileWriter writer) throws IOException { 077 if (doWriteStripeMetadata) { 078 if (LOG.isDebugEnabled()) { 079 LOG.debug("Write stripe metadata for " + writer.getPath().toString()); 080 } 081 int index = existingWriters.indexOf(writer); 082 writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, boundaries.get(index)); 083 writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, boundaries.get(index + 1)); 084 } else { 085 if (LOG.isDebugEnabled()) { 086 LOG.debug("Skip writing stripe metadata for " + writer.getPath().toString()); 087 } 088 } 089 } 090 091 /** 092 * Subclasses can call this method to make sure the first KV is within multi-writer range. 093 * @param left The left boundary of the writer. 094 * @param cell The cell whose row has to be checked. 095 */ 096 protected void sanityCheckLeft(byte[] left, Cell cell) throws IOException { 097 if ( 098 !Arrays.equals(StripeStoreFileManager.OPEN_KEY, left) 099 && comparator.compareRows(cell, left, 0, left.length) < 0 100 ) { 101 String error = 102 "The first row is lower than the left boundary of [" + Bytes.toString(left) + "]: [" 103 + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + "]"; 104 LOG.error(error); 105 throw new IOException(error); 106 } 107 } 108 109 /** 110 * Subclasses can call this method to make sure the last KV is within multi-writer range. 111 * @param right The right boundary of the writer. 112 */ 113 protected void sanityCheckRight(byte[] right, Cell cell) throws IOException { 114 if ( 115 !Arrays.equals(StripeStoreFileManager.OPEN_KEY, right) 116 && comparator.compareRows(cell, right, 0, right.length) >= 0 117 ) { 118 String error = "The last row is higher or equal than the right boundary of [" 119 + Bytes.toString(right) + "]: [" 120 + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + "]"; 121 LOG.error(error); 122 throw new IOException(error); 123 } 124 } 125 126 /** 127 * MultiWriter that separates the cells based on fixed row-key boundaries. All the KVs between 128 * each pair of neighboring boundaries from the list supplied to ctor will end up in one file, and 129 * separate from all other such pairs. 130 */ 131 public static class BoundaryMultiWriter extends StripeMultiFileWriter { 132 private StoreFileWriter currentWriter; 133 private byte[] currentWriterEndKey; 134 135 private Cell lastCell; 136 private long cellsInCurrentWriter = 0; 137 private int majorRangeFromIndex = -1, majorRangeToIndex = -1; 138 private boolean hasAnyWriter = false; 139 140 /** 141 * @param targetBoundaries The boundaries on which writers/files are separated. 142 * @param majorRangeFrom Major range is the range for which at least one file should be 143 * written (because all files are included in compaction). 144 * majorRangeFrom is the left boundary. 145 * @param majorRangeTo The right boundary of majorRange (see majorRangeFrom). 146 */ 147 public BoundaryMultiWriter(CellComparator comparator, List<byte[]> targetBoundaries, 148 byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException { 149 super(comparator); 150 this.boundaries = targetBoundaries; 151 this.existingWriters = new ArrayList<>(this.boundaries.size() - 1); 152 // "major" range (range for which all files are included) boundaries, if any, 153 // must match some target boundaries, let's find them. 154 assert (majorRangeFrom == null) == (majorRangeTo == null); 155 if (majorRangeFrom != null) { 156 majorRangeFromIndex = Arrays.equals(majorRangeFrom, StripeStoreFileManager.OPEN_KEY) 157 ? 0 158 : Collections.binarySearch(boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR); 159 majorRangeToIndex = Arrays.equals(majorRangeTo, StripeStoreFileManager.OPEN_KEY) 160 ? boundaries.size() 161 : Collections.binarySearch(boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR); 162 if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) { 163 throw new IOException("Major range does not match writer boundaries: [" 164 + Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from " 165 + majorRangeFromIndex + " to " + majorRangeToIndex); 166 } 167 } 168 } 169 170 @Override 171 public void append(ExtendedCell cell) throws IOException { 172 if (currentWriter == null && existingWriters.isEmpty()) { 173 // First append ever, do a sanity check. 174 sanityCheckLeft(this.boundaries.get(0), cell); 175 } 176 prepareWriterFor(cell); 177 currentWriter.append(cell); 178 lastCell = cell; // for the sanity check 179 ++cellsInCurrentWriter; 180 } 181 182 private boolean isCellAfterCurrentWriter(Cell cell) { 183 return !Arrays.equals(currentWriterEndKey, StripeStoreFileManager.OPEN_KEY) 184 && (comparator.compareRows(cell, currentWriterEndKey, 0, currentWriterEndKey.length) >= 0); 185 } 186 187 @Override 188 protected void preCommitWritersInternal() throws IOException { 189 stopUsingCurrentWriter(); 190 while (existingWriters.size() < boundaries.size() - 1) { 191 createEmptyWriter(); 192 } 193 if (lastCell != null) { 194 sanityCheckRight(boundaries.get(boundaries.size() - 1), lastCell); 195 } 196 } 197 198 private void prepareWriterFor(Cell cell) throws IOException { 199 if (currentWriter != null && !isCellAfterCurrentWriter(cell)) return; // Use same writer. 200 201 stopUsingCurrentWriter(); 202 // See if KV will be past the writer we are about to create; need to add another one. 203 while (isCellAfterCurrentWriter(cell)) { 204 checkCanCreateWriter(); 205 createEmptyWriter(); 206 } 207 checkCanCreateWriter(); 208 hasAnyWriter = true; 209 currentWriter = writerFactory.createWriter(); 210 existingWriters.add(currentWriter); 211 } 212 213 /** 214 * Called if there are no cells for some stripe. We need to have something in the writer list 215 * for this stripe, so that writer-boundary list indices correspond to each other. We can insert 216 * null in the writer list for that purpose, except in the following cases where we actually 217 * need a file: 1) If we are in range for which we are compacting all the files, we need to 218 * create an empty file to preserve stripe metadata. 2) If we have not produced any file at all 219 * for this compactions, and this is the last chance (the last stripe), we need to preserve last 220 * seqNum (see also HBASE-6059). 221 */ 222 private void createEmptyWriter() throws IOException { 223 int index = existingWriters.size(); 224 boolean isInMajorRange = (index >= majorRangeFromIndex) && (index < majorRangeToIndex); 225 // Stripe boundary count = stripe count + 1, so last stripe index is (#boundaries minus 2) 226 boolean isLastWriter = !hasAnyWriter && (index == (boundaries.size() - 2)); 227 boolean needEmptyFile = isInMajorRange || isLastWriter; 228 existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null); 229 hasAnyWriter |= needEmptyFile; 230 currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size()) 231 ? null 232 : boundaries.get(existingWriters.size() + 1); 233 } 234 235 private void checkCanCreateWriter() throws IOException { 236 int maxWriterCount = boundaries.size() - 1; 237 assert existingWriters.size() <= maxWriterCount; 238 if (existingWriters.size() >= maxWriterCount) { 239 throw new IOException("Cannot create any more writers (created " + existingWriters.size() 240 + " out of " + maxWriterCount + " - row might be out of range of all valid writers"); 241 } 242 } 243 244 private void stopUsingCurrentWriter() { 245 if (currentWriter != null) { 246 if (LOG.isDebugEnabled()) { 247 LOG.debug("Stopping to use a writer after [" + Bytes.toString(currentWriterEndKey) 248 + "] row; wrote out " + cellsInCurrentWriter + " kvs"); 249 } 250 cellsInCurrentWriter = 0; 251 } 252 currentWriter = null; 253 currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size()) 254 ? null 255 : boundaries.get(existingWriters.size() + 1); 256 } 257 } 258 259 /** 260 * MultiWriter that separates the cells based on target cell number per file and file count. New 261 * file is started every time the target number of KVs is reached, unless the fixed count of 262 * writers has already been created (in that case all the remaining KVs go into the last writer). 263 */ 264 public static class SizeMultiWriter extends StripeMultiFileWriter { 265 private int targetCount; 266 private long targetCells; 267 private byte[] left; 268 private byte[] right; 269 270 private Cell lastCell; 271 private StoreFileWriter currentWriter; 272 protected byte[] lastRowInCurrentWriter = null; 273 private long cellsInCurrentWriter = 0; 274 private long cellsSeen = 0; 275 private long cellsSeenInPrevious = 0; 276 277 /** 278 * @param targetCount The maximum count of writers that can be created. 279 * @param targetKvs The number of KVs to read from source before starting each new writer. 280 * @param left The left boundary of the first writer. 281 * @param right The right boundary of the last writer. 282 */ 283 public SizeMultiWriter(CellComparator comparator, int targetCount, long targetKvs, byte[] left, 284 byte[] right) { 285 super(comparator); 286 this.targetCount = targetCount; 287 this.targetCells = targetKvs; 288 this.left = left; 289 this.right = right; 290 int preallocate = Math.min(this.targetCount, 64); 291 this.existingWriters = new ArrayList<>(preallocate); 292 this.boundaries = new ArrayList<>(preallocate + 1); 293 } 294 295 @Override 296 public void append(ExtendedCell cell) throws IOException { 297 // If we are waiting for opportunity to close and we started writing different row, 298 // discard the writer and stop waiting. 299 boolean doCreateWriter = false; 300 if (currentWriter == null) { 301 // First append ever, do a sanity check. 302 sanityCheckLeft(left, cell); 303 doCreateWriter = true; 304 } else if ( 305 lastRowInCurrentWriter != null && !PrivateCellUtil.matchingRows(cell, 306 lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length) 307 ) { 308 if (LOG.isDebugEnabled()) { 309 LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter) 310 + "] row; wrote out " + cellsInCurrentWriter + " kvs"); 311 } 312 lastRowInCurrentWriter = null; 313 cellsInCurrentWriter = 0; 314 cellsSeenInPrevious += cellsSeen; 315 doCreateWriter = true; 316 } 317 if (doCreateWriter) { 318 // make a copy 319 byte[] boundary = existingWriters.isEmpty() ? left : CellUtil.cloneRow(cell); 320 if (LOG.isDebugEnabled()) { 321 LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]"); 322 } 323 currentWriter = writerFactory.createWriter(); 324 boundaries.add(boundary); 325 existingWriters.add(currentWriter); 326 } 327 328 currentWriter.append(cell); 329 lastCell = cell; // for the sanity check 330 ++cellsInCurrentWriter; 331 cellsSeen = cellsInCurrentWriter; 332 if (this.sourceScanner != null) { 333 cellsSeen = Math.max(cellsSeen, 334 this.sourceScanner.getEstimatedNumberOfKvsScanned() - cellsSeenInPrevious); 335 } 336 337 // If we are not already waiting for opportunity to close, start waiting if we can 338 // create any more writers and if the current one is too big. 339 if ( 340 lastRowInCurrentWriter == null && existingWriters.size() < targetCount 341 && cellsSeen >= targetCells 342 ) { 343 lastRowInCurrentWriter = CellUtil.cloneRow(cell); // make a copy 344 if (LOG.isDebugEnabled()) { 345 LOG.debug("Preparing to start a new writer after [" 346 + Bytes.toString(lastRowInCurrentWriter) + "] row; observed " + cellsSeen 347 + " kvs and wrote out " + cellsInCurrentWriter + " kvs"); 348 } 349 } 350 } 351 352 @Override 353 protected void preCommitWritersInternal() throws IOException { 354 if (LOG.isDebugEnabled()) { 355 LOG.debug("Stopping with " + cellsInCurrentWriter + " kvs in last writer" 356 + ((this.sourceScanner == null) 357 ? "" 358 : ("; observed estimated " + this.sourceScanner.getEstimatedNumberOfKvsScanned() 359 + " KVs total"))); 360 } 361 if (lastCell != null) { 362 sanityCheckRight(right, lastCell); 363 } 364 365 // When expired stripes were going to be merged into one, and if no writer was created during 366 // the compaction, we need to create an empty file to preserve metadata. 367 if (existingWriters.isEmpty() && 1 == targetCount) { 368 if (LOG.isDebugEnabled()) { 369 LOG.debug("Merge expired stripes into one, create an empty file to preserve metadata."); 370 } 371 boundaries.add(left); 372 existingWriters.add(writerFactory.createWriter()); 373 } 374 375 this.boundaries.add(right); 376 } 377 } 378}