001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
022
023import java.io.IOException;
024import java.util.List;
025import java.util.concurrent.CountDownLatch;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HColumnDescriptor;
033import org.apache.hadoop.hbase.HTableDescriptor;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.TableNotFoundException;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.RegionLocator;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.ResultScanner;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.io.hfile.HFile;
045import org.apache.hadoop.hbase.io.hfile.HFileContext;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.testclassification.RegionServerTests;
048import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.junit.AfterClass;
051import org.junit.Assert;
052import org.junit.BeforeClass;
053import org.junit.ClassRule;
054import org.junit.Rule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.junit.rules.TestName;
058
059@Category({RegionServerTests.class, MediumTests.class})
060public class TestScannerWithBulkload {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064      HBaseClassTestRule.forClass(TestScannerWithBulkload.class);
065
066  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
067
068  @Rule
069  public TestName name = new TestName();
070
071  @BeforeClass
072  public static void setUpBeforeClass() throws Exception {
073    TEST_UTIL.startMiniCluster(1);
074  }
075
076  private static void createTable(Admin admin, TableName tableName) throws IOException {
077    HTableDescriptor desc = new HTableDescriptor(tableName);
078    HColumnDescriptor hcd = new HColumnDescriptor("col");
079    hcd.setMaxVersions(3);
080    desc.addFamily(hcd);
081    admin.createTable(desc);
082  }
083
084  @Test
085  public void testBulkLoad() throws Exception {
086    final TableName tableName = TableName.valueOf(name.getMethodName());
087    long l = System.currentTimeMillis();
088    Admin admin = TEST_UTIL.getAdmin();
089    createTable(admin, tableName);
090    Scan scan = createScan();
091    final Table table = init(admin, l, scan, tableName);
092    // use bulkload
093    final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file",
094      false);
095    Configuration conf = TEST_UTIL.getConfiguration();
096    conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
097    final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
098    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
099      bulkload.doBulkLoad(hfilePath, admin, table, locator);
100    }
101    ResultScanner scanner = table.getScanner(scan);
102    Result result = scanner.next();
103    result = scanAfterBulkLoad(scanner, result, "version2");
104    Put put0 = new Put(Bytes.toBytes("row1"));
105    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
106        .toBytes("version3")));
107    table.put(put0);
108    admin.flush(tableName);
109    scanner = table.getScanner(scan);
110    result = scanner.next();
111    while (result != null) {
112      List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
113      for (Cell _c : cells) {
114        if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())
115            .equals("row1")) {
116          System.out
117              .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
118          System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
119            _c.getQualifierLength()));
120          System.out.println(
121            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
122          Assert.assertEquals("version3",
123            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
124        }
125      }
126      result = scanner.next();
127    }
128    scanner.close();
129    table.close();
130  }
131
132  private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal)
133      throws IOException {
134    while (result != null) {
135      List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
136      for (Cell _c : cells) {
137        if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())
138            .equals("row1")) {
139          System.out
140              .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
141          System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
142            _c.getQualifierLength()));
143          System.out.println(
144            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
145          Assert.assertEquals(expctedVal,
146            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
147        }
148      }
149      result = scanner.next();
150    }
151    return result;
152  }
153
154  // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file.
155  // Else, we will set BULKLOAD_TIME_KEY.
156  private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile)
157      throws IOException {
158    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
159    final Path hfilePath = new Path(hFilePath);
160    fs.mkdirs(hfilePath);
161    Path path = new Path(pathStr);
162    HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
163    Assert.assertNotNull(wf);
164    HFileContext context = new HFileContext();
165    HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create();
166    KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
167        Bytes.toBytes("version2"));
168
169    // Set cell seq id to test bulk load native hfiles.
170    if (nativeHFile) {
171      // Set a big seq id. Scan should not look at this seq id in a bulk loaded file.
172      // Scan should only look at the seq id appended at the bulk load time, and not skip
173      // this kv.
174      kv.setSequenceId(9999999);
175    }
176
177    writer.append(kv);
178
179    if (nativeHFile) {
180      // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file.
181      // Scan should only look at the seq id appended at the bulk load time, and not skip its
182      // kv.
183      writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999)));
184    }
185    else {
186    writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
187    }
188    writer.close();
189    return hfilePath;
190  }
191
192  private Table init(Admin admin, long l, Scan scan, TableName tableName) throws Exception {
193    Table table = TEST_UTIL.getConnection().getTable(tableName);
194    Put put0 = new Put(Bytes.toBytes("row1"));
195    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
196        Bytes.toBytes("version0")));
197    table.put(put0);
198    admin.flush(tableName);
199    Put put1 = new Put(Bytes.toBytes("row2"));
200    put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
201        .toBytes("version0")));
202    table.put(put1);
203    admin.flush(tableName);
204    put0 = new Put(Bytes.toBytes("row1"));
205    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
206        .toBytes("version1")));
207    table.put(put0);
208    admin.flush(tableName);
209    admin.compact(tableName);
210
211    ResultScanner scanner = table.getScanner(scan);
212    Result result = scanner.next();
213    List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
214    Assert.assertEquals(1, cells.size());
215    Cell _c = cells.get(0);
216    Assert.assertEquals("version1",
217      Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
218    scanner.close();
219    return table;
220  }
221
222  @Test
223  public void testBulkLoadWithParallelScan() throws Exception {
224    final TableName tableName = TableName.valueOf(name.getMethodName());
225      final long l = System.currentTimeMillis();
226    final Admin admin = TEST_UTIL.getAdmin();
227    createTable(admin, tableName);
228    Scan scan = createScan();
229    scan.setCaching(1);
230    final Table table = init(admin, l, scan, tableName);
231    // use bulkload
232    final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/",
233        "/temp/testBulkLoadWithParallelScan/col/file", false);
234    Configuration conf = TEST_UTIL.getConfiguration();
235    conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
236    final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
237    ResultScanner scanner = table.getScanner(scan);
238    Result result = scanner.next();
239    // Create a scanner and then do bulk load
240    final CountDownLatch latch = new CountDownLatch(1);
241    new Thread() {
242      @Override
243      public void run() {
244        try {
245          Put put1 = new Put(Bytes.toBytes("row5"));
246          put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
247              Bytes.toBytes("version0")));
248          table.put(put1);
249          try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
250            bulkload.doBulkLoad(hfilePath, admin, table, locator);
251          }
252          latch.countDown();
253        } catch (TableNotFoundException e) {
254        } catch (IOException e) {
255        }
256      }
257    }.start();
258    latch.await();
259    // By the time we do next() the bulk loaded files are also added to the kv
260    // scanner
261    scanAfterBulkLoad(scanner, result, "version1");
262    scanner.close();
263    table.close();
264  }
265
266  @Test
267  public void testBulkLoadNativeHFile() throws Exception {
268    final TableName tableName = TableName.valueOf(name.getMethodName());
269    long l = System.currentTimeMillis();
270    Admin admin = TEST_UTIL.getAdmin();
271    createTable(admin, tableName);
272    Scan scan = createScan();
273    final Table table = init(admin, l, scan, tableName);
274    // use bulkload
275    final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/",
276      "/temp/testBulkLoadNativeHFile/col/file", true);
277    Configuration conf = TEST_UTIL.getConfiguration();
278    conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
279    final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
280    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
281      bulkload.doBulkLoad(hfilePath, admin, table, locator);
282    }
283    ResultScanner scanner = table.getScanner(scan);
284    Result result = scanner.next();
285    // We had 'version0', 'version1' for 'row1,col:q' in the table.
286    // Bulk load added 'version2'  scanner should be able to see 'version2'
287    result = scanAfterBulkLoad(scanner, result, "version2");
288    Put put0 = new Put(Bytes.toBytes("row1"));
289    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
290        .toBytes("version3")));
291    table.put(put0);
292    admin.flush(tableName);
293    scanner = table.getScanner(scan);
294    result = scanner.next();
295    while (result != null) {
296      List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
297      for (Cell _c : cells) {
298        if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())
299            .equals("row1")) {
300          System.out
301              .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
302          System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
303            _c.getQualifierLength()));
304          System.out.println(
305            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
306          Assert.assertEquals("version3",
307            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
308        }
309      }
310      result = scanner.next();
311    }
312    scanner.close();
313    table.close();
314  }
315
316  private Scan createScan() {
317    Scan scan = new Scan();
318    scan.setMaxVersions(3);
319    return scan;
320  }
321
322  @AfterClass
323  public static void tearDownAfterClass() throws Exception {
324    TEST_UTIL.shutdownMiniCluster();
325  }
326}