001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import java.io.IOException;
021import java.security.PrivilegedExceptionAction;
022import java.util.Arrays;
023import java.util.Iterator;
024import java.util.UUID;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.conf.Configured;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellUtil;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HColumnDescriptor;
032import org.apache.hadoop.hbase.HRegionLocation;
033import org.apache.hadoop.hbase.HTableDescriptor;
034import org.apache.hadoop.hbase.IntegrationTestingUtility;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.BufferedMutator;
039import org.apache.hadoop.hbase.client.BufferedMutatorParams;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.ConnectionFactory;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
047import org.apache.hadoop.hbase.log.HBaseMarkers;
048import org.apache.hadoop.hbase.mapreduce.Import;
049import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
050import org.apache.hadoop.hbase.security.User;
051import org.apache.hadoop.hbase.security.access.AccessControlClient;
052import org.apache.hadoop.hbase.security.access.Permission;
053import org.apache.hadoop.hbase.security.visibility.Authorizations;
054import org.apache.hadoop.hbase.security.visibility.CellVisibility;
055import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
056import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
057import org.apache.hadoop.hbase.testclassification.IntegrationTests;
058import org.apache.hadoop.hbase.util.AbstractHBaseTool;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.io.BytesWritable;
061import org.apache.hadoop.mapreduce.Counter;
062import org.apache.hadoop.mapreduce.CounterGroup;
063import org.apache.hadoop.mapreduce.Counters;
064import org.apache.hadoop.mapreduce.Job;
065import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
066import org.apache.hadoop.util.Tool;
067import org.apache.hadoop.util.ToolRunner;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
074
075/**
076 * IT test used to verify the deletes with visibility labels.
077 * The test creates three tables tablename_0, tablename_1 and tablename_2 and each table
078 * is associated with a unique pair of labels.
079 * Another common table with the name 'commontable' is created and it has the data combined
080 * from all these 3 tables such that there are 3 versions of every row but the visibility label
081 * in every row corresponds to the table from which the row originated.
082 * Then deletes are issued to the common table by selecting the visibility label
083 * associated with each of the smaller tables.
084 * After the delete is issued with one set of visibility labels we try to scan the common table
085 * with each of the visibility pairs defined for the 3 tables.
086 * So after the first delete is issued, a scan with the first set of visibility labels would
087 * return zero result whereas the scan issued with the other two sets of visibility labels
088 * should return all the rows corresponding to that set of visibility labels.  The above
089 * process of delete and scan is repeated until after the last set of visibility labels are
090 * used for the deletes the common table should not return any row.
091 *
092 * To use this
093 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedListWithVisibility Loop 1 1 20000 /tmp 1 10000
094 * or
095 * ./hbase org.apache.hadoop.hbase.IntegrationTestsDriver -r .*IntegrationTestBigLinkedListWithVisibility.*
096 */
097@Category(IntegrationTests.class)
098public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestBigLinkedList {
099
100  private static final String CONFIDENTIAL = "confidential";
101  private static final String TOPSECRET = "topsecret";
102  private static final String SECRET = "secret";
103  private static final String PUBLIC = "public";
104  private static final String PRIVATE = "private";
105  private static final String EVERYONE = "everyone";
106  private static final String RESTRICTED = "restricted";
107  private static final String GROUP = "group";
108  private static final String PREVILIGED = "previliged";
109  private static final String OPEN = "open";
110  public static String labels = CONFIDENTIAL + "," + TOPSECRET + "," + SECRET + "," + RESTRICTED
111      + "," + PRIVATE + "," + PREVILIGED + "," + GROUP + "," + OPEN + "," + PUBLIC + "," + EVERYONE;
112  private static final String COMMA = ",";
113  private static final String UNDER_SCORE = "_";
114  public static int DEFAULT_TABLES_COUNT = 3;
115  public static String tableName = "tableName";
116  public static final String COMMON_TABLE_NAME = "commontable";
117  public static final String LABELS_KEY = "LABELS";
118  public static final String INDEX_KEY = "INDEX";
119  private static User USER;
120  private static final String OR = "|";
121  private static String USER_OPT = "user";
122  private static String userName = "user1";
123
124  static class VisibilityGenerator extends Generator {
125    private static final Logger LOG = LoggerFactory.getLogger(VisibilityGenerator.class);
126
127    @Override
128    protected void createSchema() throws IOException {
129      LOG.info("Creating tables");
130      // Create three tables
131      boolean acl = AccessControlClient.isAccessControllerRunning(ConnectionFactory
132          .createConnection(getConf()));
133      if(!acl) {
134        LOG.info("No ACL available.");
135      }
136      try (Connection conn = ConnectionFactory.createConnection(getConf());
137          Admin admin = conn.getAdmin()) {
138        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
139          TableName tableName = IntegrationTestBigLinkedListWithVisibility.getTableName(i);
140          createTable(admin, tableName, false, acl);
141        }
142        TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
143        createTable(admin, tableName, true, acl);
144      }
145    }
146
147    private void createTable(Admin admin, TableName tableName, boolean setVersion,
148        boolean acl) throws IOException {
149      if (!admin.tableExists(tableName)) {
150        HTableDescriptor htd = new HTableDescriptor(tableName);
151        HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
152        if (setVersion) {
153          family.setMaxVersions(DEFAULT_TABLES_COUNT);
154        }
155        htd.addFamily(family);
156        admin.createTable(htd);
157        if (acl) {
158          LOG.info("Granting permissions for user " + USER.getShortName());
159          Permission.Action[] actions = { Permission.Action.READ };
160          try {
161            AccessControlClient.grant(ConnectionFactory.createConnection(getConf()), tableName,
162                USER.getShortName(), null, null, actions);
163          } catch (Throwable e) {
164            LOG.error(HBaseMarkers.FATAL, "Error in granting permission for the user " +
165                USER.getShortName(), e);
166            throw new IOException(e);
167          }
168        }
169      }
170    }
171
172    @Override
173    protected void setMapperForGenerator(Job job) {
174      job.setMapperClass(VisibilityGeneratorMapper.class);
175    }
176
177    static class VisibilityGeneratorMapper extends GeneratorMapper {
178      BufferedMutator[] tables = new BufferedMutator[DEFAULT_TABLES_COUNT];
179
180      @Override
181      protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException,
182          InterruptedException {
183        super.setup(context);
184      }
185
186      @Override
187      protected void instantiateHTable() throws IOException {
188        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
189          BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i));
190          params.writeBufferSize(4 * 1024 * 1024);
191          BufferedMutator table = connection.getBufferedMutator(params);
192          this.tables[i] = table;
193        }
194      }
195
196      @Override
197      protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
198          throws IOException, InterruptedException {
199        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
200          if (tables[i] != null) {
201            tables[i].close();
202          }
203        }
204      }
205
206      @Override
207      protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count,
208          byte[][] prev, byte[][] current, byte[] id) throws IOException {
209        String visibilityExps = "";
210        String[] split = labels.split(COMMA);
211        for (int i = 0; i < current.length; i++) {
212          for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
213            Put put = new Put(current[i]);
214            byte[] value = prev == null ? NO_KEY : prev[i];
215            put.addColumn(FAMILY_NAME, COLUMN_PREV, value);
216
217            if (count >= 0) {
218              put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
219            }
220            if (id != null) {
221              put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
222            }
223            visibilityExps = split[j * 2] + OR + split[(j * 2) + 1];
224            put.setCellVisibility(new CellVisibility(visibilityExps));
225            tables[j].mutate(put);
226            try {
227              Thread.sleep(1);
228            } catch (InterruptedException e) {
229              throw new IOException();
230            }
231          }
232          if (i % 1000 == 0) {
233            // Tickle progress every so often else maprunner will think us hung
234            output.progress();
235          }
236        }
237      }
238    }
239  }
240
241  static class Copier extends Configured implements Tool {
242    private static final Logger LOG = LoggerFactory.getLogger(Copier.class);
243    private TableName tableName;
244    private int labelIndex;
245    private boolean delete;
246
247    public Copier(TableName tableName, int index, boolean delete) {
248      this.tableName = tableName;
249      this.labelIndex = index;
250      this.delete = delete;
251    }
252
253    public int runCopier(String outputDir) throws Exception {
254      Job job = null;
255      Scan scan = null;
256      job = new Job(getConf());
257      job.setJobName("Data copier");
258      job.getConfiguration().setInt("INDEX", labelIndex);
259      job.getConfiguration().set("LABELS", labels);
260      job.setJarByClass(getClass());
261      scan = new Scan();
262      scan.setCacheBlocks(false);
263      scan.setRaw(true);
264
265      String[] split = labels.split(COMMA);
266      scan.setAuthorizations(new Authorizations(split[this.labelIndex * 2],
267          split[(this.labelIndex * 2) + 1]));
268      if (delete) {
269        LOG.info("Running deletes");
270      } else {
271        LOG.info("Running copiers");
272      }
273      if (delete) {
274        TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
275            VisibilityDeleteImport.class, null, null, job);
276      } else {
277        TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
278            VisibilityImport.class, null, null, job);
279      }
280      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
281      job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false);
282      TableMapReduceUtil.initTableReducerJob(COMMON_TABLE_NAME, null, job, null, null, null, null);
283      TableMapReduceUtil.addDependencyJars(job);
284      TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
285      TableMapReduceUtil.initCredentials(job);
286      job.setNumReduceTasks(0);
287      boolean success = job.waitForCompletion(true);
288      return success ? 0 : 1;
289    }
290
291    @Override
292    public int run(String[] arg0) throws Exception {
293      // TODO Auto-generated method stub
294      return 0;
295    }
296  }
297
298  static class VisibilityImport extends Import.Importer {
299    private int index;
300    private String labels;
301    private String[] split;
302
303    @Override
304    public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
305      index = context.getConfiguration().getInt(INDEX_KEY, -1);
306      labels = context.getConfiguration().get(LABELS_KEY);
307      split = labels.split(COMMA);
308      super.setup(context);
309    }
310
311    @Override
312    protected void addPutToKv(Put put, Cell kv) throws IOException {
313      String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
314      put.setCellVisibility(new CellVisibility(visibilityExps));
315      super.addPutToKv(put, kv);
316    }
317  }
318
319  static class VisibilityDeleteImport extends Import.Importer {
320    private int index;
321    private String labels;
322    private String[] split;
323
324    @Override
325    public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
326      index = context.getConfiguration().getInt(INDEX_KEY, -1);
327      labels = context.getConfiguration().get(LABELS_KEY);
328      split = labels.split(COMMA);
329      super.setup(context);
330    }
331
332    // Creating delete here
333    @Override
334    protected void processKV(ImmutableBytesWritable key, Result result,
335        org.apache.hadoop.mapreduce.Mapper.Context context, Put put,
336        org.apache.hadoop.hbase.client.Delete delete) throws
337        IOException, InterruptedException {
338      String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
339      for (Cell kv : result.rawCells()) {
340        // skip if we filter it out
341        if (kv == null)
342          continue;
343        // Create deletes here
344        if (delete == null) {
345          delete = new Delete(key.get());
346        }
347        delete.setCellVisibility(new CellVisibility(visibilityExps));
348        delete.addFamily(CellUtil.cloneFamily(kv));
349      }
350      if (delete != null) {
351        context.write(key, delete);
352      }
353    }
354  }
355
356  @Override
357  protected void addOptions() {
358    super.addOptions();
359    addOptWithArg("u", USER_OPT, "User name");
360  }
361
362  @Override
363  protected void processOptions(CommandLine cmd) {
364    super.processOptions(cmd);
365    if (cmd.hasOption(USER_OPT)) {
366      userName = cmd.getOptionValue(USER_OPT);
367    }
368
369  }
370  @Override
371  public void setUpCluster() throws Exception {
372    util = getTestingUtil(null);
373    Configuration conf = util.getConfiguration();
374    VisibilityTestUtil.enableVisiblityLabels(conf);
375    conf.set("hbase.superuser", User.getCurrent().getName());
376    conf.setBoolean("dfs.permissions", false);
377    USER = User.createUserForTesting(conf, userName, new String[] {});
378    super.setUpCluster();
379    addLabels();
380  }
381
382  static TableName getTableName(int i) {
383    return TableName.valueOf(tableName + UNDER_SCORE + i);
384  }
385
386  private void addLabels() throws Exception {
387    try {
388      VisibilityClient.addLabels(util.getConnection(), labels.split(COMMA));
389      VisibilityClient.setAuths(util.getConnection(), labels.split(COMMA), USER.getName());
390    } catch (Throwable t) {
391      throw new IOException(t);
392    }
393  }
394
395  static class VisibilityVerify extends Verify {
396    private static final Logger LOG = LoggerFactory.getLogger(VisibilityVerify.class);
397    private TableName tableName;
398    private int labelIndex;
399
400    public VisibilityVerify(String tableName, int index) {
401      this.tableName = TableName.valueOf(tableName);
402      this.labelIndex = index;
403    }
404
405    @Override
406    public int run(final Path outputDir, final int numReducers) throws Exception {
407      LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers);
408      PrivilegedExceptionAction<Integer> scanAction = new PrivilegedExceptionAction<Integer>() {
409        @Override
410        public Integer run() throws Exception {
411          return doVerify(outputDir, numReducers);
412        }
413      };
414      return USER.runAs(scanAction);
415    }
416
417    private int doVerify(Path outputDir, int numReducers) throws IOException, InterruptedException,
418        ClassNotFoundException {
419      job = new Job(getConf());
420
421      job.setJobName("Link Verifier");
422      job.setNumReduceTasks(numReducers);
423      job.setJarByClass(getClass());
424
425      setJobScannerConf(job);
426
427      Scan scan = new Scan();
428      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
429      scan.setCaching(10000);
430      scan.setCacheBlocks(false);
431      String[] split = labels.split(COMMA);
432
433      scan.setAuthorizations(new Authorizations(split[this.labelIndex * 2],
434          split[(this.labelIndex * 2) + 1]));
435
436      TableMapReduceUtil.initTableMapperJob(tableName.getName(), scan, VerifyMapper.class,
437          BytesWritable.class, BytesWritable.class, job);
438      TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
439
440      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
441
442      job.setReducerClass(VerifyReducer.class);
443      job.setOutputFormatClass(TextOutputFormat.class);
444      TextOutputFormat.setOutputPath(job, outputDir);
445      boolean success = job.waitForCompletion(true);
446
447      return success ? 0 : 1;
448    }
449
450    @Override
451    protected void handleFailure(Counters counters) throws IOException {
452      try (Connection conn = ConnectionFactory.createConnection(job.getConfiguration())) {
453        TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
454        CounterGroup g = counters.getGroup("undef");
455        Iterator<Counter> it = g.iterator();
456        while (it.hasNext()) {
457          String keyString = it.next().getName();
458          byte[] key = Bytes.toBytes(keyString);
459          HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true);
460          LOG.error("undefined row " + keyString + ", " + loc);
461        }
462        g = counters.getGroup("unref");
463        it = g.iterator();
464        while (it.hasNext()) {
465          String keyString = it.next().getName();
466          byte[] key = Bytes.toBytes(keyString);
467          HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true);
468          LOG.error("unreferred row " + keyString + ", " + loc);
469        }
470      }
471    }
472  }
473
474  static class VisibilityLoop extends Loop {
475    private static final int SLEEP_IN_MS = 5000;
476    private static final Logger LOG = LoggerFactory.getLogger(VisibilityLoop.class);
477    IntegrationTestBigLinkedListWithVisibility it;
478
479    @Override
480    protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
481        Integer wrapMultiplier, Integer numWalkers) throws Exception {
482      Path outputPath = new Path(outputDir);
483      UUID uuid = UUID.randomUUID(); // create a random UUID.
484      Path generatorOutput = new Path(outputPath, uuid.toString());
485
486      Generator generator = new VisibilityGenerator();
487      generator.setConf(getConf());
488      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier,
489          numWalkers);
490      if (retCode > 0) {
491        throw new RuntimeException("Generator failed with return code: " + retCode);
492      }
493    }
494
495    protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width,
496        Integer wrapMultiplier, int tableIndex) throws Exception {
497      LOG.info("Running copier on table "+IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex));
498      Copier copier = new Copier(
499          IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true);
500      copier.setConf(getConf());
501      copier.runCopier(outputDir);
502      Thread.sleep(SLEEP_IN_MS);
503    }
504
505    protected void runVerify(String outputDir, int numReducers, long expectedNumNodes,
506        boolean allTables) throws Exception {
507      Path outputPath = new Path(outputDir);
508
509      if (allTables) {
510        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
511          LOG.info("Verifying table " + i);
512          sleep(SLEEP_IN_MS);
513          UUID uuid = UUID.randomUUID(); // create a random UUID.
514          Path iterationOutput = new Path(outputPath, uuid.toString());
515          Verify verify = new VisibilityVerify(getTableName(i).getNameAsString(), i);
516          verify(numReducers, expectedNumNodes, iterationOutput, verify);
517        }
518      }
519      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
520        runVerifyCommonTable(outputDir, numReducers, expectedNumNodes, i);
521      }
522    }
523
524    private void runVerify(String outputDir, int numReducers, long expectedNodes, int tableIndex)
525        throws Exception {
526      long temp = expectedNodes;
527      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
528        if (i <= tableIndex) {
529          expectedNodes = 0;
530        } else {
531          expectedNodes = temp;
532        }
533        LOG.info("Verifying data in the table with index "+i+ " and expected nodes is "+expectedNodes);
534        runVerifyCommonTable(outputDir, numReducers, expectedNodes, i);
535      }
536    }
537
538    private void sleep(long ms) throws InterruptedException {
539      Thread.sleep(ms);
540    }
541
542    protected void runVerifyCommonTable(String outputDir, int numReducers, long expectedNumNodes,
543        int index) throws Exception {
544      LOG.info("Verifying common table with index " + index);
545      sleep(SLEEP_IN_MS);
546      Path outputPath = new Path(outputDir);
547      UUID uuid = UUID.randomUUID(); // create a random UUID.
548      Path iterationOutput = new Path(outputPath, uuid.toString());
549      Verify verify = new VisibilityVerify(TableName.valueOf(COMMON_TABLE_NAME).getNameAsString(),
550          index);
551      verify(numReducers, expectedNumNodes, iterationOutput, verify);
552    }
553
554    protected void runCopier(String outputDir) throws Exception {
555      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
556        LOG.info("Running copier " + IntegrationTestBigLinkedListWithVisibility.getTableName(i));
557        sleep(SLEEP_IN_MS);
558        Copier copier = new Copier(IntegrationTestBigLinkedListWithVisibility.getTableName(i), i,
559            false);
560        copier.setConf(getConf());
561        copier.runCopier(outputDir);
562      }
563    }
564
565    private void verify(int numReducers, long expectedNumNodes,
566        Path iterationOutput, Verify verify) throws Exception {
567      verify.setConf(getConf());
568      int retCode = verify.run(iterationOutput, numReducers);
569      if (retCode > 0) {
570        throw new RuntimeException("Verify.run failed with return code: " + retCode);
571      }
572
573      if (!verify.verify(expectedNumNodes)) {
574        throw new RuntimeException("Verify.verify failed");
575      }
576
577      LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
578    }
579
580    @Override
581    public int run(String[] args) throws Exception {
582      if (args.length < 5) {
583        System.err
584            .println("Usage: Loop <num iterations> " +
585                "<num mappers> <num nodes per mapper> <output dir> " +
586                "<num reducers> [<width> <wrap multiplier>]");
587        return 1;
588      }
589      LOG.info("Running Loop with args:" + Arrays.deepToString(args));
590
591      int numIterations = Integer.parseInt(args[0]);
592      int numMappers = Integer.parseInt(args[1]);
593      long numNodes = Long.parseLong(args[2]);
594      String outputDir = args[3];
595      int numReducers = Integer.parseInt(args[4]);
596      Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
597      Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
598      long expectedNumNodes = 0;
599
600      if (numIterations < 0) {
601        numIterations = Integer.MAX_VALUE; // run indefinitely (kind of)
602      }
603
604      for (int i = 0; i < numIterations; i++) {
605        LOG.info("Starting iteration = " + i);
606        LOG.info("Generating data");
607        // By default run no concurrent walkers for test with visibility
608        runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, 0);
609        expectedNumNodes += numMappers * numNodes;
610        // Copying wont work because expressions are not returned back to the
611        // client
612        LOG.info("Running copier");
613        sleep(SLEEP_IN_MS);
614        runCopier(outputDir);
615        LOG.info("Verifying copied data");
616        sleep(SLEEP_IN_MS);
617        runVerify(outputDir, numReducers, expectedNumNodes, true);
618        sleep(SLEEP_IN_MS);
619        for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
620          LOG.info("Deleting data on table with index: "+j);
621          runDelete(numMappers, numNodes, outputDir, width, wrapMultiplier, j);
622          sleep(SLEEP_IN_MS);
623          LOG.info("Verifying common table after deleting");
624          runVerify(outputDir, numReducers, expectedNumNodes, j);
625          sleep(SLEEP_IN_MS);
626        }
627      }
628      return 0;
629    }
630  }
631
632  @Override
633  @Test
634  public void testContinuousIngest() throws IOException, Exception {
635    // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir>
636    // <num reducers>
637    int ret = ToolRunner.run(
638        getTestingUtil(getConf()).getConfiguration(),
639        new VisibilityLoop(),
640        new String[] { "1", "1", "20000",
641            util.getDataTestDirOnTestFS("IntegrationTestBigLinkedListWithVisibility").toString(),
642            "1", "10000" });
643    org.junit.Assert.assertEquals(0, ret);
644  }
645
646  public static void main(String[] args) throws Exception {
647    Configuration conf = HBaseConfiguration.create();
648    IntegrationTestingUtility.setUseDistributedCluster(conf);
649    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedListWithVisibility(), args);
650    System.exit(ret);
651  }
652
653  @Override
654  protected MonkeyFactory getDefaultMonkeyFactory() {
655    return MonkeyFactory.getFactory(MonkeyFactory.CALM);
656  }
657
658  @Override
659  public int runTestFromCommandLine() throws Exception {
660    Tool tool = null;
661    Loop loop = new VisibilityLoop();
662    loop.it = this;
663    tool = loop;
664    return ToolRunner.run(getConf(), tool, otherArgs);
665  }
666}