001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
022
023import java.io.IOException;
024import java.util.List;
025import java.util.concurrent.CountDownLatch;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.TableNotFoundException;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.ResultScanner;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
044import org.apache.hadoop.hbase.io.hfile.HFile;
045import org.apache.hadoop.hbase.io.hfile.HFileContext;
046import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.testclassification.RegionServerTests;
049import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.junit.AfterClass;
053import org.junit.Assert;
054import org.junit.BeforeClass;
055import org.junit.ClassRule;
056import org.junit.Rule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.junit.rules.TestName;
060
061@Category({ RegionServerTests.class, MediumTests.class })
062public class TestScannerWithBulkload {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066    HBaseClassTestRule.forClass(TestScannerWithBulkload.class);
067
068  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
069
070  @Rule
071  public TestName name = new TestName();
072
073  @BeforeClass
074  public static void setUpBeforeClass() throws Exception {
075    TEST_UTIL.startMiniCluster(1);
076  }
077
078  private static void createTable(Admin admin, TableName tableName) throws IOException {
079    TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);
080    ColumnFamilyDescriptor columnFamilyDescriptor =
081      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("col")).setMaxVersions(3).build();
082    tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
083    admin.createTable(tableDescriptorBuilder.build());
084  }
085
086  @Test
087  public void testBulkLoad() throws Exception {
088    final TableName tableName = TableName.valueOf(name.getMethodName());
089    long l = EnvironmentEdgeManager.currentTime();
090    Admin admin = TEST_UTIL.getAdmin();
091    createTable(admin, tableName);
092    Scan scan = createScan();
093    final Table table = init(admin, l, scan, tableName);
094    // use bulkload
095    final Path hfilePath =
096      writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file", false);
097    Configuration conf = TEST_UTIL.getConfiguration();
098    conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
099    BulkLoadHFiles.create(conf).bulkLoad(tableName, hfilePath);
100    ResultScanner scanner = table.getScanner(scan);
101    Result result = scanner.next();
102    result = scanAfterBulkLoad(scanner, result, "version2");
103    Put put0 = new Put(Bytes.toBytes("row1"));
104    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
105      Bytes.toBytes("version3")));
106    table.put(put0);
107    admin.flush(tableName);
108    scanner = table.getScanner(scan);
109    result = scanner.next();
110    while (result != null) {
111      List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
112      for (Cell _c : cells) {
113        if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()).equals("row1")) {
114          System.out
115            .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
116          System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
117            _c.getQualifierLength()));
118          System.out
119            .println(Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
120          Assert.assertEquals("version3",
121            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
122        }
123      }
124      result = scanner.next();
125    }
126    scanner.close();
127    table.close();
128  }
129
130  private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal)
131    throws IOException {
132    while (result != null) {
133      List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
134      for (Cell _c : cells) {
135        if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()).equals("row1")) {
136          System.out
137            .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
138          System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
139            _c.getQualifierLength()));
140          System.out
141            .println(Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
142          Assert.assertEquals(expctedVal,
143            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
144        }
145      }
146      result = scanner.next();
147    }
148    return result;
149  }
150
151  // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file.
152  // Else, we will set BULKLOAD_TIME_KEY.
153  private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile)
154    throws IOException {
155    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
156    final Path hfilePath = new Path(hFilePath);
157    fs.mkdirs(hfilePath);
158    Path path = new Path(pathStr);
159    HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
160    Assert.assertNotNull(wf);
161    HFileContext context = new HFileContextBuilder().build();
162    HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create();
163    KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
164      Bytes.toBytes("version2"));
165
166    // Set cell seq id to test bulk load native hfiles.
167    if (nativeHFile) {
168      // Set a big seq id. Scan should not look at this seq id in a bulk loaded file.
169      // Scan should only look at the seq id appended at the bulk load time, and not skip
170      // this kv.
171      kv.setSequenceId(9999999);
172    }
173
174    writer.append(kv);
175
176    if (nativeHFile) {
177      // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file.
178      // Scan should only look at the seq id appended at the bulk load time, and not skip its
179      // kv.
180      writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999)));
181    } else {
182      writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
183    }
184    writer.close();
185    return hfilePath;
186  }
187
188  private Table init(Admin admin, long l, Scan scan, TableName tableName) throws Exception {
189    Table table = TEST_UTIL.getConnection().getTable(tableName);
190    Put put0 = new Put(Bytes.toBytes("row1"));
191    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
192      Bytes.toBytes("version0")));
193    table.put(put0);
194    admin.flush(tableName);
195    Put put1 = new Put(Bytes.toBytes("row2"));
196    put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
197      Bytes.toBytes("version0")));
198    table.put(put1);
199    admin.flush(tableName);
200    put0 = new Put(Bytes.toBytes("row1"));
201    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
202      Bytes.toBytes("version1")));
203    table.put(put0);
204    admin.flush(tableName);
205    admin.compact(tableName);
206
207    ResultScanner scanner = table.getScanner(scan);
208    Result result = scanner.next();
209    List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
210    Assert.assertEquals(1, cells.size());
211    Cell _c = cells.get(0);
212    Assert.assertEquals("version1",
213      Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
214    scanner.close();
215    return table;
216  }
217
218  @Test
219  public void testBulkLoadWithParallelScan() throws Exception {
220    final TableName tableName = TableName.valueOf(name.getMethodName());
221    final long l = EnvironmentEdgeManager.currentTime();
222    final Admin admin = TEST_UTIL.getAdmin();
223    createTable(admin, tableName);
224    Scan scan = createScan();
225    scan.setCaching(1);
226    final Table table = init(admin, l, scan, tableName);
227    // use bulkload
228    final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/",
229      "/temp/testBulkLoadWithParallelScan/col/file", false);
230    Configuration conf = TEST_UTIL.getConfiguration();
231    conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
232    final BulkLoadHFiles bulkload = BulkLoadHFiles.create(conf);
233    ResultScanner scanner = table.getScanner(scan);
234    Result result = scanner.next();
235    // Create a scanner and then do bulk load
236    final CountDownLatch latch = new CountDownLatch(1);
237    new Thread() {
238      @Override
239      public void run() {
240        try {
241          Put put1 = new Put(Bytes.toBytes("row5"));
242          put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
243            Bytes.toBytes("version0")));
244          table.put(put1);
245          bulkload.bulkLoad(tableName, hfilePath);
246          latch.countDown();
247        } catch (TableNotFoundException e) {
248        } catch (IOException e) {
249        }
250      }
251    }.start();
252    latch.await();
253    // By the time we do next() the bulk loaded files are also added to the kv
254    // scanner
255    scanAfterBulkLoad(scanner, result, "version1");
256    scanner.close();
257    table.close();
258  }
259
260  @Test
261  public void testBulkLoadNativeHFile() throws Exception {
262    final TableName tableName = TableName.valueOf(name.getMethodName());
263    long l = EnvironmentEdgeManager.currentTime();
264    Admin admin = TEST_UTIL.getAdmin();
265    createTable(admin, tableName);
266    Scan scan = createScan();
267    final Table table = init(admin, l, scan, tableName);
268    // use bulkload
269    final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/",
270      "/temp/testBulkLoadNativeHFile/col/file", true);
271    Configuration conf = TEST_UTIL.getConfiguration();
272    conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
273    BulkLoadHFiles.create(conf).bulkLoad(tableName, hfilePath);
274    ResultScanner scanner = table.getScanner(scan);
275    Result result = scanner.next();
276    // We had 'version0', 'version1' for 'row1,col:q' in the table.
277    // Bulk load added 'version2' scanner should be able to see 'version2'
278    result = scanAfterBulkLoad(scanner, result, "version2");
279    Put put0 = new Put(Bytes.toBytes("row1"));
280    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
281      Bytes.toBytes("version3")));
282    table.put(put0);
283    admin.flush(tableName);
284    scanner = table.getScanner(scan);
285    result = scanner.next();
286    while (result != null) {
287      List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
288      for (Cell _c : cells) {
289        if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()).equals("row1")) {
290          System.out
291            .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
292          System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
293            _c.getQualifierLength()));
294          System.out
295            .println(Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
296          Assert.assertEquals("version3",
297            Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
298        }
299      }
300      result = scanner.next();
301    }
302    scanner.close();
303    table.close();
304  }
305
306  private Scan createScan() {
307    Scan scan = new Scan();
308    scan.readVersions(3);
309    return scan;
310  }
311
312  @AfterClass
313  public static void tearDownAfterClass() throws Exception {
314    TEST_UTIL.shutdownMiniCluster();
315  }
316}