001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.NavigableSet; 024import java.util.SortedSet; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellComparator; 029import org.apache.hadoop.hbase.ExtendedCell; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.ClassSize; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036 037/** 038 * An abstract class, which implements the behaviour shared by all concrete memstore instances. 039 */ 040@InterfaceAudience.Private 041public abstract class AbstractMemStore implements MemStore { 042 043 private static final long NO_SNAPSHOT_ID = -1; 044 045 private final Configuration conf; 046 private final CellComparator comparator; 047 048 // active segment absorbs write operations 049 private volatile MutableSegment active; 050 // Snapshot of memstore. Made for flusher. 051 protected volatile ImmutableSegment snapshot; 052 protected volatile long snapshotId; 053 // Used to track when to flush 054 private volatile long timeOfOldestEdit; 055 056 protected RegionServicesForStores regionServices; 057 058 public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT 059 + (5 * ClassSize.REFERENCE) 060 + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit 061 062 public final static long DEEP_OVERHEAD = FIXED_OVERHEAD; 063 064 public static void addToScanners(List<? extends Segment> segments, long readPt, 065 List<KeyValueScanner> scanners) { 066 for (Segment item : segments) { 067 addToScanners(item, readPt, scanners); 068 } 069 } 070 071 protected static void addToScanners(Segment segment, long readPt, 072 List<KeyValueScanner> scanners) { 073 scanners.add(segment.getScanner(readPt)); 074 } 075 076 protected AbstractMemStore(final Configuration conf, final CellComparator c, 077 final RegionServicesForStores regionServices) { 078 this.conf = conf; 079 this.comparator = c; 080 this.regionServices = regionServices; 081 resetActive(); 082 resetTimeOfOldestEdit(); 083 this.snapshot = SegmentFactory.instance().createImmutableSegment(c); 084 this.snapshotId = NO_SNAPSHOT_ID; 085 } 086 087 protected void resetActive() { 088 // Record the MutableSegment' heap overhead when initialing 089 MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); 090 // Reset heap to not include any keys 091 active = SegmentFactory.instance().createMutableSegment(conf, comparator, memstoreAccounting); 092 // regionServices can be null when testing 093 if (regionServices != null) { 094 regionServices.addMemStoreSize(memstoreAccounting.getDataSize(), 095 memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(), 096 memstoreAccounting.getCellsCount()); 097 } 098 } 099 100 protected void resetTimeOfOldestEdit() { 101 this.timeOfOldestEdit = Long.MAX_VALUE; 102 } 103 104 /** 105 * Updates the wal with the lowest sequence id (oldest entry) that is still in memory 106 * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or 107 * only if it is greater than the previous sequence id 108 */ 109 public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent); 110 111 @Override 112 public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) { 113 for (Cell cell : cells) { 114 add(cell, memstoreSizing); 115 } 116 } 117 118 @Override 119 public void add(Cell cell, MemStoreSizing memstoreSizing) { 120 doAddOrUpsert(cell, 0, memstoreSizing, true); } 121 122 /* 123 * Inserts the specified Cell into MemStore and deletes any existing 124 * versions of the same row/family/qualifier as the specified Cell. 125 * <p> 126 * First, the specified Cell is inserted into the Memstore. 127 * <p> 128 * If there are any existing Cell in this MemStore with the same row, 129 * family, and qualifier, they are removed. 130 * <p> 131 * Callers must hold the read lock. 132 * 133 * @param cell the cell to be updated 134 * @param readpoint readpoint below which we can safely remove duplicate KVs 135 * @param memstoreSizing object to accumulate changed size 136 */ 137 private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) { 138 doAddOrUpsert(cell, readpoint, memstoreSizing, false); 139 } 140 141 private void doAddOrUpsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing, boolean 142 doAdd) { 143 MutableSegment currentActive; 144 boolean succ = false; 145 while (!succ) { 146 currentActive = getActive(); 147 succ = preUpdate(currentActive, cell, memstoreSizing); 148 if (succ) { 149 if(doAdd) { 150 doAdd(currentActive, cell, memstoreSizing); 151 } else { 152 doUpsert(currentActive, cell, readpoint, memstoreSizing); 153 } 154 postUpdate(currentActive); 155 } 156 } 157 } 158 159 private void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) { 160 Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false); 161 boolean mslabUsed = (toAdd != cell); 162 // This cell data is backed by the same byte[] where we read request in RPC(See 163 // HBASE-15180). By default MSLAB is ON and we might have copied cell to MSLAB area. If 164 // not we must do below deep copy. Or else we will keep referring to the bigger chunk of 165 // memory and prevent it from getting GCed. 166 // Copy to MSLAB would not have happened if 167 // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled" 168 // 2. When the size of the cell is bigger than the max size supported by MSLAB. See 169 // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB 170 // 3. When cells are from Append/Increment operation. 171 if (!mslabUsed) { 172 toAdd = deepCopyIfNeeded(toAdd); 173 } 174 internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing); 175 } 176 177 private void doUpsert(MutableSegment currentActive, Cell cell, long readpoint, MemStoreSizing 178 memstoreSizing) { 179 // Add the Cell to the MemStore 180 // Use the internalAdd method here since we (a) already have a lock 181 // and (b) cannot safely use the MSLAB here without potentially 182 // hitting OOME - see TestMemStore.testUpsertMSLAB for a 183 // test that triggers the pathological case if we don't avoid MSLAB 184 // here. 185 // This cell data is backed by the same byte[] where we read request in RPC(See 186 // HBASE-15180). We must do below deep copy. Or else we will keep referring to the bigger 187 // chunk of memory and prevent it from getting GCed. 188 cell = deepCopyIfNeeded(cell); 189 boolean sizeAddedPreOperation = sizeAddedPreOperation(); 190 currentActive.upsert(cell, readpoint, memstoreSizing, sizeAddedPreOperation); 191 setOldestEditTimeToNow(); 192 } 193 194 /** 195 * Issue any synchronization and test needed before applying the update 196 * @param currentActive the segment to be updated 197 * @param cell the cell to be added 198 * @param memstoreSizing object to accumulate region size changes 199 * @return true iff can proceed with applying the update 200 */ 201 protected abstract boolean preUpdate(MutableSegment currentActive, Cell cell, 202 MemStoreSizing memstoreSizing); 203 204 /** 205 * Issue any post update synchronization and tests 206 * @param currentActive updated segment 207 */ 208 protected abstract void postUpdate(MutableSegment currentActive); 209 210 private static Cell deepCopyIfNeeded(Cell cell) { 211 if (cell instanceof ExtendedCell) { 212 return ((ExtendedCell) cell).deepClone(); 213 } 214 return cell; 215 } 216 217 @Override 218 public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) { 219 for (Cell cell : cells) { 220 upsert(cell, readpoint, memstoreSizing); 221 } 222 } 223 224 /** 225 * @return Oldest timestamp of all the Cells in the MemStore 226 */ 227 @Override 228 public long timeOfOldestEdit() { 229 return timeOfOldestEdit; 230 } 231 232 /** 233 * The passed snapshot was successfully persisted; it can be let go. 234 * @param id Id of the snapshot to clean out. 235 * @see MemStore#snapshot() 236 */ 237 @Override 238 public void clearSnapshot(long id) throws UnexpectedStateException { 239 if (this.snapshotId == -1) return; // already cleared 240 if (this.snapshotId != id) { 241 throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " 242 + id); 243 } 244 // OK. Passed in snapshot is same as current snapshot. If not-empty, 245 // create a new snapshot and let the old one go. 246 Segment oldSnapshot = this.snapshot; 247 if (!this.snapshot.isEmpty()) { 248 this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator); 249 } 250 this.snapshotId = NO_SNAPSHOT_ID; 251 oldSnapshot.close(); 252 } 253 254 @Override 255 public MemStoreSize getSnapshotSize() { 256 return this.snapshot.getMemStoreSize(); 257 } 258 259 @Override 260 public String toString() { 261 StringBuilder buf = new StringBuilder(); 262 int i = 1; 263 try { 264 for (Segment segment : getSegments()) { 265 buf.append("Segment (").append(i).append(") ").append(segment.toString()).append("; "); 266 i++; 267 } 268 } catch (IOException e){ 269 return e.toString(); 270 } 271 return buf.toString(); 272 } 273 274 protected Configuration getConfiguration() { 275 return conf; 276 } 277 278 protected void dump(Logger log) { 279 getActive().dump(log); 280 snapshot.dump(log); 281 } 282 283 284 /* 285 * @param a 286 * @param b 287 * @return Return lowest of a or b or null if both a and b are null 288 */ 289 protected Cell getLowest(final Cell a, final Cell b) { 290 if (a == null) { 291 return b; 292 } 293 if (b == null) { 294 return a; 295 } 296 return comparator.compareRows(a, b) <= 0? a: b; 297 } 298 299 /* 300 * @param key Find row that follows this one. If null, return first. 301 * @param set Set to look in for a row beyond <code>row</code>. 302 * @return Next row or null if none found. If one found, will be a new 303 * KeyValue -- can be destroyed by subsequent calls to this method. 304 */ 305 protected Cell getNextRow(final Cell key, 306 final NavigableSet<Cell> set) { 307 Cell result = null; 308 SortedSet<Cell> tail = key == null? set: set.tailSet(key); 309 // Iterate until we fall into the next row; i.e. move off current row 310 for (Cell cell: tail) { 311 if (comparator.compareRows(cell, key) <= 0) { 312 continue; 313 } 314 // Note: Not suppressing deletes or expired cells. Needs to be handled 315 // by higher up functions. 316 result = cell; 317 break; 318 } 319 return result; 320 } 321 322 /** 323 * If the segment has a memory allocator the cell is being cloned to this space, and returned; 324 * Otherwise the given cell is returned 325 * 326 * When a cell's size is too big (bigger than maxAlloc), it is not allocated on MSLAB. 327 * Since the process of flattening to CellChunkMap assumes that all cells are allocated on MSLAB, 328 * during this process, the input parameter forceCloneOfBigCell is set to 'true' 329 * and the cell is copied into MSLAB. 330 * 331 * @param cell the cell to clone 332 * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap. 333 * @return either the given cell or its clone 334 */ 335 private Cell maybeCloneWithAllocator(MutableSegment currentActive, Cell cell, boolean 336 forceCloneOfBigCell) { 337 return currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell); 338 } 339 340 /* 341 * Internal version of add() that doesn't clone Cells with the 342 * allocator, and doesn't take the lock. 343 * 344 * Callers should ensure they already have the read lock taken 345 * @param toAdd the cell to add 346 * @param mslabUsed whether using MSLAB 347 * @param memstoreSizing object to accumulate changed size 348 */ 349 private void internalAdd(MutableSegment currentActive, final Cell toAdd, final boolean 350 mslabUsed, MemStoreSizing memstoreSizing) { 351 boolean sizeAddedPreOperation = sizeAddedPreOperation(); 352 currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation); 353 setOldestEditTimeToNow(); 354 } 355 356 protected abstract boolean sizeAddedPreOperation(); 357 358 private void setOldestEditTimeToNow() { 359 if (timeOfOldestEdit == Long.MAX_VALUE) { 360 timeOfOldestEdit = EnvironmentEdgeManager.currentTime(); 361 } 362 } 363 364 /** 365 * @return The total size of cells in this memstore. We will not consider cells in the snapshot 366 */ 367 protected abstract long keySize(); 368 369 /** 370 * @return The total heap size of cells in this memstore. We will not consider cells in the 371 * snapshot 372 */ 373 protected abstract long heapSize(); 374 375 protected CellComparator getComparator() { 376 return comparator; 377 } 378 379 MutableSegment getActive() { 380 return active; 381 } 382 383 ImmutableSegment getSnapshot() { 384 return snapshot; 385 } 386 387 /** 388 * @return an ordered list of segments from most recent to oldest in memstore 389 */ 390 protected abstract List<Segment> getSegments() throws IOException; 391 392}