001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import java.io.IOException; 021import java.security.PrivilegedExceptionAction; 022import java.util.Arrays; 023import java.util.Iterator; 024import java.util.UUID; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.conf.Configured; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HColumnDescriptor; 032import org.apache.hadoop.hbase.HRegionLocation; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.IntegrationTestingUtility; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.BufferedMutator; 039import org.apache.hadoop.hbase.client.BufferedMutatorParams; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 047import org.apache.hadoop.hbase.log.HBaseMarkers; 048import org.apache.hadoop.hbase.mapreduce.Import; 049import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.security.access.AccessControlClient; 052import org.apache.hadoop.hbase.security.access.Permission; 053import org.apache.hadoop.hbase.security.visibility.Authorizations; 054import org.apache.hadoop.hbase.security.visibility.CellVisibility; 055import org.apache.hadoop.hbase.security.visibility.VisibilityClient; 056import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 057import org.apache.hadoop.hbase.testclassification.IntegrationTests; 058import org.apache.hadoop.hbase.util.AbstractHBaseTool; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.io.BytesWritable; 061import org.apache.hadoop.mapreduce.Counter; 062import org.apache.hadoop.mapreduce.CounterGroup; 063import org.apache.hadoop.mapreduce.Counters; 064import org.apache.hadoop.mapreduce.Job; 065import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 066import org.apache.hadoop.util.Tool; 067import org.apache.hadoop.util.ToolRunner; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 074import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 075 076/** 077 * IT test used to verify the deletes with visibility labels. The test creates three tables 078 * tablename_0, tablename_1 and tablename_2 and each table is associated with a unique pair of 079 * labels. Another common table with the name 'commontable' is created and it has the data combined 080 * from all these 3 tables such that there are 3 versions of every row but the visibility label in 081 * every row corresponds to the table from which the row originated. Then deletes are issued to the 082 * common table by selecting the visibility label associated with each of the smaller tables. After 083 * the delete is issued with one set of visibility labels we try to scan the common table with each 084 * of the visibility pairs defined for the 3 tables. So after the first delete is issued, a scan 085 * with the first set of visibility labels would return zero result whereas the scan issued with the 086 * other two sets of visibility labels should return all the rows corresponding to that set of 087 * visibility labels. The above process of delete and scan is repeated until after the last set of 088 * visibility labels are used for the deletes the common table should not return any row. To use 089 * this ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedListWithVisibility Loop 1 1 090 * 20000 /tmp 1 10000 or ./hbase org.apache.hadoop.hbase.IntegrationTestsDriver -r 091 * .*IntegrationTestBigLinkedListWithVisibility.* 092 */ 093@Category(IntegrationTests.class) 094public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestBigLinkedList { 095 096 private static final String CONFIDENTIAL = "confidential"; 097 private static final String TOPSECRET = "topsecret"; 098 private static final String SECRET = "secret"; 099 private static final String PUBLIC = "public"; 100 private static final String PRIVATE = "private"; 101 private static final String EVERYONE = "everyone"; 102 private static final String RESTRICTED = "restricted"; 103 private static final String GROUP = "group"; 104 private static final String PREVILIGED = "previliged"; 105 private static final String OPEN = "open"; 106 public static String labels = CONFIDENTIAL + "," + TOPSECRET + "," + SECRET + "," + RESTRICTED 107 + "," + PRIVATE + "," + PREVILIGED + "," + GROUP + "," + OPEN + "," + PUBLIC + "," + EVERYONE; 108 private static final String COMMA = ","; 109 private static final String UNDER_SCORE = "_"; 110 public static int DEFAULT_TABLES_COUNT = 3; 111 public static String tableName = "tableName"; 112 public static final String COMMON_TABLE_NAME = "commontable"; 113 public static final String LABELS_KEY = "LABELS"; 114 public static final String INDEX_KEY = "INDEX"; 115 private static User USER; 116 private static final String OR = "|"; 117 private static String USER_OPT = "user"; 118 private static String userName = "user1"; 119 120 static class VisibilityGenerator extends Generator { 121 private static final Logger LOG = LoggerFactory.getLogger(VisibilityGenerator.class); 122 123 @Override 124 protected void createSchema() throws IOException { 125 LOG.info("Creating tables"); 126 // Create three tables 127 boolean acl = AccessControlClient 128 .isAccessControllerRunning(ConnectionFactory.createConnection(getConf())); 129 if (!acl) { 130 LOG.info("No ACL available."); 131 } 132 try (Connection conn = ConnectionFactory.createConnection(getConf()); 133 Admin admin = conn.getAdmin()) { 134 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 135 TableName tableName = IntegrationTestBigLinkedListWithVisibility.getTableName(i); 136 createTable(admin, tableName, false, acl); 137 } 138 TableName tableName = TableName.valueOf(COMMON_TABLE_NAME); 139 createTable(admin, tableName, true, acl); 140 } 141 } 142 143 private void createTable(Admin admin, TableName tableName, boolean setVersion, boolean acl) 144 throws IOException { 145 if (!admin.tableExists(tableName)) { 146 HTableDescriptor htd = new HTableDescriptor(tableName); 147 HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); 148 if (setVersion) { 149 family.setMaxVersions(DEFAULT_TABLES_COUNT); 150 } 151 htd.addFamily(family); 152 admin.createTable(htd); 153 if (acl) { 154 LOG.info("Granting permissions for user " + USER.getShortName()); 155 Permission.Action[] actions = { Permission.Action.READ }; 156 try { 157 AccessControlClient.grant(ConnectionFactory.createConnection(getConf()), tableName, 158 USER.getShortName(), null, null, actions); 159 } catch (Throwable e) { 160 LOG.error(HBaseMarkers.FATAL, 161 "Error in granting permission for the user " + USER.getShortName(), e); 162 throw new IOException(e); 163 } 164 } 165 } 166 } 167 168 @Override 169 protected void setMapperForGenerator(Job job) { 170 job.setMapperClass(VisibilityGeneratorMapper.class); 171 } 172 173 static class VisibilityGeneratorMapper extends GeneratorMapper { 174 BufferedMutator[] tables = new BufferedMutator[DEFAULT_TABLES_COUNT]; 175 176 @Override 177 protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) 178 throws IOException, InterruptedException { 179 super.setup(context); 180 } 181 182 @Override 183 protected void instantiateHTable() throws IOException { 184 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 185 BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i)); 186 params.writeBufferSize(4 * 1024 * 1024); 187 BufferedMutator table = connection.getBufferedMutator(params); 188 this.tables[i] = table; 189 } 190 } 191 192 @Override 193 protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context) 194 throws IOException, InterruptedException { 195 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 196 if (tables[i] != null) { 197 tables[i].close(); 198 } 199 } 200 } 201 202 @Override 203 protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count, 204 byte[][] prev, byte[][] current, byte[] id) throws IOException { 205 String visibilityExps = ""; 206 String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new); 207 for (int i = 0; i < current.length; i++) { 208 for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) { 209 Put put = new Put(current[i]); 210 byte[] value = prev == null ? NO_KEY : prev[i]; 211 put.addColumn(FAMILY_NAME, COLUMN_PREV, value); 212 213 if (count >= 0) { 214 put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); 215 } 216 if (id != null) { 217 put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id); 218 } 219 visibilityExps = split[j * 2] + OR + split[(j * 2) + 1]; 220 put.setCellVisibility(new CellVisibility(visibilityExps)); 221 tables[j].mutate(put); 222 try { 223 Thread.sleep(1); 224 } catch (InterruptedException e) { 225 throw new IOException(); 226 } 227 } 228 if (i % 1000 == 0) { 229 // Tickle progress every so often else maprunner will think us hung 230 output.progress(); 231 } 232 } 233 } 234 } 235 } 236 237 static class Copier extends Configured implements Tool { 238 private static final Logger LOG = LoggerFactory.getLogger(Copier.class); 239 private TableName tableName; 240 private int labelIndex; 241 private boolean delete; 242 243 public Copier(TableName tableName, int index, boolean delete) { 244 this.tableName = tableName; 245 this.labelIndex = index; 246 this.delete = delete; 247 } 248 249 public int runCopier(String outputDir) throws Exception { 250 Job job = new Job(getConf()); 251 job.setJobName("Data copier"); 252 job.getConfiguration().setInt("INDEX", labelIndex); 253 job.getConfiguration().set("LABELS", labels); 254 job.setJarByClass(getClass()); 255 Scan scan = new Scan(); 256 scan.setCacheBlocks(false); 257 scan.setRaw(true); 258 259 String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new); 260 scan.setAuthorizations( 261 new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1])); 262 if (delete) { 263 LOG.info("Running deletes"); 264 } else { 265 LOG.info("Running copiers"); 266 } 267 if (delete) { 268 TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan, 269 VisibilityDeleteImport.class, null, null, job); 270 } else { 271 TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan, 272 VisibilityImport.class, null, null, job); 273 } 274 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 275 job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false); 276 TableMapReduceUtil.initTableReducerJob(COMMON_TABLE_NAME, null, job, null, null, null, null); 277 TableMapReduceUtil.addDependencyJars(job); 278 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class); 279 TableMapReduceUtil.initCredentials(job); 280 job.setNumReduceTasks(0); 281 boolean success = job.waitForCompletion(true); 282 return success ? 0 : 1; 283 } 284 285 @Override 286 public int run(String[] arg0) throws Exception { 287 // TODO Auto-generated method stub 288 return 0; 289 } 290 } 291 292 static class VisibilityImport extends Import.Importer { 293 private int index; 294 private String labels; 295 private String[] split; 296 297 @Override 298 public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) { 299 index = context.getConfiguration().getInt(INDEX_KEY, -1); 300 labels = context.getConfiguration().get(LABELS_KEY); 301 split = labels.split(COMMA); 302 super.setup(context); 303 } 304 305 @Override 306 protected void addPutToKv(Put put, Cell kv) throws IOException { 307 String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1]; 308 put.setCellVisibility(new CellVisibility(visibilityExps)); 309 super.addPutToKv(put, kv); 310 } 311 } 312 313 static class VisibilityDeleteImport extends Import.Importer { 314 private int index; 315 private String labels; 316 private String[] split; 317 318 @Override 319 public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) { 320 index = context.getConfiguration().getInt(INDEX_KEY, -1); 321 labels = context.getConfiguration().get(LABELS_KEY); 322 split = labels.split(COMMA); 323 super.setup(context); 324 } 325 326 // Creating delete here 327 @Override 328 protected void processKV(ImmutableBytesWritable key, Result result, 329 org.apache.hadoop.mapreduce.Mapper.Context context, Put put, 330 org.apache.hadoop.hbase.client.Delete delete) throws IOException, InterruptedException { 331 String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1]; 332 for (Cell kv : result.rawCells()) { 333 // skip if we filter it out 334 if (kv == null) continue; 335 // Create deletes here 336 if (delete == null) { 337 delete = new Delete(key.get()); 338 } 339 delete.setCellVisibility(new CellVisibility(visibilityExps)); 340 delete.addFamily(CellUtil.cloneFamily(kv)); 341 } 342 if (delete != null) { 343 context.write(key, delete); 344 } 345 } 346 } 347 348 @Override 349 protected void addOptions() { 350 super.addOptions(); 351 addOptWithArg("u", USER_OPT, "User name"); 352 } 353 354 @Override 355 protected void processOptions(CommandLine cmd) { 356 super.processOptions(cmd); 357 if (cmd.hasOption(USER_OPT)) { 358 userName = cmd.getOptionValue(USER_OPT); 359 } 360 361 } 362 363 @Override 364 public void setUpCluster() throws Exception { 365 util = getTestingUtil(null); 366 Configuration conf = util.getConfiguration(); 367 VisibilityTestUtil.enableVisiblityLabels(conf); 368 conf.set("hbase.superuser", User.getCurrent().getName()); 369 conf.setBoolean("dfs.permissions", false); 370 USER = User.createUserForTesting(conf, userName, new String[] {}); 371 super.setUpCluster(); 372 addLabels(); 373 } 374 375 static TableName getTableName(int i) { 376 return TableName.valueOf(tableName + UNDER_SCORE + i); 377 } 378 379 private void addLabels() throws Exception { 380 try { 381 VisibilityClient.addLabels(util.getConnection(), labels.split(COMMA)); 382 VisibilityClient.setAuths(util.getConnection(), labels.split(COMMA), USER.getName()); 383 } catch (Throwable t) { 384 throw new IOException(t); 385 } 386 } 387 388 static class VisibilityVerify extends Verify { 389 private static final Logger LOG = LoggerFactory.getLogger(VisibilityVerify.class); 390 private TableName tableName; 391 private int labelIndex; 392 393 public VisibilityVerify(String tableName, int index) { 394 this.tableName = TableName.valueOf(tableName); 395 this.labelIndex = index; 396 } 397 398 @Override 399 public int run(final Path outputDir, final int numReducers) throws Exception { 400 LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers); 401 PrivilegedExceptionAction<Integer> scanAction = new PrivilegedExceptionAction<Integer>() { 402 @Override 403 public Integer run() throws Exception { 404 return doVerify(outputDir, numReducers); 405 } 406 }; 407 return USER.runAs(scanAction); 408 } 409 410 private int doVerify(Path outputDir, int numReducers) 411 throws IOException, InterruptedException, ClassNotFoundException { 412 job = new Job(getConf()); 413 414 job.setJobName("Link Verifier"); 415 job.setNumReduceTasks(numReducers); 416 job.setJarByClass(getClass()); 417 418 setJobScannerConf(job); 419 420 Scan scan = new Scan(); 421 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 422 scan.setCaching(10000); 423 scan.setCacheBlocks(false); 424 String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new); 425 426 scan.setAuthorizations( 427 new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1])); 428 429 TableMapReduceUtil.initTableMapperJob(tableName.getName(), scan, VerifyMapper.class, 430 BytesWritable.class, BytesWritable.class, job); 431 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class); 432 433 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 434 435 job.setReducerClass(VerifyReducer.class); 436 job.setOutputFormatClass(TextOutputFormat.class); 437 TextOutputFormat.setOutputPath(job, outputDir); 438 boolean success = job.waitForCompletion(true); 439 440 return success ? 0 : 1; 441 } 442 443 @Override 444 protected void handleFailure(Counters counters) throws IOException { 445 try (Connection conn = ConnectionFactory.createConnection(job.getConfiguration())) { 446 TableName tableName = TableName.valueOf(COMMON_TABLE_NAME); 447 CounterGroup g = counters.getGroup("undef"); 448 Iterator<Counter> it = g.iterator(); 449 while (it.hasNext()) { 450 String keyString = it.next().getName(); 451 byte[] key = Bytes.toBytes(keyString); 452 HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true); 453 LOG.error("undefined row " + keyString + ", " + loc); 454 } 455 g = counters.getGroup("unref"); 456 it = g.iterator(); 457 while (it.hasNext()) { 458 String keyString = it.next().getName(); 459 byte[] key = Bytes.toBytes(keyString); 460 HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true); 461 LOG.error("unreferred row " + keyString + ", " + loc); 462 } 463 } 464 } 465 } 466 467 static class VisibilityLoop extends Loop { 468 private static final int SLEEP_IN_MS = 5000; 469 private static final Logger LOG = LoggerFactory.getLogger(VisibilityLoop.class); 470 471 @Override 472 protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width, 473 Integer wrapMultiplier, Integer numWalkers) throws Exception { 474 Path outputPath = new Path(outputDir); 475 UUID uuid = UUID.randomUUID(); // create a random UUID. 476 Path generatorOutput = new Path(outputPath, uuid.toString()); 477 478 Generator generator = new VisibilityGenerator(); 479 generator.setConf(getConf()); 480 int retCode = 481 generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, numWalkers); 482 if (retCode > 0) { 483 throw new RuntimeException("Generator failed with return code: " + retCode); 484 } 485 } 486 487 protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width, 488 Integer wrapMultiplier, int tableIndex) throws Exception { 489 LOG.info("Running copier on table " 490 + IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex)); 491 Copier copier = new Copier( 492 IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true); 493 copier.setConf(getConf()); 494 copier.runCopier(outputDir); 495 Thread.sleep(SLEEP_IN_MS); 496 } 497 498 protected void runVerify(String outputDir, int numReducers, long expectedNumNodes, 499 boolean allTables) throws Exception { 500 Path outputPath = new Path(outputDir); 501 502 if (allTables) { 503 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 504 LOG.info("Verifying table " + i); 505 sleep(SLEEP_IN_MS); 506 UUID uuid = UUID.randomUUID(); // create a random UUID. 507 Path iterationOutput = new Path(outputPath, uuid.toString()); 508 Verify verify = new VisibilityVerify(getTableName(i).getNameAsString(), i); 509 verify(numReducers, expectedNumNodes, iterationOutput, verify); 510 } 511 } 512 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 513 runVerifyCommonTable(outputDir, numReducers, expectedNumNodes, i); 514 } 515 } 516 517 private void runVerify(String outputDir, int numReducers, long expectedNodes, int tableIndex) 518 throws Exception { 519 long temp = expectedNodes; 520 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 521 if (i <= tableIndex) { 522 expectedNodes = 0; 523 } else { 524 expectedNodes = temp; 525 } 526 LOG.info("Verifying data in the table with index " + i + " and expected nodes is " 527 + expectedNodes); 528 runVerifyCommonTable(outputDir, numReducers, expectedNodes, i); 529 } 530 } 531 532 private void sleep(long ms) throws InterruptedException { 533 Thread.sleep(ms); 534 } 535 536 protected void runVerifyCommonTable(String outputDir, int numReducers, long expectedNumNodes, 537 int index) throws Exception { 538 LOG.info("Verifying common table with index " + index); 539 sleep(SLEEP_IN_MS); 540 Path outputPath = new Path(outputDir); 541 UUID uuid = UUID.randomUUID(); // create a random UUID. 542 Path iterationOutput = new Path(outputPath, uuid.toString()); 543 Verify verify = 544 new VisibilityVerify(TableName.valueOf(COMMON_TABLE_NAME).getNameAsString(), index); 545 verify(numReducers, expectedNumNodes, iterationOutput, verify); 546 } 547 548 protected void runCopier(String outputDir) throws Exception { 549 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 550 LOG.info("Running copier " + IntegrationTestBigLinkedListWithVisibility.getTableName(i)); 551 sleep(SLEEP_IN_MS); 552 Copier copier = 553 new Copier(IntegrationTestBigLinkedListWithVisibility.getTableName(i), i, false); 554 copier.setConf(getConf()); 555 copier.runCopier(outputDir); 556 } 557 } 558 559 private void verify(int numReducers, long expectedNumNodes, Path iterationOutput, Verify verify) 560 throws Exception { 561 verify.setConf(getConf()); 562 int retCode = verify.run(iterationOutput, numReducers); 563 if (retCode > 0) { 564 throw new RuntimeException("Verify.run failed with return code: " + retCode); 565 } 566 567 if (!verify.verify(expectedNumNodes)) { 568 throw new RuntimeException("Verify.verify failed"); 569 } 570 571 LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes); 572 } 573 574 @Override 575 public int run(String[] args) throws Exception { 576 if (args.length < 5) { 577 System.err.println( 578 "Usage: Loop <num iterations> " + "<num mappers> <num nodes per mapper> <output dir> " 579 + "<num reducers> [<width> <wrap multiplier>]"); 580 return 1; 581 } 582 LOG.info("Running Loop with args:" + Arrays.deepToString(args)); 583 584 int numIterations = Integer.parseInt(args[0]); 585 int numMappers = Integer.parseInt(args[1]); 586 long numNodes = Long.parseLong(args[2]); 587 String outputDir = args[3]; 588 int numReducers = Integer.parseInt(args[4]); 589 Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]); 590 Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]); 591 long expectedNumNodes = 0; 592 593 if (numIterations < 0) { 594 numIterations = Integer.MAX_VALUE; // run indefinitely (kind of) 595 } 596 597 for (int i = 0; i < numIterations; i++) { 598 LOG.info("Starting iteration = " + i); 599 LOG.info("Generating data"); 600 // By default run no concurrent walkers for test with visibility 601 runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, 0); 602 expectedNumNodes += numMappers * numNodes; 603 // Copying wont work because expressions are not returned back to the 604 // client 605 LOG.info("Running copier"); 606 sleep(SLEEP_IN_MS); 607 runCopier(outputDir); 608 LOG.info("Verifying copied data"); 609 sleep(SLEEP_IN_MS); 610 runVerify(outputDir, numReducers, expectedNumNodes, true); 611 sleep(SLEEP_IN_MS); 612 for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) { 613 LOG.info("Deleting data on table with index: " + j); 614 runDelete(numMappers, numNodes, outputDir, width, wrapMultiplier, j); 615 sleep(SLEEP_IN_MS); 616 LOG.info("Verifying common table after deleting"); 617 runVerify(outputDir, numReducers, expectedNumNodes, j); 618 sleep(SLEEP_IN_MS); 619 } 620 } 621 return 0; 622 } 623 } 624 625 @Override 626 @Test 627 public void testContinuousIngest() throws IOException, Exception { 628 // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> 629 // <num reducers> 630 int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new VisibilityLoop(), 631 new String[] { "1", "1", "20000", 632 util.getDataTestDirOnTestFS("IntegrationTestBigLinkedListWithVisibility").toString(), "1", 633 "10000" }); 634 org.junit.Assert.assertEquals(0, ret); 635 } 636 637 public static void main(String[] args) throws Exception { 638 Configuration conf = HBaseConfiguration.create(); 639 IntegrationTestingUtility.setUseDistributedCluster(conf); 640 int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedListWithVisibility(), args); 641 System.exit(ret); 642 } 643 644 @Override 645 protected MonkeyFactory getDefaultMonkeyFactory() { 646 return MonkeyFactory.getFactory(MonkeyFactory.CALM); 647 } 648 649 @Override 650 public int runTestFromCommandLine() throws Exception { 651 return ToolRunner.run(getConf(), new VisibilityLoop(), otherArgs); 652 } 653}