001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.ByteBufferKeyValue;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.HRegionInfo;
035import org.apache.hadoop.hbase.KeyValue;
036import org.apache.hadoop.hbase.PrivateCellUtil;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Durability;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.regionserver.ChunkCreator;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.regionserver.HStore;
044import org.apache.hadoop.hbase.regionserver.InternalScanner;
045import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.testclassification.RegionServerTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
050import org.apache.hadoop.hbase.wal.WAL;
051import org.junit.After;
052import org.junit.ClassRule;
053import org.junit.Rule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.junit.rules.TestName;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060@Category({ RegionServerTests.class, MediumTests.class })
061public class TestScannerFromBucketCache {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestScannerFromBucketCache.class);
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestScannerFromBucketCache.class);
068  @Rule
069  public TestName name = new TestName();
070
071  HRegion region = null;
072  private HBaseTestingUtility test_util;
073  public Configuration conf;
074  private final int MAX_VERSIONS = 2;
075  byte[] val = new byte[512 * 1024];
076
077  // Test names
078  private TableName tableName;
079
080  private void setUp(boolean useBucketCache) throws IOException {
081    test_util = HBaseTestingUtility.createLocalHTU();
082    conf = test_util.getConfiguration();
083    if (useBucketCache) {
084      conf.setInt("hbase.bucketcache.size", 400);
085      conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
086      conf.setInt("hbase.bucketcache.writer.threads", 10);
087      conf.setFloat("hfile.block.cache.size", 0.2f);
088      conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
089    }
090    tableName = TableName.valueOf(name.getMethodName());
091    CacheConfig.instantiateBlockCache(conf);
092  }
093
094  @After
095  public void tearDown() throws Exception {
096    EnvironmentEdgeManagerTestHelper.reset();
097    LOG.info("Cleaning test directory: " + test_util.getDataTestDir());
098    test_util.cleanupTestDir();
099    CacheConfig.clearGlobalInstances();
100  }
101
102  String getName() {
103    return name.getMethodName();
104  }
105
106  @Test
107  public void testBasicScanWithLRUCache() throws IOException {
108    setUp(false);
109    byte[] row1 = Bytes.toBytes("row1");
110    byte[] qf1 = Bytes.toBytes("qualifier1");
111    byte[] qf2 = Bytes.toBytes("qualifier2");
112    byte[] fam1 = Bytes.toBytes("lrucache");
113
114    long ts1 = 1; // System.currentTimeMillis();
115    long ts2 = ts1 + 1;
116    long ts3 = ts1 + 2;
117
118    // Setting up region
119    String method = this.getName();
120    this.region = initHRegion(tableName, method, conf, test_util, fam1);
121    try {
122      List<Cell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, false);
123
124      List<Cell> actual = performScan(row1, fam1);
125      // Verify result
126      for (int i = 0; i < expected.size(); i++) {
127        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
128        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
129      }
130      // do the scan again and verify. This time it should be from the lru cache
131      actual = performScan(row1, fam1);
132      // Verify result
133      for (int i = 0; i < expected.size(); i++) {
134        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
135        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
136      }
137
138    } finally {
139      HBaseTestingUtility.closeRegionAndWAL(this.region);
140      this.region = null;
141    }
142  }
143
144  @Test
145  public void testBasicScanWithOffheapBucketCache() throws IOException {
146    setUp(true);
147    byte[] row1 = Bytes.toBytes("row1offheap");
148    byte[] qf1 = Bytes.toBytes("qualifier1");
149    byte[] qf2 = Bytes.toBytes("qualifier2");
150    byte[] fam1 = Bytes.toBytes("famoffheap");
151
152    long ts1 = 1; // System.currentTimeMillis();
153    long ts2 = ts1 + 1;
154    long ts3 = ts1 + 2;
155
156    // Setting up region
157    String method = this.getName();
158    this.region = initHRegion(tableName, method, conf, test_util, fam1);
159    try {
160      List<Cell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, false);
161
162      List<Cell> actual = performScan(row1, fam1);
163      // Verify result
164      for (int i = 0; i < expected.size(); i++) {
165        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
166        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
167      }
168      // Wait for the bucket cache threads to move the data to offheap
169      Thread.sleep(500);
170      // do the scan again and verify. This time it should be from the bucket cache in offheap mode
171      actual = performScan(row1, fam1);
172      // Verify result
173      for (int i = 0; i < expected.size(); i++) {
174        assertTrue(actual.get(i) instanceof ByteBufferKeyValue);
175        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
176      }
177
178    } catch (InterruptedException e) {
179    } finally {
180      HBaseTestingUtility.closeRegionAndWAL(this.region);
181      this.region = null;
182    }
183  }
184
185  @Test
186  public void testBasicScanWithOffheapBucketCacheWithMBB() throws IOException {
187    setUp(true);
188    byte[] row1 = Bytes.toBytes("row1offheap");
189    byte[] qf1 = Bytes.toBytes("qualifier1");
190    byte[] qf2 = Bytes.toBytes("qualifier2");
191    byte[] fam1 = Bytes.toBytes("famoffheap");
192
193    long ts1 = 1; // System.currentTimeMillis();
194    long ts2 = ts1 + 1;
195    long ts3 = ts1 + 2;
196
197    // Setting up region
198    String method = this.getName();
199    this.region = initHRegion(tableName, method, conf, test_util, fam1);
200    try {
201      List<Cell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, true);
202
203      List<Cell> actual = performScan(row1, fam1);
204      // Verify result
205      for (int i = 0; i < expected.size(); i++) {
206        assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
207        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
208      }
209      // Wait for the bucket cache threads to move the data to offheap
210      Thread.sleep(500);
211      // do the scan again and verify. This time it should be from the bucket cache in offheap mode
212      // but one of the cell will be copied due to the asSubByteBuff call
213      Scan scan = new Scan(row1);
214      scan.addFamily(fam1);
215      scan.setMaxVersions(10);
216      actual = new ArrayList<>();
217      InternalScanner scanner = region.getScanner(scan);
218
219      boolean hasNext = scanner.next(actual);
220      assertEquals(false, hasNext);
221      // Verify result
222      for (int i = 0; i < expected.size(); i++) {
223        if (i != 5) {
224          // the last cell fetched will be of type shareable but not offheap because
225          // the MBB is copied to form a single cell
226          assertTrue(actual.get(i) instanceof ByteBufferKeyValue);
227        }
228      }
229
230    } catch (InterruptedException e) {
231    } finally {
232      HBaseTestingUtility.closeRegionAndWAL(this.region);
233      this.region = null;
234    }
235  }
236
237  private List<Cell> insertData(byte[] row1, byte[] qf1, byte[] qf2, byte[] fam1, long ts1,
238      long ts2, long ts3, boolean withVal) throws IOException {
239    // Putting data in Region
240    Put put = null;
241    KeyValue kv13 = null;
242    KeyValue kv12 = null;
243    KeyValue kv11 = null;
244
245    KeyValue kv23 = null;
246    KeyValue kv22 = null;
247    KeyValue kv21 = null;
248    if (!withVal) {
249      kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
250      kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
251      kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
252
253      kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
254      kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
255      kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
256    } else {
257      kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, val);
258      kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, val);
259      kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, val);
260
261      kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, val);
262      kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, val);
263      kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, val);
264    }
265
266    put = new Put(row1);
267    put.add(kv13);
268    put.add(kv12);
269    put.add(kv11);
270    put.add(kv23);
271    put.add(kv22);
272    put.add(kv21);
273    region.put(put);
274    region.flush(true);
275    HStore store = region.getStore(fam1);
276    while (store.getStorefilesCount() <= 0) {
277      try {
278        Thread.sleep(20);
279      } catch (InterruptedException e) {
280      }
281    }
282
283    // Expected
284    List<Cell> expected = new ArrayList<>();
285    expected.add(kv13);
286    expected.add(kv12);
287    expected.add(kv23);
288    expected.add(kv22);
289    return expected;
290  }
291
292  private List<Cell> performScan(byte[] row1, byte[] fam1) throws IOException {
293    Scan scan = new Scan(row1);
294    scan.addFamily(fam1);
295    scan.setMaxVersions(MAX_VERSIONS);
296    List<Cell> actual = new ArrayList<>();
297    InternalScanner scanner = region.getScanner(scan);
298
299    boolean hasNext = scanner.next(actual);
300    assertEquals(false, hasNext);
301    return actual;
302  }
303
304  private static HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
305      HBaseTestingUtility test_util, byte[]... families) throws IOException {
306    return initHRegion(tableName, null, null, callingMethod, conf, test_util, false, families);
307  }
308
309  private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
310      String callingMethod, Configuration conf, HBaseTestingUtility test_util, boolean isReadOnly,
311      byte[]... families) throws IOException {
312    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
313    Path logDir = test_util.getDataTestDirOnTestFS(callingMethod + ".log");
314    HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
315    final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
316    return initHRegion(tableName, startKey, stopKey, callingMethod, conf, test_util, isReadOnly,
317      Durability.SYNC_WAL, wal, families);
318  }
319
320  /**
321   * @param tableName
322   * @param startKey
323   * @param stopKey
324   * @param callingMethod
325   * @param conf
326   * @param isReadOnly
327   * @param families
328   * @throws IOException
329   * @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
330   *         when done.
331   */
332  private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
333      String callingMethod, Configuration conf, HBaseTestingUtility test_util, boolean isReadOnly,
334      Durability durability, WAL wal, byte[]... families) throws IOException {
335    return test_util.createLocalHRegion(tableName, startKey, stopKey, isReadOnly, durability, wal,
336      families);
337  }
338}