001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.List; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.PrivateCellUtil; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.regionserver.CellSink; 033import org.apache.hadoop.hbase.regionserver.HMobStore; 034import org.apache.hadoop.hbase.regionserver.HStore; 035import org.apache.hadoop.hbase.regionserver.InternalScanner; 036import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 037import org.apache.hadoop.hbase.regionserver.ScanInfo; 038import org.apache.hadoop.hbase.regionserver.ScanType; 039import org.apache.hadoop.hbase.regionserver.ScannerContext; 040import org.apache.hadoop.hbase.regionserver.ShipperListener; 041import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 042import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 043import org.apache.hadoop.hbase.regionserver.StoreScanner; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 045import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 046import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 047import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 048import org.apache.hadoop.hbase.security.User; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Compact passed set of files in the mob-enabled column family. 057 */ 058@InterfaceAudience.Private 059public class DefaultMobStoreCompactor extends DefaultCompactor { 060 061 private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreCompactor.class); 062 private long mobSizeThreshold; 063 private HMobStore mobStore; 064 065 private final InternalScannerFactory scannerFactory = new InternalScannerFactory() { 066 067 @Override 068 public ScanType getScanType(CompactionRequestImpl request) { 069 // retain the delete markers until they are expired. 070 return ScanType.COMPACT_RETAIN_DELETES; 071 } 072 073 @Override 074 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 075 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 076 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 077 fd.earliestPutTs); 078 } 079 }; 080 081 private final CellSinkFactory<StoreFileWriter> writerFactory = 082 new CellSinkFactory<StoreFileWriter>() { 083 @Override 084 public StoreFileWriter createWriter(InternalScanner scanner, 085 org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, 086 boolean shouldDropBehind) throws IOException { 087 // make this writer with tags always because of possible new cells with tags. 088 return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true, 089 shouldDropBehind); 090 } 091 }; 092 093 public DefaultMobStoreCompactor(Configuration conf, HStore store) { 094 super(conf, store); 095 // The mob cells reside in the mob-enabled column family which is held by HMobStore. 096 // During the compaction, the compactor reads the cells from the mob files and 097 // probably creates new mob files. All of these operations are included in HMobStore, 098 // so we need to cast the Store to HMobStore. 099 if (!(store instanceof HMobStore)) { 100 throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); 101 } 102 mobStore = (HMobStore) store; 103 mobSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold(); 104 } 105 106 @Override 107 public List<Path> compact(CompactionRequestImpl request, ThroughputController throughputController, 108 User user) throws IOException { 109 return compact(request, scannerFactory, writerFactory, throughputController, user); 110 } 111 112 /** 113 * Performs compaction on a column family with the mob flag enabled. 114 * This is for when the mob threshold size has changed or if the mob 115 * column family mode has been toggled via an alter table statement. 116 * Compacts the files by the following rules. 117 * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file. 118 * <ol> 119 * <li> 120 * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, 121 * directly copy the (with mob tag) cell into the new store file. 122 * </li> 123 * <li> 124 * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into 125 * the new store file. 126 * </li> 127 * </ol> 128 * 2. If the Put cell doesn't have a reference tag. 129 * <ol> 130 * <li> 131 * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, 132 * write this cell to a mob file, and write the path of this mob file to the store file. 133 * </li> 134 * <li> 135 * Otherwise, directly write this cell into the store file. 136 * </li> 137 * </ol> 138 * 3. Decide how to write a Delete cell. 139 * <ol> 140 * <li> 141 * If a Delete cell does not have a mob reference tag which means this delete marker have not 142 * been written to the mob del file, write this cell to the mob del file, and write this cell 143 * with a ref tag to a store file. 144 * </li> 145 * <li> 146 * Otherwise, directly write it to a store file. 147 * </li> 148 * </ol> 149 * After the major compaction on the normal hfiles, we have a guarantee that we have purged all 150 * deleted or old version mob refs, and the delete markers are written to a del file with the 151 * suffix _del. Because of this, it is safe to use the del file in the mob compaction. 152 * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the 153 * mob files. When the small mob files are merged into bigger ones, the del file is added into 154 * the scanner to filter the deleted cells. 155 * @param fd File details 156 * @param scanner Where to read from. 157 * @param writer Where to write to. 158 * @param smallestReadPoint Smallest read point. 159 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint 160 * @param throughputController The compaction throughput controller. 161 * @param major Is a major compaction. 162 * @param numofFilesToCompact the number of files to compact 163 * @return Whether compaction ended; false if it was interrupted for any reason. 164 */ 165 @Override 166 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 167 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 168 boolean major, int numofFilesToCompact) throws IOException { 169 long bytesWrittenProgressForCloseCheck = 0; 170 long bytesWrittenProgressForLog = 0; 171 long bytesWrittenProgressForShippedCall = 0; 172 // Since scanner.next() can return 'false' but still be delivering data, 173 // we have to use a do/while loop. 174 List<Cell> cells = new ArrayList<>(); 175 // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME 176 int closeCheckSizeLimit = HStore.getCloseCheckInterval(); 177 long lastMillis = 0; 178 if (LOG.isDebugEnabled()) { 179 lastMillis = EnvironmentEdgeManager.currentTime(); 180 } 181 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 182 long now = 0; 183 boolean hasMore; 184 Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 185 byte[] fileName = null; 186 StoreFileWriter mobFileWriter = null, delFileWriter = null; 187 long mobCells = 0, deleteMarkersCount = 0; 188 long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; 189 long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; 190 boolean finished = false; 191 ScannerContext scannerContext = 192 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 193 throughputController.start(compactionName); 194 KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null; 195 long shippedCallSizeLimit = (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); 196 try { 197 try { 198 // If the mob file writer could not be created, directly write the cell to the store file. 199 mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, 200 compactionCompression, store.getRegionInfo().getStartKey(), true); 201 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 202 } catch (IOException e) { 203 LOG.warn("Failed to create mob writer, " 204 + "we will continue the compaction by writing MOB cells directly in store files", e); 205 } 206 if (major) { 207 try { 208 delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), 209 fd.maxKeyCount, compactionCompression, store.getRegionInfo().getStartKey()); 210 } catch (IOException e) { 211 LOG.warn( 212 "Failed to create del writer, " 213 + "we will continue the compaction by writing delete markers directly in store files", 214 e); 215 } 216 } 217 do { 218 hasMore = scanner.next(cells, scannerContext); 219 if (LOG.isDebugEnabled()) { 220 now = EnvironmentEdgeManager.currentTime(); 221 } 222 for (Cell c : cells) { 223 if (major && CellUtil.isDelete(c)) { 224 if (MobUtils.isMobReferenceCell(c) || delFileWriter == null) { 225 // Directly write it to a store file 226 writer.append(c); 227 } else { 228 // Add a ref tag to this cell and write it to a store file. 229 writer.append(MobUtils.createMobRefDeleteMarker(c)); 230 // Write the cell to a del file 231 delFileWriter.append(c); 232 deleteMarkersCount++; 233 } 234 } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) { 235 // If the mob file writer is null or the kv type is not put, directly write the cell 236 // to the store file. 237 writer.append(c); 238 } else if (MobUtils.isMobReferenceCell(c)) { 239 if (MobUtils.hasValidMobRefCellValue(c)) { 240 int size = MobUtils.getMobValueLength(c); 241 if (size > mobSizeThreshold) { 242 // If the value size is larger than the threshold, it's regarded as a mob. Since 243 // its value is already in the mob file, directly write this cell to the store file 244 writer.append(c); 245 } else { 246 // If the value is not larger than the threshold, it's not regarded a mob. Retrieve 247 // the mob cell from the mob file, and write it back to the store file. 248 Cell mobCell = mobStore.resolve(c, false); 249 if (mobCell.getValueLength() != 0) { 250 // put the mob data back to the store file 251 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 252 writer.append(mobCell); 253 cellsCountCompactedFromMob++; 254 cellsSizeCompactedFromMob += mobCell.getValueLength(); 255 } else { 256 // If the value of a file is empty, there might be issues when retrieving, 257 // directly write the cell to the store file, and leave it to be handled by the 258 // next compaction. 259 writer.append(c); 260 } 261 } 262 } else { 263 LOG.warn("The value format of the KeyValue " + c 264 + " is wrong, its length is less than " + Bytes.SIZEOF_INT); 265 writer.append(c); 266 } 267 } else if (c.getValueLength() <= mobSizeThreshold) { 268 //If value size of a cell is not larger than the threshold, directly write to store file 269 writer.append(c); 270 } else { 271 // If the value size of a cell is larger than the threshold, it's regarded as a mob, 272 // write this cell to a mob file, and write the path to the store file. 273 mobCells++; 274 // append the original keyValue in the mob file. 275 mobFileWriter.append(c); 276 Cell reference = MobUtils.createMobRefCell(c, fileName, 277 this.mobStore.getRefCellTags()); 278 // write the cell whose value is the path of a mob file to the store file. 279 writer.append(reference); 280 cellsCountCompactedToMob++; 281 cellsSizeCompactedToMob += c.getValueLength(); 282 } 283 int len = c.getSerializedSize(); 284 ++progress.currentCompactedKVs; 285 progress.totalCompactedSize += len; 286 bytesWrittenProgressForShippedCall += len; 287 if (LOG.isDebugEnabled()) { 288 bytesWrittenProgressForLog += len; 289 } 290 throughputController.control(compactionName, len); 291 // check periodically to see if a system stop is requested 292 if (closeCheckSizeLimit > 0) { 293 bytesWrittenProgressForCloseCheck += len; 294 if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { 295 bytesWrittenProgressForCloseCheck = 0; 296 if (!store.areWritesEnabled()) { 297 progress.cancel(); 298 return false; 299 } 300 } 301 } 302 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 303 ((ShipperListener)writer).beforeShipped(); 304 kvs.shipped(); 305 bytesWrittenProgressForShippedCall = 0; 306 } 307 } 308 // Log the progress of long running compactions every minute if 309 // logging at DEBUG level 310 if (LOG.isDebugEnabled()) { 311 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 312 String rate = String.format("%.2f", 313 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 314 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 315 compactionName, progress, rate, throughputController); 316 lastMillis = now; 317 bytesWrittenProgressForLog = 0; 318 } 319 } 320 cells.clear(); 321 } while (hasMore); 322 finished = true; 323 } catch (InterruptedException e) { 324 progress.cancel(); 325 throw new InterruptedIOException( 326 "Interrupted while control throughput of compacting " + compactionName); 327 } finally { 328 // Clone last cell in the final because writer will append last cell when committing. If 329 // don't clone here and once the scanner get closed, then the memory of last cell will be 330 // released. (HBASE-22582) 331 ((ShipperListener) writer).beforeShipped(); 332 throughputController.finish(compactionName); 333 if (!finished && mobFileWriter != null) { 334 abortWriter(mobFileWriter); 335 } 336 if (!finished && delFileWriter != null) { 337 abortWriter(delFileWriter); 338 } 339 } 340 if (delFileWriter != null) { 341 if (deleteMarkersCount > 0) { 342 // If the del file is not empty, commit it. 343 // If the commit fails, the compaction is re-performed again. 344 delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount); 345 delFileWriter.close(); 346 mobStore.commitFile(delFileWriter.getPath(), path); 347 } else { 348 // If the del file is empty, delete it instead of committing. 349 abortWriter(delFileWriter); 350 } 351 } 352 if (mobFileWriter != null) { 353 if (mobCells > 0) { 354 // If the mob file is not empty, commit it. 355 mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); 356 mobFileWriter.close(); 357 mobStore.commitFile(mobFileWriter.getPath(), path); 358 } else { 359 // If the mob file is empty, delete it instead of committing. 360 abortWriter(mobFileWriter); 361 } 362 } 363 mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); 364 mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); 365 mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); 366 mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); 367 progress.complete(); 368 return true; 369 } 370}