001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertNotNull;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.IOException;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.ConnectionConfiguration;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Get;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
051import org.apache.hadoop.hbase.io.hfile.TestHFile;
052import org.apache.hadoop.hbase.regionserver.HRegion;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.CommonFSUtils;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.hbase.util.HFileArchiveUtil;
058import org.junit.jupiter.api.AfterAll;
059import org.junit.jupiter.api.BeforeAll;
060import org.junit.jupiter.api.BeforeEach;
061import org.junit.jupiter.api.Tag;
062import org.junit.jupiter.api.Test;
063import org.junit.jupiter.api.TestInfo;
064
065@Tag(MediumTests.TAG)
066public class TestMobStoreScanner {
067
068  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
069  private final static byte[] row1 = Bytes.toBytes("row1");
070  private final static byte[] row2 = Bytes.toBytes("row2");
071  private final static byte[] family = Bytes.toBytes("family");
072  private final static byte[] qf1 = Bytes.toBytes("qualifier1");
073  private final static byte[] qf2 = Bytes.toBytes("qualifier2");
074  protected final byte[] qf3 = Bytes.toBytes("qualifier3");
075  private static Table table;
076  private static Admin admin;
077  private static ColumnFamilyDescriptor familyDescriptor;
078  private static TableDescriptor tableDescriptor;
079  private static long defaultThreshold = 10;
080  private FileSystem fs;
081  private Configuration conf;
082  private String testMethodName;
083
084  @BeforeAll
085  public static void setUpBeforeClass() throws Exception {
086    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
087      100 * 1024 * 1024);
088    TEST_UTIL.getConfiguration().setInt(HRegion.HBASE_MAX_CELL_SIZE_KEY, 100 * 1024 * 1024);
089    TEST_UTIL.startMiniCluster(1);
090  }
091
092  @AfterAll
093  public static void tearDownAfterClass() throws Exception {
094    TEST_UTIL.shutdownMiniCluster();
095  }
096
097  @BeforeEach
098  public void setUpTest(TestInfo testInfo) {
099    testMethodName = testInfo.getTestMethod().get().getName();
100  }
101
102  public void setUp(long threshold, TableName tn) throws Exception {
103    conf = TEST_UTIL.getConfiguration();
104    fs = FileSystem.get(conf);
105    familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(family).setMobEnabled(true)
106      .setMobThreshold(threshold).setMaxVersions(4).build();
107    tableDescriptor =
108      TableDescriptorBuilder.newBuilder(tn).setColumnFamily(familyDescriptor).build();
109    admin = TEST_UTIL.getAdmin();
110    admin.createTable(tableDescriptor);
111    table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()).getTable(tn);
112  }
113
114  /**
115   * Generate the mob value.
116   * @param size the size of the value
117   * @return the mob value generated
118   */
119  private static byte[] generateMobValue(int size) {
120    byte[] mobVal = new byte[size];
121    Bytes.random(mobVal);
122    return mobVal;
123  }
124
125  /**
126   * Set the scan attribute
127   * @param reversed   if true, scan will be backward order
128   * @param mobScanRaw if true, scan will get the mob reference
129   */
130  public void setScan(Scan scan, boolean reversed, boolean mobScanRaw) {
131    scan.setReversed(reversed);
132    scan.readVersions(4);
133    if (mobScanRaw) {
134      scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
135    }
136  }
137
138  @Test
139  public void testMobStoreScanner() throws Exception {
140    testGetFromFiles(false);
141    testGetFromMemStore(false);
142    testGetReferences(false);
143    testMobThreshold(false);
144    testGetFromArchive(false);
145  }
146
147  @Test
148  public void testReversedMobStoreScanner() throws Exception {
149    testGetFromFiles(true);
150    testGetFromMemStore(true);
151    testGetReferences(true);
152    testMobThreshold(true);
153    testGetFromArchive(true);
154  }
155
156  @Test
157  public void testGetMassive() throws Exception {
158    setUp(defaultThreshold, TableName.valueOf(testMethodName));
159
160    // Put some data 5 10, 15, 20 mb ok (this would be right below protobuf
161    // default max size of 64MB.
162    // 25, 30, 40 fail. these is above protobuf max size of 64MB
163    byte[] bigValue = new byte[25 * 1024 * 1024];
164
165    Put put = new Put(row1);
166    Bytes.random(bigValue);
167    put.addColumn(family, qf1, bigValue);
168    table.put(put);
169    put = new Put(row1);
170    Bytes.random(bigValue);
171    put.addColumn(family, qf2, bigValue);
172    table.put(put);
173    put = new Put(row1);
174    Bytes.random(bigValue);
175    put.addColumn(family, qf3, bigValue);
176    table.put(put);
177
178    Get g = new Get(row1);
179    table.get(g);
180    // should not have blown up.
181  }
182
183  @Test
184  public void testReadPt() throws Exception {
185    final TableName tableName = TableName.valueOf(testMethodName);
186    setUp(0L, tableName);
187    long ts = EnvironmentEdgeManager.currentTime();
188    byte[] value1 = Bytes.toBytes("value1");
189    Put put1 = new Put(row1);
190    put1.addColumn(family, qf1, ts, value1);
191    table.put(put1);
192    Put put2 = new Put(row2);
193    byte[] value2 = Bytes.toBytes("value2");
194    put2.addColumn(family, qf1, ts, value2);
195    table.put(put2);
196
197    Scan scan = new Scan();
198    scan.setCaching(1);
199    ResultScanner rs = table.getScanner(scan);
200    Result result = rs.next();
201    Put put3 = new Put(row1);
202    byte[] value3 = Bytes.toBytes("value3");
203    put3.addColumn(family, qf1, ts, value3);
204    table.put(put3);
205    Put put4 = new Put(row2);
206    byte[] value4 = Bytes.toBytes("value4");
207    put4.addColumn(family, qf1, ts, value4);
208    table.put(put4);
209
210    Cell cell = result.getColumnLatestCell(family, qf1);
211    assertArrayEquals(value1, CellUtil.cloneValue(cell));
212
213    admin.flush(tableName);
214    result = rs.next();
215    cell = result.getColumnLatestCell(family, qf1);
216    assertArrayEquals(value2, CellUtil.cloneValue(cell));
217  }
218
219  @Test
220  public void testReadFromCorruptMobFilesWithReadEmptyValueOnMobCellMiss() throws Exception {
221    final TableName tableName = TableName.valueOf(testMethodName);
222    setUp(0, tableName);
223    createRecordAndCorruptMobFile(tableName, row1, family, qf1, Bytes.toBytes("value1"));
224    Get get = new Get(row1);
225    get.setAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS, Bytes.toBytes(true));
226    Result result = table.get(get);
227    Cell cell = result.getColumnLatestCell(family, qf1);
228    assertEquals(0, cell.getValueLength());
229  }
230
231  @Test
232  public void testReadFromCorruptMobFiles() throws Exception {
233    final TableName tableName = TableName.valueOf(testMethodName);
234    setUp(0, tableName);
235    createRecordAndCorruptMobFile(tableName, row1, family, qf1, Bytes.toBytes("value1"));
236    Get get = new Get(row1);
237    IOException ioe = null;
238    try {
239      table.get(get);
240    } catch (IOException e) {
241      ioe = e;
242    }
243    assertNotNull(ioe);
244    assertEquals(CorruptHFileException.class.getName(), ioe.getClass().getName());
245  }
246
247  private void createRecordAndCorruptMobFile(TableName tn, byte[] row, byte[] family, byte[] qf,
248    byte[] value) throws IOException {
249    Put put1 = new Put(row);
250    put1.addColumn(family, qf, value);
251    table.put(put1);
252    admin.flush(tn);
253    Path mobFile = getFlushedMobFile(conf, fs, tn, Bytes.toString(family));
254    assertNotNull(mobFile);
255    // create new corrupt mob file.
256    Path corruptFile = new Path(mobFile.getParent(), "dummy");
257    TestHFile.truncateFile(fs, mobFile, corruptFile);
258    fs.delete(mobFile, true);
259    fs.rename(corruptFile, mobFile);
260  }
261
262  private Path getFlushedMobFile(Configuration conf, FileSystem fs, TableName table, String family)
263    throws IOException {
264    Path famDir = MobUtils.getMobFamilyPath(conf, table, family);
265    FileStatus[] hfFss = fs.listStatus(famDir);
266    for (FileStatus hfs : hfFss) {
267      if (!hfs.isDirectory()) {
268        return hfs.getPath();
269      }
270    }
271    return null;
272  }
273
274  private void testGetFromFiles(boolean reversed) throws Exception {
275    TableName tn = TableName.valueOf("testGetFromFiles" + reversed);
276    testGet(tn, reversed, true);
277  }
278
279  private void testGetFromMemStore(boolean reversed) throws Exception {
280    TableName tn = TableName.valueOf("testGetFromMemStore" + reversed);
281    testGet(tn, reversed, false);
282  }
283
284  private void testGet(TableName tableName, boolean reversed, boolean doFlush) throws Exception {
285    setUp(defaultThreshold, tableName);
286    long ts1 = EnvironmentEdgeManager.currentTime();
287    long ts2 = ts1 + 1;
288    long ts3 = ts1 + 2;
289    byte[] value = generateMobValue((int) defaultThreshold + 1);
290
291    Put put1 = new Put(row1);
292    put1.addColumn(family, qf1, ts3, value);
293    put1.addColumn(family, qf2, ts2, value);
294    put1.addColumn(family, qf3, ts1, value);
295    table.put(put1);
296
297    if (doFlush) {
298      admin.flush(tableName);
299    }
300
301    Scan scan = new Scan();
302    setScan(scan, reversed, false);
303    MobTestUtil.assertCellsValue(table, scan, value, 3);
304  }
305
306  private void testGetReferences(boolean reversed) throws Exception {
307    TableName tn = TableName.valueOf("testGetReferences" + reversed);
308    setUp(defaultThreshold, tn);
309    long ts1 = EnvironmentEdgeManager.currentTime();
310    long ts2 = ts1 + 1;
311    long ts3 = ts1 + 2;
312    byte[] value = generateMobValue((int) defaultThreshold + 1);
313
314    Put put1 = new Put(row1);
315    put1.addColumn(family, qf1, ts3, value);
316    put1.addColumn(family, qf2, ts2, value);
317    put1.addColumn(family, qf3, ts1, value);
318    table.put(put1);
319
320    admin.flush(tn);
321
322    Scan scan = new Scan();
323    setScan(scan, reversed, true);
324
325    ResultScanner results = table.getScanner(scan);
326    int count = 0;
327    for (Result res : results) {
328      List<Cell> cells = res.listCells();
329      for (Cell cell : cells) {
330        // Verify the value
331        assertIsMobReference(cell, row1, family, value, tn);
332        count++;
333      }
334    }
335    results.close();
336    assertEquals(3, count);
337  }
338
339  private void testMobThreshold(boolean reversed) throws Exception {
340    TableName tn = TableName.valueOf("testMobThreshold" + reversed);
341    setUp(defaultThreshold, tn);
342    byte[] valueLess = generateMobValue((int) defaultThreshold - 1);
343    byte[] valueEqual = generateMobValue((int) defaultThreshold);
344    byte[] valueGreater = generateMobValue((int) defaultThreshold + 1);
345    long ts1 = EnvironmentEdgeManager.currentTime();
346    long ts2 = ts1 + 1;
347    long ts3 = ts1 + 2;
348
349    Put put1 = new Put(row1);
350    put1.addColumn(family, qf1, ts3, valueLess);
351    put1.addColumn(family, qf2, ts2, valueEqual);
352    put1.addColumn(family, qf3, ts1, valueGreater);
353    table.put(put1);
354
355    admin.flush(tn);
356
357    Scan scan = new Scan();
358    setScan(scan, reversed, true);
359
360    Cell cellLess = null;
361    Cell cellEqual = null;
362    Cell cellGreater = null;
363    ResultScanner results = table.getScanner(scan);
364    int count = 0;
365    for (Result res : results) {
366      List<Cell> cells = res.listCells();
367      for (Cell cell : cells) {
368        // Verify the value
369        String qf = Bytes.toString(CellUtil.cloneQualifier(cell));
370        if (qf.equals(Bytes.toString(qf1))) {
371          cellLess = cell;
372        }
373        if (qf.equals(Bytes.toString(qf2))) {
374          cellEqual = cell;
375        }
376        if (qf.equals(Bytes.toString(qf3))) {
377          cellGreater = cell;
378        }
379        count++;
380      }
381    }
382    assertEquals(3, count);
383    assertNotMobReference(cellLess, row1, family, valueLess);
384    assertNotMobReference(cellEqual, row1, family, valueEqual);
385    assertIsMobReference(cellGreater, row1, family, valueGreater, tn);
386    results.close();
387  }
388
389  private void testGetFromArchive(boolean reversed) throws Exception {
390    TableName tn = TableName.valueOf("testGetFromArchive" + reversed);
391    setUp(defaultThreshold, tn);
392    long ts1 = EnvironmentEdgeManager.currentTime();
393    long ts2 = ts1 + 1;
394    long ts3 = ts1 + 2;
395    byte[] value = generateMobValue((int) defaultThreshold + 1);
396    // Put some data
397    Put put1 = new Put(row1);
398    put1.addColumn(family, qf1, ts3, value);
399    put1.addColumn(family, qf2, ts2, value);
400    put1.addColumn(family, qf3, ts1, value);
401    table.put(put1);
402
403    admin.flush(tn);
404
405    // Get the files in the mob path
406    Path mobFamilyPath;
407    mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn,
408      familyDescriptor.getNameAsString());
409    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
410    FileStatus[] files = fs.listStatus(mobFamilyPath);
411
412    // Get the archive path
413    Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
414    Path tableDir = CommonFSUtils.getTableDir(rootDir, tn);
415    RegionInfo regionInfo = MobUtils.getMobRegionInfo(tn);
416    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(),
417      regionInfo, tableDir, family);
418
419    // Move the files from mob path to archive path
420    fs.mkdirs(storeArchiveDir);
421    int fileCount = 0;
422    for (FileStatus file : files) {
423      fileCount++;
424      Path filePath = file.getPath();
425      Path src = new Path(mobFamilyPath, filePath.getName());
426      Path dst = new Path(storeArchiveDir, filePath.getName());
427      fs.rename(src, dst);
428    }
429
430    // Verify the moving success
431    FileStatus[] files1 = fs.listStatus(mobFamilyPath);
432    assertEquals(0, files1.length);
433    FileStatus[] files2 = fs.listStatus(storeArchiveDir);
434    assertEquals(fileCount, files2.length);
435
436    // Scan from archive
437    Scan scan = new Scan();
438    setScan(scan, reversed, false);
439    MobTestUtil.assertCellsValue(table, scan, value, 3);
440  }
441
442  /**
443   * Assert the value is not store in mob.
444   */
445  private static void assertNotMobReference(Cell cell, byte[] row, byte[] family, byte[] value)
446    throws IOException {
447    assertArrayEquals(row, CellUtil.cloneRow(cell));
448    assertArrayEquals(family, CellUtil.cloneFamily(cell));
449    assertArrayEquals(value, CellUtil.cloneValue(cell));
450  }
451
452  /**
453   * Assert the value is store in mob.
454   */
455  private static void assertIsMobReference(Cell cell, byte[] row, byte[] family, byte[] value,
456    TableName tn) throws IOException {
457    assertArrayEquals(row, CellUtil.cloneRow(cell));
458    assertArrayEquals(family, CellUtil.cloneFamily(cell));
459    assertFalse(Bytes.equals(value, CellUtil.cloneValue(cell)));
460    byte[] referenceValue = CellUtil.cloneValue(cell);
461    String fileName = MobUtils.getMobFileName(cell);
462    int valLen = Bytes.toInt(referenceValue, 0, Bytes.SIZEOF_INT);
463    assertEquals(value.length, valLen);
464    Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn,
465      familyDescriptor.getNameAsString());
466    Path targetPath = new Path(mobFamilyPath, fileName);
467    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
468    assertTrue(fs.exists(targetPath));
469  }
470}