001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Date; 025import java.util.List; 026import java.util.Optional; 027import java.util.Random; 028import java.util.concurrent.atomic.AtomicLong; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.PrivateCellUtil; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; 036import org.apache.hadoop.hbase.regionserver.CellSink; 037import org.apache.hadoop.hbase.regionserver.HStore; 038import org.apache.hadoop.hbase.regionserver.InternalScanner; 039import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 040import org.apache.hadoop.hbase.regionserver.ScannerContext; 041import org.apache.hadoop.hbase.regionserver.ShipperListener; 042import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 043import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 046import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 047import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * This class is used for testing only. The main purpose is to emulate random failures during MOB 056 * compaction process. Example of usage: 057 * 058 * <pre> 059 * { 060 * @code 061 * public class SomeTest { 062 * 063 * public void initConfiguration(Configuration conf) { 064 * conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY, 065 * FaultyMobStoreCompactor.class.getName()); 066 * conf.setDouble("hbase.mob.compaction.fault.probability", 0.1); 067 * } 068 * } 069 * } 070 * </pre> 071 * 072 * @see org.apache.hadoop.hbase.mob.MobStressToolRunner on how to use and configure this class. 073 */ 074@InterfaceAudience.Private 075public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor { 076 077 private static final Logger LOG = LoggerFactory.getLogger(FaultyMobStoreCompactor.class); 078 079 public static AtomicLong mobCounter = new AtomicLong(); 080 public static AtomicLong totalFailures = new AtomicLong(); 081 public static AtomicLong totalCompactions = new AtomicLong(); 082 public static AtomicLong totalMajorCompactions = new AtomicLong(); 083 084 static double failureProb = 0.1d; 085 static Random rnd = new Random(); 086 087 public FaultyMobStoreCompactor(Configuration conf, HStore store) { 088 super(conf, store); 089 failureProb = conf.getDouble("hbase.mob.compaction.fault.probability", 0.1); 090 } 091 092 @Override 093 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 094 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 095 CompactionRequestImpl request, CompactionProgress progress) throws IOException { 096 097 boolean major = request.isAllFiles(); 098 totalCompactions.incrementAndGet(); 099 if (major) { 100 totalMajorCompactions.incrementAndGet(); 101 } 102 long bytesWrittenProgressForCloseCheck = 0; 103 long bytesWrittenProgressForLog = 0; 104 long bytesWrittenProgressForShippedCall = 0; 105 // Clear old mob references 106 mobRefSet.get().clear(); 107 boolean isUserRequest = userRequest.get(); 108 boolean compactMOBs = major && isUserRequest; 109 boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY, 110 MobConstants.DEFAULT_MOB_DISCARD_MISS); 111 112 boolean mustFail = false; 113 if (compactMOBs) { 114 mobCounter.incrementAndGet(); 115 double dv = rnd.nextDouble(); 116 if (dv < failureProb) { 117 mustFail = true; 118 totalFailures.incrementAndGet(); 119 } 120 } 121 122 // Since scanner.next() can return 'false' but still be delivering data, 123 // we have to use a do/while loop. 124 List<Cell> cells = new ArrayList<>(); 125 // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME 126 int closeCheckSizeLimit = 127 conf.getInt(CloseChecker.SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */); 128 long lastMillis = 0; 129 if (LOG.isDebugEnabled()) { 130 lastMillis = EnvironmentEdgeManager.currentTime(); 131 } 132 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 133 long now = 0; 134 boolean hasMore; 135 Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 136 byte[] fileName = null; 137 StoreFileWriter mobFileWriter = null; 138 long mobCells = 0; 139 long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; 140 long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; 141 boolean finished = false; 142 143 ScannerContext scannerContext = 144 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 145 throughputController.start(compactionName); 146 KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; 147 long shippedCallSizeLimit = 148 (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); 149 150 Cell mobCell = null; 151 152 long counter = 0; 153 long countFailAt = -1; 154 if (mustFail) { 155 countFailAt = rnd.nextInt(100); // randomly fail fast 156 } 157 158 try { 159 try { 160 mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, 161 major ? majorCompactionCompression : minorCompactionCompression, 162 store.getRegionInfo().getStartKey(), true); 163 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 164 } catch (IOException e) { 165 // Bailing out 166 LOG.error("Failed to create mob writer, ", e); 167 throw e; 168 } 169 if (compactMOBs) { 170 // Add the only reference we get for compact MOB case 171 // because new store file will have only one MOB reference 172 // in this case - of newly compacted MOB file 173 mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName()); 174 } 175 do { 176 hasMore = scanner.next(cells, scannerContext); 177 if (LOG.isDebugEnabled()) { 178 now = EnvironmentEdgeManager.currentTime(); 179 } 180 for (Cell c : cells) { 181 counter++; 182 if (compactMOBs) { 183 if (MobUtils.isMobReferenceCell(c)) { 184 if (counter == countFailAt) { 185 LOG.warn("INJECTED FAULT mobCounter={}", mobCounter.get()); 186 throw new CorruptHFileException("injected fault"); 187 } 188 String fName = MobUtils.getMobFileName(c); 189 // Added to support migration 190 try { 191 mobCell = mobStore.resolve(c, true, false).getCell(); 192 } catch (FileNotFoundException fnfe) { 193 if (discardMobMiss) { 194 LOG.error("Missing MOB cell: file={} not found", fName); 195 continue; 196 } else { 197 throw fnfe; 198 } 199 } 200 201 if (discardMobMiss && mobCell.getValueLength() == 0) { 202 LOG.error("Missing MOB cell value: file={} cell={}", fName, mobCell); 203 continue; 204 } 205 206 if (mobCell.getValueLength() > mobSizeThreshold) { 207 // put the mob data back to the store file 208 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 209 mobFileWriter.append(mobCell); 210 writer.append( 211 MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags())); 212 mobCells++; 213 } else { 214 // If MOB value is less than threshold, append it directly to a store file 215 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 216 writer.append(mobCell); 217 cellsCountCompactedFromMob++; 218 cellsSizeCompactedFromMob += mobCell.getValueLength(); 219 } 220 } else { 221 // Not a MOB reference cell 222 int size = c.getValueLength(); 223 if (size > mobSizeThreshold) { 224 mobFileWriter.append(c); 225 writer 226 .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags())); 227 mobCells++; 228 cellsCountCompactedToMob++; 229 cellsSizeCompactedToMob += c.getValueLength(); 230 } else { 231 writer.append(c); 232 } 233 } 234 } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) { 235 // Not a major compaction or major with MOB disabled 236 // If the kv type is not put, directly write the cell 237 // to the store file. 238 writer.append(c); 239 } else if (MobUtils.isMobReferenceCell(c)) { 240 // Not a major MOB compaction, Put MOB reference 241 if (MobUtils.hasValidMobRefCellValue(c)) { 242 int size = MobUtils.getMobValueLength(c); 243 if (size > mobSizeThreshold) { 244 // If the value size is larger than the threshold, it's regarded as a mob. Since 245 // its value is already in the mob file, directly write this cell to the store file 246 Optional<TableName> refTable = MobUtils.getTableName(c); 247 if (refTable.isPresent()) { 248 mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c)); 249 writer.append(c); 250 } else { 251 throw new IOException(String.format("MOB cell did not contain a tablename " 252 + "tag. should not be possible. see ref guide on mob troubleshooting. " 253 + "store=%s cell=%s", getStoreInfo(), c)); 254 } 255 } else { 256 // If the value is not larger than the threshold, it's not regarded a mob. Retrieve 257 // the mob cell from the mob file, and write it back to the store file. 258 mobCell = mobStore.resolve(c, true, false).getCell(); 259 if (mobCell.getValueLength() != 0) { 260 // put the mob data back to the store file 261 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 262 writer.append(mobCell); 263 cellsCountCompactedFromMob++; 264 cellsSizeCompactedFromMob += mobCell.getValueLength(); 265 } else { 266 // If the value of a file is empty, there might be issues when retrieving, 267 // directly write the cell to the store file, and leave it to be handled by the 268 // next compaction. 269 LOG.error("Empty value for: " + c); 270 Optional<TableName> refTable = MobUtils.getTableName(c); 271 if (refTable.isPresent()) { 272 mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c)); 273 writer.append(c); 274 } else { 275 throw new IOException(String.format("MOB cell did not contain a tablename " 276 + "tag. should not be possible. see ref guide on mob troubleshooting. " 277 + "store=%s cell=%s", getStoreInfo(), c)); 278 } 279 } 280 } 281 } else { 282 LOG.error("Corrupted MOB reference: {}", c); 283 writer.append(c); 284 } 285 } else if (c.getValueLength() <= mobSizeThreshold) { 286 // If the value size of a cell is not larger than the threshold, directly write it to 287 // the store file. 288 writer.append(c); 289 } else { 290 // If the value size of a cell is larger than the threshold, it's regarded as a mob, 291 // write this cell to a mob file, and write the path to the store file. 292 mobCells++; 293 // append the original keyValue in the mob file. 294 mobFileWriter.append(c); 295 Cell reference = MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()); 296 // write the cell whose value is the path of a mob file to the store file. 297 writer.append(reference); 298 cellsCountCompactedToMob++; 299 cellsSizeCompactedToMob += c.getValueLength(); 300 // Add ref we get for compact MOB case 301 mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName()); 302 } 303 304 int len = c.getSerializedSize(); 305 ++progress.currentCompactedKVs; 306 progress.totalCompactedSize += len; 307 bytesWrittenProgressForShippedCall += len; 308 if (LOG.isDebugEnabled()) { 309 bytesWrittenProgressForLog += len; 310 } 311 throughputController.control(compactionName, len); 312 // check periodically to see if a system stop is requested 313 if (closeCheckSizeLimit > 0) { 314 bytesWrittenProgressForCloseCheck += len; 315 if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { 316 bytesWrittenProgressForCloseCheck = 0; 317 if (!store.areWritesEnabled()) { 318 progress.cancel(); 319 return false; 320 } 321 } 322 } 323 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 324 ((ShipperListener) writer).beforeShipped(); 325 kvs.shipped(); 326 bytesWrittenProgressForShippedCall = 0; 327 } 328 } 329 // Log the progress of long running compactions every minute if 330 // logging at DEBUG level 331 if (LOG.isDebugEnabled()) { 332 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 333 String rate = String.format("%.2f", 334 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 335 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 336 compactionName, progress, rate, throughputController); 337 lastMillis = now; 338 bytesWrittenProgressForLog = 0; 339 } 340 } 341 cells.clear(); 342 } while (hasMore); 343 finished = true; 344 } catch (InterruptedException e) { 345 progress.cancel(); 346 throw new InterruptedIOException( 347 "Interrupted while control throughput of compacting " + compactionName); 348 } catch (FileNotFoundException e) { 349 LOG.error("MOB Stress Test FAILED, region: " + store.getRegionInfo().getEncodedName(), e); 350 System.exit(-1); 351 } catch (IOException t) { 352 LOG.error("Mob compaction failed for region: " + store.getRegionInfo().getEncodedName()); 353 throw t; 354 } finally { 355 // Clone last cell in the final because writer will append last cell when committing. If 356 // don't clone here and once the scanner get closed, then the memory of last cell will be 357 // released. (HBASE-22582) 358 ((ShipperListener) writer).beforeShipped(); 359 throughputController.finish(compactionName); 360 if (!finished && mobFileWriter != null) { 361 // Remove all MOB references because compaction failed 362 mobRefSet.get().clear(); 363 // Abort writer 364 abortWriter(mobFileWriter); 365 } 366 } 367 368 if (mobFileWriter != null) { 369 if (mobCells > 0) { 370 // If the mob file is not empty, commit it. 371 mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); 372 mobFileWriter.close(); 373 mobStore.commitFile(mobFileWriter.getPath(), path); 374 } else { 375 // If the mob file is empty, delete it instead of committing. 376 abortWriter(mobFileWriter); 377 } 378 } 379 mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); 380 mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); 381 mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); 382 mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); 383 progress.complete(); 384 return true; 385 386 } 387 388}