001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import java.io.IOException; 021import java.security.PrivilegedExceptionAction; 022import java.util.Arrays; 023import java.util.Iterator; 024import java.util.UUID; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.conf.Configured; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HRegionLocation; 032import org.apache.hadoop.hbase.IntegrationTestingUtility; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.BufferedMutator; 037import org.apache.hadoop.hbase.client.BufferedMutatorParams; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.Delete; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.Scan; 045import org.apache.hadoop.hbase.client.TableDescriptor; 046import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 047import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 048import org.apache.hadoop.hbase.log.HBaseMarkers; 049import org.apache.hadoop.hbase.mapreduce.Import; 050import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 051import org.apache.hadoop.hbase.security.User; 052import org.apache.hadoop.hbase.security.access.AccessControlClient; 053import org.apache.hadoop.hbase.security.access.Permission; 054import org.apache.hadoop.hbase.security.visibility.Authorizations; 055import org.apache.hadoop.hbase.security.visibility.CellVisibility; 056import org.apache.hadoop.hbase.security.visibility.VisibilityClient; 057import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 058import org.apache.hadoop.hbase.testclassification.IntegrationTests; 059import org.apache.hadoop.hbase.util.AbstractHBaseTool; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.io.BytesWritable; 062import org.apache.hadoop.mapreduce.Counter; 063import org.apache.hadoop.mapreduce.CounterGroup; 064import org.apache.hadoop.mapreduce.Counters; 065import org.apache.hadoop.mapreduce.Job; 066import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 067import org.apache.hadoop.util.Tool; 068import org.apache.hadoop.util.ToolRunner; 069import org.junit.Test; 070import org.junit.experimental.categories.Category; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 075import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 076 077/** 078 * IT test used to verify the deletes with visibility labels. The test creates three tables 079 * tablename_0, tablename_1 and tablename_2 and each table is associated with a unique pair of 080 * labels. Another common table with the name 'commontable' is created and it has the data combined 081 * from all these 3 tables such that there are 3 versions of every row but the visibility label in 082 * every row corresponds to the table from which the row originated. Then deletes are issued to the 083 * common table by selecting the visibility label associated with each of the smaller tables. After 084 * the delete is issued with one set of visibility labels we try to scan the common table with each 085 * of the visibility pairs defined for the 3 tables. So after the first delete is issued, a scan 086 * with the first set of visibility labels would return zero result whereas the scan issued with the 087 * other two sets of visibility labels should return all the rows corresponding to that set of 088 * visibility labels. The above process of delete and scan is repeated until after the last set of 089 * visibility labels are used for the deletes the common table should not return any row. To use 090 * this ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedListWithVisibility Loop 1 1 091 * 20000 /tmp 1 10000 or ./hbase org.apache.hadoop.hbase.IntegrationTestsDriver -r 092 * .*IntegrationTestBigLinkedListWithVisibility.* 093 */ 094@Category(IntegrationTests.class) 095public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestBigLinkedList { 096 097 private static final String CONFIDENTIAL = "confidential"; 098 private static final String TOPSECRET = "topsecret"; 099 private static final String SECRET = "secret"; 100 private static final String PUBLIC = "public"; 101 private static final String PRIVATE = "private"; 102 private static final String EVERYONE = "everyone"; 103 private static final String RESTRICTED = "restricted"; 104 private static final String GROUP = "group"; 105 private static final String PREVILIGED = "previliged"; 106 private static final String OPEN = "open"; 107 public static String labels = CONFIDENTIAL + "," + TOPSECRET + "," + SECRET + "," + RESTRICTED 108 + "," + PRIVATE + "," + PREVILIGED + "," + GROUP + "," + OPEN + "," + PUBLIC + "," + EVERYONE; 109 private static final String COMMA = ","; 110 private static final String UNDER_SCORE = "_"; 111 public static int DEFAULT_TABLES_COUNT = 3; 112 public static String tableName = "tableName"; 113 public static final String COMMON_TABLE_NAME = "commontable"; 114 public static final String LABELS_KEY = "LABELS"; 115 public static final String INDEX_KEY = "INDEX"; 116 private static User USER; 117 private static final String OR = "|"; 118 private static String USER_OPT = "user"; 119 private static String userName = "user1"; 120 121 static class VisibilityGenerator extends Generator { 122 private static final Logger LOG = LoggerFactory.getLogger(VisibilityGenerator.class); 123 124 @Override 125 protected void createSchema() throws IOException { 126 LOG.info("Creating tables"); 127 // Create three tables 128 boolean acl = AccessControlClient 129 .isAccessControllerRunning(ConnectionFactory.createConnection(getConf())); 130 if (!acl) { 131 LOG.info("No ACL available."); 132 } 133 try (Connection conn = ConnectionFactory.createConnection(getConf()); 134 Admin admin = conn.getAdmin()) { 135 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 136 TableName tableName = IntegrationTestBigLinkedListWithVisibility.getTableName(i); 137 createTable(admin, tableName, false, acl); 138 } 139 TableName tableName = TableName.valueOf(COMMON_TABLE_NAME); 140 createTable(admin, tableName, true, acl); 141 } 142 } 143 144 private void createTable(Admin admin, TableName tableName, boolean setVersion, boolean acl) 145 throws IOException { 146 if (!admin.tableExists(tableName)) { 147 ColumnFamilyDescriptorBuilder cfBuilder = 148 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME); 149 if (setVersion) { 150 cfBuilder.setMaxVersions(DEFAULT_TABLES_COUNT); 151 } 152 TableDescriptor tableDescriptor = 153 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(cfBuilder.build()).build(); 154 admin.createTable(tableDescriptor); 155 if (acl) { 156 LOG.info("Granting permissions for user " + USER.getShortName()); 157 Permission.Action[] actions = { Permission.Action.READ }; 158 try { 159 AccessControlClient.grant(ConnectionFactory.createConnection(getConf()), tableName, 160 USER.getShortName(), null, null, actions); 161 } catch (Throwable e) { 162 LOG.error(HBaseMarkers.FATAL, 163 "Error in granting permission for the user " + USER.getShortName(), e); 164 throw new IOException(e); 165 } 166 } 167 } 168 } 169 170 @Override 171 protected void setMapperForGenerator(Job job) { 172 job.setMapperClass(VisibilityGeneratorMapper.class); 173 } 174 175 static class VisibilityGeneratorMapper extends GeneratorMapper { 176 BufferedMutator[] tables = new BufferedMutator[DEFAULT_TABLES_COUNT]; 177 178 @Override 179 protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) 180 throws IOException, InterruptedException { 181 super.setup(context); 182 } 183 184 @Override 185 protected void instantiateHTable() throws IOException { 186 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 187 BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i)); 188 params.writeBufferSize(4 * 1024 * 1024); 189 BufferedMutator table = connection.getBufferedMutator(params); 190 this.tables[i] = table; 191 } 192 } 193 194 @Override 195 protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context) 196 throws IOException, InterruptedException { 197 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 198 if (tables[i] != null) { 199 tables[i].close(); 200 } 201 } 202 } 203 204 @Override 205 protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count, 206 byte[][] prev, byte[][] current, byte[] id) throws IOException { 207 String visibilityExps = ""; 208 String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new); 209 for (int i = 0; i < current.length; i++) { 210 for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) { 211 Put put = new Put(current[i]); 212 byte[] value = prev == null ? NO_KEY : prev[i]; 213 put.addColumn(FAMILY_NAME, COLUMN_PREV, value); 214 215 if (count >= 0) { 216 put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); 217 } 218 if (id != null) { 219 put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id); 220 } 221 visibilityExps = split[j * 2] + OR + split[(j * 2) + 1]; 222 put.setCellVisibility(new CellVisibility(visibilityExps)); 223 tables[j].mutate(put); 224 try { 225 Thread.sleep(1); 226 } catch (InterruptedException e) { 227 throw new IOException(); 228 } 229 } 230 if (i % 1000 == 0) { 231 // Tickle progress every so often else maprunner will think us hung 232 output.progress(); 233 } 234 } 235 } 236 } 237 } 238 239 static class Copier extends Configured implements Tool { 240 private static final Logger LOG = LoggerFactory.getLogger(Copier.class); 241 private TableName tableName; 242 private int labelIndex; 243 private boolean delete; 244 245 public Copier(TableName tableName, int index, boolean delete) { 246 this.tableName = tableName; 247 this.labelIndex = index; 248 this.delete = delete; 249 } 250 251 public int runCopier(String outputDir) throws Exception { 252 Job job = new Job(getConf()); 253 job.setJobName("Data copier"); 254 job.getConfiguration().setInt("INDEX", labelIndex); 255 job.getConfiguration().set("LABELS", labels); 256 job.setJarByClass(getClass()); 257 Scan scan = new Scan(); 258 scan.setCacheBlocks(false); 259 scan.setRaw(true); 260 261 String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new); 262 scan.setAuthorizations( 263 new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1])); 264 if (delete) { 265 LOG.info("Running deletes"); 266 } else { 267 LOG.info("Running copiers"); 268 } 269 if (delete) { 270 TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan, 271 VisibilityDeleteImport.class, null, null, job); 272 } else { 273 TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan, 274 VisibilityImport.class, null, null, job); 275 } 276 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 277 job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false); 278 TableMapReduceUtil.initTableReducerJob(COMMON_TABLE_NAME, null, job, null, null, null, null); 279 TableMapReduceUtil.addDependencyJars(job); 280 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class); 281 TableMapReduceUtil.initCredentials(job); 282 job.setNumReduceTasks(0); 283 boolean success = job.waitForCompletion(true); 284 return success ? 0 : 1; 285 } 286 287 @Override 288 public int run(String[] arg0) throws Exception { 289 // TODO Auto-generated method stub 290 return 0; 291 } 292 } 293 294 static class VisibilityImport extends Import.Importer { 295 private int index; 296 private String labels; 297 private String[] split; 298 299 @Override 300 public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) { 301 index = context.getConfiguration().getInt(INDEX_KEY, -1); 302 labels = context.getConfiguration().get(LABELS_KEY); 303 split = labels.split(COMMA); 304 super.setup(context); 305 } 306 307 @Override 308 protected void addPutToKv(Put put, Cell kv) throws IOException { 309 String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1]; 310 put.setCellVisibility(new CellVisibility(visibilityExps)); 311 super.addPutToKv(put, kv); 312 } 313 } 314 315 static class VisibilityDeleteImport extends Import.Importer { 316 private int index; 317 private String labels; 318 private String[] split; 319 320 @Override 321 public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) { 322 index = context.getConfiguration().getInt(INDEX_KEY, -1); 323 labels = context.getConfiguration().get(LABELS_KEY); 324 split = labels.split(COMMA); 325 super.setup(context); 326 } 327 328 // Creating delete here 329 @Override 330 protected void processKV(ImmutableBytesWritable key, Result result, 331 org.apache.hadoop.mapreduce.Mapper.Context context, Put put, 332 org.apache.hadoop.hbase.client.Delete delete) throws IOException, InterruptedException { 333 String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1]; 334 for (Cell kv : result.rawCells()) { 335 // skip if we filter it out 336 if (kv == null) continue; 337 // Create deletes here 338 if (delete == null) { 339 delete = new Delete(key.get()); 340 } 341 delete.setCellVisibility(new CellVisibility(visibilityExps)); 342 delete.addFamily(CellUtil.cloneFamily(kv)); 343 } 344 if (delete != null) { 345 context.write(key, delete); 346 } 347 } 348 } 349 350 @Override 351 protected void addOptions() { 352 super.addOptions(); 353 addOptWithArg("u", USER_OPT, "User name"); 354 } 355 356 @Override 357 protected void processOptions(CommandLine cmd) { 358 super.processOptions(cmd); 359 if (cmd.hasOption(USER_OPT)) { 360 userName = cmd.getOptionValue(USER_OPT); 361 } 362 363 } 364 365 @Override 366 public void setUpCluster() throws Exception { 367 util = getTestingUtil(null); 368 Configuration conf = util.getConfiguration(); 369 VisibilityTestUtil.enableVisiblityLabels(conf); 370 conf.set("hbase.superuser", User.getCurrent().getName()); 371 conf.setBoolean("dfs.permissions", false); 372 USER = User.createUserForTesting(conf, userName, new String[] {}); 373 super.setUpCluster(); 374 addLabels(); 375 } 376 377 static TableName getTableName(int i) { 378 return TableName.valueOf(tableName + UNDER_SCORE + i); 379 } 380 381 private void addLabels() throws Exception { 382 try { 383 VisibilityClient.addLabels(util.getConnection(), labels.split(COMMA)); 384 VisibilityClient.setAuths(util.getConnection(), labels.split(COMMA), USER.getName()); 385 } catch (Throwable t) { 386 throw new IOException(t); 387 } 388 } 389 390 static class VisibilityVerify extends Verify { 391 private static final Logger LOG = LoggerFactory.getLogger(VisibilityVerify.class); 392 private TableName tableName; 393 private int labelIndex; 394 395 public VisibilityVerify(String tableName, int index) { 396 this.tableName = TableName.valueOf(tableName); 397 this.labelIndex = index; 398 } 399 400 @Override 401 public int run(final Path outputDir, final int numReducers) throws Exception { 402 LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers); 403 PrivilegedExceptionAction<Integer> scanAction = new PrivilegedExceptionAction<Integer>() { 404 @Override 405 public Integer run() throws Exception { 406 return doVerify(outputDir, numReducers); 407 } 408 }; 409 return USER.runAs(scanAction); 410 } 411 412 private int doVerify(Path outputDir, int numReducers) 413 throws IOException, InterruptedException, ClassNotFoundException { 414 job = new Job(getConf()); 415 416 job.setJobName("Link Verifier"); 417 job.setNumReduceTasks(numReducers); 418 job.setJarByClass(getClass()); 419 420 setJobScannerConf(job); 421 422 Scan scan = new Scan(); 423 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 424 scan.setCaching(10000); 425 scan.setCacheBlocks(false); 426 String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new); 427 428 scan.setAuthorizations( 429 new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1])); 430 431 TableMapReduceUtil.initTableMapperJob(tableName.getName(), scan, VerifyMapper.class, 432 BytesWritable.class, BytesWritable.class, job); 433 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class); 434 435 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 436 437 job.setReducerClass(VerifyReducer.class); 438 job.setOutputFormatClass(TextOutputFormat.class); 439 TextOutputFormat.setOutputPath(job, outputDir); 440 boolean success = job.waitForCompletion(true); 441 442 return success ? 0 : 1; 443 } 444 445 @Override 446 protected void handleFailure(Counters counters) throws IOException { 447 try (Connection conn = ConnectionFactory.createConnection(job.getConfiguration())) { 448 TableName tableName = TableName.valueOf(COMMON_TABLE_NAME); 449 CounterGroup g = counters.getGroup("undef"); 450 Iterator<Counter> it = g.iterator(); 451 while (it.hasNext()) { 452 String keyString = it.next().getName(); 453 byte[] key = Bytes.toBytes(keyString); 454 HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true); 455 LOG.error("undefined row " + keyString + ", " + loc); 456 } 457 g = counters.getGroup("unref"); 458 it = g.iterator(); 459 while (it.hasNext()) { 460 String keyString = it.next().getName(); 461 byte[] key = Bytes.toBytes(keyString); 462 HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true); 463 LOG.error("unreferred row " + keyString + ", " + loc); 464 } 465 } 466 } 467 } 468 469 static class VisibilityLoop extends Loop { 470 private static final int SLEEP_IN_MS = 5000; 471 private static final Logger LOG = LoggerFactory.getLogger(VisibilityLoop.class); 472 473 @Override 474 protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width, 475 Integer wrapMultiplier, Integer numWalkers) throws Exception { 476 Path outputPath = new Path(outputDir); 477 UUID uuid = UUID.randomUUID(); // create a random UUID. 478 Path generatorOutput = new Path(outputPath, uuid.toString()); 479 480 Generator generator = new VisibilityGenerator(); 481 generator.setConf(getConf()); 482 int retCode = 483 generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, numWalkers); 484 if (retCode > 0) { 485 throw new RuntimeException("Generator failed with return code: " + retCode); 486 } 487 } 488 489 protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width, 490 Integer wrapMultiplier, int tableIndex) throws Exception { 491 LOG.info("Running copier on table " 492 + IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex)); 493 Copier copier = new Copier( 494 IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true); 495 copier.setConf(getConf()); 496 copier.runCopier(outputDir); 497 Thread.sleep(SLEEP_IN_MS); 498 } 499 500 protected void runVerify(String outputDir, int numReducers, long expectedNumNodes, 501 boolean allTables) throws Exception { 502 Path outputPath = new Path(outputDir); 503 504 if (allTables) { 505 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 506 LOG.info("Verifying table " + i); 507 sleep(SLEEP_IN_MS); 508 UUID uuid = UUID.randomUUID(); // create a random UUID. 509 Path iterationOutput = new Path(outputPath, uuid.toString()); 510 Verify verify = new VisibilityVerify(getTableName(i).getNameAsString(), i); 511 verify(numReducers, expectedNumNodes, iterationOutput, verify); 512 } 513 } 514 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 515 runVerifyCommonTable(outputDir, numReducers, expectedNumNodes, i); 516 } 517 } 518 519 private void runVerify(String outputDir, int numReducers, long expectedNodes, int tableIndex) 520 throws Exception { 521 long temp = expectedNodes; 522 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 523 if (i <= tableIndex) { 524 expectedNodes = 0; 525 } else { 526 expectedNodes = temp; 527 } 528 LOG.info("Verifying data in the table with index " + i + " and expected nodes is " 529 + expectedNodes); 530 runVerifyCommonTable(outputDir, numReducers, expectedNodes, i); 531 } 532 } 533 534 private void sleep(long ms) throws InterruptedException { 535 Thread.sleep(ms); 536 } 537 538 protected void runVerifyCommonTable(String outputDir, int numReducers, long expectedNumNodes, 539 int index) throws Exception { 540 LOG.info("Verifying common table with index " + index); 541 sleep(SLEEP_IN_MS); 542 Path outputPath = new Path(outputDir); 543 UUID uuid = UUID.randomUUID(); // create a random UUID. 544 Path iterationOutput = new Path(outputPath, uuid.toString()); 545 Verify verify = 546 new VisibilityVerify(TableName.valueOf(COMMON_TABLE_NAME).getNameAsString(), index); 547 verify(numReducers, expectedNumNodes, iterationOutput, verify); 548 } 549 550 protected void runCopier(String outputDir) throws Exception { 551 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 552 LOG.info("Running copier " + IntegrationTestBigLinkedListWithVisibility.getTableName(i)); 553 sleep(SLEEP_IN_MS); 554 Copier copier = 555 new Copier(IntegrationTestBigLinkedListWithVisibility.getTableName(i), i, false); 556 copier.setConf(getConf()); 557 copier.runCopier(outputDir); 558 } 559 } 560 561 private void verify(int numReducers, long expectedNumNodes, Path iterationOutput, Verify verify) 562 throws Exception { 563 verify.setConf(getConf()); 564 int retCode = verify.run(iterationOutput, numReducers); 565 if (retCode > 0) { 566 throw new RuntimeException("Verify.run failed with return code: " + retCode); 567 } 568 569 if (!verify.verify(expectedNumNodes)) { 570 throw new RuntimeException("Verify.verify failed"); 571 } 572 573 LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes); 574 } 575 576 @Override 577 public int run(String[] args) throws Exception { 578 if (args.length < 5) { 579 System.err.println( 580 "Usage: Loop <num iterations> " + "<num mappers> <num nodes per mapper> <output dir> " 581 + "<num reducers> [<width> <wrap multiplier>]"); 582 return 1; 583 } 584 LOG.info("Running Loop with args:" + Arrays.deepToString(args)); 585 586 int numIterations = Integer.parseInt(args[0]); 587 int numMappers = Integer.parseInt(args[1]); 588 long numNodes = Long.parseLong(args[2]); 589 String outputDir = args[3]; 590 int numReducers = Integer.parseInt(args[4]); 591 Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]); 592 Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]); 593 long expectedNumNodes = 0; 594 595 if (numIterations < 0) { 596 numIterations = Integer.MAX_VALUE; // run indefinitely (kind of) 597 } 598 599 for (int i = 0; i < numIterations; i++) { 600 LOG.info("Starting iteration = " + i); 601 LOG.info("Generating data"); 602 // By default run no concurrent walkers for test with visibility 603 runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, 0); 604 expectedNumNodes += numMappers * numNodes; 605 // Copying wont work because expressions are not returned back to the 606 // client 607 LOG.info("Running copier"); 608 sleep(SLEEP_IN_MS); 609 runCopier(outputDir); 610 LOG.info("Verifying copied data"); 611 sleep(SLEEP_IN_MS); 612 runVerify(outputDir, numReducers, expectedNumNodes, true); 613 sleep(SLEEP_IN_MS); 614 for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) { 615 LOG.info("Deleting data on table with index: " + j); 616 runDelete(numMappers, numNodes, outputDir, width, wrapMultiplier, j); 617 sleep(SLEEP_IN_MS); 618 LOG.info("Verifying common table after deleting"); 619 runVerify(outputDir, numReducers, expectedNumNodes, j); 620 sleep(SLEEP_IN_MS); 621 } 622 } 623 return 0; 624 } 625 } 626 627 @Override 628 @Test 629 public void testContinuousIngest() throws IOException, Exception { 630 // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> 631 // <num reducers> 632 int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new VisibilityLoop(), 633 new String[] { "1", "1", "20000", 634 util.getDataTestDirOnTestFS("IntegrationTestBigLinkedListWithVisibility").toString(), "1", 635 "10000" }); 636 org.junit.Assert.assertEquals(0, ret); 637 } 638 639 public static void main(String[] args) throws Exception { 640 Configuration conf = HBaseConfiguration.create(); 641 IntegrationTestingUtility.setUseDistributedCluster(conf); 642 int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedListWithVisibility(), args); 643 System.exit(ret); 644 } 645 646 @Override 647 protected MonkeyFactory getDefaultMonkeyFactory() { 648 return MonkeyFactory.getFactory(MonkeyFactory.CALM); 649 } 650 651 @Override 652 public int runTestFromCommandLine() throws Exception { 653 return ToolRunner.run(getConf(), new VisibilityLoop(), otherArgs); 654 } 655}