001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.List; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.PrivateCellUtil; 030import org.apache.hadoop.hbase.util.Bytes; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * Base class for cell sink that separates the provided cells into multiple files for stripe 037 * compaction. 038 */ 039@InterfaceAudience.Private 040public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter { 041 042 private static final Logger LOG = LoggerFactory.getLogger(StripeMultiFileWriter.class); 043 044 protected final CellComparator comparator; 045 protected List<StoreFileWriter> existingWriters; 046 protected List<byte[]> boundaries; 047 048 /** Whether to write stripe metadata */ 049 private boolean doWriteStripeMetadata = true; 050 051 public StripeMultiFileWriter(CellComparator comparator) { 052 this.comparator = comparator; 053 } 054 055 public void setNoStripeMetadata() { 056 this.doWriteStripeMetadata = false; 057 } 058 059 @Override 060 protected Collection<StoreFileWriter> writers() { 061 return existingWriters; 062 } 063 064 protected abstract void preCommitWritersInternal() throws IOException; 065 066 @Override 067 protected final void preCommitWriters() throws IOException { 068 // do some sanity check here. 069 assert this.existingWriters != null; 070 preCommitWritersInternal(); 071 assert this.boundaries.size() == (this.existingWriters.size() + 1); 072 } 073 074 @Override 075 protected void preCloseWriter(StoreFileWriter writer) throws IOException { 076 if (doWriteStripeMetadata) { 077 if (LOG.isDebugEnabled()) { 078 LOG.debug("Write stripe metadata for " + writer.getPath().toString()); 079 } 080 int index = existingWriters.indexOf(writer); 081 writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, boundaries.get(index)); 082 writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, boundaries.get(index + 1)); 083 } else { 084 if (LOG.isDebugEnabled()) { 085 LOG.debug("Skip writing stripe metadata for " + writer.getPath().toString()); 086 } 087 } 088 } 089 090 /** 091 * Subclasses can call this method to make sure the first KV is within multi-writer range. 092 * @param left The left boundary of the writer. 093 * @param cell The cell whose row has to be checked. 094 */ 095 protected void sanityCheckLeft(byte[] left, Cell cell) throws IOException { 096 if ( 097 !Arrays.equals(StripeStoreFileManager.OPEN_KEY, left) 098 && comparator.compareRows(cell, left, 0, left.length) < 0 099 ) { 100 String error = 101 "The first row is lower than the left boundary of [" + Bytes.toString(left) + "]: [" 102 + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + "]"; 103 LOG.error(error); 104 throw new IOException(error); 105 } 106 } 107 108 /** 109 * Subclasses can call this method to make sure the last KV is within multi-writer range. 110 * @param right The right boundary of the writer. 111 */ 112 protected void sanityCheckRight(byte[] right, Cell cell) throws IOException { 113 if ( 114 !Arrays.equals(StripeStoreFileManager.OPEN_KEY, right) 115 && comparator.compareRows(cell, right, 0, right.length) >= 0 116 ) { 117 String error = "The last row is higher or equal than the right boundary of [" 118 + Bytes.toString(right) + "]: [" 119 + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + "]"; 120 LOG.error(error); 121 throw new IOException(error); 122 } 123 } 124 125 /** 126 * MultiWriter that separates the cells based on fixed row-key boundaries. All the KVs between 127 * each pair of neighboring boundaries from the list supplied to ctor will end up in one file, and 128 * separate from all other such pairs. 129 */ 130 public static class BoundaryMultiWriter extends StripeMultiFileWriter { 131 private StoreFileWriter currentWriter; 132 private byte[] currentWriterEndKey; 133 134 private Cell lastCell; 135 private long cellsInCurrentWriter = 0; 136 private int majorRangeFromIndex = -1, majorRangeToIndex = -1; 137 private boolean hasAnyWriter = false; 138 139 /** 140 * @param targetBoundaries The boundaries on which writers/files are separated. 141 * @param majorRangeFrom Major range is the range for which at least one file should be 142 * written (because all files are included in compaction). 143 * majorRangeFrom is the left boundary. 144 * @param majorRangeTo The right boundary of majorRange (see majorRangeFrom). 145 */ 146 public BoundaryMultiWriter(CellComparator comparator, List<byte[]> targetBoundaries, 147 byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException { 148 super(comparator); 149 this.boundaries = targetBoundaries; 150 this.existingWriters = new ArrayList<>(this.boundaries.size() - 1); 151 // "major" range (range for which all files are included) boundaries, if any, 152 // must match some target boundaries, let's find them. 153 assert (majorRangeFrom == null) == (majorRangeTo == null); 154 if (majorRangeFrom != null) { 155 majorRangeFromIndex = Arrays.equals(majorRangeFrom, StripeStoreFileManager.OPEN_KEY) 156 ? 0 157 : Collections.binarySearch(boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR); 158 majorRangeToIndex = Arrays.equals(majorRangeTo, StripeStoreFileManager.OPEN_KEY) 159 ? boundaries.size() 160 : Collections.binarySearch(boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR); 161 if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) { 162 throw new IOException("Major range does not match writer boundaries: [" 163 + Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from " 164 + majorRangeFromIndex + " to " + majorRangeToIndex); 165 } 166 } 167 } 168 169 @Override 170 public void append(Cell cell) throws IOException { 171 if (currentWriter == null && existingWriters.isEmpty()) { 172 // First append ever, do a sanity check. 173 sanityCheckLeft(this.boundaries.get(0), cell); 174 } 175 prepareWriterFor(cell); 176 currentWriter.append(cell); 177 lastCell = cell; // for the sanity check 178 ++cellsInCurrentWriter; 179 } 180 181 private boolean isCellAfterCurrentWriter(Cell cell) { 182 return !Arrays.equals(currentWriterEndKey, StripeStoreFileManager.OPEN_KEY) 183 && (comparator.compareRows(cell, currentWriterEndKey, 0, currentWriterEndKey.length) >= 0); 184 } 185 186 @Override 187 protected void preCommitWritersInternal() throws IOException { 188 stopUsingCurrentWriter(); 189 while (existingWriters.size() < boundaries.size() - 1) { 190 createEmptyWriter(); 191 } 192 if (lastCell != null) { 193 sanityCheckRight(boundaries.get(boundaries.size() - 1), lastCell); 194 } 195 } 196 197 private void prepareWriterFor(Cell cell) throws IOException { 198 if (currentWriter != null && !isCellAfterCurrentWriter(cell)) return; // Use same writer. 199 200 stopUsingCurrentWriter(); 201 // See if KV will be past the writer we are about to create; need to add another one. 202 while (isCellAfterCurrentWriter(cell)) { 203 checkCanCreateWriter(); 204 createEmptyWriter(); 205 } 206 checkCanCreateWriter(); 207 hasAnyWriter = true; 208 currentWriter = writerFactory.createWriter(); 209 existingWriters.add(currentWriter); 210 } 211 212 /** 213 * Called if there are no cells for some stripe. We need to have something in the writer list 214 * for this stripe, so that writer-boundary list indices correspond to each other. We can insert 215 * null in the writer list for that purpose, except in the following cases where we actually 216 * need a file: 1) If we are in range for which we are compacting all the files, we need to 217 * create an empty file to preserve stripe metadata. 2) If we have not produced any file at all 218 * for this compactions, and this is the last chance (the last stripe), we need to preserve last 219 * seqNum (see also HBASE-6059). 220 */ 221 private void createEmptyWriter() throws IOException { 222 int index = existingWriters.size(); 223 boolean isInMajorRange = (index >= majorRangeFromIndex) && (index < majorRangeToIndex); 224 // Stripe boundary count = stripe count + 1, so last stripe index is (#boundaries minus 2) 225 boolean isLastWriter = !hasAnyWriter && (index == (boundaries.size() - 2)); 226 boolean needEmptyFile = isInMajorRange || isLastWriter; 227 existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null); 228 hasAnyWriter |= needEmptyFile; 229 currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size()) 230 ? null 231 : boundaries.get(existingWriters.size() + 1); 232 } 233 234 private void checkCanCreateWriter() throws IOException { 235 int maxWriterCount = boundaries.size() - 1; 236 assert existingWriters.size() <= maxWriterCount; 237 if (existingWriters.size() >= maxWriterCount) { 238 throw new IOException("Cannot create any more writers (created " + existingWriters.size() 239 + " out of " + maxWriterCount + " - row might be out of range of all valid writers"); 240 } 241 } 242 243 private void stopUsingCurrentWriter() { 244 if (currentWriter != null) { 245 if (LOG.isDebugEnabled()) { 246 LOG.debug("Stopping to use a writer after [" + Bytes.toString(currentWriterEndKey) 247 + "] row; wrote out " + cellsInCurrentWriter + " kvs"); 248 } 249 cellsInCurrentWriter = 0; 250 } 251 currentWriter = null; 252 currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size()) 253 ? null 254 : boundaries.get(existingWriters.size() + 1); 255 } 256 } 257 258 /** 259 * MultiWriter that separates the cells based on target cell number per file and file count. New 260 * file is started every time the target number of KVs is reached, unless the fixed count of 261 * writers has already been created (in that case all the remaining KVs go into the last writer). 262 */ 263 public static class SizeMultiWriter extends StripeMultiFileWriter { 264 private int targetCount; 265 private long targetCells; 266 private byte[] left; 267 private byte[] right; 268 269 private Cell lastCell; 270 private StoreFileWriter currentWriter; 271 protected byte[] lastRowInCurrentWriter = null; 272 private long cellsInCurrentWriter = 0; 273 private long cellsSeen = 0; 274 private long cellsSeenInPrevious = 0; 275 276 /** 277 * @param targetCount The maximum count of writers that can be created. 278 * @param targetKvs The number of KVs to read from source before starting each new writer. 279 * @param left The left boundary of the first writer. 280 * @param right The right boundary of the last writer. 281 */ 282 public SizeMultiWriter(CellComparator comparator, int targetCount, long targetKvs, byte[] left, 283 byte[] right) { 284 super(comparator); 285 this.targetCount = targetCount; 286 this.targetCells = targetKvs; 287 this.left = left; 288 this.right = right; 289 int preallocate = Math.min(this.targetCount, 64); 290 this.existingWriters = new ArrayList<>(preallocate); 291 this.boundaries = new ArrayList<>(preallocate + 1); 292 } 293 294 @Override 295 public void append(Cell cell) throws IOException { 296 // If we are waiting for opportunity to close and we started writing different row, 297 // discard the writer and stop waiting. 298 boolean doCreateWriter = false; 299 if (currentWriter == null) { 300 // First append ever, do a sanity check. 301 sanityCheckLeft(left, cell); 302 doCreateWriter = true; 303 } else if ( 304 lastRowInCurrentWriter != null && !PrivateCellUtil.matchingRows(cell, 305 lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length) 306 ) { 307 if (LOG.isDebugEnabled()) { 308 LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter) 309 + "] row; wrote out " + cellsInCurrentWriter + " kvs"); 310 } 311 lastRowInCurrentWriter = null; 312 cellsInCurrentWriter = 0; 313 cellsSeenInPrevious += cellsSeen; 314 doCreateWriter = true; 315 } 316 if (doCreateWriter) { 317 // make a copy 318 byte[] boundary = existingWriters.isEmpty() ? left : CellUtil.cloneRow(cell); 319 if (LOG.isDebugEnabled()) { 320 LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]"); 321 } 322 currentWriter = writerFactory.createWriter(); 323 boundaries.add(boundary); 324 existingWriters.add(currentWriter); 325 } 326 327 currentWriter.append(cell); 328 lastCell = cell; // for the sanity check 329 ++cellsInCurrentWriter; 330 cellsSeen = cellsInCurrentWriter; 331 if (this.sourceScanner != null) { 332 cellsSeen = Math.max(cellsSeen, 333 this.sourceScanner.getEstimatedNumberOfKvsScanned() - cellsSeenInPrevious); 334 } 335 336 // If we are not already waiting for opportunity to close, start waiting if we can 337 // create any more writers and if the current one is too big. 338 if ( 339 lastRowInCurrentWriter == null && existingWriters.size() < targetCount 340 && cellsSeen >= targetCells 341 ) { 342 lastRowInCurrentWriter = CellUtil.cloneRow(cell); // make a copy 343 if (LOG.isDebugEnabled()) { 344 LOG.debug("Preparing to start a new writer after [" 345 + Bytes.toString(lastRowInCurrentWriter) + "] row; observed " + cellsSeen 346 + " kvs and wrote out " + cellsInCurrentWriter + " kvs"); 347 } 348 } 349 } 350 351 @Override 352 protected void preCommitWritersInternal() throws IOException { 353 if (LOG.isDebugEnabled()) { 354 LOG.debug("Stopping with " + cellsInCurrentWriter + " kvs in last writer" 355 + ((this.sourceScanner == null) 356 ? "" 357 : ("; observed estimated " + this.sourceScanner.getEstimatedNumberOfKvsScanned() 358 + " KVs total"))); 359 } 360 if (lastCell != null) { 361 sanityCheckRight(right, lastCell); 362 } 363 364 // When expired stripes were going to be merged into one, and if no writer was created during 365 // the compaction, we need to create an empty file to preserve metadata. 366 if (existingWriters.isEmpty() && 1 == targetCount) { 367 if (LOG.isDebugEnabled()) { 368 LOG.debug("Merge expired stripes into one, create an empty file to preserve metadata."); 369 } 370 boundaries.add(left); 371 existingWriters.add(writerFactory.createWriter()); 372 } 373 374 this.boundaries.add(right); 375 } 376 } 377}