001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.security.PrivilegedExceptionAction; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import java.util.UUID; 031import org.apache.hadoop.conf.Configurable; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataOutputStream; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.CellUtil; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Connection; 044import org.apache.hadoop.hbase.client.ConnectionFactory; 045import org.apache.hadoop.hbase.client.Delete; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.ResultScanner; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.io.hfile.CacheConfig; 051import org.apache.hadoop.hbase.io.hfile.HFile; 052import org.apache.hadoop.hbase.io.hfile.HFileScanner; 053import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse; 054import org.apache.hadoop.hbase.security.User; 055import org.apache.hadoop.hbase.security.visibility.Authorizations; 056import org.apache.hadoop.hbase.security.visibility.CellVisibility; 057import org.apache.hadoop.hbase.security.visibility.ScanLabelGenerator; 058import org.apache.hadoop.hbase.security.visibility.SimpleScanLabelGenerator; 059import org.apache.hadoop.hbase.security.visibility.VisibilityClient; 060import org.apache.hadoop.hbase.security.visibility.VisibilityConstants; 061import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 062import org.apache.hadoop.hbase.security.visibility.VisibilityUtils; 063import org.apache.hadoop.hbase.testclassification.LargeTests; 064import org.apache.hadoop.hbase.testclassification.MapReduceTests; 065import org.apache.hadoop.hbase.util.Bytes; 066import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; 067import org.apache.hadoop.util.Tool; 068import org.apache.hadoop.util.ToolRunner; 069import org.junit.AfterClass; 070import org.junit.BeforeClass; 071import org.junit.ClassRule; 072import org.junit.Rule; 073import org.junit.Test; 074import org.junit.experimental.categories.Category; 075import org.junit.rules.TestName; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079@Category({MapReduceTests.class, LargeTests.class}) 080public class TestImportTSVWithVisibilityLabels implements Configurable { 081 082 @ClassRule 083 public static final HBaseClassTestRule CLASS_RULE = 084 HBaseClassTestRule.forClass(TestImportTSVWithVisibilityLabels.class); 085 086 private static final Logger LOG = 087 LoggerFactory.getLogger(TestImportTSVWithVisibilityLabels.class); 088 protected static final String NAME = TestImportTsv.class.getSimpleName(); 089 protected static HBaseTestingUtility util = new HBaseTestingUtility(); 090 091 /** 092 * Delete the tmp directory after running doMROnTableTest. Boolean. Default is 093 * false. 094 */ 095 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 096 097 /** 098 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 099 */ 100 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 101 102 private final String FAMILY = "FAM"; 103 private final static String TOPSECRET = "topsecret"; 104 private final static String PUBLIC = "public"; 105 private final static String PRIVATE = "private"; 106 private final static String CONFIDENTIAL = "confidential"; 107 private final static String SECRET = "secret"; 108 private static User SUPERUSER; 109 private static Configuration conf; 110 111 @Rule 112 public TestName name = new TestName(); 113 114 @Override 115 public Configuration getConf() { 116 return util.getConfiguration(); 117 } 118 119 @Override 120 public void setConf(Configuration conf) { 121 throw new IllegalArgumentException("setConf not supported"); 122 } 123 124 @BeforeClass 125 public static void provisionCluster() throws Exception { 126 conf = util.getConfiguration(); 127 SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { "supergroup" }); 128 conf.set("hbase.superuser", "admin,"+User.getCurrent().getName()); 129 VisibilityTestUtil.enableVisiblityLabels(conf); 130 conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, SimpleScanLabelGenerator.class, 131 ScanLabelGenerator.class); 132 util.startMiniCluster(); 133 // Wait for the labels table to become available 134 util.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME.getName(), 50000); 135 createLabels(); 136 } 137 138 private static void createLabels() throws IOException, InterruptedException { 139 PrivilegedExceptionAction<VisibilityLabelsResponse> action = 140 new PrivilegedExceptionAction<VisibilityLabelsResponse>() { 141 @Override 142 public VisibilityLabelsResponse run() throws Exception { 143 String[] labels = { SECRET, TOPSECRET, CONFIDENTIAL, PUBLIC, PRIVATE }; 144 try (Connection conn = ConnectionFactory.createConnection(conf)) { 145 VisibilityClient.addLabels(conn, labels); 146 LOG.info("Added labels "); 147 } catch (Throwable t) { 148 LOG.error("Error in adding labels" , t); 149 throw new IOException(t); 150 } 151 return null; 152 } 153 }; 154 SUPERUSER.runAs(action); 155 } 156 157 @AfterClass 158 public static void releaseCluster() throws Exception { 159 util.shutdownMiniCluster(); 160 } 161 162 @Test 163 public void testMROnTable() throws Exception { 164 final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); 165 166 // Prepare the arguments required for the test. 167 String[] args = new String[] { 168 "-D" + ImportTsv.MAPPER_CONF_KEY 169 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 170 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 171 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 172 String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; 173 util.createTable(tableName, FAMILY); 174 doMROnTableTest(util, FAMILY, data, args, 1); 175 util.deleteTable(tableName); 176 } 177 178 @Test 179 public void testMROnTableWithDeletes() throws Exception { 180 final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); 181 182 // Prepare the arguments required for the test. 183 String[] args = new String[] { 184 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 185 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 186 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 187 String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; 188 util.createTable(tableName, FAMILY); 189 doMROnTableTest(util, FAMILY, data, args, 1); 190 issueDeleteAndVerifyData(tableName); 191 util.deleteTable(tableName); 192 } 193 194 private void issueDeleteAndVerifyData(TableName tableName) throws IOException { 195 LOG.debug("Validating table after delete."); 196 Table table = util.getConnection().getTable(tableName); 197 boolean verified = false; 198 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 199 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 200 for (int i = 0; i < numRetries; i++) { 201 try { 202 Delete d = new Delete(Bytes.toBytes("KEY")); 203 d.addFamily(Bytes.toBytes(FAMILY)); 204 d.setCellVisibility(new CellVisibility("private&secret")); 205 table.delete(d); 206 207 Scan scan = new Scan(); 208 // Scan entire family. 209 scan.addFamily(Bytes.toBytes(FAMILY)); 210 scan.setAuthorizations(new Authorizations("secret", "private")); 211 ResultScanner resScanner = table.getScanner(scan); 212 Result[] next = resScanner.next(5); 213 assertEquals(0, next.length); 214 verified = true; 215 break; 216 } catch (NullPointerException e) { 217 // If here, a cell was empty. Presume its because updates came in 218 // after the scanner had been opened. Wait a while and retry. 219 } 220 try { 221 Thread.sleep(pause); 222 } catch (InterruptedException e) { 223 // continue 224 } 225 } 226 table.close(); 227 assertTrue(verified); 228 } 229 230 @Test 231 public void testMROnTableWithBulkload() throws Exception { 232 final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); 233 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 234 // Prepare the arguments required for the test. 235 String[] args = new String[] { 236 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 237 "-D" + ImportTsv.COLUMNS_CONF_KEY 238 + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 239 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 240 String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; 241 util.createTable(tableName, FAMILY); 242 doMROnTableTest(util, FAMILY, data, args, 1); 243 util.deleteTable(tableName); 244 } 245 246 @Test 247 public void testBulkOutputWithTsvImporterTextMapper() throws Exception { 248 final TableName table = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); 249 String FAMILY = "FAM"; 250 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table.getNameAsString()),"hfiles"); 251 // Prepare the arguments required for the test. 252 String[] args = 253 new String[] { 254 "-D" + ImportTsv.MAPPER_CONF_KEY 255 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", 256 "-D" + ImportTsv.COLUMNS_CONF_KEY 257 + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 258 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", 259 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), 260 table.getNameAsString() 261 }; 262 String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; 263 doMROnTableTest(util, FAMILY, data, args, 4); 264 util.deleteTable(table); 265 } 266 267 @Test 268 public void testMRWithOutputFormat() throws Exception { 269 final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); 270 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 271 // Prepare the arguments required for the test. 272 String[] args = new String[] { 273 "-D" + ImportTsv.MAPPER_CONF_KEY 274 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 275 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 276 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 277 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 278 String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; 279 util.createTable(tableName, FAMILY); 280 doMROnTableTest(util, FAMILY, data, args, 1); 281 util.deleteTable(tableName); 282 } 283 284 @Test 285 public void testBulkOutputWithInvalidLabels() throws Exception { 286 final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); 287 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 288 // Prepare the arguments required for the test. 289 String[] args = 290 new String[] { "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 291 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 292 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 293 294 // 2 Data rows, one with valid label and one with invalid label 295 String data = 296 "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n"; 297 util.createTable(tableName, FAMILY); 298 doMROnTableTest(util, FAMILY, data, args, 1, 2); 299 util.deleteTable(tableName); 300 } 301 302 @Test 303 public void testBulkOutputWithTsvImporterTextMapperWithInvalidLabels() throws Exception { 304 final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); 305 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 306 // Prepare the arguments required for the test. 307 String[] args = 308 new String[] { 309 "-D" + ImportTsv.MAPPER_CONF_KEY 310 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", 311 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 312 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 313 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 314 315 // 2 Data rows, one with valid label and one with invalid label 316 String data = 317 "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n"; 318 util.createTable(tableName, FAMILY); 319 doMROnTableTest(util, FAMILY, data, args, 1, 2); 320 util.deleteTable(tableName); 321 } 322 323 protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, 324 String[] args, int valueMultiplier) throws Exception { 325 return doMROnTableTest(util, family, data, args, valueMultiplier, -1); 326 } 327 328 /** 329 * Run an ImportTsv job and perform basic validation on the results. Returns 330 * the ImportTsv <code>Tool</code> instance so that other tests can inspect it 331 * for further validation as necessary. This method is static to insure 332 * non-reliance on instance's util/conf facilities. 333 * 334 * @param args 335 * Any arguments to pass BEFORE inputFile path is appended. 336 * 337 * @param expectedKVCount Expected KV count. pass -1 to skip the kvcount check 338 * 339 * @return The Tool instance used to run the test. 340 */ 341 protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, 342 String[] args, int valueMultiplier,int expectedKVCount) throws Exception { 343 TableName table = TableName.valueOf(args[args.length - 1]); 344 Configuration conf = new Configuration(util.getConfiguration()); 345 346 // populate input file 347 FileSystem fs = FileSystem.get(conf); 348 Path inputPath = fs.makeQualified(new Path(util 349 .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); 350 FSDataOutputStream op = fs.create(inputPath, true); 351 if (data == null) { 352 data = "KEY\u001bVALUE1\u001bVALUE2\n"; 353 } 354 op.write(Bytes.toBytes(data)); 355 op.close(); 356 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 357 358 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 359 LOG.debug("Forcing combiner."); 360 conf.setInt("mapreduce.map.combine.minspills", 1); 361 } 362 363 // run the import 364 List<String> argv = new ArrayList<>(Arrays.asList(args)); 365 argv.add(inputPath.toString()); 366 Tool tool = new ImportTsv(); 367 LOG.debug("Running ImportTsv with arguments: " + argv); 368 assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); 369 370 // Perform basic validation. If the input args did not include 371 // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. 372 // Otherwise, validate presence of hfiles. 373 boolean createdHFiles = false; 374 String outputPath = null; 375 for (String arg : argv) { 376 if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) { 377 createdHFiles = true; 378 // split '-Dfoo=bar' on '=' and keep 'bar' 379 outputPath = arg.split("=")[1]; 380 break; 381 } 382 } 383 LOG.debug("validating the table " + createdHFiles); 384 if (createdHFiles) 385 validateHFiles(fs, outputPath, family,expectedKVCount); 386 else 387 validateTable(conf, table, family, valueMultiplier); 388 389 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 390 LOG.debug("Deleting test subdirectory"); 391 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 392 } 393 return tool; 394 } 395 396 /** 397 * Confirm ImportTsv via HFiles on fs. 398 */ 399 private static void validateHFiles(FileSystem fs, String outputPath, String family, 400 int expectedKVCount) throws IOException { 401 402 // validate number and content of output columns 403 LOG.debug("Validating HFiles."); 404 Set<String> configFamilies = new HashSet<>(); 405 configFamilies.add(family); 406 Set<String> foundFamilies = new HashSet<>(); 407 int actualKVCount = 0; 408 for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { 409 LOG.debug("The output path has files"); 410 String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); 411 String cf = elements[elements.length - 1]; 412 foundFamilies.add(cf); 413 assertTrue(String.format( 414 "HFile ouput contains a column family (%s) not present in input families (%s)", cf, 415 configFamilies), configFamilies.contains(cf)); 416 for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { 417 assertTrue(String.format("HFile %s appears to contain no data.", hfile.getPath()), 418 hfile.getLen() > 0); 419 if (expectedKVCount > -1) { 420 actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); 421 } 422 } 423 } 424 if (expectedKVCount > -1) { 425 assertTrue(String.format( 426 "KV count in output hfile=<%d> doesn't match with expected KV count=<%d>", actualKVCount, 427 expectedKVCount), actualKVCount == expectedKVCount); 428 } 429 } 430 431 /** 432 * Confirm ImportTsv via data in online table. 433 */ 434 private static void validateTable(Configuration conf, TableName tableName, String family, 435 int valueMultiplier) throws IOException { 436 437 LOG.debug("Validating table."); 438 Table table = util.getConnection().getTable(tableName); 439 boolean verified = false; 440 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 441 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 442 for (int i = 0; i < numRetries; i++) { 443 try { 444 Scan scan = new Scan(); 445 // Scan entire family. 446 scan.addFamily(Bytes.toBytes(family)); 447 scan.setAuthorizations(new Authorizations("secret","private")); 448 ResultScanner resScanner = table.getScanner(scan); 449 Result[] next = resScanner.next(5); 450 assertEquals(1, next.length); 451 for (Result res : resScanner) { 452 LOG.debug("Getting results " + res.size()); 453 assertTrue(res.size() == 2); 454 List<Cell> kvs = res.listCells(); 455 assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY"))); 456 assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY"))); 457 assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); 458 assertTrue(CellUtil.matchingValue(kvs.get(1), 459 Bytes.toBytes("VALUE" + 2 * valueMultiplier))); 460 // Only one result set is expected, so let it loop. 461 } 462 verified = true; 463 break; 464 } catch (NullPointerException e) { 465 // If here, a cell was empty. Presume its because updates came in 466 // after the scanner had been opened. Wait a while and retry. 467 } 468 try { 469 Thread.sleep(pause); 470 } catch (InterruptedException e) { 471 // continue 472 } 473 } 474 table.close(); 475 assertTrue(verified); 476 } 477 478 /** 479 * Method returns the total KVs in given hfile 480 * @param fs File System 481 * @param p HFile path 482 * @return KV count in the given hfile 483 * @throws IOException 484 */ 485 private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { 486 Configuration conf = util.getConfiguration(); 487 HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); 488 reader.loadFileInfo(); 489 HFileScanner scanner = reader.getScanner(false, false); 490 scanner.seekTo(); 491 int count = 0; 492 do { 493 count++; 494 } while (scanner.next()); 495 reader.close(); 496 return count; 497 } 498 499}