001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
022
023import java.io.IOException;
024import java.util.List;
025import java.util.concurrent.CountDownLatch;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HColumnDescriptor;
033import org.apache.hadoop.hbase.HTableDescriptor;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.TableNotFoundException;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.RegionLocator;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.ResultScanner;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.io.hfile.HFile;
045import org.apache.hadoop.hbase.io.hfile.HFileContext;
046import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.testclassification.RegionServerTests;
049import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.junit.AfterClass;
053import org.junit.Assert;
054import org.junit.BeforeClass;
055import org.junit.ClassRule;
056import org.junit.Rule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.junit.rules.TestName;
060
061@Category({ RegionServerTests.class, MediumTests.class })
062public class TestScannerWithBulkload {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066    HBaseClassTestRule.forClass(TestScannerWithBulkload.class);
067
068  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
069
070  @Rule
071  public TestName name = new TestName();
072
073  @BeforeClass
074  public static void setUpBeforeClass() throws Exception {
075    TEST_UTIL.startMiniCluster(1);
076  }
077
078  private static void createTable(Admin admin, TableName tableName) throws IOException {
079    HTableDescriptor desc = new HTableDescriptor(tableName);
080    HColumnDescriptor hcd = new HColumnDescriptor("col");
081    hcd.setMaxVersions(3);
082    desc.addFamily(hcd);
083    admin.createTable(desc);
084  }
085
086  @Test
087  public void testBulkLoad() throws Exception {
088    final TableName tableName = TableName.valueOf(name.getMethodName());
089    long l = EnvironmentEdgeManager.currentTime();
090    Admin admin = TEST_UTIL.getAdmin();
091    createTable(admin, tableName);
092    Scan scan = createScan();
093    final Table table = init(admin, l, scan, tableName);
094    // use bulkload
095    final Path hfilePath =
096      writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file", false);
097    Configuration conf = TEST_UTIL.getConfiguration();
098    conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
099    final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
100    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
101      bulkload.doBulkLoad(hfilePath, admin, table, locator);
102    }
103    ResultScanner scanner = table.getScanner(scan);
104    Result result = scanner.next();
105    result = scanAfterBulkLoad(scanner, result, "version2");
106    Put put0 = new Put(Bytes.toBytes("row1"));
107    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
108      Bytes.toBytes("version3")));
109    table.put(put0);
110    admin.flush(tableName);
111    scanner = table.getScanner(scan);
112    result = scanner.next();
113    while (result != null) {
114      List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
115      for (Cell _c : cells) {
116        if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()).equals("row1")) {
117          System.out
118            .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
119          System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
120            _c.getQualifierLength()));
121          System.out
122            .println(Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
123          Assert.assertEquals("version3",
124            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
125        }
126      }
127      result = scanner.next();
128    }
129    scanner.close();
130    table.close();
131  }
132
133  private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal)
134    throws IOException {
135    while (result != null) {
136      List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
137      for (Cell _c : cells) {
138        if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()).equals("row1")) {
139          System.out
140            .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
141          System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
142            _c.getQualifierLength()));
143          System.out
144            .println(Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
145          Assert.assertEquals(expctedVal,
146            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
147        }
148      }
149      result = scanner.next();
150    }
151    return result;
152  }
153
154  // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file.
155  // Else, we will set BULKLOAD_TIME_KEY.
156  private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile)
157    throws IOException {
158    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
159    final Path hfilePath = new Path(hFilePath);
160    fs.mkdirs(hfilePath);
161    Path path = new Path(pathStr);
162    HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
163    Assert.assertNotNull(wf);
164    HFileContext context = new HFileContextBuilder().build();
165    HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create();
166    KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
167      Bytes.toBytes("version2"));
168
169    // Set cell seq id to test bulk load native hfiles.
170    if (nativeHFile) {
171      // Set a big seq id. Scan should not look at this seq id in a bulk loaded file.
172      // Scan should only look at the seq id appended at the bulk load time, and not skip
173      // this kv.
174      kv.setSequenceId(9999999);
175    }
176
177    writer.append(kv);
178
179    if (nativeHFile) {
180      // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file.
181      // Scan should only look at the seq id appended at the bulk load time, and not skip its
182      // kv.
183      writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999)));
184    } else {
185      writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
186    }
187    writer.close();
188    return hfilePath;
189  }
190
191  private Table init(Admin admin, long l, Scan scan, TableName tableName) throws Exception {
192    Table table = TEST_UTIL.getConnection().getTable(tableName);
193    Put put0 = new Put(Bytes.toBytes("row1"));
194    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
195      Bytes.toBytes("version0")));
196    table.put(put0);
197    admin.flush(tableName);
198    Put put1 = new Put(Bytes.toBytes("row2"));
199    put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
200      Bytes.toBytes("version0")));
201    table.put(put1);
202    admin.flush(tableName);
203    put0 = new Put(Bytes.toBytes("row1"));
204    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
205      Bytes.toBytes("version1")));
206    table.put(put0);
207    admin.flush(tableName);
208    admin.compact(tableName);
209
210    ResultScanner scanner = table.getScanner(scan);
211    Result result = scanner.next();
212    List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
213    Assert.assertEquals(1, cells.size());
214    Cell _c = cells.get(0);
215    Assert.assertEquals("version1",
216      Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
217    scanner.close();
218    return table;
219  }
220
221  @Test
222  public void testBulkLoadWithParallelScan() throws Exception {
223    final TableName tableName = TableName.valueOf(name.getMethodName());
224    final long l = EnvironmentEdgeManager.currentTime();
225    final Admin admin = TEST_UTIL.getAdmin();
226    createTable(admin, tableName);
227    Scan scan = createScan();
228    scan.setCaching(1);
229    final Table table = init(admin, l, scan, tableName);
230    // use bulkload
231    final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/",
232      "/temp/testBulkLoadWithParallelScan/col/file", false);
233    Configuration conf = TEST_UTIL.getConfiguration();
234    conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
235    final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
236    ResultScanner scanner = table.getScanner(scan);
237    Result result = scanner.next();
238    // Create a scanner and then do bulk load
239    final CountDownLatch latch = new CountDownLatch(1);
240    new Thread() {
241      @Override
242      public void run() {
243        try {
244          Put put1 = new Put(Bytes.toBytes("row5"));
245          put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
246            Bytes.toBytes("version0")));
247          table.put(put1);
248          try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
249            bulkload.doBulkLoad(hfilePath, admin, table, locator);
250          }
251          latch.countDown();
252        } catch (TableNotFoundException e) {
253        } catch (IOException e) {
254        }
255      }
256    }.start();
257    latch.await();
258    // By the time we do next() the bulk loaded files are also added to the kv
259    // scanner
260    scanAfterBulkLoad(scanner, result, "version1");
261    scanner.close();
262    table.close();
263  }
264
265  @Test
266  public void testBulkLoadNativeHFile() throws Exception {
267    final TableName tableName = TableName.valueOf(name.getMethodName());
268    long l = EnvironmentEdgeManager.currentTime();
269    Admin admin = TEST_UTIL.getAdmin();
270    createTable(admin, tableName);
271    Scan scan = createScan();
272    final Table table = init(admin, l, scan, tableName);
273    // use bulkload
274    final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/",
275      "/temp/testBulkLoadNativeHFile/col/file", true);
276    Configuration conf = TEST_UTIL.getConfiguration();
277    conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
278    final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
279    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
280      bulkload.doBulkLoad(hfilePath, admin, table, locator);
281    }
282    ResultScanner scanner = table.getScanner(scan);
283    Result result = scanner.next();
284    // We had 'version0', 'version1' for 'row1,col:q' in the table.
285    // Bulk load added 'version2' scanner should be able to see 'version2'
286    result = scanAfterBulkLoad(scanner, result, "version2");
287    Put put0 = new Put(Bytes.toBytes("row1"));
288    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
289      Bytes.toBytes("version3")));
290    table.put(put0);
291    admin.flush(tableName);
292    scanner = table.getScanner(scan);
293    result = scanner.next();
294    while (result != null) {
295      List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
296      for (Cell _c : cells) {
297        if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()).equals("row1")) {
298          System.out
299            .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
300          System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
301            _c.getQualifierLength()));
302          System.out
303            .println(Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
304          Assert.assertEquals("version3",
305            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
306        }
307      }
308      result = scanner.next();
309    }
310    scanner.close();
311    table.close();
312  }
313
314  private Scan createScan() {
315    Scan scan = new Scan();
316    scan.setMaxVersions(3);
317    return scan;
318  }
319
320  @AfterClass
321  public static void tearDownAfterClass() throws Exception {
322    TEST_UTIL.shutdownMiniCluster();
323  }
324}