001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.lang.management.ManagementFactory;
023import java.lang.management.RuntimeMXBean;
024import java.util.ArrayList;
025import java.util.List;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.ClassSize;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039
040/**
041 * The MemStore holds in-memory modifications to the Store.  Modifications
042 * are {@link Cell}s.  When asked to flush, current memstore is moved
043 * to snapshot and is cleared.  We continue to serve edits out of new memstore
044 * and backing snapshot until flusher reports in that the flush succeeded. At
045 * this point we let the snapshot go.
046 *  <p>
047 * The MemStore functions should not be called in parallel. Callers should hold
048 *  write and read locks. This is done in {@link HStore}.
049 *  </p>
050 *
051 * TODO: Adjust size of the memstore when we remove items because they have
052 * been deleted.
053 * TODO: With new KVSLS, need to make sure we update HeapSize with difference
054 * in KV size.
055 */
056@InterfaceAudience.Private
057public class DefaultMemStore extends AbstractMemStore {
058  private static final Logger LOG = LoggerFactory.getLogger(DefaultMemStore.class);
059
060  public final static long DEEP_OVERHEAD = ClassSize.align(AbstractMemStore.DEEP_OVERHEAD);
061  public final static long FIXED_OVERHEAD = ClassSize.align(AbstractMemStore.FIXED_OVERHEAD);
062  /**
063   * Default constructor. Used for tests.
064   */
065  public DefaultMemStore() {
066    this(HBaseConfiguration.create(), CellComparator.getInstance(), null);
067  }
068
069  /**
070   * Constructor.
071   * @param c Comparator
072   */
073  public DefaultMemStore(final Configuration conf, final CellComparator c) {
074    super(conf, c, null);
075  }
076
077  /**
078   * Constructor.
079   * @param c Comparator
080   */
081  public DefaultMemStore(final Configuration conf, final CellComparator c,
082      final RegionServicesForStores regionServices) {
083    super(conf, c, regionServices);
084  }
085
086  /**
087   * Creates a snapshot of the current memstore.
088   * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
089   */
090  @Override
091  public MemStoreSnapshot snapshot() {
092    // If snapshot currently has entries, then flusher failed or didn't call
093    // cleanup.  Log a warning.
094    if (!this.snapshot.isEmpty()) {
095      LOG.warn("Snapshot called again without clearing previous. " +
096          "Doing nothing. Another ongoing flush or did we fail last attempt?");
097    } else {
098      this.snapshotId = EnvironmentEdgeManager.currentTime();
099      if (!this.active.isEmpty()) {
100        // Record the ImmutableSegment' heap overhead when initialing
101        MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
102        ImmutableSegment immutableSegment = SegmentFactory.instance().
103            createImmutableSegment(this.active, memstoreAccounting);
104        // regionServices can be null when testing
105        if (regionServices != null) {
106          regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
107            memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(),
108            memstoreAccounting.getCellsCount());
109        }
110        this.snapshot = immutableSegment;
111        resetActive();
112      }
113    }
114    return new MemStoreSnapshot(this.snapshotId, this.snapshot);
115  }
116
117  @Override
118  public MemStoreSize getFlushableSize() {
119    MemStoreSize mss = getSnapshotSize();
120    return mss.getDataSize() > 0? mss: this.active.getMemStoreSize();
121  }
122
123  @Override
124  protected long keySize() {
125    return this.active.getDataSize();
126  }
127
128  @Override
129  protected long heapSize() {
130    return this.active.getHeapSize();
131  }
132
133  @Override
134  /*
135   * Scanners are ordered from 0 (oldest) to newest in increasing order.
136   */
137  public List<KeyValueScanner> getScanners(long readPt) throws IOException {
138    List<KeyValueScanner> list = new ArrayList<>();
139    addToScanners(active, readPt, list);
140    addToScanners(snapshot.getAllSegments(), readPt, list);
141    return list;
142  }
143
144  @Override
145  protected List<Segment> getSegments() throws IOException {
146    List<Segment> list = new ArrayList<>(2);
147    list.add(this.active);
148    list.add(this.snapshot);
149    return list;
150  }
151
152  /**
153   * @param cell Find the row that comes after this one.  If null, we return the
154   * first.
155   * @return Next row or null if none found.
156   */
157  Cell getNextRow(final Cell cell) {
158    return getLowest(
159        getNextRow(cell, this.active.getCellSet()),
160        getNextRow(cell, this.snapshot.getCellSet()));
161  }
162
163  @Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) {
164  }
165
166  @Override
167  public MemStoreSize size() {
168    return active.getMemStoreSize();
169  }
170
171  /**
172   * Check whether anything need to be done based on the current active set size
173   * Nothing need to be done for the DefaultMemStore
174   */
175  @Override
176  protected void checkActiveSize() {
177    return;
178  }
179
180  @Override
181  public long preFlushSeqIDEstimation() {
182    return HConstants.NO_SEQNUM;
183  }
184
185  @Override public boolean isSloppy() {
186    return false;
187  }
188
189  /**
190   * Code to help figure if our approximation of object heap sizes is close
191   * enough.  See hbase-900.  Fills memstores then waits so user can heap
192   * dump and bring up resultant hprof in something like jprofiler which
193   * allows you get 'deep size' on objects.
194   * @param args main args
195   */
196  public static void main(String [] args) {
197    RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
198    LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
199      runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
200    LOG.info("vmInputArguments=" + runtime.getInputArguments());
201    DefaultMemStore memstore1 = new DefaultMemStore();
202    // TODO: x32 vs x64
203    final int count = 10000;
204    byte [] fam = Bytes.toBytes("col");
205    byte [] qf = Bytes.toBytes("umn");
206    byte [] empty = new byte[0];
207    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
208    for (int i = 0; i < count; i++) {
209      // Give each its own ts
210      memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing);
211    }
212    LOG.info("memstore1 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() +
213        memStoreSizing.getMemStoreSize().getHeapSize());
214    for (int i = 0; i < count; i++) {
215      memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing);
216    }
217    LOG.info("memstore1 estimated size (2nd loading of same data)={}",
218        memStoreSizing.getMemStoreSize().getDataSize() +
219            memStoreSizing.getMemStoreSize().getHeapSize());
220    // Make a variably sized memstore.
221    DefaultMemStore memstore2 = new DefaultMemStore();
222    memStoreSizing = new NonThreadSafeMemStoreSizing();
223    for (int i = 0; i < count; i++) {
224      memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memStoreSizing);
225    }
226    LOG.info("memstore2 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() +
227        memStoreSizing.getMemStoreSize().getHeapSize());
228    final int seconds = 30;
229    LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
230    LOG.info("Exiting.");
231  }
232}