001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 022import java.io.IOException; 023import java.util.List; 024import java.util.NavigableSet; 025import java.util.SortedSet; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellComparator; 030import org.apache.hadoop.hbase.ExtendedCell; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.ClassSize; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037 038/** 039 * An abstract class, which implements the behaviour shared by all concrete memstore instances. 040 */ 041@InterfaceAudience.Private 042public abstract class AbstractMemStore implements MemStore { 043 044 private static final long NO_SNAPSHOT_ID = -1; 045 046 private final Configuration conf; 047 private final CellComparator comparator; 048 049 // active segment absorbs write operations 050 protected volatile MutableSegment active; 051 // Snapshot of memstore. Made for flusher. 052 protected volatile ImmutableSegment snapshot; 053 protected volatile long snapshotId; 054 // Used to track when to flush 055 private volatile long timeOfOldestEdit; 056 057 protected RegionServicesForStores regionServices; 058 059 public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT 060 + (5 * ClassSize.REFERENCE) 061 + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit 062 063 public final static long DEEP_OVERHEAD = FIXED_OVERHEAD; 064 065 public static void addToScanners(List<? extends Segment> segments, long readPt, 066 List<KeyValueScanner> scanners) { 067 for (Segment item : segments) { 068 addToScanners(item, readPt, scanners); 069 } 070 } 071 072 protected static void addToScanners(Segment segment, long readPt, 073 List<KeyValueScanner> scanners) { 074 scanners.add(segment.getScanner(readPt)); 075 } 076 077 protected AbstractMemStore(final Configuration conf, final CellComparator c, 078 final RegionServicesForStores regionServices) { 079 this.conf = conf; 080 this.comparator = c; 081 this.regionServices = regionServices; 082 resetActive(); 083 this.snapshot = SegmentFactory.instance().createImmutableSegment(c); 084 this.snapshotId = NO_SNAPSHOT_ID; 085 } 086 087 protected void resetActive() { 088 // Record the MutableSegment' heap overhead when initialing 089 MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); 090 // Reset heap to not include any keys 091 this.active = SegmentFactory.instance() 092 .createMutableSegment(conf, comparator, memstoreAccounting); 093 // regionServices can be null when testing 094 if (regionServices != null) { 095 regionServices.addMemStoreSize(memstoreAccounting.getDataSize(), 096 memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(), 097 memstoreAccounting.getCellsCount()); 098 } 099 this.timeOfOldestEdit = Long.MAX_VALUE; 100 } 101 102 /** 103 * Updates the wal with the lowest sequence id (oldest entry) that is still in memory 104 * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or 105 * only if it is greater than the previous sequence id 106 */ 107 public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent); 108 109 @Override 110 public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) { 111 for (Cell cell : cells) { 112 add(cell, memstoreSizing); 113 } 114 } 115 116 @Override 117 public void add(Cell cell, MemStoreSizing memstoreSizing) { 118 Cell toAdd = maybeCloneWithAllocator(cell, false); 119 boolean mslabUsed = (toAdd != cell); 120 // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By 121 // default MSLAB is ON and we might have copied cell to MSLAB area. If not we must do below deep 122 // copy. Or else we will keep referring to the bigger chunk of memory and prevent it from 123 // getting GCed. 124 // Copy to MSLAB would not have happened if 125 // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled" 126 // 2. When the size of the cell is bigger than the max size supported by MSLAB. See 127 // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB 128 // 3. When cells are from Append/Increment operation. 129 if (!mslabUsed) { 130 toAdd = deepCopyIfNeeded(toAdd); 131 } 132 internalAdd(toAdd, mslabUsed, memstoreSizing); 133 } 134 135 private static Cell deepCopyIfNeeded(Cell cell) { 136 if (cell instanceof ExtendedCell) { 137 return ((ExtendedCell) cell).deepClone(); 138 } 139 return cell; 140 } 141 142 @Override 143 public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) { 144 for (Cell cell : cells) { 145 upsert(cell, readpoint, memstoreSizing); 146 } 147 } 148 149 /** 150 * @return Oldest timestamp of all the Cells in the MemStore 151 */ 152 @Override 153 public long timeOfOldestEdit() { 154 return timeOfOldestEdit; 155 } 156 157 /** 158 * The passed snapshot was successfully persisted; it can be let go. 159 * @param id Id of the snapshot to clean out. 160 * @see MemStore#snapshot() 161 */ 162 @Override 163 public void clearSnapshot(long id) throws UnexpectedStateException { 164 if (this.snapshotId == -1) return; // already cleared 165 if (this.snapshotId != id) { 166 throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " 167 + id); 168 } 169 // OK. Passed in snapshot is same as current snapshot. If not-empty, 170 // create a new snapshot and let the old one go. 171 Segment oldSnapshot = this.snapshot; 172 if (!this.snapshot.isEmpty()) { 173 this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator); 174 } 175 this.snapshotId = NO_SNAPSHOT_ID; 176 oldSnapshot.close(); 177 } 178 179 @Override 180 public MemStoreSize getSnapshotSize() { 181 return this.snapshot.getMemStoreSize(); 182 } 183 184 @Override 185 public String toString() { 186 StringBuilder buf = new StringBuilder(); 187 int i = 1; 188 try { 189 for (Segment segment : getSegments()) { 190 buf.append("Segment (").append(i).append(") ").append(segment.toString()).append("; "); 191 i++; 192 } 193 } catch (IOException e){ 194 return e.toString(); 195 } 196 return buf.toString(); 197 } 198 199 protected Configuration getConfiguration() { 200 return conf; 201 } 202 203 protected void dump(Logger log) { 204 active.dump(log); 205 snapshot.dump(log); 206 } 207 208 209 /* 210 * Inserts the specified Cell into MemStore and deletes any existing 211 * versions of the same row/family/qualifier as the specified Cell. 212 * <p> 213 * First, the specified Cell is inserted into the Memstore. 214 * <p> 215 * If there are any existing Cell in this MemStore with the same row, 216 * family, and qualifier, they are removed. 217 * <p> 218 * Callers must hold the read lock. 219 * 220 * @param cell the cell to be updated 221 * @param readpoint readpoint below which we can safely remove duplicate KVs 222 * @param memstoreSize 223 */ 224 private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) { 225 // Add the Cell to the MemStore 226 // Use the internalAdd method here since we (a) already have a lock 227 // and (b) cannot safely use the MSLAB here without potentially 228 // hitting OOME - see TestMemStore.testUpsertMSLAB for a 229 // test that triggers the pathological case if we don't avoid MSLAB 230 // here. 231 // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). We 232 // must do below deep copy. Or else we will keep referring to the bigger chunk of memory and 233 // prevent it from getting GCed. 234 cell = deepCopyIfNeeded(cell); 235 this.active.upsert(cell, readpoint, memstoreSizing); 236 setOldestEditTimeToNow(); 237 checkActiveSize(); 238 } 239 240 /* 241 * @param a 242 * @param b 243 * @return Return lowest of a or b or null if both a and b are null 244 */ 245 protected Cell getLowest(final Cell a, final Cell b) { 246 if (a == null) { 247 return b; 248 } 249 if (b == null) { 250 return a; 251 } 252 return comparator.compareRows(a, b) <= 0? a: b; 253 } 254 255 /* 256 * @param key Find row that follows this one. If null, return first. 257 * @param set Set to look in for a row beyond <code>row</code>. 258 * @return Next row or null if none found. If one found, will be a new 259 * KeyValue -- can be destroyed by subsequent calls to this method. 260 */ 261 protected Cell getNextRow(final Cell key, 262 final NavigableSet<Cell> set) { 263 Cell result = null; 264 SortedSet<Cell> tail = key == null? set: set.tailSet(key); 265 // Iterate until we fall into the next row; i.e. move off current row 266 for (Cell cell: tail) { 267 if (comparator.compareRows(cell, key) <= 0) { 268 continue; 269 } 270 // Note: Not suppressing deletes or expired cells. Needs to be handled 271 // by higher up functions. 272 result = cell; 273 break; 274 } 275 return result; 276 } 277 278 /** 279 * If the segment has a memory allocator the cell is being cloned to this space, and returned; 280 * Otherwise the given cell is returned 281 * 282 * When a cell's size is too big (bigger than maxAlloc), it is not allocated on MSLAB. 283 * Since the process of flattening to CellChunkMap assumes that all cells are allocated on MSLAB, 284 * during this process, the input parameter forceCloneOfBigCell is set to 'true' 285 * and the cell is copied into MSLAB. 286 * 287 * @param cell the cell to clone 288 * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap. 289 * @return either the given cell or its clone 290 */ 291 private Cell maybeCloneWithAllocator(Cell cell, boolean forceCloneOfBigCell) { 292 return active.maybeCloneWithAllocator(cell, forceCloneOfBigCell); 293 } 294 295 /* 296 * Internal version of add() that doesn't clone Cells with the 297 * allocator, and doesn't take the lock. 298 * 299 * Callers should ensure they already have the read lock taken 300 * @param toAdd the cell to add 301 * @param mslabUsed whether using MSLAB 302 * @param memstoreSize 303 */ 304 private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemStoreSizing memstoreSizing) { 305 active.add(toAdd, mslabUsed, memstoreSizing); 306 setOldestEditTimeToNow(); 307 checkActiveSize(); 308 } 309 310 private void setOldestEditTimeToNow() { 311 if (timeOfOldestEdit == Long.MAX_VALUE) { 312 timeOfOldestEdit = EnvironmentEdgeManager.currentTime(); 313 } 314 } 315 316 /** 317 * @return The total size of cells in this memstore. We will not consider cells in the snapshot 318 */ 319 protected abstract long keySize(); 320 321 /** 322 * @return The total heap size of cells in this memstore. We will not consider cells in the 323 * snapshot 324 */ 325 protected abstract long heapSize(); 326 327 protected CellComparator getComparator() { 328 return comparator; 329 } 330 331 @VisibleForTesting 332 MutableSegment getActive() { 333 return active; 334 } 335 336 @VisibleForTesting 337 ImmutableSegment getSnapshot() { 338 return snapshot; 339 } 340 341 /** 342 * Check whether anything need to be done based on the current active set size 343 */ 344 protected abstract void checkActiveSize(); 345 346 /** 347 * @return an ordered list of segments from most recent to oldest in memstore 348 */ 349 protected abstract List<Segment> getSegments() throws IOException; 350 351}