001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.lang.management.ManagementFactory; 023import java.lang.management.RuntimeMXBean; 024import java.util.ArrayList; 025import java.util.List; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellComparator; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.ClassSize; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 039 040/** 041 * The MemStore holds in-memory modifications to the Store. Modifications 042 * are {@link Cell}s. When asked to flush, current memstore is moved 043 * to snapshot and is cleared. We continue to serve edits out of new memstore 044 * and backing snapshot until flusher reports in that the flush succeeded. At 045 * this point we let the snapshot go. 046 * <p> 047 * The MemStore functions should not be called in parallel. Callers should hold 048 * write and read locks. This is done in {@link HStore}. 049 * </p> 050 * 051 * TODO: Adjust size of the memstore when we remove items because they have 052 * been deleted. 053 * TODO: With new KVSLS, need to make sure we update HeapSize with difference 054 * in KV size. 055 */ 056@InterfaceAudience.Private 057public class DefaultMemStore extends AbstractMemStore { 058 private static final Logger LOG = LoggerFactory.getLogger(DefaultMemStore.class); 059 060 public final static long DEEP_OVERHEAD = ClassSize.align(AbstractMemStore.DEEP_OVERHEAD); 061 public final static long FIXED_OVERHEAD = ClassSize.align(AbstractMemStore.FIXED_OVERHEAD); 062 /** 063 * Default constructor. Used for tests. 064 */ 065 public DefaultMemStore() { 066 this(HBaseConfiguration.create(), CellComparator.getInstance(), null); 067 } 068 069 /** 070 * Constructor. 071 * @param c Comparator 072 */ 073 public DefaultMemStore(final Configuration conf, final CellComparator c) { 074 super(conf, c, null); 075 } 076 077 /** 078 * Constructor. 079 * @param c Comparator 080 */ 081 public DefaultMemStore(final Configuration conf, final CellComparator c, 082 final RegionServicesForStores regionServices) { 083 super(conf, c, regionServices); 084 } 085 086 /** 087 * Creates a snapshot of the current memstore. 088 * Snapshot must be cleared by call to {@link #clearSnapshot(long)} 089 */ 090 @Override 091 public MemStoreSnapshot snapshot() { 092 // If snapshot currently has entries, then flusher failed or didn't call 093 // cleanup. Log a warning. 094 if (!this.snapshot.isEmpty()) { 095 LOG.warn("Snapshot called again without clearing previous. " + 096 "Doing nothing. Another ongoing flush or did we fail last attempt?"); 097 } else { 098 this.snapshotId = EnvironmentEdgeManager.currentTime(); 099 if (!getActive().isEmpty()) { 100 // Record the ImmutableSegment' heap overhead when initialing 101 MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); 102 ImmutableSegment immutableSegment = SegmentFactory.instance(). 103 createImmutableSegment(getActive(), memstoreAccounting); 104 // regionServices can be null when testing 105 if (regionServices != null) { 106 regionServices.addMemStoreSize(memstoreAccounting.getDataSize(), 107 memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(), 108 memstoreAccounting.getCellsCount()); 109 } 110 this.snapshot = immutableSegment; 111 resetActive(); 112 } 113 } 114 return new MemStoreSnapshot(this.snapshotId, this.snapshot); 115 } 116 117 @Override 118 public MemStoreSize getFlushableSize() { 119 MemStoreSize mss = getSnapshotSize(); 120 return mss.getDataSize() > 0? mss: getActive().getMemStoreSize(); 121 } 122 123 @Override 124 protected long keySize() { 125 return getActive().getDataSize(); 126 } 127 128 @Override 129 protected long heapSize() { 130 return getActive().getHeapSize(); 131 } 132 133 @Override 134 /* 135 * Scanners are ordered from 0 (oldest) to newest in increasing order. 136 */ 137 public List<KeyValueScanner> getScanners(long readPt) throws IOException { 138 List<KeyValueScanner> list = new ArrayList<>(); 139 addToScanners(getActive(), readPt, list); 140 addToScanners(snapshot.getAllSegments(), readPt, list); 141 return list; 142 } 143 144 @Override 145 protected List<Segment> getSegments() throws IOException { 146 List<Segment> list = new ArrayList<>(2); 147 list.add(getActive()); 148 list.add(snapshot); 149 return list; 150 } 151 152 /** 153 * @param cell Find the row that comes after this one. If null, we return the 154 * first. 155 * @return Next row or null if none found. 156 */ 157 Cell getNextRow(final Cell cell) { 158 return getLowest( 159 getNextRow(cell, this.getActive().getCellSet()), 160 getNextRow(cell, this.snapshot.getCellSet())); 161 } 162 163 @Override 164 public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) { 165 } 166 167 @Override 168 protected boolean preUpdate(MutableSegment currentActive, Cell cell, 169 MemStoreSizing memstoreSizing) { 170 return true; 171 } 172 173 @Override 174 protected void postUpdate(MutableSegment currentActive) { 175 return; 176 } 177 178 @Override 179 protected boolean sizeAddedPreOperation() { 180 return false; 181 } 182 183 @Override 184 public MemStoreSize size() { 185 return getActive().getMemStoreSize(); 186 } 187 188 @Override 189 public long preFlushSeqIDEstimation() { 190 return HConstants.NO_SEQNUM; 191 } 192 193 @Override 194 public boolean isSloppy() { 195 return false; 196 } 197 198 /** 199 * Code to help figure if our approximation of object heap sizes is close 200 * enough. See hbase-900. Fills memstores then waits so user can heap 201 * dump and bring up resultant hprof in something like jprofiler which 202 * allows you get 'deep size' on objects. 203 * @param args main args 204 */ 205 public static void main(String [] args) { 206 RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); 207 LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" + 208 runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion()); 209 LOG.info("vmInputArguments=" + runtime.getInputArguments()); 210 DefaultMemStore memstore1 = new DefaultMemStore(); 211 // TODO: x32 vs x64 212 final int count = 10000; 213 byte [] fam = Bytes.toBytes("col"); 214 byte [] qf = Bytes.toBytes("umn"); 215 byte [] empty = new byte[0]; 216 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 217 for (int i = 0; i < count; i++) { 218 // Give each its own ts 219 memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing); 220 } 221 LOG.info("memstore1 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() + 222 memStoreSizing.getMemStoreSize().getHeapSize()); 223 for (int i = 0; i < count; i++) { 224 memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing); 225 } 226 LOG.info("memstore1 estimated size (2nd loading of same data)={}", 227 memStoreSizing.getMemStoreSize().getDataSize() + 228 memStoreSizing.getMemStoreSize().getHeapSize()); 229 // Make a variably sized memstore. 230 DefaultMemStore memstore2 = new DefaultMemStore(); 231 memStoreSizing = new NonThreadSafeMemStoreSizing(); 232 for (int i = 0; i < count; i++) { 233 memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memStoreSizing); 234 } 235 LOG.info("memstore2 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() + 236 memStoreSizing.getMemStoreSize().getHeapSize()); 237 final int seconds = 30; 238 LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); 239 LOG.info("Exiting."); 240 } 241}