001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.concurrent.CountDownLatch; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HColumnDescriptor; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.TableNotFoundException; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionLocator; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.io.hfile.HFile; 045import org.apache.hadoop.hbase.io.hfile.HFileContext; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.apache.hadoop.hbase.testclassification.RegionServerTests; 048import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.junit.AfterClass; 051import org.junit.Assert; 052import org.junit.BeforeClass; 053import org.junit.ClassRule; 054import org.junit.Rule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.junit.rules.TestName; 058 059@Category({RegionServerTests.class, MediumTests.class}) 060public class TestScannerWithBulkload { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestScannerWithBulkload.class); 065 066 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 067 068 @Rule 069 public TestName name = new TestName(); 070 071 @BeforeClass 072 public static void setUpBeforeClass() throws Exception { 073 TEST_UTIL.startMiniCluster(1); 074 } 075 076 private static void createTable(Admin admin, TableName tableName) throws IOException { 077 HTableDescriptor desc = new HTableDescriptor(tableName); 078 HColumnDescriptor hcd = new HColumnDescriptor("col"); 079 hcd.setMaxVersions(3); 080 desc.addFamily(hcd); 081 admin.createTable(desc); 082 } 083 084 @Test 085 public void testBulkLoad() throws Exception { 086 final TableName tableName = TableName.valueOf(name.getMethodName()); 087 long l = System.currentTimeMillis(); 088 Admin admin = TEST_UTIL.getAdmin(); 089 createTable(admin, tableName); 090 Scan scan = createScan(); 091 final Table table = init(admin, l, scan, tableName); 092 // use bulkload 093 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file", 094 false); 095 Configuration conf = TEST_UTIL.getConfiguration(); 096 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); 097 final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); 098 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 099 bulkload.doBulkLoad(hfilePath, admin, table, locator); 100 } 101 ResultScanner scanner = table.getScanner(scan); 102 Result result = scanner.next(); 103 result = scanAfterBulkLoad(scanner, result, "version2"); 104 Put put0 = new Put(Bytes.toBytes("row1")); 105 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes 106 .toBytes("version3"))); 107 table.put(put0); 108 admin.flush(tableName); 109 scanner = table.getScanner(scan); 110 result = scanner.next(); 111 while (result != null) { 112 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 113 for (Cell _c : cells) { 114 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()) 115 .equals("row1")) { 116 System.out 117 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())); 118 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(), 119 _c.getQualifierLength())); 120 System.out.println( 121 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 122 Assert.assertEquals("version3", 123 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 124 } 125 } 126 result = scanner.next(); 127 } 128 scanner.close(); 129 table.close(); 130 } 131 132 private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal) 133 throws IOException { 134 while (result != null) { 135 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 136 for (Cell _c : cells) { 137 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()) 138 .equals("row1")) { 139 System.out 140 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())); 141 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(), 142 _c.getQualifierLength())); 143 System.out.println( 144 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 145 Assert.assertEquals(expctedVal, 146 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 147 } 148 } 149 result = scanner.next(); 150 } 151 return result; 152 } 153 154 // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file. 155 // Else, we will set BULKLOAD_TIME_KEY. 156 private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile) 157 throws IOException { 158 FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); 159 final Path hfilePath = new Path(hFilePath); 160 fs.mkdirs(hfilePath); 161 Path path = new Path(pathStr); 162 HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); 163 Assert.assertNotNull(wf); 164 HFileContext context = new HFileContext(); 165 HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create(); 166 KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 167 Bytes.toBytes("version2")); 168 169 // Set cell seq id to test bulk load native hfiles. 170 if (nativeHFile) { 171 // Set a big seq id. Scan should not look at this seq id in a bulk loaded file. 172 // Scan should only look at the seq id appended at the bulk load time, and not skip 173 // this kv. 174 kv.setSequenceId(9999999); 175 } 176 177 writer.append(kv); 178 179 if (nativeHFile) { 180 // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file. 181 // Scan should only look at the seq id appended at the bulk load time, and not skip its 182 // kv. 183 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999))); 184 } 185 else { 186 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); 187 } 188 writer.close(); 189 return hfilePath; 190 } 191 192 private Table init(Admin admin, long l, Scan scan, TableName tableName) throws Exception { 193 Table table = TEST_UTIL.getConnection().getTable(tableName); 194 Put put0 = new Put(Bytes.toBytes("row1")); 195 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 196 Bytes.toBytes("version0"))); 197 table.put(put0); 198 admin.flush(tableName); 199 Put put1 = new Put(Bytes.toBytes("row2")); 200 put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes 201 .toBytes("version0"))); 202 table.put(put1); 203 admin.flush(tableName); 204 put0 = new Put(Bytes.toBytes("row1")); 205 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes 206 .toBytes("version1"))); 207 table.put(put0); 208 admin.flush(tableName); 209 admin.compact(tableName); 210 211 ResultScanner scanner = table.getScanner(scan); 212 Result result = scanner.next(); 213 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 214 Assert.assertEquals(1, cells.size()); 215 Cell _c = cells.get(0); 216 Assert.assertEquals("version1", 217 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 218 scanner.close(); 219 return table; 220 } 221 222 @Test 223 public void testBulkLoadWithParallelScan() throws Exception { 224 final TableName tableName = TableName.valueOf(name.getMethodName()); 225 final long l = System.currentTimeMillis(); 226 final Admin admin = TEST_UTIL.getAdmin(); 227 createTable(admin, tableName); 228 Scan scan = createScan(); 229 scan.setCaching(1); 230 final Table table = init(admin, l, scan, tableName); 231 // use bulkload 232 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/", 233 "/temp/testBulkLoadWithParallelScan/col/file", false); 234 Configuration conf = TEST_UTIL.getConfiguration(); 235 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); 236 final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); 237 ResultScanner scanner = table.getScanner(scan); 238 Result result = scanner.next(); 239 // Create a scanner and then do bulk load 240 final CountDownLatch latch = new CountDownLatch(1); 241 new Thread() { 242 @Override 243 public void run() { 244 try { 245 Put put1 = new Put(Bytes.toBytes("row5")); 246 put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 247 Bytes.toBytes("version0"))); 248 table.put(put1); 249 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 250 bulkload.doBulkLoad(hfilePath, admin, table, locator); 251 } 252 latch.countDown(); 253 } catch (TableNotFoundException e) { 254 } catch (IOException e) { 255 } 256 } 257 }.start(); 258 latch.await(); 259 // By the time we do next() the bulk loaded files are also added to the kv 260 // scanner 261 scanAfterBulkLoad(scanner, result, "version1"); 262 scanner.close(); 263 table.close(); 264 } 265 266 @Test 267 public void testBulkLoadNativeHFile() throws Exception { 268 final TableName tableName = TableName.valueOf(name.getMethodName()); 269 long l = System.currentTimeMillis(); 270 Admin admin = TEST_UTIL.getAdmin(); 271 createTable(admin, tableName); 272 Scan scan = createScan(); 273 final Table table = init(admin, l, scan, tableName); 274 // use bulkload 275 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/", 276 "/temp/testBulkLoadNativeHFile/col/file", true); 277 Configuration conf = TEST_UTIL.getConfiguration(); 278 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); 279 final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); 280 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 281 bulkload.doBulkLoad(hfilePath, admin, table, locator); 282 } 283 ResultScanner scanner = table.getScanner(scan); 284 Result result = scanner.next(); 285 // We had 'version0', 'version1' for 'row1,col:q' in the table. 286 // Bulk load added 'version2' scanner should be able to see 'version2' 287 result = scanAfterBulkLoad(scanner, result, "version2"); 288 Put put0 = new Put(Bytes.toBytes("row1")); 289 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes 290 .toBytes("version3"))); 291 table.put(put0); 292 admin.flush(tableName); 293 scanner = table.getScanner(scan); 294 result = scanner.next(); 295 while (result != null) { 296 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 297 for (Cell _c : cells) { 298 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()) 299 .equals("row1")) { 300 System.out 301 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())); 302 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(), 303 _c.getQualifierLength())); 304 System.out.println( 305 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 306 Assert.assertEquals("version3", 307 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 308 } 309 } 310 result = scanner.next(); 311 } 312 scanner.close(); 313 table.close(); 314 } 315 316 private Scan createScan() { 317 Scan scan = new Scan(); 318 scan.setMaxVersions(3); 319 return scan; 320 } 321 322 @AfterClass 323 public static void tearDownAfterClass() throws Exception { 324 TEST_UTIL.shutdownMiniCluster(); 325 } 326}