001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import java.io.IOException;
021import java.util.List;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.FileStatus;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.HColumnDescriptor;
031import org.apache.hadoop.hbase.HTableDescriptor;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ConnectionConfiguration;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Get;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.ResultScanner;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
044import org.apache.hadoop.hbase.io.hfile.TestHFile;
045import org.apache.hadoop.hbase.regionserver.HRegion;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.CommonFSUtils;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.hadoop.hbase.util.HFileArchiveUtil;
051import org.junit.AfterClass;
052import org.junit.Assert;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059
060@Category(MediumTests.class)
061public class TestMobStoreScanner {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065    HBaseClassTestRule.forClass(TestMobStoreScanner.class);
066
067  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
068  private final static byte[] row1 = Bytes.toBytes("row1");
069  private final static byte[] row2 = Bytes.toBytes("row2");
070  private final static byte[] family = Bytes.toBytes("family");
071  private final static byte[] qf1 = Bytes.toBytes("qualifier1");
072  private final static byte[] qf2 = Bytes.toBytes("qualifier2");
073  protected final byte[] qf3 = Bytes.toBytes("qualifier3");
074  private static Table table;
075  private static Admin admin;
076  private static HColumnDescriptor hcd;
077  private static HTableDescriptor desc;
078  private static long defaultThreshold = 10;
079  private FileSystem fs;
080  private Configuration conf;
081
082  @Rule
083  public TestName name = new TestName();
084
085  @BeforeClass
086  public static void setUpBeforeClass() throws Exception {
087    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
088      100 * 1024 * 1024);
089    TEST_UTIL.getConfiguration().setInt(HRegion.HBASE_MAX_CELL_SIZE_KEY, 100 * 1024 * 1024);
090    TEST_UTIL.startMiniCluster(1);
091  }
092
093  @AfterClass
094  public static void tearDownAfterClass() throws Exception {
095    TEST_UTIL.shutdownMiniCluster();
096  }
097
098  public void setUp(long threshold, TableName tn) throws Exception {
099    conf = TEST_UTIL.getConfiguration();
100    fs = FileSystem.get(conf);
101    desc = new HTableDescriptor(tn);
102    hcd = new HColumnDescriptor(family);
103    hcd.setMobEnabled(true);
104    hcd.setMobThreshold(threshold);
105    hcd.setMaxVersions(4);
106    desc.addFamily(hcd);
107    admin = TEST_UTIL.getAdmin();
108    admin.createTable(desc);
109    table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()).getTable(tn);
110  }
111
112  /**
113   * Generate the mob value.
114   * @param size the size of the value
115   * @return the mob value generated
116   */
117  private static byte[] generateMobValue(int size) {
118    byte[] mobVal = new byte[size];
119    Bytes.random(mobVal);
120    return mobVal;
121  }
122
123  /**
124   * Set the scan attribute
125   * @param reversed   if true, scan will be backward order
126   * @param mobScanRaw if true, scan will get the mob reference
127   */
128  public void setScan(Scan scan, boolean reversed, boolean mobScanRaw) {
129    scan.setReversed(reversed);
130    scan.setMaxVersions(4);
131    if (mobScanRaw) {
132      scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
133    }
134  }
135
136  @Test
137  public void testMobStoreScanner() throws Exception {
138    testGetFromFiles(false);
139    testGetFromMemStore(false);
140    testGetReferences(false);
141    testMobThreshold(false);
142    testGetFromArchive(false);
143  }
144
145  @Test
146  public void testReversedMobStoreScanner() throws Exception {
147    testGetFromFiles(true);
148    testGetFromMemStore(true);
149    testGetReferences(true);
150    testMobThreshold(true);
151    testGetFromArchive(true);
152  }
153
154  @Test
155  public void testGetMassive() throws Exception {
156    setUp(defaultThreshold, TableName.valueOf(name.getMethodName()));
157
158    // Put some data 5 10, 15, 20 mb ok (this would be right below protobuf
159    // default max size of 64MB.
160    // 25, 30, 40 fail. these is above protobuf max size of 64MB
161    byte[] bigValue = new byte[25 * 1024 * 1024];
162
163    Put put = new Put(row1);
164    Bytes.random(bigValue);
165    put.addColumn(family, qf1, bigValue);
166    table.put(put);
167    put = new Put(row1);
168    Bytes.random(bigValue);
169    put.addColumn(family, qf2, bigValue);
170    table.put(put);
171    put = new Put(row1);
172    Bytes.random(bigValue);
173    put.addColumn(family, qf3, bigValue);
174    table.put(put);
175
176    Get g = new Get(row1);
177    table.get(g);
178    // should not have blown up.
179  }
180
181  @Test
182  public void testReadPt() throws Exception {
183    final TableName tableName = TableName.valueOf(name.getMethodName());
184    setUp(0L, tableName);
185    long ts = EnvironmentEdgeManager.currentTime();
186    byte[] value1 = Bytes.toBytes("value1");
187    Put put1 = new Put(row1);
188    put1.addColumn(family, qf1, ts, value1);
189    table.put(put1);
190    Put put2 = new Put(row2);
191    byte[] value2 = Bytes.toBytes("value2");
192    put2.addColumn(family, qf1, ts, value2);
193    table.put(put2);
194
195    Scan scan = new Scan();
196    scan.setCaching(1);
197    ResultScanner rs = table.getScanner(scan);
198    Result result = rs.next();
199    Put put3 = new Put(row1);
200    byte[] value3 = Bytes.toBytes("value3");
201    put3.addColumn(family, qf1, ts, value3);
202    table.put(put3);
203    Put put4 = new Put(row2);
204    byte[] value4 = Bytes.toBytes("value4");
205    put4.addColumn(family, qf1, ts, value4);
206    table.put(put4);
207
208    Cell cell = result.getColumnLatestCell(family, qf1);
209    Assert.assertArrayEquals(value1, CellUtil.cloneValue(cell));
210
211    admin.flush(tableName);
212    result = rs.next();
213    cell = result.getColumnLatestCell(family, qf1);
214    Assert.assertArrayEquals(value2, CellUtil.cloneValue(cell));
215  }
216
217  @Test
218  public void testReadFromCorruptMobFilesWithReadEmptyValueOnMobCellMiss() throws Exception {
219    final TableName tableName = TableName.valueOf(name.getMethodName());
220    setUp(0, tableName);
221    createRecordAndCorruptMobFile(tableName, row1, family, qf1, Bytes.toBytes("value1"));
222    Get get = new Get(row1);
223    get.setAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS, Bytes.toBytes(true));
224    Result result = table.get(get);
225    Cell cell = result.getColumnLatestCell(family, qf1);
226    Assert.assertEquals(0, cell.getValueLength());
227  }
228
229  @Test
230  public void testReadFromCorruptMobFiles() throws Exception {
231    final TableName tableName = TableName.valueOf(name.getMethodName());
232    setUp(0, tableName);
233    createRecordAndCorruptMobFile(tableName, row1, family, qf1, Bytes.toBytes("value1"));
234    Get get = new Get(row1);
235    IOException ioe = null;
236    try {
237      table.get(get);
238    } catch (IOException e) {
239      ioe = e;
240    }
241    Assert.assertNotNull(ioe);
242    Assert.assertEquals(CorruptHFileException.class.getName(), ioe.getClass().getName());
243  }
244
245  private void createRecordAndCorruptMobFile(TableName tn, byte[] row, byte[] family, byte[] qf,
246    byte[] value) throws IOException {
247    Put put1 = new Put(row);
248    put1.addColumn(family, qf, value);
249    table.put(put1);
250    admin.flush(tn);
251    Path mobFile = getFlushedMobFile(conf, fs, tn, Bytes.toString(family));
252    Assert.assertNotNull(mobFile);
253    // create new corrupt mob file.
254    Path corruptFile = new Path(mobFile.getParent(), "dummy");
255    TestHFile.truncateFile(fs, mobFile, corruptFile);
256    fs.delete(mobFile, true);
257    fs.rename(corruptFile, mobFile);
258  }
259
260  private Path getFlushedMobFile(Configuration conf, FileSystem fs, TableName table, String family)
261    throws IOException {
262    Path famDir = MobUtils.getMobFamilyPath(conf, table, family);
263    FileStatus[] hfFss = fs.listStatus(famDir);
264    for (FileStatus hfs : hfFss) {
265      if (!hfs.isDirectory()) {
266        return hfs.getPath();
267      }
268    }
269    return null;
270  }
271
272  private void testGetFromFiles(boolean reversed) throws Exception {
273    TableName tn = TableName.valueOf("testGetFromFiles" + reversed);
274    testGet(tn, reversed, true);
275  }
276
277  private void testGetFromMemStore(boolean reversed) throws Exception {
278    TableName tn = TableName.valueOf("testGetFromMemStore" + reversed);
279    testGet(tn, reversed, false);
280  }
281
282  private void testGet(TableName tableName, boolean reversed, boolean doFlush) throws Exception {
283    setUp(defaultThreshold, tableName);
284    long ts1 = EnvironmentEdgeManager.currentTime();
285    long ts2 = ts1 + 1;
286    long ts3 = ts1 + 2;
287    byte[] value = generateMobValue((int) defaultThreshold + 1);
288
289    Put put1 = new Put(row1);
290    put1.addColumn(family, qf1, ts3, value);
291    put1.addColumn(family, qf2, ts2, value);
292    put1.addColumn(family, qf3, ts1, value);
293    table.put(put1);
294
295    if (doFlush) {
296      admin.flush(tableName);
297    }
298
299    Scan scan = new Scan();
300    setScan(scan, reversed, false);
301    MobTestUtil.assertCellsValue(table, scan, value, 3);
302  }
303
304  private void testGetReferences(boolean reversed) throws Exception {
305    TableName tn = TableName.valueOf("testGetReferences" + reversed);
306    setUp(defaultThreshold, tn);
307    long ts1 = EnvironmentEdgeManager.currentTime();
308    long ts2 = ts1 + 1;
309    long ts3 = ts1 + 2;
310    byte[] value = generateMobValue((int) defaultThreshold + 1);
311    ;
312
313    Put put1 = new Put(row1);
314    put1.addColumn(family, qf1, ts3, value);
315    put1.addColumn(family, qf2, ts2, value);
316    put1.addColumn(family, qf3, ts1, value);
317    table.put(put1);
318
319    admin.flush(tn);
320
321    Scan scan = new Scan();
322    setScan(scan, reversed, true);
323
324    ResultScanner results = table.getScanner(scan);
325    int count = 0;
326    for (Result res : results) {
327      List<Cell> cells = res.listCells();
328      for (Cell cell : cells) {
329        // Verify the value
330        assertIsMobReference(cell, row1, family, value, tn);
331        count++;
332      }
333    }
334    results.close();
335    Assert.assertEquals(3, count);
336  }
337
338  private void testMobThreshold(boolean reversed) throws Exception {
339    TableName tn = TableName.valueOf("testMobThreshold" + reversed);
340    setUp(defaultThreshold, tn);
341    byte[] valueLess = generateMobValue((int) defaultThreshold - 1);
342    byte[] valueEqual = generateMobValue((int) defaultThreshold);
343    byte[] valueGreater = generateMobValue((int) defaultThreshold + 1);
344    long ts1 = EnvironmentEdgeManager.currentTime();
345    long ts2 = ts1 + 1;
346    long ts3 = ts1 + 2;
347
348    Put put1 = new Put(row1);
349    put1.addColumn(family, qf1, ts3, valueLess);
350    put1.addColumn(family, qf2, ts2, valueEqual);
351    put1.addColumn(family, qf3, ts1, valueGreater);
352    table.put(put1);
353
354    admin.flush(tn);
355
356    Scan scan = new Scan();
357    setScan(scan, reversed, true);
358
359    Cell cellLess = null;
360    Cell cellEqual = null;
361    Cell cellGreater = null;
362    ResultScanner results = table.getScanner(scan);
363    int count = 0;
364    for (Result res : results) {
365      List<Cell> cells = res.listCells();
366      for (Cell cell : cells) {
367        // Verify the value
368        String qf = Bytes.toString(CellUtil.cloneQualifier(cell));
369        if (qf.equals(Bytes.toString(qf1))) {
370          cellLess = cell;
371        }
372        if (qf.equals(Bytes.toString(qf2))) {
373          cellEqual = cell;
374        }
375        if (qf.equals(Bytes.toString(qf3))) {
376          cellGreater = cell;
377        }
378        count++;
379      }
380    }
381    Assert.assertEquals(3, count);
382    assertNotMobReference(cellLess, row1, family, valueLess);
383    assertNotMobReference(cellEqual, row1, family, valueEqual);
384    assertIsMobReference(cellGreater, row1, family, valueGreater, tn);
385    results.close();
386  }
387
388  private void testGetFromArchive(boolean reversed) throws Exception {
389    TableName tn = TableName.valueOf("testGetFromArchive" + reversed);
390    setUp(defaultThreshold, tn);
391    long ts1 = EnvironmentEdgeManager.currentTime();
392    long ts2 = ts1 + 1;
393    long ts3 = ts1 + 2;
394    byte[] value = generateMobValue((int) defaultThreshold + 1);
395    ;
396    // Put some data
397    Put put1 = new Put(row1);
398    put1.addColumn(family, qf1, ts3, value);
399    put1.addColumn(family, qf2, ts2, value);
400    put1.addColumn(family, qf3, ts1, value);
401    table.put(put1);
402
403    admin.flush(tn);
404
405    // Get the files in the mob path
406    Path mobFamilyPath;
407    mobFamilyPath =
408      MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, hcd.getNameAsString());
409    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
410    FileStatus[] files = fs.listStatus(mobFamilyPath);
411
412    // Get the archive path
413    Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
414    Path tableDir = CommonFSUtils.getTableDir(rootDir, tn);
415    RegionInfo regionInfo = MobUtils.getMobRegionInfo(tn);
416    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(),
417      regionInfo, tableDir, family);
418
419    // Move the files from mob path to archive path
420    fs.mkdirs(storeArchiveDir);
421    int fileCount = 0;
422    for (FileStatus file : files) {
423      fileCount++;
424      Path filePath = file.getPath();
425      Path src = new Path(mobFamilyPath, filePath.getName());
426      Path dst = new Path(storeArchiveDir, filePath.getName());
427      fs.rename(src, dst);
428    }
429
430    // Verify the moving success
431    FileStatus[] files1 = fs.listStatus(mobFamilyPath);
432    Assert.assertEquals(0, files1.length);
433    FileStatus[] files2 = fs.listStatus(storeArchiveDir);
434    Assert.assertEquals(fileCount, files2.length);
435
436    // Scan from archive
437    Scan scan = new Scan();
438    setScan(scan, reversed, false);
439    MobTestUtil.assertCellsValue(table, scan, value, 3);
440  }
441
442  /**
443   * Assert the value is not store in mob.
444   */
445  private static void assertNotMobReference(Cell cell, byte[] row, byte[] family, byte[] value)
446    throws IOException {
447    Assert.assertArrayEquals(row, CellUtil.cloneRow(cell));
448    Assert.assertArrayEquals(family, CellUtil.cloneFamily(cell));
449    Assert.assertArrayEquals(value, CellUtil.cloneValue(cell));
450  }
451
452  /**
453   * Assert the value is store in mob.
454   */
455  private static void assertIsMobReference(Cell cell, byte[] row, byte[] family, byte[] value,
456    TableName tn) throws IOException {
457    Assert.assertArrayEquals(row, CellUtil.cloneRow(cell));
458    Assert.assertArrayEquals(family, CellUtil.cloneFamily(cell));
459    Assert.assertFalse(Bytes.equals(value, CellUtil.cloneValue(cell)));
460    byte[] referenceValue = CellUtil.cloneValue(cell);
461    String fileName = MobUtils.getMobFileName(cell);
462    int valLen = Bytes.toInt(referenceValue, 0, Bytes.SIZEOF_INT);
463    Assert.assertEquals(value.length, valLen);
464    Path mobFamilyPath =
465      MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, hcd.getNameAsString());
466    Path targetPath = new Path(mobFamilyPath, fileName);
467    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
468    Assert.assertTrue(fs.exists(targetPath));
469  }
470}