001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 022import java.io.IOException; 023import java.util.List; 024import java.util.NavigableSet; 025import java.util.SortedSet; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellComparator; 030import org.apache.hadoop.hbase.ExtendedCell; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.ClassSize; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037 038/** 039 * An abstract class, which implements the behaviour shared by all concrete memstore instances. 040 */ 041@InterfaceAudience.Private 042public abstract class AbstractMemStore implements MemStore { 043 044 private static final long NO_SNAPSHOT_ID = -1; 045 046 private final Configuration conf; 047 private final CellComparator comparator; 048 049 // active segment absorbs write operations 050 private volatile MutableSegment active; 051 // Snapshot of memstore. Made for flusher. 052 protected volatile ImmutableSegment snapshot; 053 protected volatile long snapshotId; 054 // Used to track when to flush 055 private volatile long timeOfOldestEdit; 056 057 protected RegionServicesForStores regionServices; 058 059 public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT 060 + (5 * ClassSize.REFERENCE) 061 + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit 062 063 public final static long DEEP_OVERHEAD = FIXED_OVERHEAD; 064 065 public static void addToScanners(List<? extends Segment> segments, long readPt, 066 List<KeyValueScanner> scanners) { 067 for (Segment item : segments) { 068 addToScanners(item, readPt, scanners); 069 } 070 } 071 072 protected static void addToScanners(Segment segment, long readPt, 073 List<KeyValueScanner> scanners) { 074 scanners.add(segment.getScanner(readPt)); 075 } 076 077 protected AbstractMemStore(final Configuration conf, final CellComparator c, 078 final RegionServicesForStores regionServices) { 079 this.conf = conf; 080 this.comparator = c; 081 this.regionServices = regionServices; 082 resetActive(); 083 this.snapshot = SegmentFactory.instance().createImmutableSegment(c); 084 this.snapshotId = NO_SNAPSHOT_ID; 085 } 086 087 protected void resetActive() { 088 // Record the MutableSegment' heap overhead when initialing 089 MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); 090 // Reset heap to not include any keys 091 active = SegmentFactory.instance().createMutableSegment(conf, comparator, memstoreAccounting); 092 // regionServices can be null when testing 093 if (regionServices != null) { 094 regionServices.addMemStoreSize(memstoreAccounting.getDataSize(), 095 memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(), 096 memstoreAccounting.getCellsCount()); 097 } 098 timeOfOldestEdit = Long.MAX_VALUE; 099 } 100 101 /** 102 * Updates the wal with the lowest sequence id (oldest entry) that is still in memory 103 * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or 104 * only if it is greater than the previous sequence id 105 */ 106 public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent); 107 108 @Override 109 public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) { 110 for (Cell cell : cells) { 111 add(cell, memstoreSizing); 112 } 113 } 114 115 @Override 116 public void add(Cell cell, MemStoreSizing memstoreSizing) { 117 doAddOrUpsert(cell, 0, memstoreSizing, true); } 118 119 /* 120 * Inserts the specified Cell into MemStore and deletes any existing 121 * versions of the same row/family/qualifier as the specified Cell. 122 * <p> 123 * First, the specified Cell is inserted into the Memstore. 124 * <p> 125 * If there are any existing Cell in this MemStore with the same row, 126 * family, and qualifier, they are removed. 127 * <p> 128 * Callers must hold the read lock. 129 * 130 * @param cell the cell to be updated 131 * @param readpoint readpoint below which we can safely remove duplicate KVs 132 * @param memstoreSizing object to accumulate changed size 133 */ 134 private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) { 135 doAddOrUpsert(cell, readpoint, memstoreSizing, false); 136 } 137 138 private void doAddOrUpsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing, boolean 139 doAdd) { 140 MutableSegment currentActive; 141 boolean succ = false; 142 while (!succ) { 143 currentActive = getActive(); 144 succ = preUpdate(currentActive, cell, memstoreSizing); 145 if (succ) { 146 if(doAdd) { 147 doAdd(currentActive, cell, memstoreSizing); 148 } else { 149 doUpsert(currentActive, cell, readpoint, memstoreSizing); 150 } 151 postUpdate(currentActive); 152 } 153 } 154 } 155 156 private void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) { 157 Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false); 158 boolean mslabUsed = (toAdd != cell); 159 // This cell data is backed by the same byte[] where we read request in RPC(See 160 // HBASE-15180). By default MSLAB is ON and we might have copied cell to MSLAB area. If 161 // not we must do below deep copy. Or else we will keep referring to the bigger chunk of 162 // memory and prevent it from getting GCed. 163 // Copy to MSLAB would not have happened if 164 // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled" 165 // 2. When the size of the cell is bigger than the max size supported by MSLAB. See 166 // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB 167 // 3. When cells are from Append/Increment operation. 168 if (!mslabUsed) { 169 toAdd = deepCopyIfNeeded(toAdd); 170 } 171 internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing); 172 } 173 174 private void doUpsert(MutableSegment currentActive, Cell cell, long readpoint, MemStoreSizing 175 memstoreSizing) { 176 // Add the Cell to the MemStore 177 // Use the internalAdd method here since we (a) already have a lock 178 // and (b) cannot safely use the MSLAB here without potentially 179 // hitting OOME - see TestMemStore.testUpsertMSLAB for a 180 // test that triggers the pathological case if we don't avoid MSLAB 181 // here. 182 // This cell data is backed by the same byte[] where we read request in RPC(See 183 // HBASE-15180). We must do below deep copy. Or else we will keep referring to the bigger 184 // chunk of memory and prevent it from getting GCed. 185 cell = deepCopyIfNeeded(cell); 186 boolean sizeAddedPreOperation = sizeAddedPreOperation(); 187 currentActive.upsert(cell, readpoint, memstoreSizing, sizeAddedPreOperation); 188 setOldestEditTimeToNow(); 189 } 190 191 /** 192 * Issue any synchronization and test needed before applying the update 193 * @param currentActive the segment to be updated 194 * @param cell the cell to be added 195 * @param memstoreSizing object to accumulate region size changes 196 * @return true iff can proceed with applying the update 197 */ 198 protected abstract boolean preUpdate(MutableSegment currentActive, Cell cell, 199 MemStoreSizing memstoreSizing); 200 201 /** 202 * Issue any post update synchronization and tests 203 * @param currentActive updated segment 204 */ 205 protected abstract void postUpdate(MutableSegment currentActive); 206 207 private static Cell deepCopyIfNeeded(Cell cell) { 208 if (cell instanceof ExtendedCell) { 209 return ((ExtendedCell) cell).deepClone(); 210 } 211 return cell; 212 } 213 214 @Override 215 public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) { 216 for (Cell cell : cells) { 217 upsert(cell, readpoint, memstoreSizing); 218 } 219 } 220 221 /** 222 * @return Oldest timestamp of all the Cells in the MemStore 223 */ 224 @Override 225 public long timeOfOldestEdit() { 226 return timeOfOldestEdit; 227 } 228 229 /** 230 * The passed snapshot was successfully persisted; it can be let go. 231 * @param id Id of the snapshot to clean out. 232 * @see MemStore#snapshot() 233 */ 234 @Override 235 public void clearSnapshot(long id) throws UnexpectedStateException { 236 if (this.snapshotId == -1) return; // already cleared 237 if (this.snapshotId != id) { 238 throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " 239 + id); 240 } 241 // OK. Passed in snapshot is same as current snapshot. If not-empty, 242 // create a new snapshot and let the old one go. 243 Segment oldSnapshot = this.snapshot; 244 if (!this.snapshot.isEmpty()) { 245 this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator); 246 } 247 this.snapshotId = NO_SNAPSHOT_ID; 248 oldSnapshot.close(); 249 } 250 251 @Override 252 public MemStoreSize getSnapshotSize() { 253 return this.snapshot.getMemStoreSize(); 254 } 255 256 @Override 257 public String toString() { 258 StringBuilder buf = new StringBuilder(); 259 int i = 1; 260 try { 261 for (Segment segment : getSegments()) { 262 buf.append("Segment (").append(i).append(") ").append(segment.toString()).append("; "); 263 i++; 264 } 265 } catch (IOException e){ 266 return e.toString(); 267 } 268 return buf.toString(); 269 } 270 271 protected Configuration getConfiguration() { 272 return conf; 273 } 274 275 protected void dump(Logger log) { 276 getActive().dump(log); 277 snapshot.dump(log); 278 } 279 280 281 /* 282 * @param a 283 * @param b 284 * @return Return lowest of a or b or null if both a and b are null 285 */ 286 protected Cell getLowest(final Cell a, final Cell b) { 287 if (a == null) { 288 return b; 289 } 290 if (b == null) { 291 return a; 292 } 293 return comparator.compareRows(a, b) <= 0? a: b; 294 } 295 296 /* 297 * @param key Find row that follows this one. If null, return first. 298 * @param set Set to look in for a row beyond <code>row</code>. 299 * @return Next row or null if none found. If one found, will be a new 300 * KeyValue -- can be destroyed by subsequent calls to this method. 301 */ 302 protected Cell getNextRow(final Cell key, 303 final NavigableSet<Cell> set) { 304 Cell result = null; 305 SortedSet<Cell> tail = key == null? set: set.tailSet(key); 306 // Iterate until we fall into the next row; i.e. move off current row 307 for (Cell cell: tail) { 308 if (comparator.compareRows(cell, key) <= 0) { 309 continue; 310 } 311 // Note: Not suppressing deletes or expired cells. Needs to be handled 312 // by higher up functions. 313 result = cell; 314 break; 315 } 316 return result; 317 } 318 319 /** 320 * If the segment has a memory allocator the cell is being cloned to this space, and returned; 321 * Otherwise the given cell is returned 322 * 323 * When a cell's size is too big (bigger than maxAlloc), it is not allocated on MSLAB. 324 * Since the process of flattening to CellChunkMap assumes that all cells are allocated on MSLAB, 325 * during this process, the input parameter forceCloneOfBigCell is set to 'true' 326 * and the cell is copied into MSLAB. 327 * 328 * @param cell the cell to clone 329 * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap. 330 * @return either the given cell or its clone 331 */ 332 private Cell maybeCloneWithAllocator(MutableSegment currentActive, Cell cell, boolean 333 forceCloneOfBigCell) { 334 return currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell); 335 } 336 337 /* 338 * Internal version of add() that doesn't clone Cells with the 339 * allocator, and doesn't take the lock. 340 * 341 * Callers should ensure they already have the read lock taken 342 * @param toAdd the cell to add 343 * @param mslabUsed whether using MSLAB 344 * @param memstoreSizing object to accumulate changed size 345 */ 346 private void internalAdd(MutableSegment currentActive, final Cell toAdd, final boolean 347 mslabUsed, MemStoreSizing memstoreSizing) { 348 boolean sizeAddedPreOperation = sizeAddedPreOperation(); 349 currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation); 350 setOldestEditTimeToNow(); 351 } 352 353 protected abstract boolean sizeAddedPreOperation(); 354 355 private void setOldestEditTimeToNow() { 356 if (timeOfOldestEdit == Long.MAX_VALUE) { 357 timeOfOldestEdit = EnvironmentEdgeManager.currentTime(); 358 } 359 } 360 361 /** 362 * @return The total size of cells in this memstore. We will not consider cells in the snapshot 363 */ 364 protected abstract long keySize(); 365 366 /** 367 * @return The total heap size of cells in this memstore. We will not consider cells in the 368 * snapshot 369 */ 370 protected abstract long heapSize(); 371 372 protected CellComparator getComparator() { 373 return comparator; 374 } 375 376 @VisibleForTesting 377 MutableSegment getActive() { 378 return active; 379 } 380 381 @VisibleForTesting 382 ImmutableSegment getSnapshot() { 383 return snapshot; 384 } 385 386 /** 387 * @return an ordered list of segments from most recent to oldest in memstore 388 */ 389 protected abstract List<Segment> getSegments() throws IOException; 390 391}