001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 022import java.io.IOException; 023import java.util.List; 024import java.util.NavigableSet; 025import java.util.SortedSet; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellComparator; 030import org.apache.hadoop.hbase.ExtendedCell; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.ClassSize; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037 038/** 039 * An abstract class, which implements the behaviour shared by all concrete memstore instances. 040 */ 041@InterfaceAudience.Private 042public abstract class AbstractMemStore implements MemStore { 043 044 private static final long NO_SNAPSHOT_ID = -1; 045 046 private final Configuration conf; 047 private final CellComparator comparator; 048 049 // active segment absorbs write operations 050 protected volatile MutableSegment active; 051 // Snapshot of memstore. Made for flusher. 052 protected volatile ImmutableSegment snapshot; 053 protected volatile long snapshotId; 054 // Used to track when to flush 055 private volatile long timeOfOldestEdit; 056 057 protected RegionServicesForStores regionServices; 058 059 public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT 060 + (5 * ClassSize.REFERENCE) 061 + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit 062 063 public final static long DEEP_OVERHEAD = FIXED_OVERHEAD; 064 065 public static long addToScanners(List<? extends Segment> segments, long readPt, long order, 066 List<KeyValueScanner> scanners) { 067 for (Segment item : segments) { 068 order = addToScanners(item, readPt, order, scanners); 069 } 070 return order; 071 } 072 073 protected static long addToScanners(Segment segment, long readPt, long order, 074 List<KeyValueScanner> scanners) { 075 scanners.add(segment.getScanner(readPt, order)); 076 return order - 1; 077 } 078 079 protected AbstractMemStore(final Configuration conf, final CellComparator c, 080 final RegionServicesForStores regionServices) { 081 this.conf = conf; 082 this.comparator = c; 083 this.regionServices = regionServices; 084 resetActive(); 085 this.snapshot = SegmentFactory.instance().createImmutableSegment(c); 086 this.snapshotId = NO_SNAPSHOT_ID; 087 } 088 089 protected void resetActive() { 090 // Record the MutableSegment' heap overhead when initialing 091 MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); 092 // Reset heap to not include any keys 093 this.active = SegmentFactory.instance() 094 .createMutableSegment(conf, comparator, memstoreAccounting); 095 // regionServices can be null when testing 096 if (regionServices != null) { 097 regionServices.addMemStoreSize(memstoreAccounting.getDataSize(), 098 memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(), 099 memstoreAccounting.getCellsCount()); 100 } 101 this.timeOfOldestEdit = Long.MAX_VALUE; 102 } 103 104 /** 105 * Updates the wal with the lowest sequence id (oldest entry) that is still in memory 106 * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or 107 * only if it is greater than the previous sequence id 108 */ 109 public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent); 110 111 @Override 112 public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) { 113 for (Cell cell : cells) { 114 add(cell, memstoreSizing); 115 } 116 } 117 118 @Override 119 public void add(Cell cell, MemStoreSizing memstoreSizing) { 120 Cell toAdd = maybeCloneWithAllocator(cell, false); 121 boolean mslabUsed = (toAdd != cell); 122 // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By 123 // default MSLAB is ON and we might have copied cell to MSLAB area. If not we must do below deep 124 // copy. Or else we will keep referring to the bigger chunk of memory and prevent it from 125 // getting GCed. 126 // Copy to MSLAB would not have happened if 127 // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled" 128 // 2. When the size of the cell is bigger than the max size supported by MSLAB. See 129 // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB 130 // 3. When cells are from Append/Increment operation. 131 if (!mslabUsed) { 132 toAdd = deepCopyIfNeeded(toAdd); 133 } 134 internalAdd(toAdd, mslabUsed, memstoreSizing); 135 } 136 137 private static Cell deepCopyIfNeeded(Cell cell) { 138 if (cell instanceof ExtendedCell) { 139 return ((ExtendedCell) cell).deepClone(); 140 } 141 return cell; 142 } 143 144 @Override 145 public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) { 146 for (Cell cell : cells) { 147 upsert(cell, readpoint, memstoreSizing); 148 } 149 } 150 151 /** 152 * @return Oldest timestamp of all the Cells in the MemStore 153 */ 154 @Override 155 public long timeOfOldestEdit() { 156 return timeOfOldestEdit; 157 } 158 159 /** 160 * The passed snapshot was successfully persisted; it can be let go. 161 * @param id Id of the snapshot to clean out. 162 * @see MemStore#snapshot() 163 */ 164 @Override 165 public void clearSnapshot(long id) throws UnexpectedStateException { 166 if (this.snapshotId == -1) return; // already cleared 167 if (this.snapshotId != id) { 168 throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " 169 + id); 170 } 171 // OK. Passed in snapshot is same as current snapshot. If not-empty, 172 // create a new snapshot and let the old one go. 173 Segment oldSnapshot = this.snapshot; 174 if (!this.snapshot.isEmpty()) { 175 this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator); 176 } 177 this.snapshotId = NO_SNAPSHOT_ID; 178 oldSnapshot.close(); 179 } 180 181 @Override 182 public MemStoreSize getSnapshotSize() { 183 return this.snapshot.getMemStoreSize(); 184 } 185 186 @Override 187 public String toString() { 188 StringBuilder buf = new StringBuilder(); 189 int i = 1; 190 try { 191 for (Segment segment : getSegments()) { 192 buf.append("Segment (" + i + ") " + segment.toString() + "; "); 193 i++; 194 } 195 } catch (IOException e){ 196 return e.toString(); 197 } 198 return buf.toString(); 199 } 200 201 protected Configuration getConfiguration() { 202 return conf; 203 } 204 205 protected void dump(Logger log) { 206 active.dump(log); 207 snapshot.dump(log); 208 } 209 210 211 /* 212 * Inserts the specified Cell into MemStore and deletes any existing 213 * versions of the same row/family/qualifier as the specified Cell. 214 * <p> 215 * First, the specified Cell is inserted into the Memstore. 216 * <p> 217 * If there are any existing Cell in this MemStore with the same row, 218 * family, and qualifier, they are removed. 219 * <p> 220 * Callers must hold the read lock. 221 * 222 * @param cell the cell to be updated 223 * @param readpoint readpoint below which we can safely remove duplicate KVs 224 * @param memstoreSize 225 */ 226 private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) { 227 // Add the Cell to the MemStore 228 // Use the internalAdd method here since we (a) already have a lock 229 // and (b) cannot safely use the MSLAB here without potentially 230 // hitting OOME - see TestMemStore.testUpsertMSLAB for a 231 // test that triggers the pathological case if we don't avoid MSLAB 232 // here. 233 // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). We 234 // must do below deep copy. Or else we will keep referring to the bigger chunk of memory and 235 // prevent it from getting GCed. 236 cell = deepCopyIfNeeded(cell); 237 this.active.upsert(cell, readpoint, memstoreSizing); 238 setOldestEditTimeToNow(); 239 checkActiveSize(); 240 } 241 242 /* 243 * @param a 244 * @param b 245 * @return Return lowest of a or b or null if both a and b are null 246 */ 247 protected Cell getLowest(final Cell a, final Cell b) { 248 if (a == null) { 249 return b; 250 } 251 if (b == null) { 252 return a; 253 } 254 return comparator.compareRows(a, b) <= 0? a: b; 255 } 256 257 /* 258 * @param key Find row that follows this one. If null, return first. 259 * @param set Set to look in for a row beyond <code>row</code>. 260 * @return Next row or null if none found. If one found, will be a new 261 * KeyValue -- can be destroyed by subsequent calls to this method. 262 */ 263 protected Cell getNextRow(final Cell key, 264 final NavigableSet<Cell> set) { 265 Cell result = null; 266 SortedSet<Cell> tail = key == null? set: set.tailSet(key); 267 // Iterate until we fall into the next row; i.e. move off current row 268 for (Cell cell: tail) { 269 if (comparator.compareRows(cell, key) <= 0) { 270 continue; 271 } 272 // Note: Not suppressing deletes or expired cells. Needs to be handled 273 // by higher up functions. 274 result = cell; 275 break; 276 } 277 return result; 278 } 279 280 /** 281 * If the segment has a memory allocator the cell is being cloned to this space, and returned; 282 * Otherwise the given cell is returned 283 * 284 * When a cell's size is too big (bigger than maxAlloc), it is not allocated on MSLAB. 285 * Since the process of flattening to CellChunkMap assumes that all cells are allocated on MSLAB, 286 * during this process, the input parameter forceCloneOfBigCell is set to 'true' 287 * and the cell is copied into MSLAB. 288 * 289 * @param cell the cell to clone 290 * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap. 291 * @return either the given cell or its clone 292 */ 293 private Cell maybeCloneWithAllocator(Cell cell, boolean forceCloneOfBigCell) { 294 return active.maybeCloneWithAllocator(cell, forceCloneOfBigCell); 295 } 296 297 /* 298 * Internal version of add() that doesn't clone Cells with the 299 * allocator, and doesn't take the lock. 300 * 301 * Callers should ensure they already have the read lock taken 302 * @param toAdd the cell to add 303 * @param mslabUsed whether using MSLAB 304 * @param memstoreSize 305 */ 306 private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemStoreSizing memstoreSizing) { 307 active.add(toAdd, mslabUsed, memstoreSizing); 308 setOldestEditTimeToNow(); 309 checkActiveSize(); 310 } 311 312 private void setOldestEditTimeToNow() { 313 if (timeOfOldestEdit == Long.MAX_VALUE) { 314 timeOfOldestEdit = EnvironmentEdgeManager.currentTime(); 315 } 316 } 317 318 /** 319 * @return The total size of cells in this memstore. We will not consider cells in the snapshot 320 */ 321 protected abstract long keySize(); 322 323 /** 324 * @return The total heap size of cells in this memstore. We will not consider cells in the 325 * snapshot 326 */ 327 protected abstract long heapSize(); 328 329 protected CellComparator getComparator() { 330 return comparator; 331 } 332 333 @VisibleForTesting 334 MutableSegment getActive() { 335 return active; 336 } 337 338 @VisibleForTesting 339 ImmutableSegment getSnapshot() { 340 return snapshot; 341 } 342 343 /** 344 * Check whether anything need to be done based on the current active set size 345 */ 346 protected abstract void checkActiveSize(); 347 348 /** 349 * @return an ordered list of segments from most recent to oldest in memstore 350 */ 351 protected abstract List<Segment> getSegments() throws IOException; 352 353}