001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.security.PrivilegedExceptionAction; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import org.apache.hadoop.conf.Configurable; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HBaseTestingUtil; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.io.hfile.CacheConfig; 049import org.apache.hadoop.hbase.io.hfile.HFile; 050import org.apache.hadoop.hbase.io.hfile.HFileScanner; 051import org.apache.hadoop.hbase.security.User; 052import org.apache.hadoop.hbase.security.visibility.Authorizations; 053import org.apache.hadoop.hbase.security.visibility.CellVisibility; 054import org.apache.hadoop.hbase.security.visibility.ScanLabelGenerator; 055import org.apache.hadoop.hbase.security.visibility.SimpleScanLabelGenerator; 056import org.apache.hadoop.hbase.security.visibility.VisibilityClient; 057import org.apache.hadoop.hbase.security.visibility.VisibilityConstants; 058import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 059import org.apache.hadoop.hbase.security.visibility.VisibilityUtils; 060import org.apache.hadoop.hbase.testclassification.LargeTests; 061import org.apache.hadoop.hbase.testclassification.MapReduceTests; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; 064import org.apache.hadoop.util.Tool; 065import org.apache.hadoop.util.ToolRunner; 066import org.junit.jupiter.api.AfterAll; 067import org.junit.jupiter.api.BeforeAll; 068import org.junit.jupiter.api.Tag; 069import org.junit.jupiter.api.Test; 070import org.junit.jupiter.api.TestInfo; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse; 075 076@Tag(MapReduceTests.TAG) 077@Tag(LargeTests.TAG) 078public class TestImportTSVWithVisibilityLabels implements Configurable { 079 080 private static final Logger LOG = 081 LoggerFactory.getLogger(TestImportTSVWithVisibilityLabels.class); 082 protected static final String NAME = TestImportTsv.class.getSimpleName(); 083 protected static HBaseTestingUtil util = new HBaseTestingUtil(); 084 085 /** 086 * Delete the tmp directory after running doMROnTableTest. Boolean. Default is false. 087 */ 088 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 089 090 /** 091 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 092 */ 093 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 094 095 private final String FAMILY = "FAM"; 096 private final static String TOPSECRET = "topsecret"; 097 private final static String PUBLIC = "public"; 098 private final static String PRIVATE = "private"; 099 private final static String CONFIDENTIAL = "confidential"; 100 private final static String SECRET = "secret"; 101 private static User SUPERUSER; 102 private static Configuration conf; 103 104 @Override 105 public Configuration getConf() { 106 return util.getConfiguration(); 107 } 108 109 @Override 110 public void setConf(Configuration conf) { 111 throw new IllegalArgumentException("setConf not supported"); 112 } 113 114 @BeforeAll 115 public static void provisionCluster() throws Exception { 116 conf = util.getConfiguration(); 117 SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { "supergroup" }); 118 conf.set("hbase.superuser", "admin," + User.getCurrent().getName()); 119 VisibilityTestUtil.enableVisiblityLabels(conf); 120 conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, SimpleScanLabelGenerator.class, 121 ScanLabelGenerator.class); 122 util.startMiniCluster(); 123 // Wait for the labels table to become available 124 util.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME.getName(), 50000); 125 createLabels(); 126 } 127 128 private static void createLabels() throws IOException, InterruptedException { 129 PrivilegedExceptionAction<VisibilityLabelsResponse> action = 130 new PrivilegedExceptionAction<VisibilityLabelsResponse>() { 131 @Override 132 public VisibilityLabelsResponse run() throws Exception { 133 String[] labels = { SECRET, TOPSECRET, CONFIDENTIAL, PUBLIC, PRIVATE }; 134 try (Connection conn = ConnectionFactory.createConnection(conf)) { 135 VisibilityClient.addLabels(conn, labels); 136 LOG.info("Added labels "); 137 } catch (Throwable t) { 138 LOG.error("Error in adding labels", t); 139 throw new IOException(t); 140 } 141 return null; 142 } 143 }; 144 SUPERUSER.runAs(action); 145 } 146 147 @AfterAll 148 public static void releaseCluster() throws Exception { 149 util.shutdownMiniCluster(); 150 } 151 152 @Test 153 public void testMROnTable(TestInfo testInfo) throws Exception { 154 final TableName tableName = 155 TableName.valueOf(testInfo.getTestMethod().get().getName() + util.getRandomUUID()); 156 157 // Prepare the arguments required for the test. 158 String[] args = new String[] { 159 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 160 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 161 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 162 String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; 163 util.createTable(tableName, FAMILY); 164 doMROnTableTest(util, FAMILY, data, args, 1); 165 util.deleteTable(tableName); 166 } 167 168 @Test 169 public void testMROnTableWithDeletes(TestInfo testInfo) throws Exception { 170 final TableName tableName = 171 TableName.valueOf(testInfo.getTestMethod().get().getName() + util.getRandomUUID()); 172 173 // Prepare the arguments required for the test. 174 String[] args = new String[] { 175 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 176 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 177 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 178 String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; 179 util.createTable(tableName, FAMILY); 180 doMROnTableTest(util, FAMILY, data, args, 1); 181 issueDeleteAndVerifyData(tableName); 182 util.deleteTable(tableName); 183 } 184 185 private void issueDeleteAndVerifyData(TableName tableName) throws IOException { 186 LOG.debug("Validating table after delete."); 187 Table table = util.getConnection().getTable(tableName); 188 boolean verified = false; 189 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 190 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 191 for (int i = 0; i < numRetries; i++) { 192 try { 193 Delete d = new Delete(Bytes.toBytes("KEY")); 194 d.addFamily(Bytes.toBytes(FAMILY)); 195 d.setCellVisibility(new CellVisibility("private&secret")); 196 table.delete(d); 197 198 Scan scan = new Scan(); 199 // Scan entire family. 200 scan.addFamily(Bytes.toBytes(FAMILY)); 201 scan.setAuthorizations(new Authorizations("secret", "private")); 202 ResultScanner resScanner = table.getScanner(scan); 203 Result[] next = resScanner.next(5); 204 assertEquals(0, next.length); 205 verified = true; 206 break; 207 } catch (NullPointerException e) { 208 // If here, a cell was empty. Presume its because updates came in 209 // after the scanner had been opened. Wait a while and retry. 210 } 211 try { 212 Thread.sleep(pause); 213 } catch (InterruptedException e) { 214 // continue 215 } 216 } 217 table.close(); 218 assertTrue(verified); 219 } 220 221 @Test 222 public void testMROnTableWithBulkload(TestInfo testInfo) throws Exception { 223 final TableName tableName = 224 TableName.valueOf(testInfo.getTestMethod().get().getName() + util.getRandomUUID()); 225 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 226 // Prepare the arguments required for the test. 227 String[] args = new String[] { "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 228 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 229 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 230 String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; 231 util.createTable(tableName, FAMILY); 232 doMROnTableTest(util, FAMILY, data, args, 1); 233 util.deleteTable(tableName); 234 } 235 236 @Test 237 public void testBulkOutputWithTsvImporterTextMapper(TestInfo testInfo) throws Exception { 238 final TableName table = 239 TableName.valueOf(testInfo.getTestMethod().get().getName() + util.getRandomUUID()); 240 String FAMILY = "FAM"; 241 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "hfiles"); 242 // Prepare the arguments required for the test. 243 String[] args = new String[] { 244 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", 245 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 246 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", 247 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), 248 table.getNameAsString() }; 249 String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; 250 doMROnTableTest(util, FAMILY, data, args, 4); 251 util.deleteTable(table); 252 } 253 254 @Test 255 public void testMRWithOutputFormat(TestInfo testInfo) throws Exception { 256 final TableName tableName = 257 TableName.valueOf(testInfo.getTestMethod().get().getName() + util.getRandomUUID()); 258 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 259 // Prepare the arguments required for the test. 260 String[] args = new String[] { 261 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 262 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 263 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 264 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 265 String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; 266 util.createTable(tableName, FAMILY); 267 doMROnTableTest(util, FAMILY, data, args, 1); 268 util.deleteTable(tableName); 269 } 270 271 @Test 272 public void testBulkOutputWithInvalidLabels(TestInfo testInfo) throws Exception { 273 final TableName tableName = 274 TableName.valueOf(testInfo.getTestMethod().get().getName() + util.getRandomUUID()); 275 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 276 // Prepare the arguments required for the test. 277 String[] args = new String[] { "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 278 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 279 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 280 281 // 2 Data rows, one with valid label and one with invalid label 282 String data = 283 "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n"; 284 util.createTable(tableName, FAMILY); 285 doMROnTableTest(util, FAMILY, data, args, 1, 2); 286 util.deleteTable(tableName); 287 } 288 289 @Test 290 public void testBulkOutputWithTsvImporterTextMapperWithInvalidLabels(TestInfo testInfo) 291 throws Exception { 292 final TableName tableName = 293 TableName.valueOf(testInfo.getTestMethod().get().getName() + util.getRandomUUID()); 294 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 295 // Prepare the arguments required for the test. 296 String[] args = new String[] { 297 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", 298 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 299 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 300 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 301 302 // 2 Data rows, one with valid label and one with invalid label 303 String data = 304 "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n"; 305 util.createTable(tableName, FAMILY); 306 doMROnTableTest(util, FAMILY, data, args, 1, 2); 307 util.deleteTable(tableName); 308 } 309 310 protected static Tool doMROnTableTest(HBaseTestingUtil util, String family, String data, 311 String[] args, int valueMultiplier) throws Exception { 312 return doMROnTableTest(util, family, data, args, valueMultiplier, -1); 313 } 314 315 /** 316 * Run an ImportTsv job and perform basic validation on the results. Returns the ImportTsv 317 * <code>Tool</code> instance so that other tests can inspect it for further validation as 318 * necessary. This method is static to insure non-reliance on instance's util/conf facilities. Any 319 * arguments to pass BEFORE inputFile path is appended. 320 * @param expectedKVCount Expected KV count. pass -1 to skip the kvcount check 321 * @return The Tool instance used to run the test. 322 */ 323 protected static Tool doMROnTableTest(HBaseTestingUtil util, String family, String data, 324 String[] args, int valueMultiplier, int expectedKVCount) throws Exception { 325 TableName table = TableName.valueOf(args[args.length - 1]); 326 Configuration conf = new Configuration(util.getConfiguration()); 327 328 // populate input file 329 FileSystem fs = FileSystem.get(conf); 330 Path inputPath = 331 fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); 332 FSDataOutputStream op = fs.create(inputPath, true); 333 if (data == null) { 334 data = "KEY\u001bVALUE1\u001bVALUE2\n"; 335 } 336 op.write(Bytes.toBytes(data)); 337 op.close(); 338 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 339 340 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 341 LOG.debug("Forcing combiner."); 342 conf.setInt("mapreduce.map.combine.minspills", 1); 343 } 344 345 // run the import 346 List<String> argv = new ArrayList<>(Arrays.asList(args)); 347 argv.add(inputPath.toString()); 348 Tool tool = new ImportTsv(); 349 LOG.debug("Running ImportTsv with arguments: " + argv); 350 assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); 351 352 // Perform basic validation. If the input args did not include 353 // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. 354 // Otherwise, validate presence of hfiles. 355 boolean createdHFiles = false; 356 String outputPath = null; 357 for (String arg : argv) { 358 if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) { 359 createdHFiles = true; 360 // split '-Dfoo=bar' on '=' and keep 'bar' 361 outputPath = arg.split("=")[1]; 362 break; 363 } 364 } 365 LOG.debug("validating the table " + createdHFiles); 366 if (createdHFiles) validateHFiles(fs, outputPath, family, expectedKVCount); 367 else validateTable(conf, table, family, valueMultiplier); 368 369 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 370 LOG.debug("Deleting test subdirectory"); 371 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 372 } 373 return tool; 374 } 375 376 /** 377 * Confirm ImportTsv via HFiles on fs. 378 */ 379 private static void validateHFiles(FileSystem fs, String outputPath, String family, 380 int expectedKVCount) throws IOException { 381 382 // validate number and content of output columns 383 LOG.debug("Validating HFiles."); 384 Set<String> configFamilies = new HashSet<>(); 385 configFamilies.add(family); 386 Set<String> foundFamilies = new HashSet<>(); 387 int actualKVCount = 0; 388 for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { 389 LOG.debug("The output path has files"); 390 String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); 391 String cf = elements[elements.length - 1]; 392 foundFamilies.add(cf); 393 assertTrue(configFamilies.contains(cf), 394 String.format( 395 "HFile ouput contains a column family (%s) not present in input families (%s)", cf, 396 configFamilies)); 397 for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { 398 assertTrue(hfile.getLen() > 0, 399 String.format("HFile %s appears to contain no data.", hfile.getPath())); 400 if (expectedKVCount > -1) { 401 actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); 402 } 403 } 404 } 405 if (expectedKVCount > -1) { 406 assertTrue(actualKVCount == expectedKVCount, 407 String.format("KV count in output hfile=<%d> doesn't match with expected KV count=<%d>", 408 actualKVCount, expectedKVCount)); 409 } 410 } 411 412 /** 413 * Confirm ImportTsv via data in online table. 414 */ 415 private static void validateTable(Configuration conf, TableName tableName, String family, 416 int valueMultiplier) throws IOException { 417 418 LOG.debug("Validating table."); 419 Table table = util.getConnection().getTable(tableName); 420 boolean verified = false; 421 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 422 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 423 for (int i = 0; i < numRetries; i++) { 424 try { 425 Scan scan = new Scan(); 426 // Scan entire family. 427 scan.addFamily(Bytes.toBytes(family)); 428 scan.setAuthorizations(new Authorizations("secret", "private")); 429 ResultScanner resScanner = table.getScanner(scan); 430 Result[] next = resScanner.next(5); 431 assertEquals(1, next.length); 432 for (Result res : resScanner) { 433 LOG.debug("Getting results " + res.size()); 434 assertTrue(res.size() == 2); 435 List<Cell> kvs = res.listCells(); 436 assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY"))); 437 assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY"))); 438 assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); 439 assertTrue( 440 CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier))); 441 // Only one result set is expected, so let it loop. 442 } 443 verified = true; 444 break; 445 } catch (NullPointerException e) { 446 // If here, a cell was empty. Presume its because updates came in 447 // after the scanner had been opened. Wait a while and retry. 448 } 449 try { 450 Thread.sleep(pause); 451 } catch (InterruptedException e) { 452 // continue 453 } 454 } 455 table.close(); 456 assertTrue(verified); 457 } 458 459 /** 460 * Method returns the total KVs in given hfile 461 * @param fs File System 462 * @param p HFile path 463 * @return KV count in the given hfile 464 */ 465 private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { 466 Configuration conf = util.getConfiguration(); 467 HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); 468 HFileScanner scanner = reader.getScanner(conf, false, false); 469 scanner.seekTo(); 470 int count = 0; 471 do { 472 count++; 473 } while (scanner.next()); 474 reader.close(); 475 return count; 476 } 477 478}