001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mob; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Date; 025import java.util.List; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.KeyValueUtil; 033import org.apache.hadoop.hbase.monitoring.MonitoredTask; 034import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; 035import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 036import org.apache.hadoop.hbase.regionserver.HMobStore; 037import org.apache.hadoop.hbase.regionserver.HStore; 038import org.apache.hadoop.hbase.regionserver.InternalScanner; 039import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; 040import org.apache.hadoop.hbase.regionserver.ScannerContext; 041import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 042import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 043import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.util.StringUtils; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher. 052 * If the store is not a mob store, the flusher flushes the MemStore the same with 053 * DefaultStoreFlusher, 054 * If the store is a mob store, the flusher flushes the MemStore into two places. 055 * One is the store files of HBase, the other is the mob files. 056 * <ol> 057 * <li>Cells that are not PUT type or have the delete mark will be directly flushed to HBase.</li> 058 * <li>If the size of a cell value is larger than a threshold, it'll be flushed 059 * to a mob file, another cell with the path of this file will be flushed to HBase.</li> 060 * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to 061 * HBase directly.</li> 062 * </ol> 063 * 064 */ 065@InterfaceAudience.Private 066public class DefaultMobStoreFlusher extends DefaultStoreFlusher { 067 068 private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreFlusher.class); 069 private final Object flushLock = new Object(); 070 private long mobCellValueSizeThreshold = 0; 071 private Path targetPath; 072 private HMobStore mobStore; 073 074 public DefaultMobStoreFlusher(Configuration conf, HStore store) throws IOException { 075 super(conf, store); 076 if (!(store instanceof HMobStore)) { 077 throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); 078 } 079 mobCellValueSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold(); 080 this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(), 081 store.getColumnFamilyName()); 082 if (!this.store.getFileSystem().exists(targetPath)) { 083 this.store.getFileSystem().mkdirs(targetPath); 084 } 085 this.mobStore = (HMobStore) store; 086 } 087 088 /** 089 * Flushes the snapshot of the MemStore. 090 * If this store is not a mob store, flush the cells in the snapshot to store files of HBase. 091 * If the store is a mob one, the flusher flushes the MemStore into two places. 092 * One is the store files of HBase, the other is the mob files. 093 * <ol> 094 * <li>Cells that are not PUT type or have the delete mark will be directly flushed to 095 * HBase.</li> 096 * <li>If the size of a cell value is larger than a threshold, it'll be 097 * flushed to a mob file, another cell with the path of this file will be flushed to HBase.</li> 098 * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to 099 * HBase directly.</li> 100 * </ol> 101 */ 102 @Override 103 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 104 MonitoredTask status, ThroughputController throughputController, 105 FlushLifeCycleTracker tracker) throws IOException { 106 ArrayList<Path> result = new ArrayList<>(); 107 long cellsCount = snapshot.getCellsCount(); 108 if (cellsCount == 0) return result; // don't flush if there are no entries 109 110 // Use a store scanner to find which rows to flush. 111 long smallestReadPoint = store.getSmallestReadPoint(); 112 InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker); 113 StoreFileWriter writer; 114 try { 115 // TODO: We can fail in the below block before we complete adding this flush to 116 // list of store files. Add cleanup of anything put on filesystem if we fail. 117 synchronized (flushLock) { 118 status.setStatus("Flushing " + store + ": creating writer"); 119 // Write the map out to the disk 120 writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(), 121 false, true, true, false); 122 IOException e = null; 123 try { 124 // It's a mob store, flush the cells in a mob way. This is the difference of flushing 125 // between a normal and a mob store. 126 performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController); 127 } catch (IOException ioe) { 128 e = ioe; 129 // throw the exception out 130 throw ioe; 131 } finally { 132 if (e != null) { 133 writer.close(); 134 } else { 135 finalizeWriter(writer, cacheFlushId, status); 136 } 137 } 138 } 139 } finally { 140 scanner.close(); 141 } 142 LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize=" 143 + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getDataSize(), "", 1) + 144 ", hasBloomFilter=" + writer.hasGeneralBloom() + 145 ", into tmp file " + writer.getPath()); 146 result.add(writer.getPath()); 147 return result; 148 } 149 150 /** 151 * Flushes the cells in the mob store. 152 * <ol>In the mob store, the cells with PUT type might have or have no mob tags. 153 * <li>If a cell does not have a mob tag, flushing the cell to different files depends 154 * on the value length. If the length is larger than a threshold, it's flushed to a 155 * mob file and the mob file is flushed to a store file in HBase. Otherwise, directly 156 * flush the cell to a store file in HBase.</li> 157 * <li>If a cell have a mob tag, its value is a mob file name, directly flush it 158 * to a store file in HBase.</li> 159 * </ol> 160 * @param snapshot Memstore snapshot. 161 * @param cacheFlushId Log cache flush sequence number. 162 * @param scanner The scanner of memstore snapshot. 163 * @param writer The store file writer. 164 * @param status Task that represents the flush operation and may be updated with status. 165 * @param throughputController A controller to avoid flush too fast. 166 * @throws IOException 167 */ 168 protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, 169 InternalScanner scanner, StoreFileWriter writer, MonitoredTask status, 170 ThroughputController throughputController) throws IOException { 171 StoreFileWriter mobFileWriter = null; 172 int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 173 HConstants.COMPACTION_KV_MAX_DEFAULT); 174 long mobCount = 0; 175 long mobSize = 0; 176 long time = snapshot.getTimeRangeTracker().getMax(); 177 mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(), 178 store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), false); 179 // the target path is {tableName}/.mob/{cfName}/mobFiles 180 // the relative path is mobFiles 181 byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 182 ScannerContext scannerContext = 183 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 184 List<Cell> cells = new ArrayList<>(); 185 boolean hasMore; 186 String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush"); 187 boolean control = throughputController != null && !store.getRegionInfo().getTable().isSystemTable(); 188 if (control) { 189 throughputController.start(flushName); 190 } 191 IOException ioe = null; 192 try { 193 do { 194 hasMore = scanner.next(cells, scannerContext); 195 if (!cells.isEmpty()) { 196 for (Cell c : cells) { 197 // If we know that this KV is going to be included always, then let us 198 // set its memstoreTS to 0. This will help us save space when writing to 199 // disk. 200 if (c.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(c) 201 || c.getTypeByte() != KeyValue.Type.Put.getCode()) { 202 writer.append(c); 203 } else { 204 // append the original keyValue in the mob file. 205 mobFileWriter.append(c); 206 mobSize += c.getValueLength(); 207 mobCount++; 208 209 // append the tags to the KeyValue. 210 // The key is same, the value is the filename of the mob file 211 Cell reference = MobUtils.createMobRefCell(c, fileName, 212 this.mobStore.getRefCellTags()); 213 writer.append(reference); 214 } 215 int len = KeyValueUtil.length(c); 216 if (control) { 217 throughputController.control(flushName, len); 218 } 219 } 220 cells.clear(); 221 } 222 } while (hasMore); 223 } catch (InterruptedException e) { 224 ioe = new InterruptedIOException( 225 "Interrupted while control throughput of flushing " + flushName); 226 throw ioe; 227 } catch (IOException e) { 228 ioe = e; 229 throw e; 230 } finally { 231 if (control) { 232 throughputController.finish(flushName); 233 } 234 if (ioe != null) { 235 mobFileWriter.close(); 236 } 237 } 238 239 if (mobCount > 0) { 240 // commit the mob file from temp folder to target folder. 241 // If the mob file is committed successfully but the store file is not, 242 // the committed mob file will be handled by the sweep tool as an unused 243 // file. 244 status.setStatus("Flushing mob file " + store + ": appending metadata"); 245 mobFileWriter.appendMetadata(cacheFlushId, false, mobCount); 246 status.setStatus("Flushing mob file " + store + ": closing flushed file"); 247 mobFileWriter.close(); 248 mobStore.commitFile(mobFileWriter.getPath(), targetPath); 249 mobStore.updateMobFlushCount(); 250 mobStore.updateMobFlushedCellsCount(mobCount); 251 mobStore.updateMobFlushedCellsSize(mobSize); 252 } else { 253 try { 254 status.setStatus("Flushing mob file " + store + ": no mob cells, closing flushed file"); 255 mobFileWriter.close(); 256 // If the mob file is empty, delete it instead of committing. 257 store.getFileSystem().delete(mobFileWriter.getPath(), true); 258 } catch (IOException e) { 259 LOG.error("Failed to delete the temp mob file", e); 260 } 261 } 262 } 263}