001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Date; 025import java.util.List; 026import java.util.Optional; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.concurrent.atomic.AtomicLong; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.PrivateCellUtil; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; 038import org.apache.hadoop.hbase.regionserver.CellSink; 039import org.apache.hadoop.hbase.regionserver.HStore; 040import org.apache.hadoop.hbase.regionserver.InternalScanner; 041import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 042import org.apache.hadoop.hbase.regionserver.ScannerContext; 043import org.apache.hadoop.hbase.regionserver.ShipperListener; 044import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 045import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; 046import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 047import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 049import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * This class is used for testing only. The main purpose is to emulate random failures during MOB 058 * compaction process. Example of usage: 059 * 060 * <pre> 061 * { 062 * @code 063 * public class SomeTest { 064 * 065 * public void initConfiguration(Configuration conf) { 066 * conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY, 067 * FaultyMobStoreCompactor.class.getName()); 068 * conf.setDouble("hbase.mob.compaction.fault.probability", 0.1); 069 * } 070 * } 071 * } 072 * </pre> 073 * 074 * @see org.apache.hadoop.hbase.mob.MobStressToolRunner on how to use and configure this class. 075 */ 076@InterfaceAudience.Private 077public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor { 078 079 private static final Logger LOG = LoggerFactory.getLogger(FaultyMobStoreCompactor.class); 080 081 public static AtomicLong mobCounter = new AtomicLong(); 082 public static AtomicLong totalFailures = new AtomicLong(); 083 public static AtomicLong totalCompactions = new AtomicLong(); 084 public static AtomicLong totalMajorCompactions = new AtomicLong(); 085 086 static double failureProb = 0.1d; 087 088 public FaultyMobStoreCompactor(Configuration conf, HStore store) { 089 super(conf, store); 090 failureProb = conf.getDouble("hbase.mob.compaction.fault.probability", 0.1); 091 } 092 093 @Override 094 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 095 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 096 CompactionRequestImpl request, CompactionProgress progress) throws IOException { 097 098 boolean major = request.isAllFiles(); 099 totalCompactions.incrementAndGet(); 100 if (major) { 101 totalMajorCompactions.incrementAndGet(); 102 } 103 long bytesWrittenProgressForLog = 0; 104 long bytesWrittenProgressForShippedCall = 0; 105 // Clear old mob references 106 mobRefSet.get().clear(); 107 boolean isUserRequest = userRequest.get(); 108 boolean compactMOBs = major && isUserRequest; 109 boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY, 110 MobConstants.DEFAULT_MOB_DISCARD_MISS); 111 112 boolean mustFail = false; 113 if (compactMOBs) { 114 mobCounter.incrementAndGet(); 115 double dv = ThreadLocalRandom.current().nextDouble(); 116 if (dv < failureProb) { 117 mustFail = true; 118 totalFailures.incrementAndGet(); 119 } 120 } 121 122 FileSystem fs = store.getFileSystem(); 123 124 // Since scanner.next() can return 'false' but still be delivering data, 125 // we have to use a do/while loop. 126 List<Cell> cells = new ArrayList<>(); 127 // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME 128 long currentTime = EnvironmentEdgeManager.currentTime(); 129 long lastMillis = 0; 130 if (LOG.isDebugEnabled()) { 131 lastMillis = currentTime; 132 } 133 CloseChecker closeChecker = new CloseChecker(conf, currentTime); 134 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 135 long now = 0; 136 boolean hasMore; 137 Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 138 byte[] fileName = null; 139 StoreFileWriter mobFileWriter = null; 140 long mobCells = 0; 141 long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; 142 long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; 143 boolean finished = false; 144 145 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax) 146 .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE, 147 compactScannerSizeLimit) 148 .build(); 149 throughputController.start(compactionName); 150 KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; 151 long shippedCallSizeLimit = 152 (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); 153 154 Cell mobCell = null; 155 156 long counter = 0; 157 long countFailAt = -1; 158 if (mustFail) { 159 countFailAt = ThreadLocalRandom.current().nextInt(100); // randomly fail fast 160 } 161 162 try { 163 try { 164 mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, 165 major ? majorCompactionCompression : minorCompactionCompression, 166 store.getRegionInfo().getStartKey(), true); 167 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 168 } catch (IOException e) { 169 // Bailing out 170 LOG.error("Failed to create mob writer, ", e); 171 throw e; 172 } 173 if (compactMOBs) { 174 // Add the only reference we get for compact MOB case 175 // because new store file will have only one MOB reference 176 // in this case - of newly compacted MOB file 177 mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName()); 178 } 179 do { 180 hasMore = scanner.next(cells, scannerContext); 181 currentTime = EnvironmentEdgeManager.currentTime(); 182 if (LOG.isDebugEnabled()) { 183 now = currentTime; 184 } 185 if (closeChecker.isTimeLimit(store, currentTime)) { 186 progress.cancel(); 187 return false; 188 } 189 for (Cell c : cells) { 190 counter++; 191 if (compactMOBs) { 192 if (MobUtils.isMobReferenceCell(c)) { 193 if (counter == countFailAt) { 194 LOG.warn("INJECTED FAULT mobCounter={}", mobCounter.get()); 195 throw new CorruptHFileException("injected fault"); 196 } 197 String fName = MobUtils.getMobFileName(c); 198 // Added to support migration 199 try { 200 mobCell = mobStore.resolve(c, true, false).getCell(); 201 } catch (DoNotRetryIOException e) { 202 if ( 203 discardMobMiss && e.getCause() != null 204 && e.getCause() instanceof FileNotFoundException 205 ) { 206 LOG.error("Missing MOB cell: file={} not found cell={}", fName, c); 207 continue; 208 } else { 209 throw e; 210 } 211 } 212 213 if (discardMobMiss && mobCell.getValueLength() == 0) { 214 LOG.error("Missing MOB cell value: file={} cell={}", fName, mobCell); 215 continue; 216 } 217 218 if (mobCell.getValueLength() > mobSizeThreshold) { 219 // put the mob data back to the store file 220 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 221 mobFileWriter.append(mobCell); 222 writer.append( 223 MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags())); 224 mobCells++; 225 } else { 226 // If MOB value is less than threshold, append it directly to a store file 227 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 228 writer.append(mobCell); 229 cellsCountCompactedFromMob++; 230 cellsSizeCompactedFromMob += mobCell.getValueLength(); 231 } 232 } else { 233 // Not a MOB reference cell 234 int size = c.getValueLength(); 235 if (size > mobSizeThreshold) { 236 mobFileWriter.append(c); 237 writer 238 .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags())); 239 mobCells++; 240 cellsCountCompactedToMob++; 241 cellsSizeCompactedToMob += c.getValueLength(); 242 } else { 243 writer.append(c); 244 } 245 } 246 } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) { 247 // Not a major compaction or major with MOB disabled 248 // If the kv type is not put, directly write the cell 249 // to the store file. 250 writer.append(c); 251 } else if (MobUtils.isMobReferenceCell(c)) { 252 // Not a major MOB compaction, Put MOB reference 253 if (MobUtils.hasValidMobRefCellValue(c)) { 254 int size = MobUtils.getMobValueLength(c); 255 if (size > mobSizeThreshold) { 256 // If the value size is larger than the threshold, it's regarded as a mob. Since 257 // its value is already in the mob file, directly write this cell to the store file 258 Optional<TableName> refTable = MobUtils.getTableName(c); 259 if (refTable.isPresent()) { 260 mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c)); 261 writer.append(c); 262 } else { 263 throw new IOException(String.format("MOB cell did not contain a tablename " 264 + "tag. should not be possible. see ref guide on mob troubleshooting. " 265 + "store=%s cell=%s", getStoreInfo(), c)); 266 } 267 } else { 268 // If the value is not larger than the threshold, it's not regarded a mob. Retrieve 269 // the mob cell from the mob file, and write it back to the store file. 270 mobCell = mobStore.resolve(c, true, false).getCell(); 271 if (mobCell.getValueLength() != 0) { 272 // put the mob data back to the store file 273 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 274 writer.append(mobCell); 275 cellsCountCompactedFromMob++; 276 cellsSizeCompactedFromMob += mobCell.getValueLength(); 277 } else { 278 // If the value of a file is empty, there might be issues when retrieving, 279 // directly write the cell to the store file, and leave it to be handled by the 280 // next compaction. 281 LOG.error("Empty value for: " + c); 282 Optional<TableName> refTable = MobUtils.getTableName(c); 283 if (refTable.isPresent()) { 284 mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c)); 285 writer.append(c); 286 } else { 287 throw new IOException(String.format("MOB cell did not contain a tablename " 288 + "tag. should not be possible. see ref guide on mob troubleshooting. " 289 + "store=%s cell=%s", getStoreInfo(), c)); 290 } 291 } 292 } 293 } else { 294 LOG.error("Corrupted MOB reference: {}", c); 295 writer.append(c); 296 } 297 } else if (c.getValueLength() <= mobSizeThreshold) { 298 // If the value size of a cell is not larger than the threshold, directly write it to 299 // the store file. 300 writer.append(c); 301 } else { 302 // If the value size of a cell is larger than the threshold, it's regarded as a mob, 303 // write this cell to a mob file, and write the path to the store file. 304 mobCells++; 305 // append the original keyValue in the mob file. 306 mobFileWriter.append(c); 307 Cell reference = MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()); 308 // write the cell whose value is the path of a mob file to the store file. 309 writer.append(reference); 310 cellsCountCompactedToMob++; 311 cellsSizeCompactedToMob += c.getValueLength(); 312 // Add ref we get for compact MOB case 313 mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName()); 314 } 315 316 int len = c.getSerializedSize(); 317 ++progress.currentCompactedKVs; 318 progress.totalCompactedSize += len; 319 bytesWrittenProgressForShippedCall += len; 320 if (LOG.isDebugEnabled()) { 321 bytesWrittenProgressForLog += len; 322 } 323 throughputController.control(compactionName, len); 324 if (closeChecker.isSizeLimit(store, len)) { 325 progress.cancel(); 326 return false; 327 } 328 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 329 ((ShipperListener) writer).beforeShipped(); 330 kvs.shipped(); 331 bytesWrittenProgressForShippedCall = 0; 332 } 333 } 334 // Log the progress of long running compactions every minute if 335 // logging at DEBUG level 336 if (LOG.isDebugEnabled()) { 337 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 338 String rate = String.format("%.2f", 339 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 340 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 341 compactionName, progress, rate, throughputController); 342 lastMillis = now; 343 bytesWrittenProgressForLog = 0; 344 } 345 } 346 cells.clear(); 347 } while (hasMore); 348 finished = true; 349 } catch (InterruptedException e) { 350 progress.cancel(); 351 throw new InterruptedIOException( 352 "Interrupted while control throughput of compacting " + compactionName); 353 } catch (FileNotFoundException e) { 354 LOG.error("MOB Stress Test FAILED, region: " + store.getRegionInfo().getEncodedName(), e); 355 System.exit(-1); 356 } catch (IOException t) { 357 LOG.error("Mob compaction failed for region: " + store.getRegionInfo().getEncodedName()); 358 throw t; 359 } finally { 360 // Clone last cell in the final because writer will append last cell when committing. If 361 // don't clone here and once the scanner get closed, then the memory of last cell will be 362 // released. (HBASE-22582) 363 ((ShipperListener) writer).beforeShipped(); 364 throughputController.finish(compactionName); 365 if (!finished && mobFileWriter != null) { 366 // Remove all MOB references because compaction failed 367 mobRefSet.get().clear(); 368 // Abort writer 369 abortWriter(mobFileWriter); 370 } 371 } 372 373 if (mobFileWriter != null) { 374 if (mobCells > 0) { 375 // If the mob file is not empty, commit it. 376 mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); 377 mobFileWriter.close(); 378 mobStore.commitFile(mobFileWriter.getPath(), path); 379 } else { 380 // If the mob file is empty, delete it instead of committing. 381 abortWriter(mobFileWriter); 382 } 383 } 384 mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); 385 mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); 386 mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); 387 mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); 388 progress.complete(); 389 return true; 390 391 } 392 393}