001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.NavigableSet; 023import java.util.SortedSet; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.CellComparator; 026import org.apache.hadoop.hbase.ExtendedCell; 027import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 028import org.apache.hadoop.hbase.util.Bytes; 029import org.apache.hadoop.hbase.util.ClassSize; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033 034/** 035 * An abstract class, which implements the behaviour shared by all concrete memstore instances. 036 */ 037@InterfaceAudience.Private 038public abstract class AbstractMemStore implements MemStore { 039 040 private static final long NO_SNAPSHOT_ID = -1; 041 042 private final Configuration conf; 043 private final CellComparator comparator; 044 045 // active segment absorbs write operations 046 private volatile MutableSegment active; 047 // Snapshot of memstore. Made for flusher. 048 protected volatile ImmutableSegment snapshot; 049 protected volatile long snapshotId; 050 // Used to track when to flush 051 private volatile long timeOfOldestEdit; 052 053 protected RegionServicesForStores regionServices; 054 055 // @formatter:off 056 public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT 057 + (5 * ClassSize.REFERENCE) 058 + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit 059 // @formatter:on 060 061 public final static long DEEP_OVERHEAD = FIXED_OVERHEAD; 062 063 public static void addToScanners(List<? extends Segment> segments, long readPt, 064 List<KeyValueScanner> scanners) { 065 for (Segment item : segments) { 066 addToScanners(item, readPt, scanners); 067 } 068 } 069 070 protected static void addToScanners(Segment segment, long readPt, 071 List<KeyValueScanner> scanners) { 072 if (!segment.isEmpty()) { 073 scanners.add(segment.getScanner(readPt)); 074 } 075 } 076 077 protected AbstractMemStore(final Configuration conf, final CellComparator c, 078 final RegionServicesForStores regionServices) { 079 this.conf = conf; 080 this.comparator = c; 081 this.regionServices = regionServices; 082 resetActive(); 083 resetTimeOfOldestEdit(); 084 this.snapshot = SegmentFactory.instance().createImmutableSegment(c); 085 this.snapshotId = NO_SNAPSHOT_ID; 086 } 087 088 protected void resetActive() { 089 // Record the MutableSegment' heap overhead when initialing 090 MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); 091 // Reset heap to not include any keys 092 active = SegmentFactory.instance().createMutableSegment(conf, comparator, memstoreAccounting); 093 // regionServices can be null when testing 094 if (regionServices != null) { 095 regionServices.addMemStoreSize(memstoreAccounting.getDataSize(), 096 memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(), 097 memstoreAccounting.getCellsCount()); 098 } 099 } 100 101 protected void resetTimeOfOldestEdit() { 102 this.timeOfOldestEdit = Long.MAX_VALUE; 103 } 104 105 /** 106 * Updates the wal with the lowest sequence id (oldest entry) that is still in memory 107 * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or 108 * only if it is greater than the previous sequence id 109 */ 110 public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent); 111 112 @Override 113 public void add(Iterable<ExtendedCell> cells, MemStoreSizing memstoreSizing) { 114 for (ExtendedCell cell : cells) { 115 add(cell, memstoreSizing); 116 } 117 } 118 119 @Override 120 public void add(ExtendedCell cell, MemStoreSizing memstoreSizing) { 121 doAddOrUpsert(cell, 0, memstoreSizing, true); 122 } 123 124 /* 125 * Inserts the specified Cell into MemStore and deletes any existing versions of the same 126 * row/family/qualifier as the specified Cell. <p> First, the specified Cell is inserted into the 127 * Memstore. <p> If there are any existing Cell in this MemStore with the same row, family, and 128 * qualifier, they are removed. <p> Callers must hold the read lock. 129 * @param cell the cell to be updated 130 * @param readpoint readpoint below which we can safely remove duplicate KVs 131 * @param memstoreSizing object to accumulate changed size 132 */ 133 private void upsert(ExtendedCell cell, long readpoint, MemStoreSizing memstoreSizing) { 134 doAddOrUpsert(cell, readpoint, memstoreSizing, false); 135 } 136 137 private void doAddOrUpsert(ExtendedCell cell, long readpoint, MemStoreSizing memstoreSizing, 138 boolean doAdd) { 139 MutableSegment currentActive; 140 boolean succ = false; 141 while (!succ) { 142 currentActive = getActive(); 143 succ = preUpdate(currentActive, cell, memstoreSizing); 144 if (succ) { 145 if (doAdd) { 146 doAdd(currentActive, cell, memstoreSizing); 147 } else { 148 doUpsert(currentActive, cell, readpoint, memstoreSizing); 149 } 150 postUpdate(currentActive); 151 } 152 } 153 } 154 155 protected void doAdd(MutableSegment currentActive, ExtendedCell cell, 156 MemStoreSizing memstoreSizing) { 157 ExtendedCell toAdd = maybeCloneWithAllocator(currentActive, cell, false); 158 boolean mslabUsed = (toAdd != cell); 159 // This cell data is backed by the same byte[] where we read request in RPC(See 160 // HBASE-15180). By default, MSLAB is ON and we might have copied cell to MSLAB area. If 161 // not we must do below deep copy. Or else we will keep referring to the bigger chunk of 162 // memory and prevent it from getting GCed. 163 // Copy to MSLAB would not have happened if 164 // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled" 165 // 2. When the size of the cell is bigger than the max size supported by MSLAB. See 166 // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB 167 // 3. When cells are from Append/Increment operation. 168 if (!mslabUsed) { 169 toAdd = deepCopyIfNeeded(toAdd); 170 } 171 internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing); 172 } 173 174 private void doUpsert(MutableSegment currentActive, ExtendedCell cell, long readpoint, 175 MemStoreSizing memstoreSizing) { 176 // Add the Cell to the MemStore 177 // Use the internalAdd method here since we 178 // (a) already have a lock and 179 // (b) cannot safely use the MSLAB here without potentially hitting OOME 180 // - see TestMemStore.testUpsertMSLAB for a test that triggers the pathological case if we don't 181 // avoid MSLAB here. 182 // This cell data is backed by the same byte[] where we read request in RPC(See 183 // HBASE-15180). We must do below deep copy. Or else we will keep referring to the bigger 184 // chunk of memory and prevent it from getting GCed. 185 cell = deepCopyIfNeeded(cell); 186 boolean sizeAddedPreOperation = sizeAddedPreOperation(); 187 currentActive.upsert(cell, readpoint, memstoreSizing, sizeAddedPreOperation); 188 setOldestEditTimeToNow(); 189 } 190 191 /** 192 * Issue any synchronization and test needed before applying the update 193 * @param currentActive the segment to be updated 194 * @param cell the cell to be added 195 * @param memstoreSizing object to accumulate region size changes 196 * @return true iff can proceed with applying the update 197 */ 198 protected abstract boolean preUpdate(MutableSegment currentActive, ExtendedCell cell, 199 MemStoreSizing memstoreSizing); 200 201 /** 202 * Issue any post update synchronization and tests 203 * @param currentActive updated segment 204 */ 205 protected abstract void postUpdate(MutableSegment currentActive); 206 207 private static ExtendedCell deepCopyIfNeeded(ExtendedCell cell) { 208 return cell.deepClone(); 209 } 210 211 @Override 212 public void upsert(Iterable<ExtendedCell> cells, long readpoint, MemStoreSizing memstoreSizing) { 213 for (ExtendedCell cell : cells) { 214 upsert(cell, readpoint, memstoreSizing); 215 } 216 } 217 218 /** Returns Oldest timestamp of all the Cells in the MemStore */ 219 @Override 220 public long timeOfOldestEdit() { 221 return timeOfOldestEdit; 222 } 223 224 /** 225 * This method is protected under {@link HStore#lock} write lock,<br/> 226 * and this method is used by {@link HStore#updateStorefiles} after flushing is completed.<br/> 227 * The passed snapshot was successfully persisted; it can be let go. 228 * @param id Id of the snapshot to clean out. 229 * @see MemStore#snapshot() 230 */ 231 @Override 232 public void clearSnapshot(long id) throws UnexpectedStateException { 233 if (this.snapshotId == -1) return; // already cleared 234 if (this.snapshotId != id) { 235 throw new UnexpectedStateException( 236 "Current snapshot id is " + this.snapshotId + ",passed " + id); 237 } 238 // OK. Passed in snapshot is same as current snapshot. If not-empty, 239 // create a new snapshot and let the old one go. 240 doClearSnapShot(); 241 } 242 243 protected void doClearSnapShot() { 244 Segment oldSnapshot = this.snapshot; 245 if (!this.snapshot.isEmpty()) { 246 this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator); 247 } 248 this.snapshotId = NO_SNAPSHOT_ID; 249 oldSnapshot.close(); 250 } 251 252 @Override 253 public MemStoreSize getSnapshotSize() { 254 return this.snapshot.getMemStoreSize(); 255 } 256 257 @Override 258 public String toString() { 259 StringBuilder buf = new StringBuilder(); 260 int i = 1; 261 try { 262 for (Segment segment : getSegments()) { 263 buf.append("Segment (").append(i).append(") ").append(segment.toString()).append("; "); 264 i++; 265 } 266 } catch (IOException e) { 267 return e.toString(); 268 } 269 return buf.toString(); 270 } 271 272 protected Configuration getConfiguration() { 273 return conf; 274 } 275 276 protected void dump(Logger log) { 277 getActive().dump(log); 278 snapshot.dump(log); 279 } 280 281 /** Returns Return lowest of a or b or null if both a and b are null */ 282 protected ExtendedCell getLowest(final ExtendedCell a, final ExtendedCell b) { 283 if (a == null) { 284 return b; 285 } 286 if (b == null) { 287 return a; 288 } 289 return comparator.compareRows(a, b) <= 0 ? a : b; 290 } 291 292 /** 293 * @param key Find row that follows this one. If null, return first. 294 * @param set Set to look in for a row beyond <code>row</code>. 295 * @return Next row or null if none found. If one found, will be a new KeyValue -- can be 296 * destroyed by subsequent calls to this method. 297 */ 298 protected ExtendedCell getNextRow(final ExtendedCell key, final NavigableSet<ExtendedCell> set) { 299 ExtendedCell result = null; 300 SortedSet<ExtendedCell> tail = key == null ? set : set.tailSet(key); 301 // Iterate until we fall into the next row; i.e. move off current row 302 for (ExtendedCell cell : tail) { 303 if (comparator.compareRows(cell, key) <= 0) { 304 continue; 305 } 306 // Note: Not suppressing deletes or expired cells. Needs to be handled 307 // by higher up functions. 308 result = cell; 309 break; 310 } 311 return result; 312 } 313 314 /** 315 * If the segment has a memory allocator the cell is being cloned to this space, and returned; 316 * Otherwise the given cell is returned When a cell's size is too big (bigger than maxAlloc), it 317 * is not allocated on MSLAB. Since the process of flattening to CellChunkMap assumes that all 318 * cells are allocated on MSLAB, during this process, the input parameter forceCloneOfBigCell is 319 * set to 'true' and the cell is copied into MSLAB. 320 * @param cell the cell to clone 321 * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap. 322 * @return either the given cell or its clone 323 */ 324 private ExtendedCell maybeCloneWithAllocator(MutableSegment currentActive, ExtendedCell cell, 325 boolean forceCloneOfBigCell) { 326 return currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell); 327 } 328 329 /** 330 * Internal version of add() that doesn't clone Cells with the allocator, and doesn't take the 331 * lock. Callers should ensure they already have the read lock taken 332 * @param toAdd the cell to add 333 * @param mslabUsed whether using MSLAB 334 * @param memstoreSizing object to accumulate changed size 335 */ 336 private void internalAdd(MutableSegment currentActive, final ExtendedCell toAdd, 337 final boolean mslabUsed, MemStoreSizing memstoreSizing) { 338 boolean sizeAddedPreOperation = sizeAddedPreOperation(); 339 currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation); 340 setOldestEditTimeToNow(); 341 } 342 343 protected abstract boolean sizeAddedPreOperation(); 344 345 private void setOldestEditTimeToNow() { 346 if (timeOfOldestEdit == Long.MAX_VALUE) { 347 timeOfOldestEdit = EnvironmentEdgeManager.currentTime(); 348 } 349 } 350 351 /** 352 * Returns The total size of cells in this memstore. We will not consider cells in the snapshot 353 */ 354 protected abstract long keySize(); 355 356 /** 357 * @return The total heap size of cells in this memstore. We will not consider cells in the 358 * snapshot 359 */ 360 protected abstract long heapSize(); 361 362 protected CellComparator getComparator() { 363 return comparator; 364 } 365 366 MutableSegment getActive() { 367 return active; 368 } 369 370 ImmutableSegment getSnapshot() { 371 return snapshot; 372 } 373 374 @Override 375 public void close() { 376 // active should never be null 377 active.close(); 378 // for snapshot, either it is empty, where we do not reference any real segment which contains a 379 // memstore lab, or it is during snapshot, where we will clear it when calling clearSnapshot, so 380 // we do not need to close it here 381 } 382 383 /** Returns an ordered list of segments from most recent to oldest in memstore */ 384 protected abstract List<Segment> getSegments() throws IOException; 385 386}