001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.lang.management.ManagementFactory;
023import java.lang.management.RuntimeMXBean;
024import java.util.ArrayList;
025import java.util.List;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.ClassSize;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039
040/**
041 * The MemStore holds in-memory modifications to the Store.  Modifications
042 * are {@link Cell}s.  When asked to flush, current memstore is moved
043 * to snapshot and is cleared.  We continue to serve edits out of new memstore
044 * and backing snapshot until flusher reports in that the flush succeeded. At
045 * this point we let the snapshot go.
046 *  <p>
047 * The MemStore functions should not be called in parallel. Callers should hold
048 *  write and read locks. This is done in {@link HStore}.
049 *  </p>
050 *
051 * TODO: Adjust size of the memstore when we remove items because they have
052 * been deleted.
053 * TODO: With new KVSLS, need to make sure we update HeapSize with difference
054 * in KV size.
055 */
056@InterfaceAudience.Private
057public class DefaultMemStore extends AbstractMemStore {
058  private static final Logger LOG = LoggerFactory.getLogger(DefaultMemStore.class);
059
060  public final static long DEEP_OVERHEAD = ClassSize.align(AbstractMemStore.DEEP_OVERHEAD);
061  public final static long FIXED_OVERHEAD = ClassSize.align(AbstractMemStore.FIXED_OVERHEAD);
062  /**
063   * Default constructor. Used for tests.
064   */
065  public DefaultMemStore() {
066    this(HBaseConfiguration.create(), CellComparator.getInstance(), null);
067  }
068
069  /**
070   * Constructor.
071   * @param c Comparator
072   */
073  public DefaultMemStore(final Configuration conf, final CellComparator c) {
074    super(conf, c, null);
075  }
076
077  /**
078   * Constructor.
079   * @param c Comparator
080   */
081  public DefaultMemStore(final Configuration conf, final CellComparator c,
082      final RegionServicesForStores regionServices) {
083    super(conf, c, regionServices);
084  }
085
086  /**
087   * Creates a snapshot of the current memstore.
088   * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
089   */
090  @Override
091  public MemStoreSnapshot snapshot() {
092    // If snapshot currently has entries, then flusher failed or didn't call
093    // cleanup.  Log a warning.
094    if (!this.snapshot.isEmpty()) {
095      LOG.warn("Snapshot called again without clearing previous. " +
096          "Doing nothing. Another ongoing flush or did we fail last attempt?");
097    } else {
098      this.snapshotId = EnvironmentEdgeManager.currentTime();
099      if (!getActive().isEmpty()) {
100        // Record the ImmutableSegment' heap overhead when initialing
101        MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
102        ImmutableSegment immutableSegment = SegmentFactory.instance().
103            createImmutableSegment(getActive(), memstoreAccounting);
104        // regionServices can be null when testing
105        if (regionServices != null) {
106          regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
107            memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(),
108            memstoreAccounting.getCellsCount());
109        }
110        this.snapshot = immutableSegment;
111        resetActive();
112      }
113    }
114    return new MemStoreSnapshot(this.snapshotId, this.snapshot);
115  }
116
117  @Override
118  public MemStoreSize getFlushableSize() {
119    MemStoreSize mss = getSnapshotSize();
120    return mss.getDataSize() > 0? mss: getActive().getMemStoreSize();
121  }
122
123  @Override
124  protected long keySize() {
125    return getActive().getDataSize();
126  }
127
128  @Override
129  protected long heapSize() {
130    return getActive().getHeapSize();
131  }
132
133  @Override
134  /*
135   * Scanners are ordered from 0 (oldest) to newest in increasing order.
136   */
137  public List<KeyValueScanner> getScanners(long readPt) throws IOException {
138    List<KeyValueScanner> list = new ArrayList<>();
139    addToScanners(getActive(), readPt, list);
140    addToScanners(snapshot.getAllSegments(), readPt, list);
141    return list;
142  }
143
144  @Override
145  protected List<Segment> getSegments() throws IOException {
146    List<Segment> list = new ArrayList<>(2);
147    list.add(getActive());
148    list.add(snapshot);
149    return list;
150  }
151
152  /**
153   * @param cell Find the row that comes after this one.  If null, we return the
154   * first.
155   * @return Next row or null if none found.
156   */
157  Cell getNextRow(final Cell cell) {
158    return getLowest(
159        getNextRow(cell, this.getActive().getCellSet()),
160        getNextRow(cell, this.snapshot.getCellSet()));
161  }
162
163  @Override
164  public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) {
165  }
166
167  @Override
168  protected boolean preUpdate(MutableSegment currentActive, Cell cell,
169      MemStoreSizing memstoreSizing) {
170    return true;
171  }
172
173  @Override
174  protected void postUpdate(MutableSegment currentActive) {
175    return;
176  }
177
178  @Override
179  protected boolean sizeAddedPreOperation() {
180    return false;
181  }
182
183  @Override
184  public MemStoreSize size() {
185    return getActive().getMemStoreSize();
186  }
187
188  @Override
189  public long preFlushSeqIDEstimation() {
190    return HConstants.NO_SEQNUM;
191  }
192
193  @Override
194  public boolean isSloppy() {
195    return false;
196  }
197
198  /**
199   * Code to help figure if our approximation of object heap sizes is close
200   * enough.  See hbase-900.  Fills memstores then waits so user can heap
201   * dump and bring up resultant hprof in something like jprofiler which
202   * allows you get 'deep size' on objects.
203   * @param args main args
204   */
205  public static void main(String [] args) {
206    RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
207    LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
208      runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
209    LOG.info("vmInputArguments=" + runtime.getInputArguments());
210    DefaultMemStore memstore1 = new DefaultMemStore();
211    // TODO: x32 vs x64
212    final int count = 10000;
213    byte [] fam = Bytes.toBytes("col");
214    byte [] qf = Bytes.toBytes("umn");
215    byte [] empty = new byte[0];
216    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
217    for (int i = 0; i < count; i++) {
218      // Give each its own ts
219      memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing);
220    }
221    LOG.info("memstore1 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() +
222        memStoreSizing.getMemStoreSize().getHeapSize());
223    for (int i = 0; i < count; i++) {
224      memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing);
225    }
226    LOG.info("memstore1 estimated size (2nd loading of same data)={}",
227        memStoreSizing.getMemStoreSize().getDataSize() +
228            memStoreSizing.getMemStoreSize().getHeapSize());
229    // Make a variably sized memstore.
230    DefaultMemStore memstore2 = new DefaultMemStore();
231    memStoreSizing = new NonThreadSafeMemStoreSizing();
232    for (int i = 0; i < count; i++) {
233      memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memStoreSizing);
234    }
235    LOG.info("memstore2 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() +
236        memStoreSizing.getMemStoreSize().getHeapSize());
237    final int seconds = 30;
238    LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
239    LOG.info("Exiting.");
240  }
241}