001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.lang.management.ManagementFactory;
023import java.lang.management.RuntimeMXBean;
024import java.util.ArrayList;
025import java.util.List;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.ClassSize;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039
040/**
041 * The MemStore holds in-memory modifications to the Store.  Modifications
042 * are {@link Cell}s.  When asked to flush, current memstore is moved
043 * to snapshot and is cleared.  We continue to serve edits out of new memstore
044 * and backing snapshot until flusher reports in that the flush succeeded. At
045 * this point we let the snapshot go.
046 *  <p>
047 * The MemStore functions should not be called in parallel. Callers should hold
048 *  write and read locks. This is done in {@link HStore}.
049 *  </p>
050 *
051 * TODO: Adjust size of the memstore when we remove items because they have
052 * been deleted.
053 * TODO: With new KVSLS, need to make sure we update HeapSize with difference
054 * in KV size.
055 */
056@InterfaceAudience.Private
057public class DefaultMemStore extends AbstractMemStore {
058  private static final Logger LOG = LoggerFactory.getLogger(DefaultMemStore.class);
059
060  public final static long DEEP_OVERHEAD = ClassSize.align(AbstractMemStore.DEEP_OVERHEAD);
061  public final static long FIXED_OVERHEAD = ClassSize.align(AbstractMemStore.FIXED_OVERHEAD);
062  /**
063   * Default constructor. Used for tests.
064   */
065  public DefaultMemStore() {
066    this(HBaseConfiguration.create(), CellComparator.getInstance(), null);
067  }
068
069  /**
070   * Constructor.
071   * @param c Comparator
072   */
073  public DefaultMemStore(final Configuration conf, final CellComparator c) {
074    super(conf, c, null);
075  }
076
077  /**
078   * Constructor.
079   * @param c Comparator
080   */
081  public DefaultMemStore(final Configuration conf, final CellComparator c,
082      final RegionServicesForStores regionServices) {
083    super(conf, c, regionServices);
084  }
085
086  /**
087   * Creates a snapshot of the current memstore.
088   * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
089   */
090  @Override
091  public MemStoreSnapshot snapshot() {
092    // If snapshot currently has entries, then flusher failed or didn't call
093    // cleanup.  Log a warning.
094    if (!this.snapshot.isEmpty()) {
095      LOG.warn("Snapshot called again without clearing previous. " +
096          "Doing nothing. Another ongoing flush or did we fail last attempt?");
097    } else {
098      this.snapshotId = EnvironmentEdgeManager.currentTime();
099      if (!this.active.isEmpty()) {
100        // Record the ImmutableSegment' heap overhead when initialing
101        MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
102        ImmutableSegment immutableSegment = SegmentFactory.instance().
103            createImmutableSegment(this.active, memstoreAccounting);
104        // regionServices can be null when testing
105        if (regionServices != null) {
106          regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
107            memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(),
108            memstoreAccounting.getCellsCount());
109        }
110        this.snapshot = immutableSegment;
111        resetActive();
112      }
113    }
114    return new MemStoreSnapshot(this.snapshotId, this.snapshot);
115  }
116
117  @Override
118  public MemStoreSize getFlushableSize() {
119    MemStoreSize mss = getSnapshotSize();
120    return mss.getDataSize() > 0? mss: this.active.getMemStoreSize();
121  }
122
123  @Override
124  protected long keySize() {
125    return this.active.getDataSize();
126  }
127
128  @Override
129  protected long heapSize() {
130    return this.active.getHeapSize();
131  }
132
133  @Override
134  /*
135   * Scanners are ordered from 0 (oldest) to newest in increasing order.
136   */
137  public List<KeyValueScanner> getScanners(long readPt) throws IOException {
138    List<KeyValueScanner> list = new ArrayList<>();
139    long order = snapshot.getNumOfSegments();
140    order = addToScanners(active, readPt, order, list);
141    addToScanners(snapshot.getAllSegments(), readPt, order, list);
142    return list;
143  }
144
145  @Override
146  protected List<Segment> getSegments() throws IOException {
147    List<Segment> list = new ArrayList<>(2);
148    list.add(this.active);
149    list.add(this.snapshot);
150    return list;
151  }
152
153  /**
154   * @param cell Find the row that comes after this one.  If null, we return the
155   * first.
156   * @return Next row or null if none found.
157   */
158  Cell getNextRow(final Cell cell) {
159    return getLowest(
160        getNextRow(cell, this.active.getCellSet()),
161        getNextRow(cell, this.snapshot.getCellSet()));
162  }
163
164  @Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) {
165  }
166
167  @Override
168  public MemStoreSize size() {
169    return active.getMemStoreSize();
170  }
171
172  /**
173   * Check whether anything need to be done based on the current active set size
174   * Nothing need to be done for the DefaultMemStore
175   */
176  @Override
177  protected void checkActiveSize() {
178    return;
179  }
180
181  @Override
182  public long preFlushSeqIDEstimation() {
183    return HConstants.NO_SEQNUM;
184  }
185
186  @Override public boolean isSloppy() {
187    return false;
188  }
189
190  /**
191   * Code to help figure if our approximation of object heap sizes is close
192   * enough.  See hbase-900.  Fills memstores then waits so user can heap
193   * dump and bring up resultant hprof in something like jprofiler which
194   * allows you get 'deep size' on objects.
195   * @param args main args
196   */
197  public static void main(String [] args) {
198    RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
199    LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
200      runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
201    LOG.info("vmInputArguments=" + runtime.getInputArguments());
202    DefaultMemStore memstore1 = new DefaultMemStore();
203    // TODO: x32 vs x64
204    final int count = 10000;
205    byte [] fam = Bytes.toBytes("col");
206    byte [] qf = Bytes.toBytes("umn");
207    byte [] empty = new byte[0];
208    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
209    for (int i = 0; i < count; i++) {
210      // Give each its own ts
211      memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing);
212    }
213    LOG.info("memstore1 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() +
214        memStoreSizing.getMemStoreSize().getHeapSize());
215    for (int i = 0; i < count; i++) {
216      memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing);
217    }
218    LOG.info("memstore1 estimated size (2nd loading of same data)={}",
219        memStoreSizing.getMemStoreSize().getDataSize() +
220            memStoreSizing.getMemStoreSize().getHeapSize());
221    // Make a variably sized memstore.
222    DefaultMemStore memstore2 = new DefaultMemStore();
223    memStoreSizing = new NonThreadSafeMemStoreSizing();
224    for (int i = 0; i < count; i++) {
225      memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memStoreSizing);
226    }
227    LOG.info("memstore2 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() +
228        memStoreSizing.getMemStoreSize().getHeapSize());
229    final int seconds = 30;
230    LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
231    LOG.info("Exiting.");
232  }
233}