001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.lang.management.ManagementFactory;
023import java.lang.management.RuntimeMXBean;
024import java.util.ArrayList;
025import java.util.List;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.ClassSize;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039
040/**
041 * The MemStore holds in-memory modifications to the Store.  Modifications
042 * are {@link Cell}s.  When asked to flush, current memstore is moved
043 * to snapshot and is cleared.  We continue to serve edits out of new memstore
044 * and backing snapshot until flusher reports in that the flush succeeded. At
045 * this point we let the snapshot go.
046 *  <p>
047 * The MemStore functions should not be called in parallel. Callers should hold
048 *  write and read locks. This is done in {@link HStore}.
049 *  </p>
050 *
051 * TODO: Adjust size of the memstore when we remove items because they have
052 * been deleted.
053 * TODO: With new KVSLS, need to make sure we update HeapSize with difference
054 * in KV size.
055 */
056@InterfaceAudience.Private
057public class DefaultMemStore extends AbstractMemStore {
058  private static final Logger LOG = LoggerFactory.getLogger(DefaultMemStore.class);
059
060  public final static long DEEP_OVERHEAD = ClassSize.align(AbstractMemStore.DEEP_OVERHEAD);
061  public final static long FIXED_OVERHEAD = ClassSize.align(AbstractMemStore.FIXED_OVERHEAD);
062  /**
063   * Default constructor. Used for tests.
064   */
065  public DefaultMemStore() {
066    this(HBaseConfiguration.create(), CellComparator.getInstance(), null);
067  }
068
069  /**
070   * Constructor.
071   * @param c Comparator
072   */
073  public DefaultMemStore(final Configuration conf, final CellComparator c) {
074    super(conf, c, null);
075  }
076
077  /**
078   * Constructor.
079   * @param c Comparator
080   */
081  public DefaultMemStore(final Configuration conf, final CellComparator c,
082      final RegionServicesForStores regionServices) {
083    super(conf, c, regionServices);
084  }
085
086  /**
087   * Creates a snapshot of the current memstore.
088   * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
089   */
090  @Override
091  public MemStoreSnapshot snapshot() {
092    // If snapshot currently has entries, then flusher failed or didn't call
093    // cleanup.  Log a warning.
094    if (!this.snapshot.isEmpty()) {
095      LOG.warn("Snapshot called again without clearing previous. " +
096          "Doing nothing. Another ongoing flush or did we fail last attempt?");
097    } else {
098      this.snapshotId = EnvironmentEdgeManager.currentTime();
099      if (!getActive().isEmpty()) {
100        // Record the ImmutableSegment' heap overhead when initialing
101        MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
102        ImmutableSegment immutableSegment = SegmentFactory.instance().
103            createImmutableSegment(getActive(), memstoreAccounting);
104        // regionServices can be null when testing
105        if (regionServices != null) {
106          regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
107            memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(),
108            memstoreAccounting.getCellsCount());
109        }
110        this.snapshot = immutableSegment;
111        resetActive();
112        resetTimeOfOldestEdit();
113      }
114    }
115    return new MemStoreSnapshot(this.snapshotId, this.snapshot);
116  }
117
118  @Override
119  public MemStoreSize getFlushableSize() {
120    MemStoreSize mss = getSnapshotSize();
121    return mss.getDataSize() > 0? mss: getActive().getMemStoreSize();
122  }
123
124  @Override
125  protected long keySize() {
126    return getActive().getDataSize();
127  }
128
129  @Override
130  protected long heapSize() {
131    return getActive().getHeapSize();
132  }
133
134  @Override
135  /*
136   * Scanners are ordered from 0 (oldest) to newest in increasing order.
137   */
138  public List<KeyValueScanner> getScanners(long readPt) throws IOException {
139    List<KeyValueScanner> list = new ArrayList<>();
140    addToScanners(getActive(), readPt, list);
141    addToScanners(snapshot.getAllSegments(), readPt, list);
142    return list;
143  }
144
145  @Override
146  protected List<Segment> getSegments() throws IOException {
147    List<Segment> list = new ArrayList<>(2);
148    list.add(getActive());
149    list.add(snapshot);
150    return list;
151  }
152
153  /**
154   * @param cell Find the row that comes after this one.  If null, we return the
155   * first.
156   * @return Next row or null if none found.
157   */
158  Cell getNextRow(final Cell cell) {
159    return getLowest(
160        getNextRow(cell, this.getActive().getCellSet()),
161        getNextRow(cell, this.snapshot.getCellSet()));
162  }
163
164  @Override
165  public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) {
166  }
167
168  @Override
169  protected boolean preUpdate(MutableSegment currentActive, Cell cell,
170      MemStoreSizing memstoreSizing) {
171    return true;
172  }
173
174  @Override
175  protected void postUpdate(MutableSegment currentActive) {
176    return;
177  }
178
179  @Override
180  protected boolean sizeAddedPreOperation() {
181    return false;
182  }
183
184  @Override
185  public MemStoreSize size() {
186    return getActive().getMemStoreSize();
187  }
188
189  @Override
190  public long preFlushSeqIDEstimation() {
191    return HConstants.NO_SEQNUM;
192  }
193
194  @Override
195  public boolean isSloppy() {
196    return false;
197  }
198
199  /**
200   * Code to help figure if our approximation of object heap sizes is close
201   * enough.  See hbase-900.  Fills memstores then waits so user can heap
202   * dump and bring up resultant hprof in something like jprofiler which
203   * allows you get 'deep size' on objects.
204   * @param args main args
205   */
206  public static void main(String [] args) {
207    RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
208    LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
209      runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
210    LOG.info("vmInputArguments=" + runtime.getInputArguments());
211    DefaultMemStore memstore1 = new DefaultMemStore();
212    // TODO: x32 vs x64
213    final int count = 10000;
214    byte [] fam = Bytes.toBytes("col");
215    byte [] qf = Bytes.toBytes("umn");
216    byte [] empty = new byte[0];
217    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
218    for (int i = 0; i < count; i++) {
219      // Give each its own ts
220      memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing);
221    }
222    LOG.info("memstore1 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() +
223        memStoreSizing.getMemStoreSize().getHeapSize());
224    for (int i = 0; i < count; i++) {
225      memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing);
226    }
227    LOG.info("memstore1 estimated size (2nd loading of same data)={}",
228        memStoreSizing.getMemStoreSize().getDataSize() +
229            memStoreSizing.getMemStoreSize().getHeapSize());
230    // Make a variably sized memstore.
231    DefaultMemStore memstore2 = new DefaultMemStore();
232    memStoreSizing = new NonThreadSafeMemStoreSizing();
233    for (int i = 0; i < count; i++) {
234      memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memStoreSizing);
235    }
236    LOG.info("memstore2 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() +
237        memStoreSizing.getMemStoreSize().getHeapSize());
238    final int seconds = 30;
239    LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
240    LOG.info("Exiting.");
241  }
242}