001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
022
023import java.io.IOException;
024import java.util.List;
025import java.util.concurrent.CountDownLatch;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HColumnDescriptor;
033import org.apache.hadoop.hbase.HTableDescriptor;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.TableNotFoundException;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.RegionLocator;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.ResultScanner;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.io.hfile.HFile;
045import org.apache.hadoop.hbase.io.hfile.HFileContext;
046import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.testclassification.RegionServerTests;
049import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.junit.AfterClass;
052import org.junit.Assert;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059
060@Category({RegionServerTests.class, MediumTests.class})
061public class TestScannerWithBulkload {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestScannerWithBulkload.class);
066
067  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
068
069  @Rule
070  public TestName name = new TestName();
071
072  @BeforeClass
073  public static void setUpBeforeClass() throws Exception {
074    TEST_UTIL.startMiniCluster(1);
075  }
076
077  private static void createTable(Admin admin, TableName tableName) throws IOException {
078    HTableDescriptor desc = new HTableDescriptor(tableName);
079    HColumnDescriptor hcd = new HColumnDescriptor("col");
080    hcd.setMaxVersions(3);
081    desc.addFamily(hcd);
082    admin.createTable(desc);
083  }
084
085  @Test
086  public void testBulkLoad() throws Exception {
087    final TableName tableName = TableName.valueOf(name.getMethodName());
088    long l = System.currentTimeMillis();
089    Admin admin = TEST_UTIL.getAdmin();
090    createTable(admin, tableName);
091    Scan scan = createScan();
092    final Table table = init(admin, l, scan, tableName);
093    // use bulkload
094    final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file",
095      false);
096    Configuration conf = TEST_UTIL.getConfiguration();
097    conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
098    final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
099    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
100      bulkload.doBulkLoad(hfilePath, admin, table, locator);
101    }
102    ResultScanner scanner = table.getScanner(scan);
103    Result result = scanner.next();
104    result = scanAfterBulkLoad(scanner, result, "version2");
105    Put put0 = new Put(Bytes.toBytes("row1"));
106    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
107        .toBytes("version3")));
108    table.put(put0);
109    admin.flush(tableName);
110    scanner = table.getScanner(scan);
111    result = scanner.next();
112    while (result != null) {
113      List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
114      for (Cell _c : cells) {
115        if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())
116            .equals("row1")) {
117          System.out
118              .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
119          System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
120            _c.getQualifierLength()));
121          System.out.println(
122            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
123          Assert.assertEquals("version3",
124            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
125        }
126      }
127      result = scanner.next();
128    }
129    scanner.close();
130    table.close();
131  }
132
133  private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal)
134      throws IOException {
135    while (result != null) {
136      List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
137      for (Cell _c : cells) {
138        if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())
139            .equals("row1")) {
140          System.out
141              .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
142          System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
143            _c.getQualifierLength()));
144          System.out.println(
145            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
146          Assert.assertEquals(expctedVal,
147            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
148        }
149      }
150      result = scanner.next();
151    }
152    return result;
153  }
154
155  // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file.
156  // Else, we will set BULKLOAD_TIME_KEY.
157  private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile)
158      throws IOException {
159    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
160    final Path hfilePath = new Path(hFilePath);
161    fs.mkdirs(hfilePath);
162    Path path = new Path(pathStr);
163    HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
164    Assert.assertNotNull(wf);
165    HFileContext context = new HFileContextBuilder().build();
166    HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create();
167    KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
168        Bytes.toBytes("version2"));
169
170    // Set cell seq id to test bulk load native hfiles.
171    if (nativeHFile) {
172      // Set a big seq id. Scan should not look at this seq id in a bulk loaded file.
173      // Scan should only look at the seq id appended at the bulk load time, and not skip
174      // this kv.
175      kv.setSequenceId(9999999);
176    }
177
178    writer.append(kv);
179
180    if (nativeHFile) {
181      // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file.
182      // Scan should only look at the seq id appended at the bulk load time, and not skip its
183      // kv.
184      writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999)));
185    }
186    else {
187    writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
188    }
189    writer.close();
190    return hfilePath;
191  }
192
193  private Table init(Admin admin, long l, Scan scan, TableName tableName) throws Exception {
194    Table table = TEST_UTIL.getConnection().getTable(tableName);
195    Put put0 = new Put(Bytes.toBytes("row1"));
196    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
197        Bytes.toBytes("version0")));
198    table.put(put0);
199    admin.flush(tableName);
200    Put put1 = new Put(Bytes.toBytes("row2"));
201    put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
202        .toBytes("version0")));
203    table.put(put1);
204    admin.flush(tableName);
205    put0 = new Put(Bytes.toBytes("row1"));
206    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
207        .toBytes("version1")));
208    table.put(put0);
209    admin.flush(tableName);
210    admin.compact(tableName);
211
212    ResultScanner scanner = table.getScanner(scan);
213    Result result = scanner.next();
214    List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
215    Assert.assertEquals(1, cells.size());
216    Cell _c = cells.get(0);
217    Assert.assertEquals("version1",
218      Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
219    scanner.close();
220    return table;
221  }
222
223  @Test
224  public void testBulkLoadWithParallelScan() throws Exception {
225    final TableName tableName = TableName.valueOf(name.getMethodName());
226      final long l = System.currentTimeMillis();
227    final Admin admin = TEST_UTIL.getAdmin();
228    createTable(admin, tableName);
229    Scan scan = createScan();
230    scan.setCaching(1);
231    final Table table = init(admin, l, scan, tableName);
232    // use bulkload
233    final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/",
234        "/temp/testBulkLoadWithParallelScan/col/file", false);
235    Configuration conf = TEST_UTIL.getConfiguration();
236    conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
237    final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
238    ResultScanner scanner = table.getScanner(scan);
239    Result result = scanner.next();
240    // Create a scanner and then do bulk load
241    final CountDownLatch latch = new CountDownLatch(1);
242    new Thread() {
243      @Override
244      public void run() {
245        try {
246          Put put1 = new Put(Bytes.toBytes("row5"));
247          put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
248              Bytes.toBytes("version0")));
249          table.put(put1);
250          try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
251            bulkload.doBulkLoad(hfilePath, admin, table, locator);
252          }
253          latch.countDown();
254        } catch (TableNotFoundException e) {
255        } catch (IOException e) {
256        }
257      }
258    }.start();
259    latch.await();
260    // By the time we do next() the bulk loaded files are also added to the kv
261    // scanner
262    scanAfterBulkLoad(scanner, result, "version1");
263    scanner.close();
264    table.close();
265  }
266
267  @Test
268  public void testBulkLoadNativeHFile() throws Exception {
269    final TableName tableName = TableName.valueOf(name.getMethodName());
270    long l = System.currentTimeMillis();
271    Admin admin = TEST_UTIL.getAdmin();
272    createTable(admin, tableName);
273    Scan scan = createScan();
274    final Table table = init(admin, l, scan, tableName);
275    // use bulkload
276    final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/",
277      "/temp/testBulkLoadNativeHFile/col/file", true);
278    Configuration conf = TEST_UTIL.getConfiguration();
279    conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
280    final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
281    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
282      bulkload.doBulkLoad(hfilePath, admin, table, locator);
283    }
284    ResultScanner scanner = table.getScanner(scan);
285    Result result = scanner.next();
286    // We had 'version0', 'version1' for 'row1,col:q' in the table.
287    // Bulk load added 'version2'  scanner should be able to see 'version2'
288    result = scanAfterBulkLoad(scanner, result, "version2");
289    Put put0 = new Put(Bytes.toBytes("row1"));
290    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
291        .toBytes("version3")));
292    table.put(put0);
293    admin.flush(tableName);
294    scanner = table.getScanner(scan);
295    result = scanner.next();
296    while (result != null) {
297      List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
298      for (Cell _c : cells) {
299        if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())
300            .equals("row1")) {
301          System.out
302              .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
303          System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
304            _c.getQualifierLength()));
305          System.out.println(
306            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
307          Assert.assertEquals("version3",
308            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
309        }
310      }
311      result = scanner.next();
312    }
313    scanner.close();
314    table.close();
315  }
316
317  private Scan createScan() {
318    Scan scan = new Scan();
319    scan.setMaxVersions(3);
320    return scan;
321  }
322
323  @AfterClass
324  public static void tearDownAfterClass() throws Exception {
325    TEST_UTIL.shutdownMiniCluster();
326  }
327}