001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import java.io.IOException;
021import java.util.List;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.FileStatus;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.ConnectionConfiguration;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Get;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.ResultScanner;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.client.TableDescriptor;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
046import org.apache.hadoop.hbase.io.hfile.TestHFile;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.testclassification.MediumTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.CommonFSUtils;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.hadoop.hbase.util.HFileArchiveUtil;
053import org.junit.AfterClass;
054import org.junit.Assert;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Rule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.rules.TestName;
061
062@Category(MediumTests.class)
063public class TestMobStoreScanner {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067    HBaseClassTestRule.forClass(TestMobStoreScanner.class);
068
069  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
070  private final static byte[] row1 = Bytes.toBytes("row1");
071  private final static byte[] row2 = Bytes.toBytes("row2");
072  private final static byte[] family = Bytes.toBytes("family");
073  private final static byte[] qf1 = Bytes.toBytes("qualifier1");
074  private final static byte[] qf2 = Bytes.toBytes("qualifier2");
075  protected final byte[] qf3 = Bytes.toBytes("qualifier3");
076  private static Table table;
077  private static Admin admin;
078  private static ColumnFamilyDescriptor familyDescriptor;
079  private static TableDescriptor tableDescriptor;
080  private static long defaultThreshold = 10;
081  private FileSystem fs;
082  private Configuration conf;
083
084  @Rule
085  public TestName name = new TestName();
086
087  @BeforeClass
088  public static void setUpBeforeClass() throws Exception {
089    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
090      100 * 1024 * 1024);
091    TEST_UTIL.getConfiguration().setInt(HRegion.HBASE_MAX_CELL_SIZE_KEY, 100 * 1024 * 1024);
092    TEST_UTIL.startMiniCluster(1);
093  }
094
095  @AfterClass
096  public static void tearDownAfterClass() throws Exception {
097    TEST_UTIL.shutdownMiniCluster();
098  }
099
100  public void setUp(long threshold, TableName tn) throws Exception {
101    conf = TEST_UTIL.getConfiguration();
102    fs = FileSystem.get(conf);
103    familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(family).setMobEnabled(true)
104      .setMobThreshold(threshold).setMaxVersions(4).build();
105    tableDescriptor =
106      TableDescriptorBuilder.newBuilder(tn).setColumnFamily(familyDescriptor).build();
107    admin = TEST_UTIL.getAdmin();
108    admin.createTable(tableDescriptor);
109    table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()).getTable(tn);
110  }
111
112  /**
113   * Generate the mob value.
114   * @param size the size of the value
115   * @return the mob value generated
116   */
117  private static byte[] generateMobValue(int size) {
118    byte[] mobVal = new byte[size];
119    Bytes.random(mobVal);
120    return mobVal;
121  }
122
123  /**
124   * Set the scan attribute
125   * @param reversed   if true, scan will be backward order
126   * @param mobScanRaw if true, scan will get the mob reference
127   */
128  public void setScan(Scan scan, boolean reversed, boolean mobScanRaw) {
129    scan.setReversed(reversed);
130    scan.readVersions(4);
131    if (mobScanRaw) {
132      scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
133    }
134  }
135
136  @Test
137  public void testMobStoreScanner() throws Exception {
138    testGetFromFiles(false);
139    testGetFromMemStore(false);
140    testGetReferences(false);
141    testMobThreshold(false);
142    testGetFromArchive(false);
143  }
144
145  @Test
146  public void testReversedMobStoreScanner() throws Exception {
147    testGetFromFiles(true);
148    testGetFromMemStore(true);
149    testGetReferences(true);
150    testMobThreshold(true);
151    testGetFromArchive(true);
152  }
153
154  @Test
155  public void testGetMassive() throws Exception {
156    setUp(defaultThreshold, TableName.valueOf(name.getMethodName()));
157
158    // Put some data 5 10, 15, 20 mb ok (this would be right below protobuf
159    // default max size of 64MB.
160    // 25, 30, 40 fail. these is above protobuf max size of 64MB
161    byte[] bigValue = new byte[25 * 1024 * 1024];
162
163    Put put = new Put(row1);
164    Bytes.random(bigValue);
165    put.addColumn(family, qf1, bigValue);
166    table.put(put);
167    put = new Put(row1);
168    Bytes.random(bigValue);
169    put.addColumn(family, qf2, bigValue);
170    table.put(put);
171    put = new Put(row1);
172    Bytes.random(bigValue);
173    put.addColumn(family, qf3, bigValue);
174    table.put(put);
175
176    Get g = new Get(row1);
177    table.get(g);
178    // should not have blown up.
179  }
180
181  @Test
182  public void testReadPt() throws Exception {
183    final TableName tableName = TableName.valueOf(name.getMethodName());
184    setUp(0L, tableName);
185    long ts = EnvironmentEdgeManager.currentTime();
186    byte[] value1 = Bytes.toBytes("value1");
187    Put put1 = new Put(row1);
188    put1.addColumn(family, qf1, ts, value1);
189    table.put(put1);
190    Put put2 = new Put(row2);
191    byte[] value2 = Bytes.toBytes("value2");
192    put2.addColumn(family, qf1, ts, value2);
193    table.put(put2);
194
195    Scan scan = new Scan();
196    scan.setCaching(1);
197    ResultScanner rs = table.getScanner(scan);
198    Result result = rs.next();
199    Put put3 = new Put(row1);
200    byte[] value3 = Bytes.toBytes("value3");
201    put3.addColumn(family, qf1, ts, value3);
202    table.put(put3);
203    Put put4 = new Put(row2);
204    byte[] value4 = Bytes.toBytes("value4");
205    put4.addColumn(family, qf1, ts, value4);
206    table.put(put4);
207
208    Cell cell = result.getColumnLatestCell(family, qf1);
209    Assert.assertArrayEquals(value1, CellUtil.cloneValue(cell));
210
211    admin.flush(tableName);
212    result = rs.next();
213    cell = result.getColumnLatestCell(family, qf1);
214    Assert.assertArrayEquals(value2, CellUtil.cloneValue(cell));
215  }
216
217  @Test
218  public void testReadFromCorruptMobFilesWithReadEmptyValueOnMobCellMiss() throws Exception {
219    final TableName tableName = TableName.valueOf(name.getMethodName());
220    setUp(0, tableName);
221    createRecordAndCorruptMobFile(tableName, row1, family, qf1, Bytes.toBytes("value1"));
222    Get get = new Get(row1);
223    get.setAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS, Bytes.toBytes(true));
224    Result result = table.get(get);
225    Cell cell = result.getColumnLatestCell(family, qf1);
226    Assert.assertEquals(0, cell.getValueLength());
227  }
228
229  @Test
230  public void testReadFromCorruptMobFiles() throws Exception {
231    final TableName tableName = TableName.valueOf(name.getMethodName());
232    setUp(0, tableName);
233    createRecordAndCorruptMobFile(tableName, row1, family, qf1, Bytes.toBytes("value1"));
234    Get get = new Get(row1);
235    IOException ioe = null;
236    try {
237      table.get(get);
238    } catch (IOException e) {
239      ioe = e;
240    }
241    Assert.assertNotNull(ioe);
242    Assert.assertEquals(CorruptHFileException.class.getName(), ioe.getClass().getName());
243  }
244
245  private void createRecordAndCorruptMobFile(TableName tn, byte[] row, byte[] family, byte[] qf,
246    byte[] value) throws IOException {
247    Put put1 = new Put(row);
248    put1.addColumn(family, qf, value);
249    table.put(put1);
250    admin.flush(tn);
251    Path mobFile = getFlushedMobFile(conf, fs, tn, Bytes.toString(family));
252    Assert.assertNotNull(mobFile);
253    // create new corrupt mob file.
254    Path corruptFile = new Path(mobFile.getParent(), "dummy");
255    TestHFile.truncateFile(fs, mobFile, corruptFile);
256    fs.delete(mobFile, true);
257    fs.rename(corruptFile, mobFile);
258  }
259
260  private Path getFlushedMobFile(Configuration conf, FileSystem fs, TableName table, String family)
261    throws IOException {
262    Path famDir = MobUtils.getMobFamilyPath(conf, table, family);
263    FileStatus[] hfFss = fs.listStatus(famDir);
264    for (FileStatus hfs : hfFss) {
265      if (!hfs.isDirectory()) {
266        return hfs.getPath();
267      }
268    }
269    return null;
270  }
271
272  private void testGetFromFiles(boolean reversed) throws Exception {
273    TableName tn = TableName.valueOf("testGetFromFiles" + reversed);
274    testGet(tn, reversed, true);
275  }
276
277  private void testGetFromMemStore(boolean reversed) throws Exception {
278    TableName tn = TableName.valueOf("testGetFromMemStore" + reversed);
279    testGet(tn, reversed, false);
280  }
281
282  private void testGet(TableName tableName, boolean reversed, boolean doFlush) throws Exception {
283    setUp(defaultThreshold, tableName);
284    long ts1 = EnvironmentEdgeManager.currentTime();
285    long ts2 = ts1 + 1;
286    long ts3 = ts1 + 2;
287    byte[] value = generateMobValue((int) defaultThreshold + 1);
288
289    Put put1 = new Put(row1);
290    put1.addColumn(family, qf1, ts3, value);
291    put1.addColumn(family, qf2, ts2, value);
292    put1.addColumn(family, qf3, ts1, value);
293    table.put(put1);
294
295    if (doFlush) {
296      admin.flush(tableName);
297    }
298
299    Scan scan = new Scan();
300    setScan(scan, reversed, false);
301    MobTestUtil.assertCellsValue(table, scan, value, 3);
302  }
303
304  private void testGetReferences(boolean reversed) throws Exception {
305    TableName tn = TableName.valueOf("testGetReferences" + reversed);
306    setUp(defaultThreshold, tn);
307    long ts1 = EnvironmentEdgeManager.currentTime();
308    long ts2 = ts1 + 1;
309    long ts3 = ts1 + 2;
310    byte[] value = generateMobValue((int) defaultThreshold + 1);
311
312    Put put1 = new Put(row1);
313    put1.addColumn(family, qf1, ts3, value);
314    put1.addColumn(family, qf2, ts2, value);
315    put1.addColumn(family, qf3, ts1, value);
316    table.put(put1);
317
318    admin.flush(tn);
319
320    Scan scan = new Scan();
321    setScan(scan, reversed, true);
322
323    ResultScanner results = table.getScanner(scan);
324    int count = 0;
325    for (Result res : results) {
326      List<Cell> cells = res.listCells();
327      for (Cell cell : cells) {
328        // Verify the value
329        assertIsMobReference(cell, row1, family, value, tn);
330        count++;
331      }
332    }
333    results.close();
334    Assert.assertEquals(3, count);
335  }
336
337  private void testMobThreshold(boolean reversed) throws Exception {
338    TableName tn = TableName.valueOf("testMobThreshold" + reversed);
339    setUp(defaultThreshold, tn);
340    byte[] valueLess = generateMobValue((int) defaultThreshold - 1);
341    byte[] valueEqual = generateMobValue((int) defaultThreshold);
342    byte[] valueGreater = generateMobValue((int) defaultThreshold + 1);
343    long ts1 = EnvironmentEdgeManager.currentTime();
344    long ts2 = ts1 + 1;
345    long ts3 = ts1 + 2;
346
347    Put put1 = new Put(row1);
348    put1.addColumn(family, qf1, ts3, valueLess);
349    put1.addColumn(family, qf2, ts2, valueEqual);
350    put1.addColumn(family, qf3, ts1, valueGreater);
351    table.put(put1);
352
353    admin.flush(tn);
354
355    Scan scan = new Scan();
356    setScan(scan, reversed, true);
357
358    Cell cellLess = null;
359    Cell cellEqual = null;
360    Cell cellGreater = null;
361    ResultScanner results = table.getScanner(scan);
362    int count = 0;
363    for (Result res : results) {
364      List<Cell> cells = res.listCells();
365      for (Cell cell : cells) {
366        // Verify the value
367        String qf = Bytes.toString(CellUtil.cloneQualifier(cell));
368        if (qf.equals(Bytes.toString(qf1))) {
369          cellLess = cell;
370        }
371        if (qf.equals(Bytes.toString(qf2))) {
372          cellEqual = cell;
373        }
374        if (qf.equals(Bytes.toString(qf3))) {
375          cellGreater = cell;
376        }
377        count++;
378      }
379    }
380    Assert.assertEquals(3, count);
381    assertNotMobReference(cellLess, row1, family, valueLess);
382    assertNotMobReference(cellEqual, row1, family, valueEqual);
383    assertIsMobReference(cellGreater, row1, family, valueGreater, tn);
384    results.close();
385  }
386
387  private void testGetFromArchive(boolean reversed) throws Exception {
388    TableName tn = TableName.valueOf("testGetFromArchive" + reversed);
389    setUp(defaultThreshold, tn);
390    long ts1 = EnvironmentEdgeManager.currentTime();
391    long ts2 = ts1 + 1;
392    long ts3 = ts1 + 2;
393    byte[] value = generateMobValue((int) defaultThreshold + 1);
394    // Put some data
395    Put put1 = new Put(row1);
396    put1.addColumn(family, qf1, ts3, value);
397    put1.addColumn(family, qf2, ts2, value);
398    put1.addColumn(family, qf3, ts1, value);
399    table.put(put1);
400
401    admin.flush(tn);
402
403    // Get the files in the mob path
404    Path mobFamilyPath;
405    mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn,
406      familyDescriptor.getNameAsString());
407    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
408    FileStatus[] files = fs.listStatus(mobFamilyPath);
409
410    // Get the archive path
411    Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
412    Path tableDir = CommonFSUtils.getTableDir(rootDir, tn);
413    RegionInfo regionInfo = MobUtils.getMobRegionInfo(tn);
414    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(),
415      regionInfo, tableDir, family);
416
417    // Move the files from mob path to archive path
418    fs.mkdirs(storeArchiveDir);
419    int fileCount = 0;
420    for (FileStatus file : files) {
421      fileCount++;
422      Path filePath = file.getPath();
423      Path src = new Path(mobFamilyPath, filePath.getName());
424      Path dst = new Path(storeArchiveDir, filePath.getName());
425      fs.rename(src, dst);
426    }
427
428    // Verify the moving success
429    FileStatus[] files1 = fs.listStatus(mobFamilyPath);
430    Assert.assertEquals(0, files1.length);
431    FileStatus[] files2 = fs.listStatus(storeArchiveDir);
432    Assert.assertEquals(fileCount, files2.length);
433
434    // Scan from archive
435    Scan scan = new Scan();
436    setScan(scan, reversed, false);
437    MobTestUtil.assertCellsValue(table, scan, value, 3);
438  }
439
440  /**
441   * Assert the value is not store in mob.
442   */
443  private static void assertNotMobReference(Cell cell, byte[] row, byte[] family, byte[] value)
444    throws IOException {
445    Assert.assertArrayEquals(row, CellUtil.cloneRow(cell));
446    Assert.assertArrayEquals(family, CellUtil.cloneFamily(cell));
447    Assert.assertArrayEquals(value, CellUtil.cloneValue(cell));
448  }
449
450  /**
451   * Assert the value is store in mob.
452   */
453  private static void assertIsMobReference(Cell cell, byte[] row, byte[] family, byte[] value,
454    TableName tn) throws IOException {
455    Assert.assertArrayEquals(row, CellUtil.cloneRow(cell));
456    Assert.assertArrayEquals(family, CellUtil.cloneFamily(cell));
457    Assert.assertFalse(Bytes.equals(value, CellUtil.cloneValue(cell)));
458    byte[] referenceValue = CellUtil.cloneValue(cell);
459    String fileName = MobUtils.getMobFileName(cell);
460    int valLen = Bytes.toInt(referenceValue, 0, Bytes.SIZEOF_INT);
461    Assert.assertEquals(value.length, valLen);
462    Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn,
463      familyDescriptor.getNameAsString());
464    Path targetPath = new Path(mobFamilyPath, fileName);
465    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
466    Assert.assertTrue(fs.exists(targetPath));
467  }
468}