001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.NavigableSet; 023import java.util.SortedSet; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.CellComparator; 027import org.apache.hadoop.hbase.ExtendedCell; 028import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.hbase.util.ClassSize; 031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034 035/** 036 * An abstract class, which implements the behaviour shared by all concrete memstore instances. 037 */ 038@InterfaceAudience.Private 039public abstract class AbstractMemStore implements MemStore { 040 041 private static final long NO_SNAPSHOT_ID = -1; 042 043 private final Configuration conf; 044 private final CellComparator comparator; 045 046 // active segment absorbs write operations 047 private volatile MutableSegment active; 048 // Snapshot of memstore. Made for flusher. 049 protected volatile ImmutableSegment snapshot; 050 protected volatile long snapshotId; 051 // Used to track when to flush 052 private volatile long timeOfOldestEdit; 053 054 protected RegionServicesForStores regionServices; 055 056 // @formatter:off 057 public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT 058 + (5 * ClassSize.REFERENCE) 059 + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit 060 // @formatter:on 061 062 public final static long DEEP_OVERHEAD = FIXED_OVERHEAD; 063 064 public static void addToScanners(List<? extends Segment> segments, long readPt, 065 List<KeyValueScanner> scanners) { 066 for (Segment item : segments) { 067 addToScanners(item, readPt, scanners); 068 } 069 } 070 071 protected static void addToScanners(Segment segment, long readPt, 072 List<KeyValueScanner> scanners) { 073 if (!segment.isEmpty()) { 074 scanners.add(segment.getScanner(readPt)); 075 } 076 } 077 078 protected AbstractMemStore(final Configuration conf, final CellComparator c, 079 final RegionServicesForStores regionServices) { 080 this.conf = conf; 081 this.comparator = c; 082 this.regionServices = regionServices; 083 resetActive(); 084 resetTimeOfOldestEdit(); 085 this.snapshot = SegmentFactory.instance().createImmutableSegment(c); 086 this.snapshotId = NO_SNAPSHOT_ID; 087 } 088 089 protected void resetActive() { 090 // Record the MutableSegment' heap overhead when initialing 091 MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); 092 // Reset heap to not include any keys 093 active = SegmentFactory.instance().createMutableSegment(conf, comparator, memstoreAccounting); 094 // regionServices can be null when testing 095 if (regionServices != null) { 096 regionServices.addMemStoreSize(memstoreAccounting.getDataSize(), 097 memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(), 098 memstoreAccounting.getCellsCount()); 099 } 100 } 101 102 protected void resetTimeOfOldestEdit() { 103 this.timeOfOldestEdit = Long.MAX_VALUE; 104 } 105 106 /** 107 * Updates the wal with the lowest sequence id (oldest entry) that is still in memory 108 * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or 109 * only if it is greater than the previous sequence id 110 */ 111 public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent); 112 113 @Override 114 public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) { 115 for (Cell cell : cells) { 116 add(cell, memstoreSizing); 117 } 118 } 119 120 @Override 121 public void add(Cell cell, MemStoreSizing memstoreSizing) { 122 doAddOrUpsert(cell, 0, memstoreSizing, true); 123 } 124 125 /* 126 * Inserts the specified Cell into MemStore and deletes any existing versions of the same 127 * row/family/qualifier as the specified Cell. <p> First, the specified Cell is inserted into the 128 * Memstore. <p> If there are any existing Cell in this MemStore with the same row, family, and 129 * qualifier, they are removed. <p> Callers must hold the read lock. 130 * @param cell the cell to be updated 131 * @param readpoint readpoint below which we can safely remove duplicate KVs 132 * @param memstoreSizing object to accumulate changed size 133 */ 134 private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) { 135 doAddOrUpsert(cell, readpoint, memstoreSizing, false); 136 } 137 138 private void doAddOrUpsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing, 139 boolean doAdd) { 140 MutableSegment currentActive; 141 boolean succ = false; 142 while (!succ) { 143 currentActive = getActive(); 144 succ = preUpdate(currentActive, cell, memstoreSizing); 145 if (succ) { 146 if (doAdd) { 147 doAdd(currentActive, cell, memstoreSizing); 148 } else { 149 doUpsert(currentActive, cell, readpoint, memstoreSizing); 150 } 151 postUpdate(currentActive); 152 } 153 } 154 } 155 156 protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) { 157 Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false); 158 boolean mslabUsed = (toAdd != cell); 159 // This cell data is backed by the same byte[] where we read request in RPC(See 160 // HBASE-15180). By default MSLAB is ON and we might have copied cell to MSLAB area. If 161 // not we must do below deep copy. Or else we will keep referring to the bigger chunk of 162 // memory and prevent it from getting GCed. 163 // Copy to MSLAB would not have happened if 164 // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled" 165 // 2. When the size of the cell is bigger than the max size supported by MSLAB. See 166 // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB 167 // 3. When cells are from Append/Increment operation. 168 if (!mslabUsed) { 169 toAdd = deepCopyIfNeeded(toAdd); 170 } 171 internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing); 172 } 173 174 private void doUpsert(MutableSegment currentActive, Cell cell, long readpoint, 175 MemStoreSizing memstoreSizing) { 176 // Add the Cell to the MemStore 177 // Use the internalAdd method here since we (a) already have a lock 178 // and (b) cannot safely use the MSLAB here without potentially 179 // hitting OOME - see TestMemStore.testUpsertMSLAB for a 180 // test that triggers the pathological case if we don't avoid MSLAB 181 // here. 182 // This cell data is backed by the same byte[] where we read request in RPC(See 183 // HBASE-15180). We must do below deep copy. Or else we will keep referring to the bigger 184 // chunk of memory and prevent it from getting GCed. 185 cell = deepCopyIfNeeded(cell); 186 boolean sizeAddedPreOperation = sizeAddedPreOperation(); 187 currentActive.upsert(cell, readpoint, memstoreSizing, sizeAddedPreOperation); 188 setOldestEditTimeToNow(); 189 } 190 191 /** 192 * Issue any synchronization and test needed before applying the update 193 * @param currentActive the segment to be updated 194 * @param cell the cell to be added 195 * @param memstoreSizing object to accumulate region size changes 196 * @return true iff can proceed with applying the update 197 */ 198 protected abstract boolean preUpdate(MutableSegment currentActive, Cell cell, 199 MemStoreSizing memstoreSizing); 200 201 /** 202 * Issue any post update synchronization and tests 203 * @param currentActive updated segment 204 */ 205 protected abstract void postUpdate(MutableSegment currentActive); 206 207 private static Cell deepCopyIfNeeded(Cell cell) { 208 if (cell instanceof ExtendedCell) { 209 return ((ExtendedCell) cell).deepClone(); 210 } 211 return cell; 212 } 213 214 @Override 215 public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) { 216 for (Cell cell : cells) { 217 upsert(cell, readpoint, memstoreSizing); 218 } 219 } 220 221 /** Returns Oldest timestamp of all the Cells in the MemStore */ 222 @Override 223 public long timeOfOldestEdit() { 224 return timeOfOldestEdit; 225 } 226 227 /** 228 * This method is protected under HStore write lock.<br/> 229 * The passed snapshot was successfully persisted; it can be let go. 230 * @param id Id of the snapshot to clean out. 231 * @see MemStore#snapshot() 232 */ 233 @Override 234 public void clearSnapshot(long id) throws UnexpectedStateException { 235 if (this.snapshotId == -1) return; // already cleared 236 if (this.snapshotId != id) { 237 throw new UnexpectedStateException( 238 "Current snapshot id is " + this.snapshotId + ",passed " + id); 239 } 240 // OK. Passed in snapshot is same as current snapshot. If not-empty, 241 // create a new snapshot and let the old one go. 242 doClearSnapShot(); 243 } 244 245 protected void doClearSnapShot() { 246 Segment oldSnapshot = this.snapshot; 247 if (!this.snapshot.isEmpty()) { 248 this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator); 249 } 250 this.snapshotId = NO_SNAPSHOT_ID; 251 oldSnapshot.close(); 252 } 253 254 @Override 255 public MemStoreSize getSnapshotSize() { 256 return this.snapshot.getMemStoreSize(); 257 } 258 259 @Override 260 public String toString() { 261 StringBuilder buf = new StringBuilder(); 262 int i = 1; 263 try { 264 for (Segment segment : getSegments()) { 265 buf.append("Segment (").append(i).append(") ").append(segment.toString()).append("; "); 266 i++; 267 } 268 } catch (IOException e) { 269 return e.toString(); 270 } 271 return buf.toString(); 272 } 273 274 protected Configuration getConfiguration() { 275 return conf; 276 } 277 278 protected void dump(Logger log) { 279 getActive().dump(log); 280 snapshot.dump(log); 281 } 282 283 /* 284 * nn * @return Return lowest of a or b or null if both a and b are null 285 */ 286 protected Cell getLowest(final Cell a, final Cell b) { 287 if (a == null) { 288 return b; 289 } 290 if (b == null) { 291 return a; 292 } 293 return comparator.compareRows(a, b) <= 0 ? a : b; 294 } 295 296 /* 297 * @param key Find row that follows this one. If null, return first. 298 * @param set Set to look in for a row beyond <code>row</code>. 299 * @return Next row or null if none found. If one found, will be a new KeyValue -- can be 300 * destroyed by subsequent calls to this method. 301 */ 302 protected Cell getNextRow(final Cell key, final NavigableSet<Cell> set) { 303 Cell result = null; 304 SortedSet<Cell> tail = key == null ? set : set.tailSet(key); 305 // Iterate until we fall into the next row; i.e. move off current row 306 for (Cell cell : tail) { 307 if (comparator.compareRows(cell, key) <= 0) { 308 continue; 309 } 310 // Note: Not suppressing deletes or expired cells. Needs to be handled 311 // by higher up functions. 312 result = cell; 313 break; 314 } 315 return result; 316 } 317 318 /** 319 * If the segment has a memory allocator the cell is being cloned to this space, and returned; 320 * Otherwise the given cell is returned When a cell's size is too big (bigger than maxAlloc), it 321 * is not allocated on MSLAB. Since the process of flattening to CellChunkMap assumes that all 322 * cells are allocated on MSLAB, during this process, the input parameter forceCloneOfBigCell is 323 * set to 'true' and the cell is copied into MSLAB. 324 * @param cell the cell to clone 325 * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap. 326 * @return either the given cell or its clone 327 */ 328 private Cell maybeCloneWithAllocator(MutableSegment currentActive, Cell cell, 329 boolean forceCloneOfBigCell) { 330 return currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell); 331 } 332 333 /* 334 * Internal version of add() that doesn't clone Cells with the allocator, and doesn't take the 335 * lock. Callers should ensure they already have the read lock taken 336 * @param toAdd the cell to add 337 * @param mslabUsed whether using MSLAB 338 * @param memstoreSizing object to accumulate changed size 339 */ 340 private void internalAdd(MutableSegment currentActive, final Cell toAdd, final boolean mslabUsed, 341 MemStoreSizing memstoreSizing) { 342 boolean sizeAddedPreOperation = sizeAddedPreOperation(); 343 currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation); 344 setOldestEditTimeToNow(); 345 } 346 347 protected abstract boolean sizeAddedPreOperation(); 348 349 private void setOldestEditTimeToNow() { 350 if (timeOfOldestEdit == Long.MAX_VALUE) { 351 timeOfOldestEdit = EnvironmentEdgeManager.currentTime(); 352 } 353 } 354 355 /** 356 * Returns The total size of cells in this memstore. We will not consider cells in the snapshot 357 */ 358 protected abstract long keySize(); 359 360 /** 361 * @return The total heap size of cells in this memstore. We will not consider cells in the 362 * snapshot 363 */ 364 protected abstract long heapSize(); 365 366 protected CellComparator getComparator() { 367 return comparator; 368 } 369 370 MutableSegment getActive() { 371 return active; 372 } 373 374 ImmutableSegment getSnapshot() { 375 return snapshot; 376 } 377 378 /** Returns an ordered list of segments from most recent to oldest in memstore */ 379 protected abstract List<Segment> getSegments() throws IOException; 380 381}