001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.security.PrivilegedExceptionAction; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import org.apache.hadoop.conf.Configurable; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.Delete; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.ResultScanner; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.io.hfile.CacheConfig; 050import org.apache.hadoop.hbase.io.hfile.HFile; 051import org.apache.hadoop.hbase.io.hfile.HFileScanner; 052import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse; 053import org.apache.hadoop.hbase.security.User; 054import org.apache.hadoop.hbase.security.visibility.Authorizations; 055import org.apache.hadoop.hbase.security.visibility.CellVisibility; 056import org.apache.hadoop.hbase.security.visibility.ScanLabelGenerator; 057import org.apache.hadoop.hbase.security.visibility.SimpleScanLabelGenerator; 058import org.apache.hadoop.hbase.security.visibility.VisibilityClient; 059import org.apache.hadoop.hbase.security.visibility.VisibilityConstants; 060import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 061import org.apache.hadoop.hbase.security.visibility.VisibilityUtils; 062import org.apache.hadoop.hbase.testclassification.LargeTests; 063import org.apache.hadoop.hbase.testclassification.MapReduceTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; 066import org.apache.hadoop.util.Tool; 067import org.apache.hadoop.util.ToolRunner; 068import org.junit.AfterClass; 069import org.junit.BeforeClass; 070import org.junit.ClassRule; 071import org.junit.Rule; 072import org.junit.Test; 073import org.junit.experimental.categories.Category; 074import org.junit.rules.TestName; 075import org.slf4j.Logger; 076import org.slf4j.LoggerFactory; 077 078@Category({ MapReduceTests.class, LargeTests.class }) 079public class TestImportTSVWithVisibilityLabels implements Configurable { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestImportTSVWithVisibilityLabels.class); 084 085 private static final Logger LOG = 086 LoggerFactory.getLogger(TestImportTSVWithVisibilityLabels.class); 087 protected static final String NAME = TestImportTsv.class.getSimpleName(); 088 protected static HBaseTestingUtility util = new HBaseTestingUtility(); 089 090 /** 091 * Delete the tmp directory after running doMROnTableTest. Boolean. Default is false. 092 */ 093 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 094 095 /** 096 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 097 */ 098 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 099 100 private final String FAMILY = "FAM"; 101 private final static String TOPSECRET = "topsecret"; 102 private final static String PUBLIC = "public"; 103 private final static String PRIVATE = "private"; 104 private final static String CONFIDENTIAL = "confidential"; 105 private final static String SECRET = "secret"; 106 private static User SUPERUSER; 107 private static Configuration conf; 108 109 @Rule 110 public TestName name = new TestName(); 111 112 @Override 113 public Configuration getConf() { 114 return util.getConfiguration(); 115 } 116 117 @Override 118 public void setConf(Configuration conf) { 119 throw new IllegalArgumentException("setConf not supported"); 120 } 121 122 @BeforeClass 123 public static void provisionCluster() throws Exception { 124 conf = util.getConfiguration(); 125 SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { "supergroup" }); 126 conf.set("hbase.superuser", "admin," + User.getCurrent().getName()); 127 VisibilityTestUtil.enableVisiblityLabels(conf); 128 conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, SimpleScanLabelGenerator.class, 129 ScanLabelGenerator.class); 130 util.startMiniCluster(); 131 // Wait for the labels table to become available 132 util.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME.getName(), 50000); 133 createLabels(); 134 } 135 136 private static void createLabels() throws IOException, InterruptedException { 137 PrivilegedExceptionAction<VisibilityLabelsResponse> action = 138 new PrivilegedExceptionAction<VisibilityLabelsResponse>() { 139 @Override 140 public VisibilityLabelsResponse run() throws Exception { 141 String[] labels = { SECRET, TOPSECRET, CONFIDENTIAL, PUBLIC, PRIVATE }; 142 try (Connection conn = ConnectionFactory.createConnection(conf)) { 143 VisibilityClient.addLabels(conn, labels); 144 LOG.info("Added labels "); 145 } catch (Throwable t) { 146 LOG.error("Error in adding labels", t); 147 throw new IOException(t); 148 } 149 return null; 150 } 151 }; 152 SUPERUSER.runAs(action); 153 } 154 155 @AfterClass 156 public static void releaseCluster() throws Exception { 157 util.shutdownMiniCluster(); 158 } 159 160 @Test 161 public void testMROnTable() throws Exception { 162 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 163 164 // Prepare the arguments required for the test. 165 String[] args = new String[] { 166 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 167 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 168 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 169 String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; 170 util.createTable(tableName, FAMILY); 171 doMROnTableTest(util, FAMILY, data, args, 1); 172 util.deleteTable(tableName); 173 } 174 175 @Test 176 public void testMROnTableWithDeletes() throws Exception { 177 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 178 179 // Prepare the arguments required for the test. 180 String[] args = new String[] { 181 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 182 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 183 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 184 String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; 185 util.createTable(tableName, FAMILY); 186 doMROnTableTest(util, FAMILY, data, args, 1); 187 issueDeleteAndVerifyData(tableName); 188 util.deleteTable(tableName); 189 } 190 191 private void issueDeleteAndVerifyData(TableName tableName) throws IOException { 192 LOG.debug("Validating table after delete."); 193 Table table = util.getConnection().getTable(tableName); 194 boolean verified = false; 195 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 196 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 197 for (int i = 0; i < numRetries; i++) { 198 try { 199 Delete d = new Delete(Bytes.toBytes("KEY")); 200 d.addFamily(Bytes.toBytes(FAMILY)); 201 d.setCellVisibility(new CellVisibility("private&secret")); 202 table.delete(d); 203 204 Scan scan = new Scan(); 205 // Scan entire family. 206 scan.addFamily(Bytes.toBytes(FAMILY)); 207 scan.setAuthorizations(new Authorizations("secret", "private")); 208 ResultScanner resScanner = table.getScanner(scan); 209 Result[] next = resScanner.next(5); 210 assertEquals(0, next.length); 211 verified = true; 212 break; 213 } catch (NullPointerException e) { 214 // If here, a cell was empty. Presume its because updates came in 215 // after the scanner had been opened. Wait a while and retry. 216 } 217 try { 218 Thread.sleep(pause); 219 } catch (InterruptedException e) { 220 // continue 221 } 222 } 223 table.close(); 224 assertTrue(verified); 225 } 226 227 @Test 228 public void testMROnTableWithBulkload() throws Exception { 229 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 230 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 231 // Prepare the arguments required for the test. 232 String[] args = new String[] { "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 233 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 234 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 235 String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; 236 util.createTable(tableName, FAMILY); 237 doMROnTableTest(util, FAMILY, data, args, 1); 238 util.deleteTable(tableName); 239 } 240 241 @Test 242 public void testBulkOutputWithTsvImporterTextMapper() throws Exception { 243 final TableName table = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 244 String FAMILY = "FAM"; 245 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "hfiles"); 246 // Prepare the arguments required for the test. 247 String[] args = new String[] { 248 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", 249 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 250 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", 251 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), 252 table.getNameAsString() }; 253 String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; 254 doMROnTableTest(util, FAMILY, data, args, 4); 255 util.deleteTable(table); 256 } 257 258 @Test 259 public void testMRWithOutputFormat() throws Exception { 260 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 261 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 262 // Prepare the arguments required for the test. 263 String[] args = new String[] { 264 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 265 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 266 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 267 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 268 String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; 269 util.createTable(tableName, FAMILY); 270 doMROnTableTest(util, FAMILY, data, args, 1); 271 util.deleteTable(tableName); 272 } 273 274 @Test 275 public void testBulkOutputWithInvalidLabels() throws Exception { 276 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 277 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 278 // Prepare the arguments required for the test. 279 String[] args = new String[] { "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 280 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 281 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 282 283 // 2 Data rows, one with valid label and one with invalid label 284 String data = 285 "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n"; 286 util.createTable(tableName, FAMILY); 287 doMROnTableTest(util, FAMILY, data, args, 1, 2); 288 util.deleteTable(tableName); 289 } 290 291 @Test 292 public void testBulkOutputWithTsvImporterTextMapperWithInvalidLabels() throws Exception { 293 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 294 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 295 // Prepare the arguments required for the test. 296 String[] args = new String[] { 297 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", 298 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 299 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 300 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 301 302 // 2 Data rows, one with valid label and one with invalid label 303 String data = 304 "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n"; 305 util.createTable(tableName, FAMILY); 306 doMROnTableTest(util, FAMILY, data, args, 1, 2); 307 util.deleteTable(tableName); 308 } 309 310 protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, 311 String[] args, int valueMultiplier) throws Exception { 312 return doMROnTableTest(util, family, data, args, valueMultiplier, -1); 313 } 314 315 /** 316 * Run an ImportTsv job and perform basic validation on the results. Returns the ImportTsv 317 * <code>Tool</code> instance so that other tests can inspect it for further validation as 318 * necessary. This method is static to insure non-reliance on instance's util/conf facilities. n * 319 * Any arguments to pass BEFORE inputFile path is appended. 320 * @param expectedKVCount Expected KV count. pass -1 to skip the kvcount check 321 * @return The Tool instance used to run the test. 322 */ 323 protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, 324 String[] args, int valueMultiplier, int expectedKVCount) throws Exception { 325 TableName table = TableName.valueOf(args[args.length - 1]); 326 Configuration conf = new Configuration(util.getConfiguration()); 327 328 // populate input file 329 FileSystem fs = FileSystem.get(conf); 330 Path inputPath = 331 fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); 332 FSDataOutputStream op = fs.create(inputPath, true); 333 if (data == null) { 334 data = "KEY\u001bVALUE1\u001bVALUE2\n"; 335 } 336 op.write(Bytes.toBytes(data)); 337 op.close(); 338 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 339 340 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 341 LOG.debug("Forcing combiner."); 342 conf.setInt("mapreduce.map.combine.minspills", 1); 343 } 344 345 // run the import 346 List<String> argv = new ArrayList<>(Arrays.asList(args)); 347 argv.add(inputPath.toString()); 348 Tool tool = new ImportTsv(); 349 LOG.debug("Running ImportTsv with arguments: " + argv); 350 assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); 351 352 // Perform basic validation. If the input args did not include 353 // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. 354 // Otherwise, validate presence of hfiles. 355 boolean createdHFiles = false; 356 String outputPath = null; 357 for (String arg : argv) { 358 if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) { 359 createdHFiles = true; 360 // split '-Dfoo=bar' on '=' and keep 'bar' 361 outputPath = arg.split("=")[1]; 362 break; 363 } 364 } 365 LOG.debug("validating the table " + createdHFiles); 366 if (createdHFiles) validateHFiles(fs, outputPath, family, expectedKVCount); 367 else validateTable(conf, table, family, valueMultiplier); 368 369 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 370 LOG.debug("Deleting test subdirectory"); 371 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 372 } 373 return tool; 374 } 375 376 /** 377 * Confirm ImportTsv via HFiles on fs. 378 */ 379 private static void validateHFiles(FileSystem fs, String outputPath, String family, 380 int expectedKVCount) throws IOException { 381 382 // validate number and content of output columns 383 LOG.debug("Validating HFiles."); 384 Set<String> configFamilies = new HashSet<>(); 385 configFamilies.add(family); 386 Set<String> foundFamilies = new HashSet<>(); 387 int actualKVCount = 0; 388 for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { 389 LOG.debug("The output path has files"); 390 String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); 391 String cf = elements[elements.length - 1]; 392 foundFamilies.add(cf); 393 assertTrue(String.format( 394 "HFile ouput contains a column family (%s) not present in input families (%s)", cf, 395 configFamilies), configFamilies.contains(cf)); 396 for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { 397 assertTrue(String.format("HFile %s appears to contain no data.", hfile.getPath()), 398 hfile.getLen() > 0); 399 if (expectedKVCount > -1) { 400 actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); 401 } 402 } 403 } 404 if (expectedKVCount > -1) { 405 assertTrue( 406 String.format("KV count in output hfile=<%d> doesn't match with expected KV count=<%d>", 407 actualKVCount, expectedKVCount), 408 actualKVCount == expectedKVCount); 409 } 410 } 411 412 /** 413 * Confirm ImportTsv via data in online table. 414 */ 415 private static void validateTable(Configuration conf, TableName tableName, String family, 416 int valueMultiplier) throws IOException { 417 418 LOG.debug("Validating table."); 419 Table table = util.getConnection().getTable(tableName); 420 boolean verified = false; 421 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 422 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 423 for (int i = 0; i < numRetries; i++) { 424 try { 425 Scan scan = new Scan(); 426 // Scan entire family. 427 scan.addFamily(Bytes.toBytes(family)); 428 scan.setAuthorizations(new Authorizations("secret", "private")); 429 ResultScanner resScanner = table.getScanner(scan); 430 Result[] next = resScanner.next(5); 431 assertEquals(1, next.length); 432 for (Result res : resScanner) { 433 LOG.debug("Getting results " + res.size()); 434 assertTrue(res.size() == 2); 435 List<Cell> kvs = res.listCells(); 436 assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY"))); 437 assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY"))); 438 assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); 439 assertTrue( 440 CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier))); 441 // Only one result set is expected, so let it loop. 442 } 443 verified = true; 444 break; 445 } catch (NullPointerException e) { 446 // If here, a cell was empty. Presume its because updates came in 447 // after the scanner had been opened. Wait a while and retry. 448 } 449 try { 450 Thread.sleep(pause); 451 } catch (InterruptedException e) { 452 // continue 453 } 454 } 455 table.close(); 456 assertTrue(verified); 457 } 458 459 /** 460 * Method returns the total KVs in given hfile 461 * @param fs File System 462 * @param p HFile path 463 * @return KV count in the given hfile n 464 */ 465 private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { 466 Configuration conf = util.getConfiguration(); 467 HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); 468 HFileScanner scanner = reader.getScanner(conf, false, false); 469 scanner.seekTo(); 470 int count = 0; 471 do { 472 count++; 473 } while (scanner.next()); 474 reader.close(); 475 return count; 476 } 477 478}