001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.List; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.PrivateCellUtil; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.KeyValueUtil; 033import org.apache.hadoop.hbase.regionserver.CellSink; 034import org.apache.hadoop.hbase.regionserver.HMobStore; 035import org.apache.hadoop.hbase.regionserver.HStore; 036import org.apache.hadoop.hbase.regionserver.InternalScanner; 037import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 038import org.apache.hadoop.hbase.regionserver.ScanInfo; 039import org.apache.hadoop.hbase.regionserver.ScanType; 040import org.apache.hadoop.hbase.regionserver.ScannerContext; 041import org.apache.hadoop.hbase.regionserver.ShipperListener; 042import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 043import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 044import org.apache.hadoop.hbase.regionserver.StoreScanner; 045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 046import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 047import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 049import org.apache.hadoop.hbase.security.User; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * Compact passed set of files in the mob-enabled column family. 058 */ 059@InterfaceAudience.Private 060public class DefaultMobStoreCompactor extends DefaultCompactor { 061 062 private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreCompactor.class); 063 private long mobSizeThreshold; 064 private HMobStore mobStore; 065 066 private final InternalScannerFactory scannerFactory = new InternalScannerFactory() { 067 068 @Override 069 public ScanType getScanType(CompactionRequestImpl request) { 070 // retain the delete markers until they are expired. 071 return ScanType.COMPACT_RETAIN_DELETES; 072 } 073 074 @Override 075 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 076 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 077 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 078 fd.earliestPutTs); 079 } 080 }; 081 082 private final CellSinkFactory<StoreFileWriter> writerFactory = 083 new CellSinkFactory<StoreFileWriter>() { 084 @Override 085 public StoreFileWriter createWriter(InternalScanner scanner, 086 org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, 087 boolean shouldDropBehind) throws IOException { 088 // make this writer with tags always because of possible new cells with tags. 089 return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true, 090 shouldDropBehind); 091 } 092 }; 093 094 public DefaultMobStoreCompactor(Configuration conf, HStore store) { 095 super(conf, store); 096 // The mob cells reside in the mob-enabled column family which is held by HMobStore. 097 // During the compaction, the compactor reads the cells from the mob files and 098 // probably creates new mob files. All of these operations are included in HMobStore, 099 // so we need to cast the Store to HMobStore. 100 if (!(store instanceof HMobStore)) { 101 throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); 102 } 103 mobStore = (HMobStore) store; 104 mobSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold(); 105 } 106 107 @Override 108 public List<Path> compact(CompactionRequestImpl request, ThroughputController throughputController, 109 User user) throws IOException { 110 return compact(request, scannerFactory, writerFactory, throughputController, user); 111 } 112 113 /** 114 * Performs compaction on a column family with the mob flag enabled. 115 * This is for when the mob threshold size has changed or if the mob 116 * column family mode has been toggled via an alter table statement. 117 * Compacts the files by the following rules. 118 * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file. 119 * <ol> 120 * <li> 121 * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, 122 * directly copy the (with mob tag) cell into the new store file. 123 * </li> 124 * <li> 125 * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into 126 * the new store file. 127 * </li> 128 * </ol> 129 * 2. If the Put cell doesn't have a reference tag. 130 * <ol> 131 * <li> 132 * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, 133 * write this cell to a mob file, and write the path of this mob file to the store file. 134 * </li> 135 * <li> 136 * Otherwise, directly write this cell into the store file. 137 * </li> 138 * </ol> 139 * 3. Decide how to write a Delete cell. 140 * <ol> 141 * <li> 142 * If a Delete cell does not have a mob reference tag which means this delete marker have not 143 * been written to the mob del file, write this cell to the mob del file, and write this cell 144 * with a ref tag to a store file. 145 * </li> 146 * <li> 147 * Otherwise, directly write it to a store file. 148 * </li> 149 * </ol> 150 * After the major compaction on the normal hfiles, we have a guarantee that we have purged all 151 * deleted or old version mob refs, and the delete markers are written to a del file with the 152 * suffix _del. Because of this, it is safe to use the del file in the mob compaction. 153 * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the 154 * mob files. When the small mob files are merged into bigger ones, the del file is added into 155 * the scanner to filter the deleted cells. 156 * @param fd File details 157 * @param scanner Where to read from. 158 * @param writer Where to write to. 159 * @param smallestReadPoint Smallest read point. 160 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint 161 * @param throughputController The compaction throughput controller. 162 * @param major Is a major compaction. 163 * @param numofFilesToCompact the number of files to compact 164 * @return Whether compaction ended; false if it was interrupted for any reason. 165 */ 166 @Override 167 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 168 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 169 boolean major, int numofFilesToCompact) throws IOException { 170 long bytesWrittenProgressForCloseCheck = 0; 171 long bytesWrittenProgressForLog = 0; 172 long bytesWrittenProgressForShippedCall = 0; 173 // Since scanner.next() can return 'false' but still be delivering data, 174 // we have to use a do/while loop. 175 List<Cell> cells = new ArrayList<>(); 176 // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME 177 int closeCheckSizeLimit = HStore.getCloseCheckInterval(); 178 long lastMillis = 0; 179 if (LOG.isDebugEnabled()) { 180 lastMillis = EnvironmentEdgeManager.currentTime(); 181 } 182 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 183 long now = 0; 184 boolean hasMore; 185 Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 186 byte[] fileName = null; 187 StoreFileWriter mobFileWriter = null, delFileWriter = null; 188 long mobCells = 0, deleteMarkersCount = 0; 189 long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; 190 long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; 191 boolean finished = false; 192 ScannerContext scannerContext = 193 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 194 throughputController.start(compactionName); 195 KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null; 196 long shippedCallSizeLimit = (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); 197 try { 198 try { 199 // If the mob file writer could not be created, directly write the cell to the store file. 200 mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, 201 compactionCompression, store.getRegionInfo().getStartKey(), true); 202 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 203 } catch (IOException e) { 204 LOG.warn("Failed to create mob writer, " 205 + "we will continue the compaction by writing MOB cells directly in store files", e); 206 } 207 if (major) { 208 try { 209 delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), 210 fd.maxKeyCount, compactionCompression, store.getRegionInfo().getStartKey()); 211 } catch (IOException e) { 212 LOG.warn( 213 "Failed to create del writer, " 214 + "we will continue the compaction by writing delete markers directly in store files", 215 e); 216 } 217 } 218 do { 219 hasMore = scanner.next(cells, scannerContext); 220 if (LOG.isDebugEnabled()) { 221 now = EnvironmentEdgeManager.currentTime(); 222 } 223 for (Cell c : cells) { 224 if (major && CellUtil.isDelete(c)) { 225 if (MobUtils.isMobReferenceCell(c) || delFileWriter == null) { 226 // Directly write it to a store file 227 writer.append(c); 228 } else { 229 // Add a ref tag to this cell and write it to a store file. 230 writer.append(MobUtils.createMobRefDeleteMarker(c)); 231 // Write the cell to a del file 232 delFileWriter.append(c); 233 deleteMarkersCount++; 234 } 235 } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) { 236 // If the mob file writer is null or the kv type is not put, directly write the cell 237 // to the store file. 238 writer.append(c); 239 } else if (MobUtils.isMobReferenceCell(c)) { 240 if (MobUtils.hasValidMobRefCellValue(c)) { 241 int size = MobUtils.getMobValueLength(c); 242 if (size > mobSizeThreshold) { 243 // If the value size is larger than the threshold, it's regarded as a mob. Since 244 // its value is already in the mob file, directly write this cell to the store file 245 writer.append(c); 246 } else { 247 // If the value is not larger than the threshold, it's not regarded a mob. Retrieve 248 // the mob cell from the mob file, and write it back to the store file. 249 Cell mobCell = mobStore.resolve(c, false); 250 if (mobCell.getValueLength() != 0) { 251 // put the mob data back to the store file 252 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 253 writer.append(mobCell); 254 cellsCountCompactedFromMob++; 255 cellsSizeCompactedFromMob += mobCell.getValueLength(); 256 } else { 257 // If the value of a file is empty, there might be issues when retrieving, 258 // directly write the cell to the store file, and leave it to be handled by the 259 // next compaction. 260 writer.append(c); 261 } 262 } 263 } else { 264 LOG.warn("The value format of the KeyValue " + c 265 + " is wrong, its length is less than " + Bytes.SIZEOF_INT); 266 writer.append(c); 267 } 268 } else if (c.getValueLength() <= mobSizeThreshold) { 269 //If value size of a cell is not larger than the threshold, directly write to store file 270 writer.append(c); 271 } else { 272 // If the value size of a cell is larger than the threshold, it's regarded as a mob, 273 // write this cell to a mob file, and write the path to the store file. 274 mobCells++; 275 // append the original keyValue in the mob file. 276 mobFileWriter.append(c); 277 Cell reference = MobUtils.createMobRefCell(c, fileName, 278 this.mobStore.getRefCellTags()); 279 // write the cell whose value is the path of a mob file to the store file. 280 writer.append(reference); 281 cellsCountCompactedToMob++; 282 cellsSizeCompactedToMob += c.getValueLength(); 283 } 284 int len = KeyValueUtil.length(c); 285 ++progress.currentCompactedKVs; 286 progress.totalCompactedSize += len; 287 bytesWrittenProgressForShippedCall += len; 288 if (LOG.isDebugEnabled()) { 289 bytesWrittenProgressForLog += len; 290 } 291 throughputController.control(compactionName, len); 292 // check periodically to see if a system stop is requested 293 if (closeCheckSizeLimit > 0) { 294 bytesWrittenProgressForCloseCheck += len; 295 if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { 296 bytesWrittenProgressForCloseCheck = 0; 297 if (!store.areWritesEnabled()) { 298 progress.cancel(); 299 return false; 300 } 301 } 302 } 303 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 304 ((ShipperListener)writer).beforeShipped(); 305 kvs.shipped(); 306 bytesWrittenProgressForShippedCall = 0; 307 } 308 } 309 // Log the progress of long running compactions every minute if 310 // logging at DEBUG level 311 if (LOG.isDebugEnabled()) { 312 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 313 String rate = String.format("%.2f", 314 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 315 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 316 compactionName, progress, rate, throughputController); 317 lastMillis = now; 318 bytesWrittenProgressForLog = 0; 319 } 320 } 321 cells.clear(); 322 } while (hasMore); 323 finished = true; 324 } catch (InterruptedException e) { 325 progress.cancel(); 326 throw new InterruptedIOException( 327 "Interrupted while control throughput of compacting " + compactionName); 328 } finally { 329 // Clone last cell in the final because writer will append last cell when committing. If 330 // don't clone here and once the scanner get closed, then the memory of last cell will be 331 // released. (HBASE-22582) 332 ((ShipperListener) writer).beforeShipped(); 333 throughputController.finish(compactionName); 334 if (!finished && mobFileWriter != null) { 335 abortWriter(mobFileWriter); 336 } 337 if (!finished && delFileWriter != null) { 338 abortWriter(delFileWriter); 339 } 340 } 341 if (delFileWriter != null) { 342 if (deleteMarkersCount > 0) { 343 // If the del file is not empty, commit it. 344 // If the commit fails, the compaction is re-performed again. 345 delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount); 346 delFileWriter.close(); 347 mobStore.commitFile(delFileWriter.getPath(), path); 348 } else { 349 // If the del file is empty, delete it instead of committing. 350 abortWriter(delFileWriter); 351 } 352 } 353 if (mobFileWriter != null) { 354 if (mobCells > 0) { 355 // If the mob file is not empty, commit it. 356 mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); 357 mobFileWriter.close(); 358 mobStore.commitFile(mobFileWriter.getPath(), path); 359 } else { 360 // If the mob file is empty, delete it instead of committing. 361 abortWriter(mobFileWriter); 362 } 363 } 364 mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); 365 mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); 366 mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); 367 mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); 368 progress.complete(); 369 return true; 370 } 371}