001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.security.PrivilegedExceptionAction;
024import java.util.Arrays;
025import java.util.Iterator;
026import java.util.UUID;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.conf.Configured;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.IntegrationTestingUtility;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.BufferedMutator;
039import org.apache.hadoop.hbase.client.BufferedMutatorParams;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
049import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
050import org.apache.hadoop.hbase.log.HBaseMarkers;
051import org.apache.hadoop.hbase.mapreduce.Import;
052import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
053import org.apache.hadoop.hbase.security.User;
054import org.apache.hadoop.hbase.security.access.AccessControlClient;
055import org.apache.hadoop.hbase.security.access.Permission;
056import org.apache.hadoop.hbase.security.visibility.Authorizations;
057import org.apache.hadoop.hbase.security.visibility.CellVisibility;
058import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
059import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
060import org.apache.hadoop.hbase.testclassification.IntegrationTests;
061import org.apache.hadoop.hbase.util.AbstractHBaseTool;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.io.BytesWritable;
064import org.apache.hadoop.mapreduce.Counter;
065import org.apache.hadoop.mapreduce.CounterGroup;
066import org.apache.hadoop.mapreduce.Counters;
067import org.apache.hadoop.mapreduce.Job;
068import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
069import org.apache.hadoop.util.Tool;
070import org.apache.hadoop.util.ToolRunner;
071import org.junit.jupiter.api.Tag;
072import org.junit.jupiter.api.Test;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
077import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
078
079/**
080 * IT test used to verify the deletes with visibility labels. The test creates three tables
081 * tablename_0, tablename_1 and tablename_2 and each table is associated with a unique pair of
082 * labels. Another common table with the name 'commontable' is created and it has the data combined
083 * from all these 3 tables such that there are 3 versions of every row but the visibility label in
084 * every row corresponds to the table from which the row originated. Then deletes are issued to the
085 * common table by selecting the visibility label associated with each of the smaller tables. After
086 * the delete is issued with one set of visibility labels we try to scan the common table with each
087 * of the visibility pairs defined for the 3 tables. So after the first delete is issued, a scan
088 * with the first set of visibility labels would return zero result whereas the scan issued with the
089 * other two sets of visibility labels should return all the rows corresponding to that set of
090 * visibility labels. The above process of delete and scan is repeated until after the last set of
091 * visibility labels are used for the deletes the common table should not return any row. To use
092 * this ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedListWithVisibility Loop 1 1
093 * 20000 /tmp 1 10000 or ./hbase org.apache.hadoop.hbase.IntegrationTestsDriver -r
094 * .*IntegrationTestBigLinkedListWithVisibility.*
095 */
096@Tag(IntegrationTests.TAG)
097public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestBigLinkedList {
098
099  private static final String CONFIDENTIAL = "confidential";
100  private static final String TOPSECRET = "topsecret";
101  private static final String SECRET = "secret";
102  private static final String PUBLIC = "public";
103  private static final String PRIVATE = "private";
104  private static final String EVERYONE = "everyone";
105  private static final String RESTRICTED = "restricted";
106  private static final String GROUP = "group";
107  private static final String PREVILIGED = "previliged";
108  private static final String OPEN = "open";
109  public static String labels = CONFIDENTIAL + "," + TOPSECRET + "," + SECRET + "," + RESTRICTED
110    + "," + PRIVATE + "," + PREVILIGED + "," + GROUP + "," + OPEN + "," + PUBLIC + "," + EVERYONE;
111  private static final String COMMA = ",";
112  private static final String UNDER_SCORE = "_";
113  public static int DEFAULT_TABLES_COUNT = 3;
114  public static String tableName = "tableName";
115  public static final String COMMON_TABLE_NAME = "commontable";
116  public static final String LABELS_KEY = "LABELS";
117  public static final String INDEX_KEY = "INDEX";
118  private static User USER;
119  private static final String OR = "|";
120  private static String USER_OPT = "user";
121  private static String userName = "user1";
122
123  static class VisibilityGenerator extends Generator {
124    private static final Logger LOG = LoggerFactory.getLogger(VisibilityGenerator.class);
125
126    @Override
127    protected void createSchema() throws IOException {
128      LOG.info("Creating tables");
129      // Create three tables
130      boolean acl = AccessControlClient
131        .isAccessControllerRunning(ConnectionFactory.createConnection(getConf()));
132      if (!acl) {
133        LOG.info("No ACL available.");
134      }
135      try (Connection conn = ConnectionFactory.createConnection(getConf());
136        Admin admin = conn.getAdmin()) {
137        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
138          TableName tableName = IntegrationTestBigLinkedListWithVisibility.getTableName(i);
139          createTable(admin, tableName, false, acl);
140        }
141        TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
142        createTable(admin, tableName, true, acl);
143      }
144    }
145
146    private void createTable(Admin admin, TableName tableName, boolean setVersion, boolean acl)
147      throws IOException {
148      if (!admin.tableExists(tableName)) {
149        ColumnFamilyDescriptorBuilder cfBuilder =
150          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME);
151        if (setVersion) {
152          cfBuilder.setMaxVersions(DEFAULT_TABLES_COUNT);
153        }
154        TableDescriptor tableDescriptor =
155          TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(cfBuilder.build()).build();
156        admin.createTable(tableDescriptor);
157        if (acl) {
158          LOG.info("Granting permissions for user " + USER.getShortName());
159          Permission.Action[] actions = { Permission.Action.READ };
160          try {
161            AccessControlClient.grant(ConnectionFactory.createConnection(getConf()), tableName,
162              USER.getShortName(), null, null, actions);
163          } catch (Throwable e) {
164            LOG.error(HBaseMarkers.FATAL,
165              "Error in granting permission for the user " + USER.getShortName(), e);
166            throw new IOException(e);
167          }
168        }
169      }
170    }
171
172    @Override
173    protected void setMapperForGenerator(Job job) {
174      job.setMapperClass(VisibilityGeneratorMapper.class);
175    }
176
177    static class VisibilityGeneratorMapper extends GeneratorMapper {
178      BufferedMutator[] tables = new BufferedMutator[DEFAULT_TABLES_COUNT];
179
180      @Override
181      protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
182        throws IOException, InterruptedException {
183        super.setup(context);
184      }
185
186      @Override
187      protected void instantiateHTable() throws IOException {
188        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
189          BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i));
190          params.writeBufferSize(4 * 1024 * 1024);
191          BufferedMutator table = connection.getBufferedMutator(params);
192          this.tables[i] = table;
193        }
194      }
195
196      @Override
197      protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
198        throws IOException, InterruptedException {
199        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
200          if (tables[i] != null) {
201            tables[i].close();
202          }
203        }
204      }
205
206      @Override
207      protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count,
208        byte[][] prev, byte[][] current, byte[] id) throws IOException {
209        String visibilityExps = "";
210        String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new);
211        for (int i = 0; i < current.length; i++) {
212          for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
213            Put put = new Put(current[i]);
214            byte[] value = prev == null ? NO_KEY : prev[i];
215            put.addColumn(FAMILY_NAME, COLUMN_PREV, value);
216
217            if (count >= 0) {
218              put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
219            }
220            if (id != null) {
221              put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
222            }
223            visibilityExps = split[j * 2] + OR + split[(j * 2) + 1];
224            put.setCellVisibility(new CellVisibility(visibilityExps));
225            tables[j].mutate(put);
226            try {
227              Thread.sleep(1);
228            } catch (InterruptedException e) {
229              throw new IOException();
230            }
231          }
232          if (i % 1000 == 0) {
233            // Tickle progress every so often else maprunner will think us hung
234            output.progress();
235          }
236        }
237      }
238    }
239  }
240
241  static class Copier extends Configured implements Tool {
242    private static final Logger LOG = LoggerFactory.getLogger(Copier.class);
243    private TableName tableName;
244    private int labelIndex;
245    private boolean delete;
246
247    public Copier(TableName tableName, int index, boolean delete) {
248      this.tableName = tableName;
249      this.labelIndex = index;
250      this.delete = delete;
251    }
252
253    public int runCopier(String outputDir) throws Exception {
254      Job job = new Job(getConf());
255      job.setJobName("Data copier");
256      job.getConfiguration().setInt("INDEX", labelIndex);
257      job.getConfiguration().set("LABELS", labels);
258      job.setJarByClass(getClass());
259      Scan scan = new Scan();
260      scan.setCacheBlocks(false);
261      scan.setRaw(true);
262
263      String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new);
264      scan.setAuthorizations(
265        new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1]));
266      if (delete) {
267        LOG.info("Running deletes");
268      } else {
269        LOG.info("Running copiers");
270      }
271      if (delete) {
272        TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
273          VisibilityDeleteImport.class, null, null, job);
274      } else {
275        TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
276          VisibilityImport.class, null, null, job);
277      }
278      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
279      job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false);
280      TableMapReduceUtil.initTableReducerJob(COMMON_TABLE_NAME, null, job);
281      TableMapReduceUtil.addDependencyJars(job);
282      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
283        AbstractHBaseTool.class);
284      TableMapReduceUtil.initCredentials(job);
285      job.setNumReduceTasks(0);
286      boolean success = job.waitForCompletion(true);
287      return success ? 0 : 1;
288    }
289
290    @Override
291    public int run(String[] arg0) throws Exception {
292      // TODO Auto-generated method stub
293      return 0;
294    }
295  }
296
297  static class VisibilityImport extends Import.Importer {
298    private int index;
299    private String labels;
300    private String[] split;
301
302    @Override
303    public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
304      index = context.getConfiguration().getInt(INDEX_KEY, -1);
305      labels = context.getConfiguration().get(LABELS_KEY);
306      split = labels.split(COMMA);
307      super.setup(context);
308    }
309
310    @Override
311    protected void addPutToKv(Put put, Cell kv) throws IOException {
312      String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
313      put.setCellVisibility(new CellVisibility(visibilityExps));
314      super.addPutToKv(put, kv);
315    }
316  }
317
318  static class VisibilityDeleteImport extends Import.Importer {
319    private int index;
320    private String labels;
321    private String[] split;
322
323    @Override
324    public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
325      index = context.getConfiguration().getInt(INDEX_KEY, -1);
326      labels = context.getConfiguration().get(LABELS_KEY);
327      split = labels.split(COMMA);
328      super.setup(context);
329    }
330
331    // Creating delete here
332    @Override
333    protected void processKV(ImmutableBytesWritable key, Result result,
334      org.apache.hadoop.mapreduce.Mapper.Context context, Put put,
335      org.apache.hadoop.hbase.client.Delete delete) throws IOException, InterruptedException {
336      String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
337      for (Cell kv : result.rawCells()) {
338        // skip if we filter it out
339        if (kv == null) continue;
340        // Create deletes here
341        if (delete == null) {
342          delete = new Delete(key.get());
343        }
344        delete.setCellVisibility(new CellVisibility(visibilityExps));
345        delete.addFamily(CellUtil.cloneFamily(kv));
346      }
347      if (delete != null) {
348        context.write(key, delete);
349      }
350    }
351  }
352
353  @Override
354  protected void addOptions() {
355    super.addOptions();
356    addOptWithArg("u", USER_OPT, "User name");
357  }
358
359  @Override
360  protected void processOptions(CommandLine cmd) {
361    super.processOptions(cmd);
362    if (cmd.hasOption(USER_OPT)) {
363      userName = cmd.getOptionValue(USER_OPT);
364    }
365
366  }
367
368  @Override
369  public void setUpCluster() throws Exception {
370    util = getTestingUtil(null);
371    Configuration conf = util.getConfiguration();
372    VisibilityTestUtil.enableVisiblityLabels(conf);
373    conf.set("hbase.superuser", User.getCurrent().getName());
374    conf.setBoolean("dfs.permissions", false);
375    USER = User.createUserForTesting(conf, userName, new String[] {});
376    super.setUpCluster();
377    addLabels();
378  }
379
380  static TableName getTableName(int i) {
381    return TableName.valueOf(tableName + UNDER_SCORE + i);
382  }
383
384  private void addLabels() throws Exception {
385    try {
386      VisibilityClient.addLabels(util.getConnection(), labels.split(COMMA));
387      VisibilityClient.setAuths(util.getConnection(), labels.split(COMMA), USER.getName());
388    } catch (Throwable t) {
389      throw new IOException(t);
390    }
391  }
392
393  static class VisibilityVerify extends Verify {
394    private static final Logger LOG = LoggerFactory.getLogger(VisibilityVerify.class);
395    private TableName tableName;
396    private int labelIndex;
397
398    public VisibilityVerify(String tableName, int index) {
399      this.tableName = TableName.valueOf(tableName);
400      this.labelIndex = index;
401    }
402
403    @Override
404    public int run(final Path outputDir, final int numReducers) throws Exception {
405      LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers);
406      PrivilegedExceptionAction<Integer> scanAction = new PrivilegedExceptionAction<Integer>() {
407        @Override
408        public Integer run() throws Exception {
409          return doVerify(outputDir, numReducers);
410        }
411      };
412      return USER.runAs(scanAction);
413    }
414
415    private int doVerify(Path outputDir, int numReducers)
416      throws IOException, InterruptedException, ClassNotFoundException {
417      job = new Job(getConf());
418
419      job.setJobName("Link Verifier");
420      job.setNumReduceTasks(numReducers);
421      job.setJarByClass(getClass());
422
423      setJobScannerConf(job);
424
425      Scan scan = new Scan();
426      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
427      scan.setCaching(10000);
428      scan.setCacheBlocks(false);
429      String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new);
430
431      scan.setAuthorizations(
432        new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1]));
433
434      TableMapReduceUtil.initTableMapperJob(tableName.getName(), scan, VerifyMapper.class,
435        BytesWritable.class, BytesWritable.class, job);
436      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
437        AbstractHBaseTool.class);
438
439      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
440
441      job.setReducerClass(VerifyReducer.class);
442      job.setOutputFormatClass(TextOutputFormat.class);
443      TextOutputFormat.setOutputPath(job, outputDir);
444      boolean success = job.waitForCompletion(true);
445
446      return success ? 0 : 1;
447    }
448
449    @Override
450    protected void handleFailure(Counters counters) throws IOException {
451      try (Connection conn = ConnectionFactory.createConnection(job.getConfiguration())) {
452        TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
453        CounterGroup g = counters.getGroup("undef");
454        Iterator<Counter> it = g.iterator();
455        while (it.hasNext()) {
456          String keyString = it.next().getName();
457          byte[] key = Bytes.toBytes(keyString);
458          HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true);
459          LOG.error("undefined row " + keyString + ", " + loc);
460        }
461        g = counters.getGroup("unref");
462        it = g.iterator();
463        while (it.hasNext()) {
464          String keyString = it.next().getName();
465          byte[] key = Bytes.toBytes(keyString);
466          HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true);
467          LOG.error("unreferred row " + keyString + ", " + loc);
468        }
469      }
470    }
471  }
472
473  static class VisibilityLoop extends Loop {
474    private static final int SLEEP_IN_MS = 5000;
475    private static final Logger LOG = LoggerFactory.getLogger(VisibilityLoop.class);
476
477    @Override
478    protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
479      Integer wrapMultiplier, Integer numWalkers) throws Exception {
480      Path outputPath = new Path(outputDir);
481      UUID uuid = UUID.randomUUID(); // create a random UUID.
482      Path generatorOutput = new Path(outputPath, uuid.toString());
483
484      Generator generator = new VisibilityGenerator();
485      generator.setConf(getConf());
486      int retCode =
487        generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, numWalkers);
488      if (retCode > 0) {
489        throw new RuntimeException("Generator failed with return code: " + retCode);
490      }
491    }
492
493    protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width,
494      Integer wrapMultiplier, int tableIndex) throws Exception {
495      LOG.info("Running copier on table "
496        + IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex));
497      Copier copier = new Copier(
498        IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true);
499      copier.setConf(getConf());
500      copier.runCopier(outputDir);
501      Thread.sleep(SLEEP_IN_MS);
502    }
503
504    protected void runVerify(String outputDir, int numReducers, long expectedNumNodes,
505      boolean allTables) throws Exception {
506      Path outputPath = new Path(outputDir);
507
508      if (allTables) {
509        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
510          LOG.info("Verifying table " + i);
511          sleep(SLEEP_IN_MS);
512          UUID uuid = UUID.randomUUID(); // create a random UUID.
513          Path iterationOutput = new Path(outputPath, uuid.toString());
514          Verify verify = new VisibilityVerify(getTableName(i).getNameAsString(), i);
515          verify(numReducers, expectedNumNodes, iterationOutput, verify);
516        }
517      }
518      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
519        runVerifyCommonTable(outputDir, numReducers, expectedNumNodes, i);
520      }
521    }
522
523    private void runVerify(String outputDir, int numReducers, long expectedNodes, int tableIndex)
524      throws Exception {
525      long temp = expectedNodes;
526      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
527        if (i <= tableIndex) {
528          expectedNodes = 0;
529        } else {
530          expectedNodes = temp;
531        }
532        LOG.info("Verifying data in the table with index " + i + " and expected nodes is "
533          + expectedNodes);
534        runVerifyCommonTable(outputDir, numReducers, expectedNodes, i);
535      }
536    }
537
538    private void sleep(long ms) throws InterruptedException {
539      Thread.sleep(ms);
540    }
541
542    protected void runVerifyCommonTable(String outputDir, int numReducers, long expectedNumNodes,
543      int index) throws Exception {
544      LOG.info("Verifying common table with index " + index);
545      sleep(SLEEP_IN_MS);
546      Path outputPath = new Path(outputDir);
547      UUID uuid = UUID.randomUUID(); // create a random UUID.
548      Path iterationOutput = new Path(outputPath, uuid.toString());
549      Verify verify =
550        new VisibilityVerify(TableName.valueOf(COMMON_TABLE_NAME).getNameAsString(), index);
551      verify(numReducers, expectedNumNodes, iterationOutput, verify);
552    }
553
554    protected void runCopier(String outputDir) throws Exception {
555      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
556        LOG.info("Running copier " + IntegrationTestBigLinkedListWithVisibility.getTableName(i));
557        sleep(SLEEP_IN_MS);
558        Copier copier =
559          new Copier(IntegrationTestBigLinkedListWithVisibility.getTableName(i), i, false);
560        copier.setConf(getConf());
561        copier.runCopier(outputDir);
562      }
563    }
564
565    private void verify(int numReducers, long expectedNumNodes, Path iterationOutput, Verify verify)
566      throws Exception {
567      verify.setConf(getConf());
568      int retCode = verify.run(iterationOutput, numReducers);
569      if (retCode > 0) {
570        throw new RuntimeException("Verify.run failed with return code: " + retCode);
571      }
572
573      if (!verify.verify(expectedNumNodes)) {
574        throw new RuntimeException("Verify.verify failed");
575      }
576
577      LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
578    }
579
580    @Override
581    public int run(String[] args) throws Exception {
582      if (args.length < 5) {
583        System.err.println(
584          "Usage: Loop <num iterations> " + "<num mappers> <num nodes per mapper> <output dir> "
585            + "<num reducers> [<width> <wrap multiplier>]");
586        return 1;
587      }
588      LOG.info("Running Loop with args:" + Arrays.deepToString(args));
589
590      int numIterations = Integer.parseInt(args[0]);
591      int numMappers = Integer.parseInt(args[1]);
592      long numNodes = Long.parseLong(args[2]);
593      String outputDir = args[3];
594      int numReducers = Integer.parseInt(args[4]);
595      Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
596      Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
597      long expectedNumNodes = 0;
598
599      if (numIterations < 0) {
600        numIterations = Integer.MAX_VALUE; // run indefinitely (kind of)
601      }
602
603      for (int i = 0; i < numIterations; i++) {
604        LOG.info("Starting iteration = " + i);
605        LOG.info("Generating data");
606        // By default run no concurrent walkers for test with visibility
607        runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, 0);
608        expectedNumNodes += numMappers * numNodes;
609        // Copying wont work because expressions are not returned back to the
610        // client
611        LOG.info("Running copier");
612        sleep(SLEEP_IN_MS);
613        runCopier(outputDir);
614        LOG.info("Verifying copied data");
615        sleep(SLEEP_IN_MS);
616        runVerify(outputDir, numReducers, expectedNumNodes, true);
617        sleep(SLEEP_IN_MS);
618        for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
619          LOG.info("Deleting data on table with index: " + j);
620          runDelete(numMappers, numNodes, outputDir, width, wrapMultiplier, j);
621          sleep(SLEEP_IN_MS);
622          LOG.info("Verifying common table after deleting");
623          runVerify(outputDir, numReducers, expectedNumNodes, j);
624          sleep(SLEEP_IN_MS);
625        }
626      }
627      return 0;
628    }
629  }
630
631  @Override
632  @Test
633  public void testContinuousIngest() throws IOException, Exception {
634    // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir>
635    // <num reducers>
636    int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new VisibilityLoop(),
637      new String[] { "1", "1", "20000",
638        util.getDataTestDirOnTestFS("IntegrationTestBigLinkedListWithVisibility").toString(), "1",
639        "10000" });
640    assertEquals(0, ret);
641  }
642
643  public static void main(String[] args) throws Exception {
644    Configuration conf = HBaseConfiguration.create();
645    IntegrationTestingUtility.setUseDistributedCluster(conf);
646    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedListWithVisibility(), args);
647    System.exit(ret);
648  }
649
650  @Override
651  protected MonkeyFactory getDefaultMonkeyFactory() {
652    return MonkeyFactory.getFactory(MonkeyFactory.CALM);
653  }
654
655  @Override
656  public int runTestFromCommandLine() throws Exception {
657    return ToolRunner.run(getConf(), new VisibilityLoop(), otherArgs);
658  }
659}