001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Date; 025import java.util.List; 026import java.util.Optional; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.concurrent.atomic.AtomicLong; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.PrivateCellUtil; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; 038import org.apache.hadoop.hbase.regionserver.CellSink; 039import org.apache.hadoop.hbase.regionserver.HStore; 040import org.apache.hadoop.hbase.regionserver.InternalScanner; 041import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 042import org.apache.hadoop.hbase.regionserver.ScannerContext; 043import org.apache.hadoop.hbase.regionserver.ShipperListener; 044import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 045import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; 046import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 047import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 049import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * This class is used for testing only. The main purpose is to emulate random failures during MOB 058 * compaction process. Example of usage: 059 * 060 * <pre> 061 * { 062 * @code 063 * public class SomeTest { 064 * 065 * public void initConfiguration(Configuration conf) { 066 * conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY, 067 * FaultyMobStoreCompactor.class.getName()); 068 * conf.setDouble("hbase.mob.compaction.fault.probability", 0.1); 069 * } 070 * } 071 * } 072 * </pre> 073 * 074 * @see org.apache.hadoop.hbase.mob.MobStressToolRunner on how to use and configure this class. 075 */ 076@InterfaceAudience.Private 077public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor { 078 079 private static final Logger LOG = LoggerFactory.getLogger(FaultyMobStoreCompactor.class); 080 081 public static AtomicLong mobCounter = new AtomicLong(); 082 public static AtomicLong totalFailures = new AtomicLong(); 083 public static AtomicLong totalCompactions = new AtomicLong(); 084 public static AtomicLong totalMajorCompactions = new AtomicLong(); 085 086 static double failureProb = 0.1d; 087 088 public FaultyMobStoreCompactor(Configuration conf, HStore store) { 089 super(conf, store); 090 failureProb = conf.getDouble("hbase.mob.compaction.fault.probability", 0.1); 091 } 092 093 @Override 094 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 095 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 096 CompactionRequestImpl request, CompactionProgress progress) throws IOException { 097 098 boolean major = request.isAllFiles(); 099 totalCompactions.incrementAndGet(); 100 if (major) { 101 totalMajorCompactions.incrementAndGet(); 102 } 103 long bytesWrittenProgressForLog = 0; 104 long bytesWrittenProgressForShippedCall = 0; 105 // Clear old mob references 106 mobRefSet.get().clear(); 107 boolean isUserRequest = userRequest.get(); 108 boolean compactMOBs = major && isUserRequest; 109 boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY, 110 MobConstants.DEFAULT_MOB_DISCARD_MISS); 111 112 boolean mustFail = false; 113 if (compactMOBs) { 114 mobCounter.incrementAndGet(); 115 double dv = ThreadLocalRandom.current().nextDouble(); 116 if (dv < failureProb) { 117 mustFail = true; 118 totalFailures.incrementAndGet(); 119 } 120 } 121 122 FileSystem fs = store.getFileSystem(); 123 124 // Since scanner.next() can return 'false' but still be delivering data, 125 // we have to use a do/while loop. 126 List<Cell> cells = new ArrayList<>(); 127 // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME 128 long currentTime = EnvironmentEdgeManager.currentTime(); 129 long lastMillis = 0; 130 if (LOG.isDebugEnabled()) { 131 lastMillis = currentTime; 132 } 133 CloseChecker closeChecker = new CloseChecker(conf, currentTime); 134 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 135 long now = 0; 136 boolean hasMore; 137 Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 138 byte[] fileName = null; 139 StoreFileWriter mobFileWriter = null; 140 long mobCells = 0; 141 long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; 142 long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; 143 boolean finished = false; 144 145 ScannerContext scannerContext = 146 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 147 throughputController.start(compactionName); 148 KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; 149 long shippedCallSizeLimit = 150 (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); 151 152 Cell mobCell = null; 153 154 long counter = 0; 155 long countFailAt = -1; 156 if (mustFail) { 157 countFailAt = ThreadLocalRandom.current().nextInt(100); // randomly fail fast 158 } 159 160 try { 161 try { 162 mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, 163 major ? majorCompactionCompression : minorCompactionCompression, 164 store.getRegionInfo().getStartKey(), true); 165 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 166 } catch (IOException e) { 167 // Bailing out 168 LOG.error("Failed to create mob writer, ", e); 169 throw e; 170 } 171 if (compactMOBs) { 172 // Add the only reference we get for compact MOB case 173 // because new store file will have only one MOB reference 174 // in this case - of newly compacted MOB file 175 mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName()); 176 } 177 do { 178 hasMore = scanner.next(cells, scannerContext); 179 currentTime = EnvironmentEdgeManager.currentTime(); 180 if (LOG.isDebugEnabled()) { 181 now = currentTime; 182 } 183 if (closeChecker.isTimeLimit(store, currentTime)) { 184 progress.cancel(); 185 return false; 186 } 187 for (Cell c : cells) { 188 counter++; 189 if (compactMOBs) { 190 if (MobUtils.isMobReferenceCell(c)) { 191 if (counter == countFailAt) { 192 LOG.warn("INJECTED FAULT mobCounter={}", mobCounter.get()); 193 throw new CorruptHFileException("injected fault"); 194 } 195 String fName = MobUtils.getMobFileName(c); 196 // Added to support migration 197 try { 198 mobCell = mobStore.resolve(c, true, false).getCell(); 199 } catch (DoNotRetryIOException e) { 200 if ( 201 discardMobMiss && e.getCause() != null 202 && e.getCause() instanceof FileNotFoundException 203 ) { 204 LOG.error("Missing MOB cell: file={} not found cell={}", fName, c); 205 continue; 206 } else { 207 throw e; 208 } 209 } 210 211 if (discardMobMiss && mobCell.getValueLength() == 0) { 212 LOG.error("Missing MOB cell value: file={} cell={}", fName, mobCell); 213 continue; 214 } 215 216 if (mobCell.getValueLength() > mobSizeThreshold) { 217 // put the mob data back to the store file 218 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 219 mobFileWriter.append(mobCell); 220 writer.append( 221 MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags())); 222 mobCells++; 223 } else { 224 // If MOB value is less than threshold, append it directly to a store file 225 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 226 writer.append(mobCell); 227 cellsCountCompactedFromMob++; 228 cellsSizeCompactedFromMob += mobCell.getValueLength(); 229 } 230 } else { 231 // Not a MOB reference cell 232 int size = c.getValueLength(); 233 if (size > mobSizeThreshold) { 234 mobFileWriter.append(c); 235 writer 236 .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags())); 237 mobCells++; 238 cellsCountCompactedToMob++; 239 cellsSizeCompactedToMob += c.getValueLength(); 240 } else { 241 writer.append(c); 242 } 243 } 244 } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) { 245 // Not a major compaction or major with MOB disabled 246 // If the kv type is not put, directly write the cell 247 // to the store file. 248 writer.append(c); 249 } else if (MobUtils.isMobReferenceCell(c)) { 250 // Not a major MOB compaction, Put MOB reference 251 if (MobUtils.hasValidMobRefCellValue(c)) { 252 int size = MobUtils.getMobValueLength(c); 253 if (size > mobSizeThreshold) { 254 // If the value size is larger than the threshold, it's regarded as a mob. Since 255 // its value is already in the mob file, directly write this cell to the store file 256 Optional<TableName> refTable = MobUtils.getTableName(c); 257 if (refTable.isPresent()) { 258 mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c)); 259 writer.append(c); 260 } else { 261 throw new IOException(String.format("MOB cell did not contain a tablename " 262 + "tag. should not be possible. see ref guide on mob troubleshooting. " 263 + "store=%s cell=%s", getStoreInfo(), c)); 264 } 265 } else { 266 // If the value is not larger than the threshold, it's not regarded a mob. Retrieve 267 // the mob cell from the mob file, and write it back to the store file. 268 mobCell = mobStore.resolve(c, true, false).getCell(); 269 if (mobCell.getValueLength() != 0) { 270 // put the mob data back to the store file 271 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 272 writer.append(mobCell); 273 cellsCountCompactedFromMob++; 274 cellsSizeCompactedFromMob += mobCell.getValueLength(); 275 } else { 276 // If the value of a file is empty, there might be issues when retrieving, 277 // directly write the cell to the store file, and leave it to be handled by the 278 // next compaction. 279 LOG.error("Empty value for: " + c); 280 Optional<TableName> refTable = MobUtils.getTableName(c); 281 if (refTable.isPresent()) { 282 mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c)); 283 writer.append(c); 284 } else { 285 throw new IOException(String.format("MOB cell did not contain a tablename " 286 + "tag. should not be possible. see ref guide on mob troubleshooting. " 287 + "store=%s cell=%s", getStoreInfo(), c)); 288 } 289 } 290 } 291 } else { 292 LOG.error("Corrupted MOB reference: {}", c); 293 writer.append(c); 294 } 295 } else if (c.getValueLength() <= mobSizeThreshold) { 296 // If the value size of a cell is not larger than the threshold, directly write it to 297 // the store file. 298 writer.append(c); 299 } else { 300 // If the value size of a cell is larger than the threshold, it's regarded as a mob, 301 // write this cell to a mob file, and write the path to the store file. 302 mobCells++; 303 // append the original keyValue in the mob file. 304 mobFileWriter.append(c); 305 Cell reference = MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()); 306 // write the cell whose value is the path of a mob file to the store file. 307 writer.append(reference); 308 cellsCountCompactedToMob++; 309 cellsSizeCompactedToMob += c.getValueLength(); 310 // Add ref we get for compact MOB case 311 mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName()); 312 } 313 314 int len = c.getSerializedSize(); 315 ++progress.currentCompactedKVs; 316 progress.totalCompactedSize += len; 317 bytesWrittenProgressForShippedCall += len; 318 if (LOG.isDebugEnabled()) { 319 bytesWrittenProgressForLog += len; 320 } 321 throughputController.control(compactionName, len); 322 if (closeChecker.isSizeLimit(store, len)) { 323 progress.cancel(); 324 return false; 325 } 326 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 327 ((ShipperListener) writer).beforeShipped(); 328 kvs.shipped(); 329 bytesWrittenProgressForShippedCall = 0; 330 } 331 } 332 // Log the progress of long running compactions every minute if 333 // logging at DEBUG level 334 if (LOG.isDebugEnabled()) { 335 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 336 String rate = String.format("%.2f", 337 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 338 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 339 compactionName, progress, rate, throughputController); 340 lastMillis = now; 341 bytesWrittenProgressForLog = 0; 342 } 343 } 344 cells.clear(); 345 } while (hasMore); 346 finished = true; 347 } catch (InterruptedException e) { 348 progress.cancel(); 349 throw new InterruptedIOException( 350 "Interrupted while control throughput of compacting " + compactionName); 351 } catch (FileNotFoundException e) { 352 LOG.error("MOB Stress Test FAILED, region: " + store.getRegionInfo().getEncodedName(), e); 353 System.exit(-1); 354 } catch (IOException t) { 355 LOG.error("Mob compaction failed for region: " + store.getRegionInfo().getEncodedName()); 356 throw t; 357 } finally { 358 // Clone last cell in the final because writer will append last cell when committing. If 359 // don't clone here and once the scanner get closed, then the memory of last cell will be 360 // released. (HBASE-22582) 361 ((ShipperListener) writer).beforeShipped(); 362 throughputController.finish(compactionName); 363 if (!finished && mobFileWriter != null) { 364 // Remove all MOB references because compaction failed 365 mobRefSet.get().clear(); 366 // Abort writer 367 abortWriter(mobFileWriter); 368 } 369 } 370 371 if (mobFileWriter != null) { 372 if (mobCells > 0) { 373 // If the mob file is not empty, commit it. 374 mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); 375 mobFileWriter.close(); 376 mobStore.commitFile(mobFileWriter.getPath(), path); 377 } else { 378 // If the mob file is empty, delete it instead of committing. 379 abortWriter(mobFileWriter); 380 } 381 } 382 mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); 383 mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); 384 mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); 385 mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); 386 progress.complete(); 387 return true; 388 389 } 390 391}