001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import java.io.IOException; 021import java.security.PrivilegedExceptionAction; 022import java.util.Arrays; 023import java.util.Iterator; 024import java.util.UUID; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.conf.Configured; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HColumnDescriptor; 032import org.apache.hadoop.hbase.HRegionLocation; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.IntegrationTestingUtility; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.BufferedMutator; 039import org.apache.hadoop.hbase.client.BufferedMutatorParams; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 047import org.apache.hadoop.hbase.log.HBaseMarkers; 048import org.apache.hadoop.hbase.mapreduce.Import; 049import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.security.access.AccessControlClient; 052import org.apache.hadoop.hbase.security.access.Permission; 053import org.apache.hadoop.hbase.security.visibility.Authorizations; 054import org.apache.hadoop.hbase.security.visibility.CellVisibility; 055import org.apache.hadoop.hbase.security.visibility.VisibilityClient; 056import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 057import org.apache.hadoop.hbase.testclassification.IntegrationTests; 058import org.apache.hadoop.hbase.util.AbstractHBaseTool; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.io.BytesWritable; 061import org.apache.hadoop.mapreduce.Counter; 062import org.apache.hadoop.mapreduce.CounterGroup; 063import org.apache.hadoop.mapreduce.Counters; 064import org.apache.hadoop.mapreduce.Job; 065import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 066import org.apache.hadoop.util.Tool; 067import org.apache.hadoop.util.ToolRunner; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 074 075/** 076 * IT test used to verify the deletes with visibility labels. 077 * The test creates three tables tablename_0, tablename_1 and tablename_2 and each table 078 * is associated with a unique pair of labels. 079 * Another common table with the name 'commontable' is created and it has the data combined 080 * from all these 3 tables such that there are 3 versions of every row but the visibility label 081 * in every row corresponds to the table from which the row originated. 082 * Then deletes are issued to the common table by selecting the visibility label 083 * associated with each of the smaller tables. 084 * After the delete is issued with one set of visibility labels we try to scan the common table 085 * with each of the visibility pairs defined for the 3 tables. 086 * So after the first delete is issued, a scan with the first set of visibility labels would 087 * return zero result whereas the scan issued with the other two sets of visibility labels 088 * should return all the rows corresponding to that set of visibility labels. The above 089 * process of delete and scan is repeated until after the last set of visibility labels are 090 * used for the deletes the common table should not return any row. 091 * 092 * To use this 093 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedListWithVisibility Loop 1 1 20000 /tmp 1 10000 094 * or 095 * ./hbase org.apache.hadoop.hbase.IntegrationTestsDriver -r .*IntegrationTestBigLinkedListWithVisibility.* 096 */ 097@Category(IntegrationTests.class) 098public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestBigLinkedList { 099 100 private static final String CONFIDENTIAL = "confidential"; 101 private static final String TOPSECRET = "topsecret"; 102 private static final String SECRET = "secret"; 103 private static final String PUBLIC = "public"; 104 private static final String PRIVATE = "private"; 105 private static final String EVERYONE = "everyone"; 106 private static final String RESTRICTED = "restricted"; 107 private static final String GROUP = "group"; 108 private static final String PREVILIGED = "previliged"; 109 private static final String OPEN = "open"; 110 public static String labels = CONFIDENTIAL + "," + TOPSECRET + "," + SECRET + "," + RESTRICTED 111 + "," + PRIVATE + "," + PREVILIGED + "," + GROUP + "," + OPEN + "," + PUBLIC + "," + EVERYONE; 112 private static final String COMMA = ","; 113 private static final String UNDER_SCORE = "_"; 114 public static int DEFAULT_TABLES_COUNT = 3; 115 public static String tableName = "tableName"; 116 public static final String COMMON_TABLE_NAME = "commontable"; 117 public static final String LABELS_KEY = "LABELS"; 118 public static final String INDEX_KEY = "INDEX"; 119 private static User USER; 120 private static final String OR = "|"; 121 private static String USER_OPT = "user"; 122 private static String userName = "user1"; 123 124 static class VisibilityGenerator extends Generator { 125 private static final Logger LOG = LoggerFactory.getLogger(VisibilityGenerator.class); 126 127 @Override 128 protected void createSchema() throws IOException { 129 LOG.info("Creating tables"); 130 // Create three tables 131 boolean acl = AccessControlClient.isAccessControllerRunning(ConnectionFactory 132 .createConnection(getConf())); 133 if(!acl) { 134 LOG.info("No ACL available."); 135 } 136 try (Connection conn = ConnectionFactory.createConnection(getConf()); 137 Admin admin = conn.getAdmin()) { 138 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 139 TableName tableName = IntegrationTestBigLinkedListWithVisibility.getTableName(i); 140 createTable(admin, tableName, false, acl); 141 } 142 TableName tableName = TableName.valueOf(COMMON_TABLE_NAME); 143 createTable(admin, tableName, true, acl); 144 } 145 } 146 147 private void createTable(Admin admin, TableName tableName, boolean setVersion, 148 boolean acl) throws IOException { 149 if (!admin.tableExists(tableName)) { 150 HTableDescriptor htd = new HTableDescriptor(tableName); 151 HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); 152 if (setVersion) { 153 family.setMaxVersions(DEFAULT_TABLES_COUNT); 154 } 155 htd.addFamily(family); 156 admin.createTable(htd); 157 if (acl) { 158 LOG.info("Granting permissions for user " + USER.getShortName()); 159 Permission.Action[] actions = { Permission.Action.READ }; 160 try { 161 AccessControlClient.grant(ConnectionFactory.createConnection(getConf()), tableName, 162 USER.getShortName(), null, null, actions); 163 } catch (Throwable e) { 164 LOG.error(HBaseMarkers.FATAL, "Error in granting permission for the user " + 165 USER.getShortName(), e); 166 throw new IOException(e); 167 } 168 } 169 } 170 } 171 172 @Override 173 protected void setMapperForGenerator(Job job) { 174 job.setMapperClass(VisibilityGeneratorMapper.class); 175 } 176 177 static class VisibilityGeneratorMapper extends GeneratorMapper { 178 BufferedMutator[] tables = new BufferedMutator[DEFAULT_TABLES_COUNT]; 179 180 @Override 181 protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, 182 InterruptedException { 183 super.setup(context); 184 } 185 186 @Override 187 protected void instantiateHTable() throws IOException { 188 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 189 BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i)); 190 params.writeBufferSize(4 * 1024 * 1024); 191 BufferedMutator table = connection.getBufferedMutator(params); 192 this.tables[i] = table; 193 } 194 } 195 196 @Override 197 protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context) 198 throws IOException, InterruptedException { 199 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 200 if (tables[i] != null) { 201 tables[i].close(); 202 } 203 } 204 } 205 206 @Override 207 protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count, 208 byte[][] prev, byte[][] current, byte[] id) throws IOException { 209 String visibilityExps = ""; 210 String[] split = labels.split(COMMA); 211 for (int i = 0; i < current.length; i++) { 212 for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) { 213 Put put = new Put(current[i]); 214 byte[] value = prev == null ? NO_KEY : prev[i]; 215 put.addColumn(FAMILY_NAME, COLUMN_PREV, value); 216 217 if (count >= 0) { 218 put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); 219 } 220 if (id != null) { 221 put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id); 222 } 223 visibilityExps = split[j * 2] + OR + split[(j * 2) + 1]; 224 put.setCellVisibility(new CellVisibility(visibilityExps)); 225 tables[j].mutate(put); 226 try { 227 Thread.sleep(1); 228 } catch (InterruptedException e) { 229 throw new IOException(); 230 } 231 } 232 if (i % 1000 == 0) { 233 // Tickle progress every so often else maprunner will think us hung 234 output.progress(); 235 } 236 } 237 } 238 } 239 } 240 241 static class Copier extends Configured implements Tool { 242 private static final Logger LOG = LoggerFactory.getLogger(Copier.class); 243 private TableName tableName; 244 private int labelIndex; 245 private boolean delete; 246 247 public Copier(TableName tableName, int index, boolean delete) { 248 this.tableName = tableName; 249 this.labelIndex = index; 250 this.delete = delete; 251 } 252 253 public int runCopier(String outputDir) throws Exception { 254 Job job = null; 255 Scan scan = null; 256 job = new Job(getConf()); 257 job.setJobName("Data copier"); 258 job.getConfiguration().setInt("INDEX", labelIndex); 259 job.getConfiguration().set("LABELS", labels); 260 job.setJarByClass(getClass()); 261 scan = new Scan(); 262 scan.setCacheBlocks(false); 263 scan.setRaw(true); 264 265 String[] split = labels.split(COMMA); 266 scan.setAuthorizations(new Authorizations(split[this.labelIndex * 2], 267 split[(this.labelIndex * 2) + 1])); 268 if (delete) { 269 LOG.info("Running deletes"); 270 } else { 271 LOG.info("Running copiers"); 272 } 273 if (delete) { 274 TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan, 275 VisibilityDeleteImport.class, null, null, job); 276 } else { 277 TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan, 278 VisibilityImport.class, null, null, job); 279 } 280 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 281 job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false); 282 TableMapReduceUtil.initTableReducerJob(COMMON_TABLE_NAME, null, job, null, null, null, null); 283 TableMapReduceUtil.addDependencyJars(job); 284 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class); 285 TableMapReduceUtil.initCredentials(job); 286 job.setNumReduceTasks(0); 287 boolean success = job.waitForCompletion(true); 288 return success ? 0 : 1; 289 } 290 291 @Override 292 public int run(String[] arg0) throws Exception { 293 // TODO Auto-generated method stub 294 return 0; 295 } 296 } 297 298 static class VisibilityImport extends Import.Importer { 299 private int index; 300 private String labels; 301 private String[] split; 302 303 @Override 304 public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) { 305 index = context.getConfiguration().getInt(INDEX_KEY, -1); 306 labels = context.getConfiguration().get(LABELS_KEY); 307 split = labels.split(COMMA); 308 super.setup(context); 309 } 310 311 @Override 312 protected void addPutToKv(Put put, Cell kv) throws IOException { 313 String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1]; 314 put.setCellVisibility(new CellVisibility(visibilityExps)); 315 super.addPutToKv(put, kv); 316 } 317 } 318 319 static class VisibilityDeleteImport extends Import.Importer { 320 private int index; 321 private String labels; 322 private String[] split; 323 324 @Override 325 public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) { 326 index = context.getConfiguration().getInt(INDEX_KEY, -1); 327 labels = context.getConfiguration().get(LABELS_KEY); 328 split = labels.split(COMMA); 329 super.setup(context); 330 } 331 332 // Creating delete here 333 @Override 334 protected void processKV(ImmutableBytesWritable key, Result result, 335 org.apache.hadoop.mapreduce.Mapper.Context context, Put put, 336 org.apache.hadoop.hbase.client.Delete delete) throws 337 IOException, InterruptedException { 338 String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1]; 339 for (Cell kv : result.rawCells()) { 340 // skip if we filter it out 341 if (kv == null) 342 continue; 343 // Create deletes here 344 if (delete == null) { 345 delete = new Delete(key.get()); 346 } 347 delete.setCellVisibility(new CellVisibility(visibilityExps)); 348 delete.addFamily(CellUtil.cloneFamily(kv)); 349 } 350 if (delete != null) { 351 context.write(key, delete); 352 } 353 } 354 } 355 356 @Override 357 protected void addOptions() { 358 super.addOptions(); 359 addOptWithArg("u", USER_OPT, "User name"); 360 } 361 362 @Override 363 protected void processOptions(CommandLine cmd) { 364 super.processOptions(cmd); 365 if (cmd.hasOption(USER_OPT)) { 366 userName = cmd.getOptionValue(USER_OPT); 367 } 368 369 } 370 @Override 371 public void setUpCluster() throws Exception { 372 util = getTestingUtil(null); 373 Configuration conf = util.getConfiguration(); 374 VisibilityTestUtil.enableVisiblityLabels(conf); 375 conf.set("hbase.superuser", User.getCurrent().getName()); 376 conf.setBoolean("dfs.permissions", false); 377 USER = User.createUserForTesting(conf, userName, new String[] {}); 378 super.setUpCluster(); 379 addLabels(); 380 } 381 382 static TableName getTableName(int i) { 383 return TableName.valueOf(tableName + UNDER_SCORE + i); 384 } 385 386 private void addLabels() throws Exception { 387 try { 388 VisibilityClient.addLabels(util.getConnection(), labels.split(COMMA)); 389 VisibilityClient.setAuths(util.getConnection(), labels.split(COMMA), USER.getName()); 390 } catch (Throwable t) { 391 throw new IOException(t); 392 } 393 } 394 395 static class VisibilityVerify extends Verify { 396 private static final Logger LOG = LoggerFactory.getLogger(VisibilityVerify.class); 397 private TableName tableName; 398 private int labelIndex; 399 400 public VisibilityVerify(String tableName, int index) { 401 this.tableName = TableName.valueOf(tableName); 402 this.labelIndex = index; 403 } 404 405 @Override 406 public int run(final Path outputDir, final int numReducers) throws Exception { 407 LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers); 408 PrivilegedExceptionAction<Integer> scanAction = new PrivilegedExceptionAction<Integer>() { 409 @Override 410 public Integer run() throws Exception { 411 return doVerify(outputDir, numReducers); 412 } 413 }; 414 return USER.runAs(scanAction); 415 } 416 417 private int doVerify(Path outputDir, int numReducers) throws IOException, InterruptedException, 418 ClassNotFoundException { 419 job = new Job(getConf()); 420 421 job.setJobName("Link Verifier"); 422 job.setNumReduceTasks(numReducers); 423 job.setJarByClass(getClass()); 424 425 setJobScannerConf(job); 426 427 Scan scan = new Scan(); 428 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 429 scan.setCaching(10000); 430 scan.setCacheBlocks(false); 431 String[] split = labels.split(COMMA); 432 433 scan.setAuthorizations(new Authorizations(split[this.labelIndex * 2], 434 split[(this.labelIndex * 2) + 1])); 435 436 TableMapReduceUtil.initTableMapperJob(tableName.getName(), scan, VerifyMapper.class, 437 BytesWritable.class, BytesWritable.class, job); 438 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class); 439 440 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 441 442 job.setReducerClass(VerifyReducer.class); 443 job.setOutputFormatClass(TextOutputFormat.class); 444 TextOutputFormat.setOutputPath(job, outputDir); 445 boolean success = job.waitForCompletion(true); 446 447 return success ? 0 : 1; 448 } 449 450 @Override 451 protected void handleFailure(Counters counters) throws IOException { 452 try (Connection conn = ConnectionFactory.createConnection(job.getConfiguration())) { 453 TableName tableName = TableName.valueOf(COMMON_TABLE_NAME); 454 CounterGroup g = counters.getGroup("undef"); 455 Iterator<Counter> it = g.iterator(); 456 while (it.hasNext()) { 457 String keyString = it.next().getName(); 458 byte[] key = Bytes.toBytes(keyString); 459 HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true); 460 LOG.error("undefined row " + keyString + ", " + loc); 461 } 462 g = counters.getGroup("unref"); 463 it = g.iterator(); 464 while (it.hasNext()) { 465 String keyString = it.next().getName(); 466 byte[] key = Bytes.toBytes(keyString); 467 HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true); 468 LOG.error("unreferred row " + keyString + ", " + loc); 469 } 470 } 471 } 472 } 473 474 static class VisibilityLoop extends Loop { 475 private static final int SLEEP_IN_MS = 5000; 476 private static final Logger LOG = LoggerFactory.getLogger(VisibilityLoop.class); 477 IntegrationTestBigLinkedListWithVisibility it; 478 479 @Override 480 protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width, 481 Integer wrapMultiplier, Integer numWalkers) throws Exception { 482 Path outputPath = new Path(outputDir); 483 UUID uuid = UUID.randomUUID(); // create a random UUID. 484 Path generatorOutput = new Path(outputPath, uuid.toString()); 485 486 Generator generator = new VisibilityGenerator(); 487 generator.setConf(getConf()); 488 int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, 489 numWalkers); 490 if (retCode > 0) { 491 throw new RuntimeException("Generator failed with return code: " + retCode); 492 } 493 } 494 495 protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width, 496 Integer wrapMultiplier, int tableIndex) throws Exception { 497 LOG.info("Running copier on table "+IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex)); 498 Copier copier = new Copier( 499 IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true); 500 copier.setConf(getConf()); 501 copier.runCopier(outputDir); 502 Thread.sleep(SLEEP_IN_MS); 503 } 504 505 protected void runVerify(String outputDir, int numReducers, long expectedNumNodes, 506 boolean allTables) throws Exception { 507 Path outputPath = new Path(outputDir); 508 509 if (allTables) { 510 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 511 LOG.info("Verifying table " + i); 512 sleep(SLEEP_IN_MS); 513 UUID uuid = UUID.randomUUID(); // create a random UUID. 514 Path iterationOutput = new Path(outputPath, uuid.toString()); 515 Verify verify = new VisibilityVerify(getTableName(i).getNameAsString(), i); 516 verify(numReducers, expectedNumNodes, iterationOutput, verify); 517 } 518 } 519 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 520 runVerifyCommonTable(outputDir, numReducers, expectedNumNodes, i); 521 } 522 } 523 524 private void runVerify(String outputDir, int numReducers, long expectedNodes, int tableIndex) 525 throws Exception { 526 long temp = expectedNodes; 527 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 528 if (i <= tableIndex) { 529 expectedNodes = 0; 530 } else { 531 expectedNodes = temp; 532 } 533 LOG.info("Verifying data in the table with index "+i+ " and expected nodes is "+expectedNodes); 534 runVerifyCommonTable(outputDir, numReducers, expectedNodes, i); 535 } 536 } 537 538 private void sleep(long ms) throws InterruptedException { 539 Thread.sleep(ms); 540 } 541 542 protected void runVerifyCommonTable(String outputDir, int numReducers, long expectedNumNodes, 543 int index) throws Exception { 544 LOG.info("Verifying common table with index " + index); 545 sleep(SLEEP_IN_MS); 546 Path outputPath = new Path(outputDir); 547 UUID uuid = UUID.randomUUID(); // create a random UUID. 548 Path iterationOutput = new Path(outputPath, uuid.toString()); 549 Verify verify = new VisibilityVerify(TableName.valueOf(COMMON_TABLE_NAME).getNameAsString(), 550 index); 551 verify(numReducers, expectedNumNodes, iterationOutput, verify); 552 } 553 554 protected void runCopier(String outputDir) throws Exception { 555 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 556 LOG.info("Running copier " + IntegrationTestBigLinkedListWithVisibility.getTableName(i)); 557 sleep(SLEEP_IN_MS); 558 Copier copier = new Copier(IntegrationTestBigLinkedListWithVisibility.getTableName(i), i, 559 false); 560 copier.setConf(getConf()); 561 copier.runCopier(outputDir); 562 } 563 } 564 565 private void verify(int numReducers, long expectedNumNodes, 566 Path iterationOutput, Verify verify) throws Exception { 567 verify.setConf(getConf()); 568 int retCode = verify.run(iterationOutput, numReducers); 569 if (retCode > 0) { 570 throw new RuntimeException("Verify.run failed with return code: " + retCode); 571 } 572 573 if (!verify.verify(expectedNumNodes)) { 574 throw new RuntimeException("Verify.verify failed"); 575 } 576 577 LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes); 578 } 579 580 @Override 581 public int run(String[] args) throws Exception { 582 if (args.length < 5) { 583 System.err 584 .println("Usage: Loop <num iterations> " + 585 "<num mappers> <num nodes per mapper> <output dir> " + 586 "<num reducers> [<width> <wrap multiplier>]"); 587 return 1; 588 } 589 LOG.info("Running Loop with args:" + Arrays.deepToString(args)); 590 591 int numIterations = Integer.parseInt(args[0]); 592 int numMappers = Integer.parseInt(args[1]); 593 long numNodes = Long.parseLong(args[2]); 594 String outputDir = args[3]; 595 int numReducers = Integer.parseInt(args[4]); 596 Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]); 597 Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]); 598 long expectedNumNodes = 0; 599 600 if (numIterations < 0) { 601 numIterations = Integer.MAX_VALUE; // run indefinitely (kind of) 602 } 603 604 for (int i = 0; i < numIterations; i++) { 605 LOG.info("Starting iteration = " + i); 606 LOG.info("Generating data"); 607 // By default run no concurrent walkers for test with visibility 608 runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, 0); 609 expectedNumNodes += numMappers * numNodes; 610 // Copying wont work because expressions are not returned back to the 611 // client 612 LOG.info("Running copier"); 613 sleep(SLEEP_IN_MS); 614 runCopier(outputDir); 615 LOG.info("Verifying copied data"); 616 sleep(SLEEP_IN_MS); 617 runVerify(outputDir, numReducers, expectedNumNodes, true); 618 sleep(SLEEP_IN_MS); 619 for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) { 620 LOG.info("Deleting data on table with index: "+j); 621 runDelete(numMappers, numNodes, outputDir, width, wrapMultiplier, j); 622 sleep(SLEEP_IN_MS); 623 LOG.info("Verifying common table after deleting"); 624 runVerify(outputDir, numReducers, expectedNumNodes, j); 625 sleep(SLEEP_IN_MS); 626 } 627 } 628 return 0; 629 } 630 } 631 632 @Override 633 @Test 634 public void testContinuousIngest() throws IOException, Exception { 635 // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> 636 // <num reducers> 637 int ret = ToolRunner.run( 638 getTestingUtil(getConf()).getConfiguration(), 639 new VisibilityLoop(), 640 new String[] { "1", "1", "20000", 641 util.getDataTestDirOnTestFS("IntegrationTestBigLinkedListWithVisibility").toString(), 642 "1", "10000" }); 643 org.junit.Assert.assertEquals(0, ret); 644 } 645 646 public static void main(String[] args) throws Exception { 647 Configuration conf = HBaseConfiguration.create(); 648 IntegrationTestingUtility.setUseDistributedCluster(conf); 649 int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedListWithVisibility(), args); 650 System.exit(ret); 651 } 652 653 @Override 654 protected MonkeyFactory getDefaultMonkeyFactory() { 655 return MonkeyFactory.getFactory(MonkeyFactory.CALM); 656 } 657 658 @Override 659 public int runTestFromCommandLine() throws Exception { 660 Tool tool = null; 661 Loop loop = new VisibilityLoop(); 662 loop.it = this; 663 tool = loop; 664 return ToolRunner.run(getConf(), tool, otherArgs); 665 } 666}