001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.concurrent.CountDownLatch; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HColumnDescriptor; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.TableNotFoundException; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionLocator; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.io.hfile.HFile; 045import org.apache.hadoop.hbase.io.hfile.HFileContext; 046import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.testclassification.RegionServerTests; 049import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.junit.AfterClass; 053import org.junit.Assert; 054import org.junit.BeforeClass; 055import org.junit.ClassRule; 056import org.junit.Rule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.junit.rules.TestName; 060 061@Category({ RegionServerTests.class, MediumTests.class }) 062public class TestScannerWithBulkload { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestScannerWithBulkload.class); 067 068 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 069 070 @Rule 071 public TestName name = new TestName(); 072 073 @BeforeClass 074 public static void setUpBeforeClass() throws Exception { 075 TEST_UTIL.startMiniCluster(1); 076 } 077 078 private static void createTable(Admin admin, TableName tableName) throws IOException { 079 HTableDescriptor desc = new HTableDescriptor(tableName); 080 HColumnDescriptor hcd = new HColumnDescriptor("col"); 081 hcd.setMaxVersions(3); 082 desc.addFamily(hcd); 083 admin.createTable(desc); 084 } 085 086 @Test 087 public void testBulkLoad() throws Exception { 088 final TableName tableName = TableName.valueOf(name.getMethodName()); 089 long l = EnvironmentEdgeManager.currentTime(); 090 Admin admin = TEST_UTIL.getAdmin(); 091 createTable(admin, tableName); 092 Scan scan = createScan(); 093 final Table table = init(admin, l, scan, tableName); 094 // use bulkload 095 final Path hfilePath = 096 writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file", false); 097 Configuration conf = TEST_UTIL.getConfiguration(); 098 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); 099 final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); 100 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 101 bulkload.doBulkLoad(hfilePath, admin, table, locator); 102 } 103 ResultScanner scanner = table.getScanner(scan); 104 Result result = scanner.next(); 105 result = scanAfterBulkLoad(scanner, result, "version2"); 106 Put put0 = new Put(Bytes.toBytes("row1")); 107 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 108 Bytes.toBytes("version3"))); 109 table.put(put0); 110 admin.flush(tableName); 111 scanner = table.getScanner(scan); 112 result = scanner.next(); 113 while (result != null) { 114 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 115 for (Cell _c : cells) { 116 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()).equals("row1")) { 117 System.out 118 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())); 119 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(), 120 _c.getQualifierLength())); 121 System.out 122 .println(Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 123 Assert.assertEquals("version3", 124 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 125 } 126 } 127 result = scanner.next(); 128 } 129 scanner.close(); 130 table.close(); 131 } 132 133 private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal) 134 throws IOException { 135 while (result != null) { 136 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 137 for (Cell _c : cells) { 138 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()).equals("row1")) { 139 System.out 140 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())); 141 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(), 142 _c.getQualifierLength())); 143 System.out 144 .println(Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 145 Assert.assertEquals(expctedVal, 146 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 147 } 148 } 149 result = scanner.next(); 150 } 151 return result; 152 } 153 154 // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file. 155 // Else, we will set BULKLOAD_TIME_KEY. 156 private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile) 157 throws IOException { 158 FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); 159 final Path hfilePath = new Path(hFilePath); 160 fs.mkdirs(hfilePath); 161 Path path = new Path(pathStr); 162 HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); 163 Assert.assertNotNull(wf); 164 HFileContext context = new HFileContextBuilder().build(); 165 HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create(); 166 KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 167 Bytes.toBytes("version2")); 168 169 // Set cell seq id to test bulk load native hfiles. 170 if (nativeHFile) { 171 // Set a big seq id. Scan should not look at this seq id in a bulk loaded file. 172 // Scan should only look at the seq id appended at the bulk load time, and not skip 173 // this kv. 174 kv.setSequenceId(9999999); 175 } 176 177 writer.append(kv); 178 179 if (nativeHFile) { 180 // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file. 181 // Scan should only look at the seq id appended at the bulk load time, and not skip its 182 // kv. 183 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999))); 184 } else { 185 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime())); 186 } 187 writer.close(); 188 return hfilePath; 189 } 190 191 private Table init(Admin admin, long l, Scan scan, TableName tableName) throws Exception { 192 Table table = TEST_UTIL.getConnection().getTable(tableName); 193 Put put0 = new Put(Bytes.toBytes("row1")); 194 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 195 Bytes.toBytes("version0"))); 196 table.put(put0); 197 admin.flush(tableName); 198 Put put1 = new Put(Bytes.toBytes("row2")); 199 put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 200 Bytes.toBytes("version0"))); 201 table.put(put1); 202 admin.flush(tableName); 203 put0 = new Put(Bytes.toBytes("row1")); 204 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 205 Bytes.toBytes("version1"))); 206 table.put(put0); 207 admin.flush(tableName); 208 admin.compact(tableName); 209 210 ResultScanner scanner = table.getScanner(scan); 211 Result result = scanner.next(); 212 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 213 Assert.assertEquals(1, cells.size()); 214 Cell _c = cells.get(0); 215 Assert.assertEquals("version1", 216 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 217 scanner.close(); 218 return table; 219 } 220 221 @Test 222 public void testBulkLoadWithParallelScan() throws Exception { 223 final TableName tableName = TableName.valueOf(name.getMethodName()); 224 final long l = EnvironmentEdgeManager.currentTime(); 225 final Admin admin = TEST_UTIL.getAdmin(); 226 createTable(admin, tableName); 227 Scan scan = createScan(); 228 scan.setCaching(1); 229 final Table table = init(admin, l, scan, tableName); 230 // use bulkload 231 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/", 232 "/temp/testBulkLoadWithParallelScan/col/file", false); 233 Configuration conf = TEST_UTIL.getConfiguration(); 234 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); 235 final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); 236 ResultScanner scanner = table.getScanner(scan); 237 Result result = scanner.next(); 238 // Create a scanner and then do bulk load 239 final CountDownLatch latch = new CountDownLatch(1); 240 new Thread() { 241 @Override 242 public void run() { 243 try { 244 Put put1 = new Put(Bytes.toBytes("row5")); 245 put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 246 Bytes.toBytes("version0"))); 247 table.put(put1); 248 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 249 bulkload.doBulkLoad(hfilePath, admin, table, locator); 250 } 251 latch.countDown(); 252 } catch (TableNotFoundException e) { 253 } catch (IOException e) { 254 } 255 } 256 }.start(); 257 latch.await(); 258 // By the time we do next() the bulk loaded files are also added to the kv 259 // scanner 260 scanAfterBulkLoad(scanner, result, "version1"); 261 scanner.close(); 262 table.close(); 263 } 264 265 @Test 266 public void testBulkLoadNativeHFile() throws Exception { 267 final TableName tableName = TableName.valueOf(name.getMethodName()); 268 long l = EnvironmentEdgeManager.currentTime(); 269 Admin admin = TEST_UTIL.getAdmin(); 270 createTable(admin, tableName); 271 Scan scan = createScan(); 272 final Table table = init(admin, l, scan, tableName); 273 // use bulkload 274 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/", 275 "/temp/testBulkLoadNativeHFile/col/file", true); 276 Configuration conf = TEST_UTIL.getConfiguration(); 277 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); 278 final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); 279 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 280 bulkload.doBulkLoad(hfilePath, admin, table, locator); 281 } 282 ResultScanner scanner = table.getScanner(scan); 283 Result result = scanner.next(); 284 // We had 'version0', 'version1' for 'row1,col:q' in the table. 285 // Bulk load added 'version2' scanner should be able to see 'version2' 286 result = scanAfterBulkLoad(scanner, result, "version2"); 287 Put put0 = new Put(Bytes.toBytes("row1")); 288 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 289 Bytes.toBytes("version3"))); 290 table.put(put0); 291 admin.flush(tableName); 292 scanner = table.getScanner(scan); 293 result = scanner.next(); 294 while (result != null) { 295 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 296 for (Cell _c : cells) { 297 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()).equals("row1")) { 298 System.out 299 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())); 300 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(), 301 _c.getQualifierLength())); 302 System.out 303 .println(Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 304 Assert.assertEquals("version3", 305 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 306 } 307 } 308 result = scanner.next(); 309 } 310 scanner.close(); 311 table.close(); 312 } 313 314 private Scan createScan() { 315 Scan scan = new Scan(); 316 scan.setMaxVersions(3); 317 return scan; 318 } 319 320 @AfterClass 321 public static void tearDownAfterClass() throws Exception { 322 TEST_UTIL.shutdownMiniCluster(); 323 } 324}