001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import java.io.IOException;
021import java.security.PrivilegedExceptionAction;
022import java.util.Arrays;
023import java.util.Iterator;
024import java.util.UUID;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.conf.Configured;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellUtil;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HRegionLocation;
032import org.apache.hadoop.hbase.IntegrationTestingUtility;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.BufferedMutator;
037import org.apache.hadoop.hbase.client.BufferedMutatorParams;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
047import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
048import org.apache.hadoop.hbase.log.HBaseMarkers;
049import org.apache.hadoop.hbase.mapreduce.Import;
050import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
051import org.apache.hadoop.hbase.security.User;
052import org.apache.hadoop.hbase.security.access.AccessControlClient;
053import org.apache.hadoop.hbase.security.access.Permission;
054import org.apache.hadoop.hbase.security.visibility.Authorizations;
055import org.apache.hadoop.hbase.security.visibility.CellVisibility;
056import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
057import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
058import org.apache.hadoop.hbase.testclassification.IntegrationTests;
059import org.apache.hadoop.hbase.util.AbstractHBaseTool;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.io.BytesWritable;
062import org.apache.hadoop.mapreduce.Counter;
063import org.apache.hadoop.mapreduce.CounterGroup;
064import org.apache.hadoop.mapreduce.Counters;
065import org.apache.hadoop.mapreduce.Job;
066import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
067import org.apache.hadoop.util.Tool;
068import org.apache.hadoop.util.ToolRunner;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
075
076/**
077 * IT test used to verify the deletes with visibility labels. The test creates three tables
078 * tablename_0, tablename_1 and tablename_2 and each table is associated with a unique pair of
079 * labels. Another common table with the name 'commontable' is created and it has the data combined
080 * from all these 3 tables such that there are 3 versions of every row but the visibility label in
081 * every row corresponds to the table from which the row originated. Then deletes are issued to the
082 * common table by selecting the visibility label associated with each of the smaller tables. After
083 * the delete is issued with one set of visibility labels we try to scan the common table with each
084 * of the visibility pairs defined for the 3 tables. So after the first delete is issued, a scan
085 * with the first set of visibility labels would return zero result whereas the scan issued with the
086 * other two sets of visibility labels should return all the rows corresponding to that set of
087 * visibility labels. The above process of delete and scan is repeated until after the last set of
088 * visibility labels are used for the deletes the common table should not return any row. To use
089 * this ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedListWithVisibility Loop 1 1
090 * 20000 /tmp 1 10000 or ./hbase org.apache.hadoop.hbase.IntegrationTestsDriver -r
091 * .*IntegrationTestBigLinkedListWithVisibility.*
092 */
093@Category(IntegrationTests.class)
094public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestBigLinkedList {
095
096  private static final String CONFIDENTIAL = "confidential";
097  private static final String TOPSECRET = "topsecret";
098  private static final String SECRET = "secret";
099  private static final String PUBLIC = "public";
100  private static final String PRIVATE = "private";
101  private static final String EVERYONE = "everyone";
102  private static final String RESTRICTED = "restricted";
103  private static final String GROUP = "group";
104  private static final String PREVILIGED = "previliged";
105  private static final String OPEN = "open";
106  public static String labels = CONFIDENTIAL + "," + TOPSECRET + "," + SECRET + "," + RESTRICTED
107    + "," + PRIVATE + "," + PREVILIGED + "," + GROUP + "," + OPEN + "," + PUBLIC + "," + EVERYONE;
108  private static final String COMMA = ",";
109  private static final String UNDER_SCORE = "_";
110  public static int DEFAULT_TABLES_COUNT = 3;
111  public static String tableName = "tableName";
112  public static final String COMMON_TABLE_NAME = "commontable";
113  public static final String LABELS_KEY = "LABELS";
114  public static final String INDEX_KEY = "INDEX";
115  private static User USER;
116  private static final String OR = "|";
117  private static String USER_OPT = "user";
118  private static String userName = "user1";
119
120  static class VisibilityGenerator extends Generator {
121    private static final Logger LOG = LoggerFactory.getLogger(VisibilityGenerator.class);
122
123    @Override
124    protected void createSchema() throws IOException {
125      LOG.info("Creating tables");
126      // Create three tables
127      boolean acl = AccessControlClient
128        .isAccessControllerRunning(ConnectionFactory.createConnection(getConf()));
129      if (!acl) {
130        LOG.info("No ACL available.");
131      }
132      try (Connection conn = ConnectionFactory.createConnection(getConf());
133        Admin admin = conn.getAdmin()) {
134        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
135          TableName tableName = IntegrationTestBigLinkedListWithVisibility.getTableName(i);
136          createTable(admin, tableName, false, acl);
137        }
138        TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
139        createTable(admin, tableName, true, acl);
140      }
141    }
142
143    private void createTable(Admin admin, TableName tableName, boolean setVersion, boolean acl)
144      throws IOException {
145      if (!admin.tableExists(tableName)) {
146        ColumnFamilyDescriptorBuilder cfBuilder =
147          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME);
148        if (setVersion) {
149          cfBuilder.setMaxVersions(DEFAULT_TABLES_COUNT);
150        }
151        TableDescriptor tableDescriptor =
152          TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(cfBuilder.build()).build();
153        admin.createTable(tableDescriptor);
154        if (acl) {
155          LOG.info("Granting permissions for user " + USER.getShortName());
156          Permission.Action[] actions = { Permission.Action.READ };
157          try {
158            AccessControlClient.grant(ConnectionFactory.createConnection(getConf()), tableName,
159              USER.getShortName(), null, null, actions);
160          } catch (Throwable e) {
161            LOG.error(HBaseMarkers.FATAL,
162              "Error in granting permission for the user " + USER.getShortName(), e);
163            throw new IOException(e);
164          }
165        }
166      }
167    }
168
169    @Override
170    protected void setMapperForGenerator(Job job) {
171      job.setMapperClass(VisibilityGeneratorMapper.class);
172    }
173
174    static class VisibilityGeneratorMapper extends GeneratorMapper {
175      BufferedMutator[] tables = new BufferedMutator[DEFAULT_TABLES_COUNT];
176
177      @Override
178      protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
179        throws IOException, InterruptedException {
180        super.setup(context);
181      }
182
183      @Override
184      protected void instantiateHTable() throws IOException {
185        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
186          BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i));
187          params.writeBufferSize(4 * 1024 * 1024);
188          BufferedMutator table = connection.getBufferedMutator(params);
189          this.tables[i] = table;
190        }
191      }
192
193      @Override
194      protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
195        throws IOException, InterruptedException {
196        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
197          if (tables[i] != null) {
198            tables[i].close();
199          }
200        }
201      }
202
203      @Override
204      protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count,
205        byte[][] prev, byte[][] current, byte[] id) throws IOException {
206        String visibilityExps = "";
207        String[] split = labels.split(COMMA);
208        for (int i = 0; i < current.length; i++) {
209          for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
210            Put put = new Put(current[i]);
211            byte[] value = prev == null ? NO_KEY : prev[i];
212            put.addColumn(FAMILY_NAME, COLUMN_PREV, value);
213
214            if (count >= 0) {
215              put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
216            }
217            if (id != null) {
218              put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
219            }
220            visibilityExps = split[j * 2] + OR + split[(j * 2) + 1];
221            put.setCellVisibility(new CellVisibility(visibilityExps));
222            tables[j].mutate(put);
223            try {
224              Thread.sleep(1);
225            } catch (InterruptedException e) {
226              throw new IOException();
227            }
228          }
229          if (i % 1000 == 0) {
230            // Tickle progress every so often else maprunner will think us hung
231            output.progress();
232          }
233        }
234      }
235    }
236  }
237
238  static class Copier extends Configured implements Tool {
239    private static final Logger LOG = LoggerFactory.getLogger(Copier.class);
240    private TableName tableName;
241    private int labelIndex;
242    private boolean delete;
243
244    public Copier(TableName tableName, int index, boolean delete) {
245      this.tableName = tableName;
246      this.labelIndex = index;
247      this.delete = delete;
248    }
249
250    public int runCopier(String outputDir) throws Exception {
251      Job job = null;
252      Scan scan = null;
253      job = new Job(getConf());
254      job.setJobName("Data copier");
255      job.getConfiguration().setInt("INDEX", labelIndex);
256      job.getConfiguration().set("LABELS", labels);
257      job.setJarByClass(getClass());
258      scan = new Scan();
259      scan.setCacheBlocks(false);
260      scan.setRaw(true);
261
262      String[] split = labels.split(COMMA);
263      scan.setAuthorizations(
264        new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1]));
265      if (delete) {
266        LOG.info("Running deletes");
267      } else {
268        LOG.info("Running copiers");
269      }
270      if (delete) {
271        TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
272          VisibilityDeleteImport.class, null, null, job);
273      } else {
274        TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
275          VisibilityImport.class, null, null, job);
276      }
277      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
278      job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false);
279      TableMapReduceUtil.initTableReducerJob(COMMON_TABLE_NAME, null, job, null, null, null, null);
280      TableMapReduceUtil.addDependencyJars(job);
281      TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
282      TableMapReduceUtil.initCredentials(job);
283      job.setNumReduceTasks(0);
284      boolean success = job.waitForCompletion(true);
285      return success ? 0 : 1;
286    }
287
288    @Override
289    public int run(String[] arg0) throws Exception {
290      // TODO Auto-generated method stub
291      return 0;
292    }
293  }
294
295  static class VisibilityImport extends Import.Importer {
296    private int index;
297    private String labels;
298    private String[] split;
299
300    @Override
301    public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
302      index = context.getConfiguration().getInt(INDEX_KEY, -1);
303      labels = context.getConfiguration().get(LABELS_KEY);
304      split = labels.split(COMMA);
305      super.setup(context);
306    }
307
308    @Override
309    protected void addPutToKv(Put put, Cell kv) throws IOException {
310      String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
311      put.setCellVisibility(new CellVisibility(visibilityExps));
312      super.addPutToKv(put, kv);
313    }
314  }
315
316  static class VisibilityDeleteImport extends Import.Importer {
317    private int index;
318    private String labels;
319    private String[] split;
320
321    @Override
322    public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
323      index = context.getConfiguration().getInt(INDEX_KEY, -1);
324      labels = context.getConfiguration().get(LABELS_KEY);
325      split = labels.split(COMMA);
326      super.setup(context);
327    }
328
329    // Creating delete here
330    @Override
331    protected void processKV(ImmutableBytesWritable key, Result result,
332      org.apache.hadoop.mapreduce.Mapper.Context context, Put put,
333      org.apache.hadoop.hbase.client.Delete delete) throws IOException, InterruptedException {
334      String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
335      for (Cell kv : result.rawCells()) {
336        // skip if we filter it out
337        if (kv == null) continue;
338        // Create deletes here
339        if (delete == null) {
340          delete = new Delete(key.get());
341        }
342        delete.setCellVisibility(new CellVisibility(visibilityExps));
343        delete.addFamily(CellUtil.cloneFamily(kv));
344      }
345      if (delete != null) {
346        context.write(key, delete);
347      }
348    }
349  }
350
351  @Override
352  protected void addOptions() {
353    super.addOptions();
354    addOptWithArg("u", USER_OPT, "User name");
355  }
356
357  @Override
358  protected void processOptions(CommandLine cmd) {
359    super.processOptions(cmd);
360    if (cmd.hasOption(USER_OPT)) {
361      userName = cmd.getOptionValue(USER_OPT);
362    }
363
364  }
365
366  @Override
367  public void setUpCluster() throws Exception {
368    util = getTestingUtil(null);
369    Configuration conf = util.getConfiguration();
370    VisibilityTestUtil.enableVisiblityLabels(conf);
371    conf.set("hbase.superuser", User.getCurrent().getName());
372    conf.setBoolean("dfs.permissions", false);
373    USER = User.createUserForTesting(conf, userName, new String[] {});
374    super.setUpCluster();
375    addLabels();
376  }
377
378  static TableName getTableName(int i) {
379    return TableName.valueOf(tableName + UNDER_SCORE + i);
380  }
381
382  private void addLabels() throws Exception {
383    try {
384      VisibilityClient.addLabels(util.getConnection(), labels.split(COMMA));
385      VisibilityClient.setAuths(util.getConnection(), labels.split(COMMA), USER.getName());
386    } catch (Throwable t) {
387      throw new IOException(t);
388    }
389  }
390
391  static class VisibilityVerify extends Verify {
392    private static final Logger LOG = LoggerFactory.getLogger(VisibilityVerify.class);
393    private TableName tableName;
394    private int labelIndex;
395
396    public VisibilityVerify(String tableName, int index) {
397      this.tableName = TableName.valueOf(tableName);
398      this.labelIndex = index;
399    }
400
401    @Override
402    public int run(final Path outputDir, final int numReducers) throws Exception {
403      LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers);
404      PrivilegedExceptionAction<Integer> scanAction = new PrivilegedExceptionAction<Integer>() {
405        @Override
406        public Integer run() throws Exception {
407          return doVerify(outputDir, numReducers);
408        }
409      };
410      return USER.runAs(scanAction);
411    }
412
413    private int doVerify(Path outputDir, int numReducers)
414      throws IOException, InterruptedException, ClassNotFoundException {
415      job = new Job(getConf());
416
417      job.setJobName("Link Verifier");
418      job.setNumReduceTasks(numReducers);
419      job.setJarByClass(getClass());
420
421      setJobScannerConf(job);
422
423      Scan scan = new Scan();
424      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
425      scan.setCaching(10000);
426      scan.setCacheBlocks(false);
427      String[] split = labels.split(COMMA);
428
429      scan.setAuthorizations(
430        new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1]));
431
432      TableMapReduceUtil.initTableMapperJob(tableName.getName(), scan, VerifyMapper.class,
433        BytesWritable.class, BytesWritable.class, job);
434      TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
435
436      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
437
438      job.setReducerClass(VerifyReducer.class);
439      job.setOutputFormatClass(TextOutputFormat.class);
440      TextOutputFormat.setOutputPath(job, outputDir);
441      boolean success = job.waitForCompletion(true);
442
443      return success ? 0 : 1;
444    }
445
446    @Override
447    protected void handleFailure(Counters counters) throws IOException {
448      try (Connection conn = ConnectionFactory.createConnection(job.getConfiguration())) {
449        TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
450        CounterGroup g = counters.getGroup("undef");
451        Iterator<Counter> it = g.iterator();
452        while (it.hasNext()) {
453          String keyString = it.next().getName();
454          byte[] key = Bytes.toBytes(keyString);
455          HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true);
456          LOG.error("undefined row " + keyString + ", " + loc);
457        }
458        g = counters.getGroup("unref");
459        it = g.iterator();
460        while (it.hasNext()) {
461          String keyString = it.next().getName();
462          byte[] key = Bytes.toBytes(keyString);
463          HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true);
464          LOG.error("unreferred row " + keyString + ", " + loc);
465        }
466      }
467    }
468  }
469
470  static class VisibilityLoop extends Loop {
471    private static final int SLEEP_IN_MS = 5000;
472    private static final Logger LOG = LoggerFactory.getLogger(VisibilityLoop.class);
473    IntegrationTestBigLinkedListWithVisibility it;
474
475    @Override
476    protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
477      Integer wrapMultiplier, Integer numWalkers) throws Exception {
478      Path outputPath = new Path(outputDir);
479      UUID uuid = UUID.randomUUID(); // create a random UUID.
480      Path generatorOutput = new Path(outputPath, uuid.toString());
481
482      Generator generator = new VisibilityGenerator();
483      generator.setConf(getConf());
484      int retCode =
485        generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, numWalkers);
486      if (retCode > 0) {
487        throw new RuntimeException("Generator failed with return code: " + retCode);
488      }
489    }
490
491    protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width,
492      Integer wrapMultiplier, int tableIndex) throws Exception {
493      LOG.info("Running copier on table "
494        + IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex));
495      Copier copier = new Copier(
496        IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true);
497      copier.setConf(getConf());
498      copier.runCopier(outputDir);
499      Thread.sleep(SLEEP_IN_MS);
500    }
501
502    protected void runVerify(String outputDir, int numReducers, long expectedNumNodes,
503      boolean allTables) throws Exception {
504      Path outputPath = new Path(outputDir);
505
506      if (allTables) {
507        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
508          LOG.info("Verifying table " + i);
509          sleep(SLEEP_IN_MS);
510          UUID uuid = UUID.randomUUID(); // create a random UUID.
511          Path iterationOutput = new Path(outputPath, uuid.toString());
512          Verify verify = new VisibilityVerify(getTableName(i).getNameAsString(), i);
513          verify(numReducers, expectedNumNodes, iterationOutput, verify);
514        }
515      }
516      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
517        runVerifyCommonTable(outputDir, numReducers, expectedNumNodes, i);
518      }
519    }
520
521    private void runVerify(String outputDir, int numReducers, long expectedNodes, int tableIndex)
522      throws Exception {
523      long temp = expectedNodes;
524      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
525        if (i <= tableIndex) {
526          expectedNodes = 0;
527        } else {
528          expectedNodes = temp;
529        }
530        LOG.info("Verifying data in the table with index " + i + " and expected nodes is "
531          + expectedNodes);
532        runVerifyCommonTable(outputDir, numReducers, expectedNodes, i);
533      }
534    }
535
536    private void sleep(long ms) throws InterruptedException {
537      Thread.sleep(ms);
538    }
539
540    protected void runVerifyCommonTable(String outputDir, int numReducers, long expectedNumNodes,
541      int index) throws Exception {
542      LOG.info("Verifying common table with index " + index);
543      sleep(SLEEP_IN_MS);
544      Path outputPath = new Path(outputDir);
545      UUID uuid = UUID.randomUUID(); // create a random UUID.
546      Path iterationOutput = new Path(outputPath, uuid.toString());
547      Verify verify =
548        new VisibilityVerify(TableName.valueOf(COMMON_TABLE_NAME).getNameAsString(), index);
549      verify(numReducers, expectedNumNodes, iterationOutput, verify);
550    }
551
552    protected void runCopier(String outputDir) throws Exception {
553      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
554        LOG.info("Running copier " + IntegrationTestBigLinkedListWithVisibility.getTableName(i));
555        sleep(SLEEP_IN_MS);
556        Copier copier =
557          new Copier(IntegrationTestBigLinkedListWithVisibility.getTableName(i), i, false);
558        copier.setConf(getConf());
559        copier.runCopier(outputDir);
560      }
561    }
562
563    private void verify(int numReducers, long expectedNumNodes, Path iterationOutput, Verify verify)
564      throws Exception {
565      verify.setConf(getConf());
566      int retCode = verify.run(iterationOutput, numReducers);
567      if (retCode > 0) {
568        throw new RuntimeException("Verify.run failed with return code: " + retCode);
569      }
570
571      if (!verify.verify(expectedNumNodes)) {
572        throw new RuntimeException("Verify.verify failed");
573      }
574
575      LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
576    }
577
578    @Override
579    public int run(String[] args) throws Exception {
580      if (args.length < 5) {
581        System.err.println(
582          "Usage: Loop <num iterations> " + "<num mappers> <num nodes per mapper> <output dir> "
583            + "<num reducers> [<width> <wrap multiplier>]");
584        return 1;
585      }
586      LOG.info("Running Loop with args:" + Arrays.deepToString(args));
587
588      int numIterations = Integer.parseInt(args[0]);
589      int numMappers = Integer.parseInt(args[1]);
590      long numNodes = Long.parseLong(args[2]);
591      String outputDir = args[3];
592      int numReducers = Integer.parseInt(args[4]);
593      Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
594      Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
595      long expectedNumNodes = 0;
596
597      if (numIterations < 0) {
598        numIterations = Integer.MAX_VALUE; // run indefinitely (kind of)
599      }
600
601      for (int i = 0; i < numIterations; i++) {
602        LOG.info("Starting iteration = " + i);
603        LOG.info("Generating data");
604        // By default run no concurrent walkers for test with visibility
605        runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, 0);
606        expectedNumNodes += numMappers * numNodes;
607        // Copying wont work because expressions are not returned back to the
608        // client
609        LOG.info("Running copier");
610        sleep(SLEEP_IN_MS);
611        runCopier(outputDir);
612        LOG.info("Verifying copied data");
613        sleep(SLEEP_IN_MS);
614        runVerify(outputDir, numReducers, expectedNumNodes, true);
615        sleep(SLEEP_IN_MS);
616        for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
617          LOG.info("Deleting data on table with index: " + j);
618          runDelete(numMappers, numNodes, outputDir, width, wrapMultiplier, j);
619          sleep(SLEEP_IN_MS);
620          LOG.info("Verifying common table after deleting");
621          runVerify(outputDir, numReducers, expectedNumNodes, j);
622          sleep(SLEEP_IN_MS);
623        }
624      }
625      return 0;
626    }
627  }
628
629  @Override
630  @Test
631  public void testContinuousIngest() throws IOException, Exception {
632    // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir>
633    // <num reducers>
634    int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new VisibilityLoop(),
635      new String[] { "1", "1", "20000",
636        util.getDataTestDirOnTestFS("IntegrationTestBigLinkedListWithVisibility").toString(), "1",
637        "10000" });
638    org.junit.Assert.assertEquals(0, ret);
639  }
640
641  public static void main(String[] args) throws Exception {
642    Configuration conf = HBaseConfiguration.create();
643    IntegrationTestingUtility.setUseDistributedCluster(conf);
644    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedListWithVisibility(), args);
645    System.exit(ret);
646  }
647
648  @Override
649  protected MonkeyFactory getDefaultMonkeyFactory() {
650    return MonkeyFactory.getFactory(MonkeyFactory.CALM);
651  }
652
653  @Override
654  public int runTestFromCommandLine() throws Exception {
655    Tool tool = null;
656    Loop loop = new VisibilityLoop();
657    loop.it = this;
658    tool = loop;
659    return ToolRunner.run(getConf(), tool, otherArgs);
660  }
661}