001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.lang.management.ManagementFactory; 023import java.lang.management.RuntimeMXBean; 024import java.util.ArrayList; 025import java.util.List; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellComparator; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.ClassSize; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 039 040/** 041 * The MemStore holds in-memory modifications to the Store. Modifications 042 * are {@link Cell}s. When asked to flush, current memstore is moved 043 * to snapshot and is cleared. We continue to serve edits out of new memstore 044 * and backing snapshot until flusher reports in that the flush succeeded. At 045 * this point we let the snapshot go. 046 * <p> 047 * The MemStore functions should not be called in parallel. Callers should hold 048 * write and read locks. This is done in {@link HStore}. 049 * </p> 050 * 051 * TODO: Adjust size of the memstore when we remove items because they have 052 * been deleted. 053 * TODO: With new KVSLS, need to make sure we update HeapSize with difference 054 * in KV size. 055 */ 056@InterfaceAudience.Private 057public class DefaultMemStore extends AbstractMemStore { 058 private static final Logger LOG = LoggerFactory.getLogger(DefaultMemStore.class); 059 060 public final static long DEEP_OVERHEAD = ClassSize.align(AbstractMemStore.DEEP_OVERHEAD); 061 public final static long FIXED_OVERHEAD = ClassSize.align(AbstractMemStore.FIXED_OVERHEAD); 062 /** 063 * Default constructor. Used for tests. 064 */ 065 public DefaultMemStore() { 066 this(HBaseConfiguration.create(), CellComparator.getInstance(), null); 067 } 068 069 /** 070 * Constructor. 071 * @param c Comparator 072 */ 073 public DefaultMemStore(final Configuration conf, final CellComparator c) { 074 super(conf, c, null); 075 } 076 077 /** 078 * Constructor. 079 * @param c Comparator 080 */ 081 public DefaultMemStore(final Configuration conf, final CellComparator c, 082 final RegionServicesForStores regionServices) { 083 super(conf, c, regionServices); 084 } 085 086 /** 087 * Creates a snapshot of the current memstore. 088 * Snapshot must be cleared by call to {@link #clearSnapshot(long)} 089 */ 090 @Override 091 public MemStoreSnapshot snapshot() { 092 // If snapshot currently has entries, then flusher failed or didn't call 093 // cleanup. Log a warning. 094 if (!this.snapshot.isEmpty()) { 095 LOG.warn("Snapshot called again without clearing previous. " + 096 "Doing nothing. Another ongoing flush or did we fail last attempt?"); 097 } else { 098 this.snapshotId = EnvironmentEdgeManager.currentTime(); 099 if (!this.active.isEmpty()) { 100 // Record the ImmutableSegment' heap overhead when initialing 101 MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); 102 ImmutableSegment immutableSegment = SegmentFactory.instance(). 103 createImmutableSegment(this.active, memstoreAccounting); 104 // regionServices can be null when testing 105 if (regionServices != null) { 106 regionServices.addMemStoreSize(memstoreAccounting.getDataSize(), 107 memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(), 108 memstoreAccounting.getCellsCount()); 109 } 110 this.snapshot = immutableSegment; 111 resetActive(); 112 } 113 } 114 return new MemStoreSnapshot(this.snapshotId, this.snapshot); 115 } 116 117 @Override 118 public MemStoreSize getFlushableSize() { 119 MemStoreSize mss = getSnapshotSize(); 120 return mss.getDataSize() > 0? mss: this.active.getMemStoreSize(); 121 } 122 123 @Override 124 protected long keySize() { 125 return this.active.getDataSize(); 126 } 127 128 @Override 129 protected long heapSize() { 130 return this.active.getHeapSize(); 131 } 132 133 @Override 134 /* 135 * Scanners are ordered from 0 (oldest) to newest in increasing order. 136 */ 137 public List<KeyValueScanner> getScanners(long readPt) throws IOException { 138 List<KeyValueScanner> list = new ArrayList<>(); 139 long order = snapshot.getNumOfSegments(); 140 order = addToScanners(active, readPt, order, list); 141 addToScanners(snapshot.getAllSegments(), readPt, order, list); 142 return list; 143 } 144 145 @Override 146 protected List<Segment> getSegments() throws IOException { 147 List<Segment> list = new ArrayList<>(2); 148 list.add(this.active); 149 list.add(this.snapshot); 150 return list; 151 } 152 153 /** 154 * @param cell Find the row that comes after this one. If null, we return the 155 * first. 156 * @return Next row or null if none found. 157 */ 158 Cell getNextRow(final Cell cell) { 159 return getLowest( 160 getNextRow(cell, this.active.getCellSet()), 161 getNextRow(cell, this.snapshot.getCellSet())); 162 } 163 164 @Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) { 165 } 166 167 @Override 168 public MemStoreSize size() { 169 return active.getMemStoreSize(); 170 } 171 172 /** 173 * Check whether anything need to be done based on the current active set size 174 * Nothing need to be done for the DefaultMemStore 175 */ 176 @Override 177 protected void checkActiveSize() { 178 return; 179 } 180 181 @Override 182 public long preFlushSeqIDEstimation() { 183 return HConstants.NO_SEQNUM; 184 } 185 186 @Override public boolean isSloppy() { 187 return false; 188 } 189 190 /** 191 * Code to help figure if our approximation of object heap sizes is close 192 * enough. See hbase-900. Fills memstores then waits so user can heap 193 * dump and bring up resultant hprof in something like jprofiler which 194 * allows you get 'deep size' on objects. 195 * @param args main args 196 */ 197 public static void main(String [] args) { 198 RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); 199 LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" + 200 runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion()); 201 LOG.info("vmInputArguments=" + runtime.getInputArguments()); 202 DefaultMemStore memstore1 = new DefaultMemStore(); 203 // TODO: x32 vs x64 204 final int count = 10000; 205 byte [] fam = Bytes.toBytes("col"); 206 byte [] qf = Bytes.toBytes("umn"); 207 byte [] empty = new byte[0]; 208 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 209 for (int i = 0; i < count; i++) { 210 // Give each its own ts 211 memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing); 212 } 213 LOG.info("memstore1 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() + 214 memStoreSizing.getMemStoreSize().getHeapSize()); 215 for (int i = 0; i < count; i++) { 216 memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing); 217 } 218 LOG.info("memstore1 estimated size (2nd loading of same data)={}", 219 memStoreSizing.getMemStoreSize().getDataSize() + 220 memStoreSizing.getMemStoreSize().getHeapSize()); 221 // Make a variably sized memstore. 222 DefaultMemStore memstore2 = new DefaultMemStore(); 223 memStoreSizing = new NonThreadSafeMemStoreSizing(); 224 for (int i = 0; i < count; i++) { 225 memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memStoreSizing); 226 } 227 LOG.info("memstore2 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() + 228 memStoreSizing.getMemStoreSize().getHeapSize()); 229 final int seconds = 30; 230 LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); 231 LOG.info("Exiting."); 232 } 233}