001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.concurrent.CountDownLatch; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HColumnDescriptor; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.TableNotFoundException; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionLocator; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.io.hfile.HFile; 045import org.apache.hadoop.hbase.io.hfile.HFileContext; 046import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.testclassification.RegionServerTests; 049import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.junit.AfterClass; 052import org.junit.Assert; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059 060@Category({RegionServerTests.class, MediumTests.class}) 061public class TestScannerWithBulkload { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestScannerWithBulkload.class); 066 067 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 068 069 @Rule 070 public TestName name = new TestName(); 071 072 @BeforeClass 073 public static void setUpBeforeClass() throws Exception { 074 TEST_UTIL.startMiniCluster(1); 075 } 076 077 private static void createTable(Admin admin, TableName tableName) throws IOException { 078 HTableDescriptor desc = new HTableDescriptor(tableName); 079 HColumnDescriptor hcd = new HColumnDescriptor("col"); 080 hcd.setMaxVersions(3); 081 desc.addFamily(hcd); 082 admin.createTable(desc); 083 } 084 085 @Test 086 public void testBulkLoad() throws Exception { 087 final TableName tableName = TableName.valueOf(name.getMethodName()); 088 long l = System.currentTimeMillis(); 089 Admin admin = TEST_UTIL.getAdmin(); 090 createTable(admin, tableName); 091 Scan scan = createScan(); 092 final Table table = init(admin, l, scan, tableName); 093 // use bulkload 094 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file", 095 false); 096 Configuration conf = TEST_UTIL.getConfiguration(); 097 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); 098 final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); 099 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 100 bulkload.doBulkLoad(hfilePath, admin, table, locator); 101 } 102 ResultScanner scanner = table.getScanner(scan); 103 Result result = scanner.next(); 104 result = scanAfterBulkLoad(scanner, result, "version2"); 105 Put put0 = new Put(Bytes.toBytes("row1")); 106 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes 107 .toBytes("version3"))); 108 table.put(put0); 109 admin.flush(tableName); 110 scanner = table.getScanner(scan); 111 result = scanner.next(); 112 while (result != null) { 113 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 114 for (Cell _c : cells) { 115 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()) 116 .equals("row1")) { 117 System.out 118 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())); 119 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(), 120 _c.getQualifierLength())); 121 System.out.println( 122 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 123 Assert.assertEquals("version3", 124 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 125 } 126 } 127 result = scanner.next(); 128 } 129 scanner.close(); 130 table.close(); 131 } 132 133 private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal) 134 throws IOException { 135 while (result != null) { 136 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 137 for (Cell _c : cells) { 138 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()) 139 .equals("row1")) { 140 System.out 141 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())); 142 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(), 143 _c.getQualifierLength())); 144 System.out.println( 145 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 146 Assert.assertEquals(expctedVal, 147 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 148 } 149 } 150 result = scanner.next(); 151 } 152 return result; 153 } 154 155 // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file. 156 // Else, we will set BULKLOAD_TIME_KEY. 157 private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile) 158 throws IOException { 159 FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); 160 final Path hfilePath = new Path(hFilePath); 161 fs.mkdirs(hfilePath); 162 Path path = new Path(pathStr); 163 HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); 164 Assert.assertNotNull(wf); 165 HFileContext context = new HFileContextBuilder().build(); 166 HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create(); 167 KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 168 Bytes.toBytes("version2")); 169 170 // Set cell seq id to test bulk load native hfiles. 171 if (nativeHFile) { 172 // Set a big seq id. Scan should not look at this seq id in a bulk loaded file. 173 // Scan should only look at the seq id appended at the bulk load time, and not skip 174 // this kv. 175 kv.setSequenceId(9999999); 176 } 177 178 writer.append(kv); 179 180 if (nativeHFile) { 181 // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file. 182 // Scan should only look at the seq id appended at the bulk load time, and not skip its 183 // kv. 184 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999))); 185 } 186 else { 187 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); 188 } 189 writer.close(); 190 return hfilePath; 191 } 192 193 private Table init(Admin admin, long l, Scan scan, TableName tableName) throws Exception { 194 Table table = TEST_UTIL.getConnection().getTable(tableName); 195 Put put0 = new Put(Bytes.toBytes("row1")); 196 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 197 Bytes.toBytes("version0"))); 198 table.put(put0); 199 admin.flush(tableName); 200 Put put1 = new Put(Bytes.toBytes("row2")); 201 put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes 202 .toBytes("version0"))); 203 table.put(put1); 204 admin.flush(tableName); 205 put0 = new Put(Bytes.toBytes("row1")); 206 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes 207 .toBytes("version1"))); 208 table.put(put0); 209 admin.flush(tableName); 210 admin.compact(tableName); 211 212 ResultScanner scanner = table.getScanner(scan); 213 Result result = scanner.next(); 214 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 215 Assert.assertEquals(1, cells.size()); 216 Cell _c = cells.get(0); 217 Assert.assertEquals("version1", 218 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 219 scanner.close(); 220 return table; 221 } 222 223 @Test 224 public void testBulkLoadWithParallelScan() throws Exception { 225 final TableName tableName = TableName.valueOf(name.getMethodName()); 226 final long l = System.currentTimeMillis(); 227 final Admin admin = TEST_UTIL.getAdmin(); 228 createTable(admin, tableName); 229 Scan scan = createScan(); 230 scan.setCaching(1); 231 final Table table = init(admin, l, scan, tableName); 232 // use bulkload 233 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/", 234 "/temp/testBulkLoadWithParallelScan/col/file", false); 235 Configuration conf = TEST_UTIL.getConfiguration(); 236 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); 237 final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); 238 ResultScanner scanner = table.getScanner(scan); 239 Result result = scanner.next(); 240 // Create a scanner and then do bulk load 241 final CountDownLatch latch = new CountDownLatch(1); 242 new Thread() { 243 @Override 244 public void run() { 245 try { 246 Put put1 = new Put(Bytes.toBytes("row5")); 247 put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 248 Bytes.toBytes("version0"))); 249 table.put(put1); 250 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 251 bulkload.doBulkLoad(hfilePath, admin, table, locator); 252 } 253 latch.countDown(); 254 } catch (TableNotFoundException e) { 255 } catch (IOException e) { 256 } 257 } 258 }.start(); 259 latch.await(); 260 // By the time we do next() the bulk loaded files are also added to the kv 261 // scanner 262 scanAfterBulkLoad(scanner, result, "version1"); 263 scanner.close(); 264 table.close(); 265 } 266 267 @Test 268 public void testBulkLoadNativeHFile() throws Exception { 269 final TableName tableName = TableName.valueOf(name.getMethodName()); 270 long l = System.currentTimeMillis(); 271 Admin admin = TEST_UTIL.getAdmin(); 272 createTable(admin, tableName); 273 Scan scan = createScan(); 274 final Table table = init(admin, l, scan, tableName); 275 // use bulkload 276 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/", 277 "/temp/testBulkLoadNativeHFile/col/file", true); 278 Configuration conf = TEST_UTIL.getConfiguration(); 279 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); 280 final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); 281 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 282 bulkload.doBulkLoad(hfilePath, admin, table, locator); 283 } 284 ResultScanner scanner = table.getScanner(scan); 285 Result result = scanner.next(); 286 // We had 'version0', 'version1' for 'row1,col:q' in the table. 287 // Bulk load added 'version2' scanner should be able to see 'version2' 288 result = scanAfterBulkLoad(scanner, result, "version2"); 289 Put put0 = new Put(Bytes.toBytes("row1")); 290 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes 291 .toBytes("version3"))); 292 table.put(put0); 293 admin.flush(tableName); 294 scanner = table.getScanner(scan); 295 result = scanner.next(); 296 while (result != null) { 297 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 298 for (Cell _c : cells) { 299 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()) 300 .equals("row1")) { 301 System.out 302 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())); 303 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(), 304 _c.getQualifierLength())); 305 System.out.println( 306 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 307 Assert.assertEquals("version3", 308 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 309 } 310 } 311 result = scanner.next(); 312 } 313 scanner.close(); 314 table.close(); 315 } 316 317 private Scan createScan() { 318 Scan scan = new Scan(); 319 scan.setMaxVersions(3); 320 return scan; 321 } 322 323 @AfterClass 324 public static void tearDownAfterClass() throws Exception { 325 TEST_UTIL.shutdownMiniCluster(); 326 } 327}