001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.security.PrivilegedExceptionAction; 024import java.util.Arrays; 025import java.util.Iterator; 026import java.util.UUID; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.conf.Configured; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.HRegionLocation; 034import org.apache.hadoop.hbase.IntegrationTestingUtility; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.BufferedMutator; 039import org.apache.hadoop.hbase.client.BufferedMutatorParams; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.TableDescriptor; 048import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 049import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 050import org.apache.hadoop.hbase.log.HBaseMarkers; 051import org.apache.hadoop.hbase.mapreduce.Import; 052import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 053import org.apache.hadoop.hbase.security.User; 054import org.apache.hadoop.hbase.security.access.AccessControlClient; 055import org.apache.hadoop.hbase.security.access.Permission; 056import org.apache.hadoop.hbase.security.visibility.Authorizations; 057import org.apache.hadoop.hbase.security.visibility.CellVisibility; 058import org.apache.hadoop.hbase.security.visibility.VisibilityClient; 059import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 060import org.apache.hadoop.hbase.testclassification.IntegrationTests; 061import org.apache.hadoop.hbase.util.AbstractHBaseTool; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.io.BytesWritable; 064import org.apache.hadoop.mapreduce.Counter; 065import org.apache.hadoop.mapreduce.CounterGroup; 066import org.apache.hadoop.mapreduce.Counters; 067import org.apache.hadoop.mapreduce.Job; 068import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 069import org.apache.hadoop.util.Tool; 070import org.apache.hadoop.util.ToolRunner; 071import org.junit.jupiter.api.Tag; 072import org.junit.jupiter.api.Test; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075 076import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 077import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 078 079/** 080 * IT test used to verify the deletes with visibility labels. The test creates three tables 081 * tablename_0, tablename_1 and tablename_2 and each table is associated with a unique pair of 082 * labels. Another common table with the name 'commontable' is created and it has the data combined 083 * from all these 3 tables such that there are 3 versions of every row but the visibility label in 084 * every row corresponds to the table from which the row originated. Then deletes are issued to the 085 * common table by selecting the visibility label associated with each of the smaller tables. After 086 * the delete is issued with one set of visibility labels we try to scan the common table with each 087 * of the visibility pairs defined for the 3 tables. So after the first delete is issued, a scan 088 * with the first set of visibility labels would return zero result whereas the scan issued with the 089 * other two sets of visibility labels should return all the rows corresponding to that set of 090 * visibility labels. The above process of delete and scan is repeated until after the last set of 091 * visibility labels are used for the deletes the common table should not return any row. To use 092 * this ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedListWithVisibility Loop 1 1 093 * 20000 /tmp 1 10000 or ./hbase org.apache.hadoop.hbase.IntegrationTestsDriver -r 094 * .*IntegrationTestBigLinkedListWithVisibility.* 095 */ 096@Tag(IntegrationTests.TAG) 097public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestBigLinkedList { 098 099 private static final String CONFIDENTIAL = "confidential"; 100 private static final String TOPSECRET = "topsecret"; 101 private static final String SECRET = "secret"; 102 private static final String PUBLIC = "public"; 103 private static final String PRIVATE = "private"; 104 private static final String EVERYONE = "everyone"; 105 private static final String RESTRICTED = "restricted"; 106 private static final String GROUP = "group"; 107 private static final String PREVILIGED = "previliged"; 108 private static final String OPEN = "open"; 109 public static String labels = CONFIDENTIAL + "," + TOPSECRET + "," + SECRET + "," + RESTRICTED 110 + "," + PRIVATE + "," + PREVILIGED + "," + GROUP + "," + OPEN + "," + PUBLIC + "," + EVERYONE; 111 private static final String COMMA = ","; 112 private static final String UNDER_SCORE = "_"; 113 public static int DEFAULT_TABLES_COUNT = 3; 114 public static String tableName = "tableName"; 115 public static final String COMMON_TABLE_NAME = "commontable"; 116 public static final String LABELS_KEY = "LABELS"; 117 public static final String INDEX_KEY = "INDEX"; 118 private static User USER; 119 private static final String OR = "|"; 120 private static String USER_OPT = "user"; 121 private static String userName = "user1"; 122 123 static class VisibilityGenerator extends Generator { 124 private static final Logger LOG = LoggerFactory.getLogger(VisibilityGenerator.class); 125 126 @Override 127 protected void createSchema() throws IOException { 128 LOG.info("Creating tables"); 129 // Create three tables 130 boolean acl = AccessControlClient 131 .isAccessControllerRunning(ConnectionFactory.createConnection(getConf())); 132 if (!acl) { 133 LOG.info("No ACL available."); 134 } 135 try (Connection conn = ConnectionFactory.createConnection(getConf()); 136 Admin admin = conn.getAdmin()) { 137 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 138 TableName tableName = IntegrationTestBigLinkedListWithVisibility.getTableName(i); 139 createTable(admin, tableName, false, acl); 140 } 141 TableName tableName = TableName.valueOf(COMMON_TABLE_NAME); 142 createTable(admin, tableName, true, acl); 143 } 144 } 145 146 private void createTable(Admin admin, TableName tableName, boolean setVersion, boolean acl) 147 throws IOException { 148 if (!admin.tableExists(tableName)) { 149 ColumnFamilyDescriptorBuilder cfBuilder = 150 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME); 151 if (setVersion) { 152 cfBuilder.setMaxVersions(DEFAULT_TABLES_COUNT); 153 } 154 TableDescriptor tableDescriptor = 155 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(cfBuilder.build()).build(); 156 admin.createTable(tableDescriptor); 157 if (acl) { 158 LOG.info("Granting permissions for user " + USER.getShortName()); 159 Permission.Action[] actions = { Permission.Action.READ }; 160 try { 161 AccessControlClient.grant(ConnectionFactory.createConnection(getConf()), tableName, 162 USER.getShortName(), null, null, actions); 163 } catch (Throwable e) { 164 LOG.error(HBaseMarkers.FATAL, 165 "Error in granting permission for the user " + USER.getShortName(), e); 166 throw new IOException(e); 167 } 168 } 169 } 170 } 171 172 @Override 173 protected void setMapperForGenerator(Job job) { 174 job.setMapperClass(VisibilityGeneratorMapper.class); 175 } 176 177 static class VisibilityGeneratorMapper extends GeneratorMapper { 178 BufferedMutator[] tables = new BufferedMutator[DEFAULT_TABLES_COUNT]; 179 180 @Override 181 protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) 182 throws IOException, InterruptedException { 183 super.setup(context); 184 } 185 186 @Override 187 protected void instantiateHTable() throws IOException { 188 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 189 BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i)); 190 params.writeBufferSize(4 * 1024 * 1024); 191 BufferedMutator table = connection.getBufferedMutator(params); 192 this.tables[i] = table; 193 } 194 } 195 196 @Override 197 protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context) 198 throws IOException, InterruptedException { 199 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 200 if (tables[i] != null) { 201 tables[i].close(); 202 } 203 } 204 } 205 206 @Override 207 protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count, 208 byte[][] prev, byte[][] current, byte[] id) throws IOException { 209 String visibilityExps = ""; 210 String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new); 211 for (int i = 0; i < current.length; i++) { 212 for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) { 213 Put put = new Put(current[i]); 214 byte[] value = prev == null ? NO_KEY : prev[i]; 215 put.addColumn(FAMILY_NAME, COLUMN_PREV, value); 216 217 if (count >= 0) { 218 put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); 219 } 220 if (id != null) { 221 put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id); 222 } 223 visibilityExps = split[j * 2] + OR + split[(j * 2) + 1]; 224 put.setCellVisibility(new CellVisibility(visibilityExps)); 225 tables[j].mutate(put); 226 try { 227 Thread.sleep(1); 228 } catch (InterruptedException e) { 229 throw new IOException(); 230 } 231 } 232 if (i % 1000 == 0) { 233 // Tickle progress every so often else maprunner will think us hung 234 output.progress(); 235 } 236 } 237 } 238 } 239 } 240 241 static class Copier extends Configured implements Tool { 242 private static final Logger LOG = LoggerFactory.getLogger(Copier.class); 243 private TableName tableName; 244 private int labelIndex; 245 private boolean delete; 246 247 public Copier(TableName tableName, int index, boolean delete) { 248 this.tableName = tableName; 249 this.labelIndex = index; 250 this.delete = delete; 251 } 252 253 public int runCopier(String outputDir) throws Exception { 254 Job job = new Job(getConf()); 255 job.setJobName("Data copier"); 256 job.getConfiguration().setInt("INDEX", labelIndex); 257 job.getConfiguration().set("LABELS", labels); 258 job.setJarByClass(getClass()); 259 Scan scan = new Scan(); 260 scan.setCacheBlocks(false); 261 scan.setRaw(true); 262 263 String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new); 264 scan.setAuthorizations( 265 new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1])); 266 if (delete) { 267 LOG.info("Running deletes"); 268 } else { 269 LOG.info("Running copiers"); 270 } 271 if (delete) { 272 TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan, 273 VisibilityDeleteImport.class, null, null, job); 274 } else { 275 TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan, 276 VisibilityImport.class, null, null, job); 277 } 278 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 279 job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false); 280 TableMapReduceUtil.initTableReducerJob(COMMON_TABLE_NAME, null, job); 281 TableMapReduceUtil.addDependencyJars(job); 282 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 283 AbstractHBaseTool.class); 284 TableMapReduceUtil.initCredentials(job); 285 job.setNumReduceTasks(0); 286 boolean success = job.waitForCompletion(true); 287 return success ? 0 : 1; 288 } 289 290 @Override 291 public int run(String[] arg0) throws Exception { 292 // TODO Auto-generated method stub 293 return 0; 294 } 295 } 296 297 static class VisibilityImport extends Import.Importer { 298 private int index; 299 private String labels; 300 private String[] split; 301 302 @Override 303 public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) { 304 index = context.getConfiguration().getInt(INDEX_KEY, -1); 305 labels = context.getConfiguration().get(LABELS_KEY); 306 split = labels.split(COMMA); 307 super.setup(context); 308 } 309 310 @Override 311 protected void addPutToKv(Put put, Cell kv) throws IOException { 312 String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1]; 313 put.setCellVisibility(new CellVisibility(visibilityExps)); 314 super.addPutToKv(put, kv); 315 } 316 } 317 318 static class VisibilityDeleteImport extends Import.Importer { 319 private int index; 320 private String labels; 321 private String[] split; 322 323 @Override 324 public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) { 325 index = context.getConfiguration().getInt(INDEX_KEY, -1); 326 labels = context.getConfiguration().get(LABELS_KEY); 327 split = labels.split(COMMA); 328 super.setup(context); 329 } 330 331 // Creating delete here 332 @Override 333 protected void processKV(ImmutableBytesWritable key, Result result, 334 org.apache.hadoop.mapreduce.Mapper.Context context, Put put, 335 org.apache.hadoop.hbase.client.Delete delete) throws IOException, InterruptedException { 336 String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1]; 337 for (Cell kv : result.rawCells()) { 338 // skip if we filter it out 339 if (kv == null) continue; 340 // Create deletes here 341 if (delete == null) { 342 delete = new Delete(key.get()); 343 } 344 delete.setCellVisibility(new CellVisibility(visibilityExps)); 345 delete.addFamily(CellUtil.cloneFamily(kv)); 346 } 347 if (delete != null) { 348 context.write(key, delete); 349 } 350 } 351 } 352 353 @Override 354 protected void addOptions() { 355 super.addOptions(); 356 addOptWithArg("u", USER_OPT, "User name"); 357 } 358 359 @Override 360 protected void processOptions(CommandLine cmd) { 361 super.processOptions(cmd); 362 if (cmd.hasOption(USER_OPT)) { 363 userName = cmd.getOptionValue(USER_OPT); 364 } 365 366 } 367 368 @Override 369 public void setUpCluster() throws Exception { 370 util = getTestingUtil(null); 371 Configuration conf = util.getConfiguration(); 372 VisibilityTestUtil.enableVisiblityLabels(conf); 373 conf.set("hbase.superuser", User.getCurrent().getName()); 374 conf.setBoolean("dfs.permissions", false); 375 USER = User.createUserForTesting(conf, userName, new String[] {}); 376 super.setUpCluster(); 377 addLabels(); 378 } 379 380 static TableName getTableName(int i) { 381 return TableName.valueOf(tableName + UNDER_SCORE + i); 382 } 383 384 private void addLabels() throws Exception { 385 try { 386 VisibilityClient.addLabels(util.getConnection(), labels.split(COMMA)); 387 VisibilityClient.setAuths(util.getConnection(), labels.split(COMMA), USER.getName()); 388 } catch (Throwable t) { 389 throw new IOException(t); 390 } 391 } 392 393 static class VisibilityVerify extends Verify { 394 private static final Logger LOG = LoggerFactory.getLogger(VisibilityVerify.class); 395 private TableName tableName; 396 private int labelIndex; 397 398 public VisibilityVerify(String tableName, int index) { 399 this.tableName = TableName.valueOf(tableName); 400 this.labelIndex = index; 401 } 402 403 @Override 404 public int run(final Path outputDir, final int numReducers) throws Exception { 405 LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers); 406 PrivilegedExceptionAction<Integer> scanAction = new PrivilegedExceptionAction<Integer>() { 407 @Override 408 public Integer run() throws Exception { 409 return doVerify(outputDir, numReducers); 410 } 411 }; 412 return USER.runAs(scanAction); 413 } 414 415 private int doVerify(Path outputDir, int numReducers) 416 throws IOException, InterruptedException, ClassNotFoundException { 417 job = new Job(getConf()); 418 419 job.setJobName("Link Verifier"); 420 job.setNumReduceTasks(numReducers); 421 job.setJarByClass(getClass()); 422 423 setJobScannerConf(job); 424 425 Scan scan = new Scan(); 426 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 427 scan.setCaching(10000); 428 scan.setCacheBlocks(false); 429 String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new); 430 431 scan.setAuthorizations( 432 new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1])); 433 434 TableMapReduceUtil.initTableMapperJob(tableName.getName(), scan, VerifyMapper.class, 435 BytesWritable.class, BytesWritable.class, job); 436 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 437 AbstractHBaseTool.class); 438 439 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 440 441 job.setReducerClass(VerifyReducer.class); 442 job.setOutputFormatClass(TextOutputFormat.class); 443 TextOutputFormat.setOutputPath(job, outputDir); 444 boolean success = job.waitForCompletion(true); 445 446 return success ? 0 : 1; 447 } 448 449 @Override 450 protected void handleFailure(Counters counters) throws IOException { 451 try (Connection conn = ConnectionFactory.createConnection(job.getConfiguration())) { 452 TableName tableName = TableName.valueOf(COMMON_TABLE_NAME); 453 CounterGroup g = counters.getGroup("undef"); 454 Iterator<Counter> it = g.iterator(); 455 while (it.hasNext()) { 456 String keyString = it.next().getName(); 457 byte[] key = Bytes.toBytes(keyString); 458 HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true); 459 LOG.error("undefined row " + keyString + ", " + loc); 460 } 461 g = counters.getGroup("unref"); 462 it = g.iterator(); 463 while (it.hasNext()) { 464 String keyString = it.next().getName(); 465 byte[] key = Bytes.toBytes(keyString); 466 HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true); 467 LOG.error("unreferred row " + keyString + ", " + loc); 468 } 469 } 470 } 471 } 472 473 static class VisibilityLoop extends Loop { 474 private static final int SLEEP_IN_MS = 5000; 475 private static final Logger LOG = LoggerFactory.getLogger(VisibilityLoop.class); 476 477 @Override 478 protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width, 479 Integer wrapMultiplier, Integer numWalkers) throws Exception { 480 Path outputPath = new Path(outputDir); 481 UUID uuid = UUID.randomUUID(); // create a random UUID. 482 Path generatorOutput = new Path(outputPath, uuid.toString()); 483 484 Generator generator = new VisibilityGenerator(); 485 generator.setConf(getConf()); 486 int retCode = 487 generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, numWalkers); 488 if (retCode > 0) { 489 throw new RuntimeException("Generator failed with return code: " + retCode); 490 } 491 } 492 493 protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width, 494 Integer wrapMultiplier, int tableIndex) throws Exception { 495 LOG.info("Running copier on table " 496 + IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex)); 497 Copier copier = new Copier( 498 IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true); 499 copier.setConf(getConf()); 500 copier.runCopier(outputDir); 501 Thread.sleep(SLEEP_IN_MS); 502 } 503 504 protected void runVerify(String outputDir, int numReducers, long expectedNumNodes, 505 boolean allTables) throws Exception { 506 Path outputPath = new Path(outputDir); 507 508 if (allTables) { 509 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 510 LOG.info("Verifying table " + i); 511 sleep(SLEEP_IN_MS); 512 UUID uuid = UUID.randomUUID(); // create a random UUID. 513 Path iterationOutput = new Path(outputPath, uuid.toString()); 514 Verify verify = new VisibilityVerify(getTableName(i).getNameAsString(), i); 515 verify(numReducers, expectedNumNodes, iterationOutput, verify); 516 } 517 } 518 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 519 runVerifyCommonTable(outputDir, numReducers, expectedNumNodes, i); 520 } 521 } 522 523 private void runVerify(String outputDir, int numReducers, long expectedNodes, int tableIndex) 524 throws Exception { 525 long temp = expectedNodes; 526 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 527 if (i <= tableIndex) { 528 expectedNodes = 0; 529 } else { 530 expectedNodes = temp; 531 } 532 LOG.info("Verifying data in the table with index " + i + " and expected nodes is " 533 + expectedNodes); 534 runVerifyCommonTable(outputDir, numReducers, expectedNodes, i); 535 } 536 } 537 538 private void sleep(long ms) throws InterruptedException { 539 Thread.sleep(ms); 540 } 541 542 protected void runVerifyCommonTable(String outputDir, int numReducers, long expectedNumNodes, 543 int index) throws Exception { 544 LOG.info("Verifying common table with index " + index); 545 sleep(SLEEP_IN_MS); 546 Path outputPath = new Path(outputDir); 547 UUID uuid = UUID.randomUUID(); // create a random UUID. 548 Path iterationOutput = new Path(outputPath, uuid.toString()); 549 Verify verify = 550 new VisibilityVerify(TableName.valueOf(COMMON_TABLE_NAME).getNameAsString(), index); 551 verify(numReducers, expectedNumNodes, iterationOutput, verify); 552 } 553 554 protected void runCopier(String outputDir) throws Exception { 555 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 556 LOG.info("Running copier " + IntegrationTestBigLinkedListWithVisibility.getTableName(i)); 557 sleep(SLEEP_IN_MS); 558 Copier copier = 559 new Copier(IntegrationTestBigLinkedListWithVisibility.getTableName(i), i, false); 560 copier.setConf(getConf()); 561 copier.runCopier(outputDir); 562 } 563 } 564 565 private void verify(int numReducers, long expectedNumNodes, Path iterationOutput, Verify verify) 566 throws Exception { 567 verify.setConf(getConf()); 568 int retCode = verify.run(iterationOutput, numReducers); 569 if (retCode > 0) { 570 throw new RuntimeException("Verify.run failed with return code: " + retCode); 571 } 572 573 if (!verify.verify(expectedNumNodes)) { 574 throw new RuntimeException("Verify.verify failed"); 575 } 576 577 LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes); 578 } 579 580 @Override 581 public int run(String[] args) throws Exception { 582 if (args.length < 5) { 583 System.err.println( 584 "Usage: Loop <num iterations> " + "<num mappers> <num nodes per mapper> <output dir> " 585 + "<num reducers> [<width> <wrap multiplier>]"); 586 return 1; 587 } 588 LOG.info("Running Loop with args:" + Arrays.deepToString(args)); 589 590 int numIterations = Integer.parseInt(args[0]); 591 int numMappers = Integer.parseInt(args[1]); 592 long numNodes = Long.parseLong(args[2]); 593 String outputDir = args[3]; 594 int numReducers = Integer.parseInt(args[4]); 595 Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]); 596 Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]); 597 long expectedNumNodes = 0; 598 599 if (numIterations < 0) { 600 numIterations = Integer.MAX_VALUE; // run indefinitely (kind of) 601 } 602 603 for (int i = 0; i < numIterations; i++) { 604 LOG.info("Starting iteration = " + i); 605 LOG.info("Generating data"); 606 // By default run no concurrent walkers for test with visibility 607 runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, 0); 608 expectedNumNodes += numMappers * numNodes; 609 // Copying wont work because expressions are not returned back to the 610 // client 611 LOG.info("Running copier"); 612 sleep(SLEEP_IN_MS); 613 runCopier(outputDir); 614 LOG.info("Verifying copied data"); 615 sleep(SLEEP_IN_MS); 616 runVerify(outputDir, numReducers, expectedNumNodes, true); 617 sleep(SLEEP_IN_MS); 618 for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) { 619 LOG.info("Deleting data on table with index: " + j); 620 runDelete(numMappers, numNodes, outputDir, width, wrapMultiplier, j); 621 sleep(SLEEP_IN_MS); 622 LOG.info("Verifying common table after deleting"); 623 runVerify(outputDir, numReducers, expectedNumNodes, j); 624 sleep(SLEEP_IN_MS); 625 } 626 } 627 return 0; 628 } 629 } 630 631 @Override 632 @Test 633 public void testContinuousIngest() throws IOException, Exception { 634 // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> 635 // <num reducers> 636 int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new VisibilityLoop(), 637 new String[] { "1", "1", "20000", 638 util.getDataTestDirOnTestFS("IntegrationTestBigLinkedListWithVisibility").toString(), "1", 639 "10000" }); 640 assertEquals(0, ret); 641 } 642 643 public static void main(String[] args) throws Exception { 644 Configuration conf = HBaseConfiguration.create(); 645 IntegrationTestingUtility.setUseDistributedCluster(conf); 646 int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedListWithVisibility(), args); 647 System.exit(ret); 648 } 649 650 @Override 651 protected MonkeyFactory getDefaultMonkeyFactory() { 652 return MonkeyFactory.getFactory(MonkeyFactory.CALM); 653 } 654 655 @Override 656 public int runTestFromCommandLine() throws Exception { 657 return ToolRunner.run(getConf(), new VisibilityLoop(), otherArgs); 658 } 659}