001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.List; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.PrivateCellUtil; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.regionserver.CellSink; 033import org.apache.hadoop.hbase.regionserver.HMobStore; 034import org.apache.hadoop.hbase.regionserver.HStore; 035import org.apache.hadoop.hbase.regionserver.InternalScanner; 036import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 037import org.apache.hadoop.hbase.regionserver.ScanInfo; 038import org.apache.hadoop.hbase.regionserver.ScanType; 039import org.apache.hadoop.hbase.regionserver.ScannerContext; 040import org.apache.hadoop.hbase.regionserver.ShipperListener; 041import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 042import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 043import org.apache.hadoop.hbase.regionserver.StoreScanner; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 045import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 046import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 047import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 048import org.apache.hadoop.hbase.security.User; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Compact passed set of files in the mob-enabled column family. 057 */ 058@InterfaceAudience.Private 059public class DefaultMobStoreCompactor extends DefaultCompactor { 060 061 private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreCompactor.class); 062 private long mobSizeThreshold; 063 private HMobStore mobStore; 064 065 private final InternalScannerFactory scannerFactory = new InternalScannerFactory() { 066 067 @Override 068 public ScanType getScanType(CompactionRequestImpl request) { 069 // retain the delete markers until they are expired. 070 return ScanType.COMPACT_RETAIN_DELETES; 071 } 072 073 @Override 074 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 075 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 076 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 077 fd.earliestPutTs); 078 } 079 }; 080 081 private final CellSinkFactory<StoreFileWriter> writerFactory = 082 new CellSinkFactory<StoreFileWriter>() { 083 @Override 084 public StoreFileWriter createWriter(InternalScanner scanner, 085 org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, 086 boolean shouldDropBehind) throws IOException { 087 // make this writer with tags always because of possible new cells with tags. 088 return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true, 089 shouldDropBehind); 090 } 091 }; 092 093 public DefaultMobStoreCompactor(Configuration conf, HStore store) { 094 super(conf, store); 095 // The mob cells reside in the mob-enabled column family which is held by HMobStore. 096 // During the compaction, the compactor reads the cells from the mob files and 097 // probably creates new mob files. All of these operations are included in HMobStore, 098 // so we need to cast the Store to HMobStore. 099 if (!(store instanceof HMobStore)) { 100 throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); 101 } 102 mobStore = (HMobStore) store; 103 mobSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold(); 104 } 105 106 @Override 107 public List<Path> compact(CompactionRequestImpl request, ThroughputController throughputController, 108 User user) throws IOException { 109 return compact(request, scannerFactory, writerFactory, throughputController, user); 110 } 111 112 /** 113 * Performs compaction on a column family with the mob flag enabled. 114 * This is for when the mob threshold size has changed or if the mob 115 * column family mode has been toggled via an alter table statement. 116 * Compacts the files by the following rules. 117 * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file. 118 * <ol> 119 * <li> 120 * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, 121 * directly copy the (with mob tag) cell into the new store file. 122 * </li> 123 * <li> 124 * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into 125 * the new store file. 126 * </li> 127 * </ol> 128 * 2. If the Put cell doesn't have a reference tag. 129 * <ol> 130 * <li> 131 * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, 132 * write this cell to a mob file, and write the path of this mob file to the store file. 133 * </li> 134 * <li> 135 * Otherwise, directly write this cell into the store file. 136 * </li> 137 * </ol> 138 * 3. Decide how to write a Delete cell. 139 * <ol> 140 * <li> 141 * If a Delete cell does not have a mob reference tag which means this delete marker have not 142 * been written to the mob del file, write this cell to the mob del file, and write this cell 143 * with a ref tag to a store file. 144 * </li> 145 * <li> 146 * Otherwise, directly write it to a store file. 147 * </li> 148 * </ol> 149 * After the major compaction on the normal hfiles, we have a guarantee that we have purged all 150 * deleted or old version mob refs, and the delete markers are written to a del file with the 151 * suffix _del. Because of this, it is safe to use the del file in the mob compaction. 152 * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the 153 * mob files. When the small mob files are merged into bigger ones, the del file is added into 154 * the scanner to filter the deleted cells. 155 * @param fd File details 156 * @param scanner Where to read from. 157 * @param writer Where to write to. 158 * @param smallestReadPoint Smallest read point. 159 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint 160 * @param throughputController The compaction throughput controller. 161 * @param major Is a major compaction. 162 * @param numofFilesToCompact the number of files to compact 163 * @return Whether compaction ended; false if it was interrupted for any reason. 164 */ 165 @Override 166 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 167 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 168 boolean major, int numofFilesToCompact) throws IOException { 169 long bytesWrittenProgressForCloseCheck = 0; 170 long bytesWrittenProgressForLog = 0; 171 long bytesWrittenProgressForShippedCall = 0; 172 // Since scanner.next() can return 'false' but still be delivering data, 173 // we have to use a do/while loop. 174 List<Cell> cells = new ArrayList<>(); 175 // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME 176 int closeCheckSizeLimit = HStore.getCloseCheckInterval(); 177 long lastMillis = 0; 178 if (LOG.isDebugEnabled()) { 179 lastMillis = EnvironmentEdgeManager.currentTime(); 180 } 181 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 182 long now = 0; 183 boolean hasMore; 184 Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 185 byte[] fileName = null; 186 StoreFileWriter mobFileWriter = null, delFileWriter = null; 187 long mobCells = 0, deleteMarkersCount = 0; 188 long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; 189 long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; 190 boolean finished = false; 191 ScannerContext scannerContext = 192 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 193 throughputController.start(compactionName); 194 KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null; 195 long shippedCallSizeLimit = (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); 196 try { 197 try { 198 // If the mob file writer could not be created, directly write the cell to the store file. 199 mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, 200 compactionCompression, store.getRegionInfo().getStartKey(), true); 201 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 202 } catch (IOException e) { 203 LOG.warn("Failed to create mob writer, " 204 + "we will continue the compaction by writing MOB cells directly in store files", e); 205 } 206 if (major) { 207 try { 208 delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), 209 fd.maxKeyCount, compactionCompression, store.getRegionInfo().getStartKey()); 210 } catch (IOException e) { 211 LOG.warn( 212 "Failed to create del writer, " 213 + "we will continue the compaction by writing delete markers directly in store files", 214 e); 215 } 216 } 217 do { 218 hasMore = scanner.next(cells, scannerContext); 219 if (LOG.isDebugEnabled()) { 220 now = EnvironmentEdgeManager.currentTime(); 221 } 222 for (Cell c : cells) { 223 if (major && CellUtil.isDelete(c)) { 224 if (MobUtils.isMobReferenceCell(c) || delFileWriter == null) { 225 // Directly write it to a store file 226 writer.append(c); 227 } else { 228 // Add a ref tag to this cell and write it to a store file. 229 writer.append(MobUtils.createMobRefDeleteMarker(c)); 230 // Write the cell to a del file 231 delFileWriter.append(c); 232 deleteMarkersCount++; 233 } 234 } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) { 235 // If the mob file writer is null or the kv type is not put, directly write the cell 236 // to the store file. 237 writer.append(c); 238 } else if (MobUtils.isMobReferenceCell(c)) { 239 if (MobUtils.hasValidMobRefCellValue(c)) { 240 int size = MobUtils.getMobValueLength(c); 241 if (size > mobSizeThreshold) { 242 // If the value size is larger than the threshold, it's regarded as a mob. Since 243 // its value is already in the mob file, directly write this cell to the store file 244 writer.append(c); 245 } else { 246 // If the value is not larger than the threshold, it's not regarded a mob. Retrieve 247 // the mob cell from the mob file, and write it back to the store file. Must 248 // close the mob scanner once the life cycle finished. 249 try (MobCell mobCell = mobStore.resolve(c, false)) { 250 if (mobCell.getCell().getValueLength() != 0) { 251 // put the mob data back to the store file 252 PrivateCellUtil.setSequenceId(mobCell.getCell(), c.getSequenceId()); 253 writer.append(mobCell.getCell()); 254 cellsCountCompactedFromMob++; 255 cellsSizeCompactedFromMob += mobCell.getCell().getValueLength(); 256 } else { 257 // If the value of a file is empty, there might be issues when retrieving, 258 // directly write the cell to the store file, and leave it to be handled by the 259 // next compaction. 260 writer.append(c); 261 } 262 } 263 } 264 } else { 265 LOG.warn("The value format of the KeyValue " + c 266 + " is wrong, its length is less than " + Bytes.SIZEOF_INT); 267 writer.append(c); 268 } 269 } else if (c.getValueLength() <= mobSizeThreshold) { 270 //If value size of a cell is not larger than the threshold, directly write to store file 271 writer.append(c); 272 } else { 273 // If the value size of a cell is larger than the threshold, it's regarded as a mob, 274 // write this cell to a mob file, and write the path to the store file. 275 mobCells++; 276 // append the original keyValue in the mob file. 277 mobFileWriter.append(c); 278 Cell reference = MobUtils.createMobRefCell(c, fileName, 279 this.mobStore.getRefCellTags()); 280 // write the cell whose value is the path of a mob file to the store file. 281 writer.append(reference); 282 cellsCountCompactedToMob++; 283 cellsSizeCompactedToMob += c.getValueLength(); 284 } 285 int len = c.getSerializedSize(); 286 ++progress.currentCompactedKVs; 287 progress.totalCompactedSize += len; 288 bytesWrittenProgressForShippedCall += len; 289 if (LOG.isDebugEnabled()) { 290 bytesWrittenProgressForLog += len; 291 } 292 throughputController.control(compactionName, len); 293 // check periodically to see if a system stop is requested 294 if (closeCheckSizeLimit > 0) { 295 bytesWrittenProgressForCloseCheck += len; 296 if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { 297 bytesWrittenProgressForCloseCheck = 0; 298 if (!store.areWritesEnabled()) { 299 progress.cancel(); 300 return false; 301 } 302 } 303 } 304 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 305 ((ShipperListener)writer).beforeShipped(); 306 kvs.shipped(); 307 bytesWrittenProgressForShippedCall = 0; 308 } 309 } 310 // Log the progress of long running compactions every minute if 311 // logging at DEBUG level 312 if (LOG.isDebugEnabled()) { 313 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 314 String rate = String.format("%.2f", 315 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 316 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 317 compactionName, progress, rate, throughputController); 318 lastMillis = now; 319 bytesWrittenProgressForLog = 0; 320 } 321 } 322 cells.clear(); 323 } while (hasMore); 324 finished = true; 325 } catch (InterruptedException e) { 326 progress.cancel(); 327 throw new InterruptedIOException( 328 "Interrupted while control throughput of compacting " + compactionName); 329 } finally { 330 // Clone last cell in the final because writer will append last cell when committing. If 331 // don't clone here and once the scanner get closed, then the memory of last cell will be 332 // released. (HBASE-22582) 333 ((ShipperListener) writer).beforeShipped(); 334 throughputController.finish(compactionName); 335 if (!finished && mobFileWriter != null) { 336 abortWriter(mobFileWriter); 337 } 338 if (!finished && delFileWriter != null) { 339 abortWriter(delFileWriter); 340 } 341 } 342 if (delFileWriter != null) { 343 if (deleteMarkersCount > 0) { 344 // If the del file is not empty, commit it. 345 // If the commit fails, the compaction is re-performed again. 346 delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount); 347 delFileWriter.close(); 348 mobStore.commitFile(delFileWriter.getPath(), path); 349 } else { 350 // If the del file is empty, delete it instead of committing. 351 abortWriter(delFileWriter); 352 } 353 } 354 if (mobFileWriter != null) { 355 if (mobCells > 0) { 356 // If the mob file is not empty, commit it. 357 mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); 358 mobFileWriter.close(); 359 mobStore.commitFile(mobFileWriter.getPath(), path); 360 } else { 361 // If the mob file is empty, delete it instead of committing. 362 abortWriter(mobFileWriter); 363 } 364 } 365 mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); 366 mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); 367 mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); 368 mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); 369 progress.complete(); 370 return true; 371 } 372}