001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.lang.management.ManagementFactory; 022import java.lang.management.RuntimeMXBean; 023import java.util.ArrayList; 024import java.util.List; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.HBaseConfiguration; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.KeyValue; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.ClassSize; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * The MemStore holds in-memory modifications to the Store. Modifications are {@link Cell}s. When 040 * asked to flush, current memstore is moved to snapshot and is cleared. We continue to serve edits 041 * out of new memstore and backing snapshot until flusher reports in that the flush succeeded. At 042 * this point we let the snapshot go. 043 * <p> 044 * The MemStore functions should not be called in parallel. Callers should hold write and read 045 * locks. This is done in {@link HStore}. 046 * </p> 047 * TODO: Adjust size of the memstore when we remove items because they have been deleted. TODO: With 048 * new KVSLS, need to make sure we update HeapSize with difference in KV size. 049 */ 050@InterfaceAudience.Private 051public class DefaultMemStore extends AbstractMemStore { 052 private static final Logger LOG = LoggerFactory.getLogger(DefaultMemStore.class); 053 054 public final static long DEEP_OVERHEAD = ClassSize.align(AbstractMemStore.DEEP_OVERHEAD); 055 public final static long FIXED_OVERHEAD = ClassSize.align(AbstractMemStore.FIXED_OVERHEAD); 056 057 /** 058 * Default constructor. Used for tests. 059 */ 060 public DefaultMemStore() { 061 this(HBaseConfiguration.create(), CellComparator.getInstance(), null); 062 } 063 064 /** 065 * Constructor. 066 * @param c Comparator 067 */ 068 public DefaultMemStore(final Configuration conf, final CellComparator c) { 069 super(conf, c, null); 070 } 071 072 /** 073 * Constructor. 074 * @param c Comparator 075 */ 076 public DefaultMemStore(final Configuration conf, final CellComparator c, 077 final RegionServicesForStores regionServices) { 078 super(conf, c, regionServices); 079 } 080 081 /** 082 * Creates a snapshot of the current memstore. Snapshot must be cleared by call to 083 * {@link #clearSnapshot(long)} 084 */ 085 @Override 086 public MemStoreSnapshot snapshot() { 087 // If snapshot currently has entries, then flusher failed or didn't call 088 // cleanup. Log a warning. 089 if (!this.snapshot.isEmpty()) { 090 LOG.warn("Snapshot called again without clearing previous. " 091 + "Doing nothing. Another ongoing flush or did we fail last attempt?"); 092 } else { 093 this.snapshotId = EnvironmentEdgeManager.currentTime(); 094 if (!getActive().isEmpty()) { 095 // Record the ImmutableSegment' heap overhead when initialing 096 MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); 097 ImmutableSegment immutableSegment = 098 SegmentFactory.instance().createImmutableSegment(getActive(), memstoreAccounting); 099 // regionServices can be null when testing 100 if (regionServices != null) { 101 regionServices.addMemStoreSize(memstoreAccounting.getDataSize(), 102 memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(), 103 memstoreAccounting.getCellsCount()); 104 } 105 this.snapshot = immutableSegment; 106 resetActive(); 107 resetTimeOfOldestEdit(); 108 } 109 } 110 return new MemStoreSnapshot(this.snapshotId, this.snapshot); 111 } 112 113 @Override 114 public MemStoreSize getFlushableSize() { 115 MemStoreSize mss = getSnapshotSize(); 116 return mss.getDataSize() > 0 ? mss : getActive().getMemStoreSize(); 117 } 118 119 @Override 120 protected long keySize() { 121 return getActive().getDataSize(); 122 } 123 124 @Override 125 protected long heapSize() { 126 return getActive().getHeapSize(); 127 } 128 129 @Override 130 /** 131 * This method is protected under {@link HStore#lock} read lock. <br/> 132 * Scanners are ordered from 0 (oldest) to newest in increasing order. 133 */ 134 public List<KeyValueScanner> getScanners(long readPt) throws IOException { 135 List<KeyValueScanner> list = new ArrayList<>(); 136 addToScanners(getActive(), readPt, list); 137 addToScanners(getSnapshotSegments(), readPt, list); 138 return list; 139 } 140 141 protected List<Segment> getSnapshotSegments() { 142 return snapshot.getAllSegments(); 143 } 144 145 @Override 146 protected List<Segment> getSegments() throws IOException { 147 List<Segment> list = new ArrayList<>(2); 148 list.add(getActive()); 149 list.add(snapshot); 150 return list; 151 } 152 153 /** 154 * @param cell Find the row that comes after this one. If null, we return the first. 155 * @return Next row or null if none found. 156 */ 157 Cell getNextRow(final Cell cell) { 158 return getLowest(getNextRow(cell, this.getActive().getCellSet()), 159 getNextRow(cell, this.snapshot.getCellSet())); 160 } 161 162 @Override 163 public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) { 164 } 165 166 @Override 167 protected boolean preUpdate(MutableSegment currentActive, Cell cell, 168 MemStoreSizing memstoreSizing) { 169 return true; 170 } 171 172 @Override 173 protected void postUpdate(MutableSegment currentActive) { 174 return; 175 } 176 177 @Override 178 protected boolean sizeAddedPreOperation() { 179 return false; 180 } 181 182 @Override 183 public MemStoreSize size() { 184 return getActive().getMemStoreSize(); 185 } 186 187 @Override 188 public long preFlushSeqIDEstimation() { 189 return HConstants.NO_SEQNUM; 190 } 191 192 @Override 193 public boolean isSloppy() { 194 return false; 195 } 196 197 /** 198 * Code to help figure if our approximation of object heap sizes is close enough. See hbase-900. 199 * Fills memstores then waits so user can heap dump and bring up resultant hprof in something like 200 * jprofiler which allows you get 'deep size' on objects. 201 * @param args main args 202 */ 203 public static void main(String[] args) { 204 RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); 205 LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" + runtime.getVmVendor() 206 + ", vmVersion=" + runtime.getVmVersion()); 207 LOG.info("vmInputArguments=" + runtime.getInputArguments()); 208 DefaultMemStore memstore1 = new DefaultMemStore(); 209 // TODO: x32 vs x64 210 final int count = 10000; 211 byte[] fam = Bytes.toBytes("col"); 212 byte[] qf = Bytes.toBytes("umn"); 213 byte[] empty = new byte[0]; 214 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 215 for (int i = 0; i < count; i++) { 216 // Give each its own ts 217 memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing); 218 } 219 LOG.info("memstore1 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() 220 + memStoreSizing.getMemStoreSize().getHeapSize()); 221 for (int i = 0; i < count; i++) { 222 memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing); 223 } 224 LOG.info("memstore1 estimated size (2nd loading of same data)={}", 225 memStoreSizing.getMemStoreSize().getDataSize() 226 + memStoreSizing.getMemStoreSize().getHeapSize()); 227 // Make a variably sized memstore. 228 DefaultMemStore memstore2 = new DefaultMemStore(); 229 memStoreSizing = new NonThreadSafeMemStoreSizing(); 230 for (int i = 0; i < count; i++) { 231 memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memStoreSizing); 232 } 233 LOG.info("memstore2 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() 234 + memStoreSizing.getMemStoreSize().getHeapSize()); 235 final int seconds = 30; 236 LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); 237 LOG.info("Exiting."); 238 } 239}