001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.security.PrivilegedExceptionAction; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import org.apache.hadoop.conf.Configurable; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.Delete; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.ResultScanner; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.io.hfile.CacheConfig; 050import org.apache.hadoop.hbase.io.hfile.HFile; 051import org.apache.hadoop.hbase.io.hfile.HFileScanner; 052import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse; 053import org.apache.hadoop.hbase.security.User; 054import org.apache.hadoop.hbase.security.visibility.Authorizations; 055import org.apache.hadoop.hbase.security.visibility.CellVisibility; 056import org.apache.hadoop.hbase.security.visibility.ScanLabelGenerator; 057import org.apache.hadoop.hbase.security.visibility.SimpleScanLabelGenerator; 058import org.apache.hadoop.hbase.security.visibility.VisibilityClient; 059import org.apache.hadoop.hbase.security.visibility.VisibilityConstants; 060import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 061import org.apache.hadoop.hbase.security.visibility.VisibilityUtils; 062import org.apache.hadoop.hbase.testclassification.LargeTests; 063import org.apache.hadoop.hbase.testclassification.MapReduceTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; 066import org.apache.hadoop.util.Tool; 067import org.apache.hadoop.util.ToolRunner; 068import org.junit.AfterClass; 069import org.junit.BeforeClass; 070import org.junit.ClassRule; 071import org.junit.Rule; 072import org.junit.Test; 073import org.junit.experimental.categories.Category; 074import org.junit.rules.TestName; 075import org.slf4j.Logger; 076import org.slf4j.LoggerFactory; 077 078@Category({MapReduceTests.class, LargeTests.class}) 079public class TestImportTSVWithVisibilityLabels implements Configurable { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestImportTSVWithVisibilityLabels.class); 084 085 private static final Logger LOG = 086 LoggerFactory.getLogger(TestImportTSVWithVisibilityLabels.class); 087 protected static final String NAME = TestImportTsv.class.getSimpleName(); 088 protected static HBaseTestingUtility util = new HBaseTestingUtility(); 089 090 /** 091 * Delete the tmp directory after running doMROnTableTest. Boolean. Default is 092 * false. 093 */ 094 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 095 096 /** 097 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 098 */ 099 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 100 101 private final String FAMILY = "FAM"; 102 private final static String TOPSECRET = "topsecret"; 103 private final static String PUBLIC = "public"; 104 private final static String PRIVATE = "private"; 105 private final static String CONFIDENTIAL = "confidential"; 106 private final static String SECRET = "secret"; 107 private static User SUPERUSER; 108 private static Configuration conf; 109 110 @Rule 111 public TestName name = new TestName(); 112 113 @Override 114 public Configuration getConf() { 115 return util.getConfiguration(); 116 } 117 118 @Override 119 public void setConf(Configuration conf) { 120 throw new IllegalArgumentException("setConf not supported"); 121 } 122 123 @BeforeClass 124 public static void provisionCluster() throws Exception { 125 conf = util.getConfiguration(); 126 SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { "supergroup" }); 127 conf.set("hbase.superuser", "admin,"+User.getCurrent().getName()); 128 VisibilityTestUtil.enableVisiblityLabels(conf); 129 conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, SimpleScanLabelGenerator.class, 130 ScanLabelGenerator.class); 131 util.startMiniCluster(); 132 // Wait for the labels table to become available 133 util.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME.getName(), 50000); 134 createLabels(); 135 } 136 137 private static void createLabels() throws IOException, InterruptedException { 138 PrivilegedExceptionAction<VisibilityLabelsResponse> action = 139 new PrivilegedExceptionAction<VisibilityLabelsResponse>() { 140 @Override 141 public VisibilityLabelsResponse run() throws Exception { 142 String[] labels = { SECRET, TOPSECRET, CONFIDENTIAL, PUBLIC, PRIVATE }; 143 try (Connection conn = ConnectionFactory.createConnection(conf)) { 144 VisibilityClient.addLabels(conn, labels); 145 LOG.info("Added labels "); 146 } catch (Throwable t) { 147 LOG.error("Error in adding labels" , t); 148 throw new IOException(t); 149 } 150 return null; 151 } 152 }; 153 SUPERUSER.runAs(action); 154 } 155 156 @AfterClass 157 public static void releaseCluster() throws Exception { 158 util.shutdownMiniCluster(); 159 } 160 161 @Test 162 public void testMROnTable() throws Exception { 163 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 164 165 // Prepare the arguments required for the test. 166 String[] args = new String[] { 167 "-D" + ImportTsv.MAPPER_CONF_KEY 168 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 169 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 170 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 171 String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; 172 util.createTable(tableName, FAMILY); 173 doMROnTableTest(util, FAMILY, data, args, 1); 174 util.deleteTable(tableName); 175 } 176 177 @Test 178 public void testMROnTableWithDeletes() throws Exception { 179 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 180 181 // Prepare the arguments required for the test. 182 String[] args = new String[] { 183 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 184 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 185 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 186 String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; 187 util.createTable(tableName, FAMILY); 188 doMROnTableTest(util, FAMILY, data, args, 1); 189 issueDeleteAndVerifyData(tableName); 190 util.deleteTable(tableName); 191 } 192 193 private void issueDeleteAndVerifyData(TableName tableName) throws IOException { 194 LOG.debug("Validating table after delete."); 195 Table table = util.getConnection().getTable(tableName); 196 boolean verified = false; 197 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 198 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 199 for (int i = 0; i < numRetries; i++) { 200 try { 201 Delete d = new Delete(Bytes.toBytes("KEY")); 202 d.addFamily(Bytes.toBytes(FAMILY)); 203 d.setCellVisibility(new CellVisibility("private&secret")); 204 table.delete(d); 205 206 Scan scan = new Scan(); 207 // Scan entire family. 208 scan.addFamily(Bytes.toBytes(FAMILY)); 209 scan.setAuthorizations(new Authorizations("secret", "private")); 210 ResultScanner resScanner = table.getScanner(scan); 211 Result[] next = resScanner.next(5); 212 assertEquals(0, next.length); 213 verified = true; 214 break; 215 } catch (NullPointerException e) { 216 // If here, a cell was empty. Presume its because updates came in 217 // after the scanner had been opened. Wait a while and retry. 218 } 219 try { 220 Thread.sleep(pause); 221 } catch (InterruptedException e) { 222 // continue 223 } 224 } 225 table.close(); 226 assertTrue(verified); 227 } 228 229 @Test 230 public void testMROnTableWithBulkload() throws Exception { 231 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 232 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 233 // Prepare the arguments required for the test. 234 String[] args = new String[] { 235 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 236 "-D" + ImportTsv.COLUMNS_CONF_KEY 237 + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 238 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 239 String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; 240 util.createTable(tableName, FAMILY); 241 doMROnTableTest(util, FAMILY, data, args, 1); 242 util.deleteTable(tableName); 243 } 244 245 @Test 246 public void testBulkOutputWithTsvImporterTextMapper() throws Exception { 247 final TableName table = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 248 String FAMILY = "FAM"; 249 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table.getNameAsString()),"hfiles"); 250 // Prepare the arguments required for the test. 251 String[] args = 252 new String[] { 253 "-D" + ImportTsv.MAPPER_CONF_KEY 254 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", 255 "-D" + ImportTsv.COLUMNS_CONF_KEY 256 + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 257 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", 258 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), 259 table.getNameAsString() 260 }; 261 String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; 262 doMROnTableTest(util, FAMILY, data, args, 4); 263 util.deleteTable(table); 264 } 265 266 @Test 267 public void testMRWithOutputFormat() throws Exception { 268 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 269 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 270 // Prepare the arguments required for the test. 271 String[] args = new String[] { 272 "-D" + ImportTsv.MAPPER_CONF_KEY 273 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 274 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 275 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 276 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 277 String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; 278 util.createTable(tableName, FAMILY); 279 doMROnTableTest(util, FAMILY, data, args, 1); 280 util.deleteTable(tableName); 281 } 282 283 @Test 284 public void testBulkOutputWithInvalidLabels() throws Exception { 285 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 286 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 287 // Prepare the arguments required for the test. 288 String[] args = 289 new String[] { "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 290 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 291 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 292 293 // 2 Data rows, one with valid label and one with invalid label 294 String data = 295 "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n"; 296 util.createTable(tableName, FAMILY); 297 doMROnTableTest(util, FAMILY, data, args, 1, 2); 298 util.deleteTable(tableName); 299 } 300 301 @Test 302 public void testBulkOutputWithTsvImporterTextMapperWithInvalidLabels() throws Exception { 303 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 304 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 305 // Prepare the arguments required for the test. 306 String[] args = 307 new String[] { 308 "-D" + ImportTsv.MAPPER_CONF_KEY 309 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", 310 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 311 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 312 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 313 314 // 2 Data rows, one with valid label and one with invalid label 315 String data = 316 "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n"; 317 util.createTable(tableName, FAMILY); 318 doMROnTableTest(util, FAMILY, data, args, 1, 2); 319 util.deleteTable(tableName); 320 } 321 322 protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, 323 String[] args, int valueMultiplier) throws Exception { 324 return doMROnTableTest(util, family, data, args, valueMultiplier, -1); 325 } 326 327 /** 328 * Run an ImportTsv job and perform basic validation on the results. Returns 329 * the ImportTsv <code>Tool</code> instance so that other tests can inspect it 330 * for further validation as necessary. This method is static to insure 331 * non-reliance on instance's util/conf facilities. 332 * 333 * @param args 334 * Any arguments to pass BEFORE inputFile path is appended. 335 * 336 * @param expectedKVCount Expected KV count. pass -1 to skip the kvcount check 337 * 338 * @return The Tool instance used to run the test. 339 */ 340 protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, 341 String[] args, int valueMultiplier,int expectedKVCount) throws Exception { 342 TableName table = TableName.valueOf(args[args.length - 1]); 343 Configuration conf = new Configuration(util.getConfiguration()); 344 345 // populate input file 346 FileSystem fs = FileSystem.get(conf); 347 Path inputPath = fs.makeQualified(new Path(util 348 .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); 349 FSDataOutputStream op = fs.create(inputPath, true); 350 if (data == null) { 351 data = "KEY\u001bVALUE1\u001bVALUE2\n"; 352 } 353 op.write(Bytes.toBytes(data)); 354 op.close(); 355 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 356 357 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 358 LOG.debug("Forcing combiner."); 359 conf.setInt("mapreduce.map.combine.minspills", 1); 360 } 361 362 // run the import 363 List<String> argv = new ArrayList<>(Arrays.asList(args)); 364 argv.add(inputPath.toString()); 365 Tool tool = new ImportTsv(); 366 LOG.debug("Running ImportTsv with arguments: " + argv); 367 assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); 368 369 // Perform basic validation. If the input args did not include 370 // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. 371 // Otherwise, validate presence of hfiles. 372 boolean createdHFiles = false; 373 String outputPath = null; 374 for (String arg : argv) { 375 if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) { 376 createdHFiles = true; 377 // split '-Dfoo=bar' on '=' and keep 'bar' 378 outputPath = arg.split("=")[1]; 379 break; 380 } 381 } 382 LOG.debug("validating the table " + createdHFiles); 383 if (createdHFiles) 384 validateHFiles(fs, outputPath, family,expectedKVCount); 385 else 386 validateTable(conf, table, family, valueMultiplier); 387 388 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 389 LOG.debug("Deleting test subdirectory"); 390 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 391 } 392 return tool; 393 } 394 395 /** 396 * Confirm ImportTsv via HFiles on fs. 397 */ 398 private static void validateHFiles(FileSystem fs, String outputPath, String family, 399 int expectedKVCount) throws IOException { 400 401 // validate number and content of output columns 402 LOG.debug("Validating HFiles."); 403 Set<String> configFamilies = new HashSet<>(); 404 configFamilies.add(family); 405 Set<String> foundFamilies = new HashSet<>(); 406 int actualKVCount = 0; 407 for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { 408 LOG.debug("The output path has files"); 409 String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); 410 String cf = elements[elements.length - 1]; 411 foundFamilies.add(cf); 412 assertTrue(String.format( 413 "HFile ouput contains a column family (%s) not present in input families (%s)", cf, 414 configFamilies), configFamilies.contains(cf)); 415 for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { 416 assertTrue(String.format("HFile %s appears to contain no data.", hfile.getPath()), 417 hfile.getLen() > 0); 418 if (expectedKVCount > -1) { 419 actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); 420 } 421 } 422 } 423 if (expectedKVCount > -1) { 424 assertTrue(String.format( 425 "KV count in output hfile=<%d> doesn't match with expected KV count=<%d>", actualKVCount, 426 expectedKVCount), actualKVCount == expectedKVCount); 427 } 428 } 429 430 /** 431 * Confirm ImportTsv via data in online table. 432 */ 433 private static void validateTable(Configuration conf, TableName tableName, String family, 434 int valueMultiplier) throws IOException { 435 436 LOG.debug("Validating table."); 437 Table table = util.getConnection().getTable(tableName); 438 boolean verified = false; 439 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 440 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 441 for (int i = 0; i < numRetries; i++) { 442 try { 443 Scan scan = new Scan(); 444 // Scan entire family. 445 scan.addFamily(Bytes.toBytes(family)); 446 scan.setAuthorizations(new Authorizations("secret","private")); 447 ResultScanner resScanner = table.getScanner(scan); 448 Result[] next = resScanner.next(5); 449 assertEquals(1, next.length); 450 for (Result res : resScanner) { 451 LOG.debug("Getting results " + res.size()); 452 assertTrue(res.size() == 2); 453 List<Cell> kvs = res.listCells(); 454 assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY"))); 455 assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY"))); 456 assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); 457 assertTrue(CellUtil.matchingValue(kvs.get(1), 458 Bytes.toBytes("VALUE" + 2 * valueMultiplier))); 459 // Only one result set is expected, so let it loop. 460 } 461 verified = true; 462 break; 463 } catch (NullPointerException e) { 464 // If here, a cell was empty. Presume its because updates came in 465 // after the scanner had been opened. Wait a while and retry. 466 } 467 try { 468 Thread.sleep(pause); 469 } catch (InterruptedException e) { 470 // continue 471 } 472 } 473 table.close(); 474 assertTrue(verified); 475 } 476 477 /** 478 * Method returns the total KVs in given hfile 479 * @param fs File System 480 * @param p HFile path 481 * @return KV count in the given hfile 482 * @throws IOException 483 */ 484 private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { 485 Configuration conf = util.getConfiguration(); 486 HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); 487 HFileScanner scanner = reader.getScanner(false, false); 488 scanner.seekTo(); 489 int count = 0; 490 do { 491 count++; 492 } while (scanner.next()); 493 reader.close(); 494 return count; 495 } 496 497}