001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import java.io.IOException;
021import java.security.PrivilegedExceptionAction;
022import java.util.Arrays;
023import java.util.Iterator;
024import java.util.UUID;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.conf.Configured;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellUtil;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HColumnDescriptor;
032import org.apache.hadoop.hbase.HRegionLocation;
033import org.apache.hadoop.hbase.HTableDescriptor;
034import org.apache.hadoop.hbase.IntegrationTestingUtility;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.BufferedMutator;
039import org.apache.hadoop.hbase.client.BufferedMutatorParams;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.ConnectionFactory;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
047import org.apache.hadoop.hbase.log.HBaseMarkers;
048import org.apache.hadoop.hbase.mapreduce.Import;
049import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
050import org.apache.hadoop.hbase.security.User;
051import org.apache.hadoop.hbase.security.access.AccessControlClient;
052import org.apache.hadoop.hbase.security.access.Permission;
053import org.apache.hadoop.hbase.security.visibility.Authorizations;
054import org.apache.hadoop.hbase.security.visibility.CellVisibility;
055import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
056import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
057import org.apache.hadoop.hbase.testclassification.IntegrationTests;
058import org.apache.hadoop.hbase.util.AbstractHBaseTool;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.io.BytesWritable;
061import org.apache.hadoop.mapreduce.Counter;
062import org.apache.hadoop.mapreduce.CounterGroup;
063import org.apache.hadoop.mapreduce.Counters;
064import org.apache.hadoop.mapreduce.Job;
065import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
066import org.apache.hadoop.util.Tool;
067import org.apache.hadoop.util.ToolRunner;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
074import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
075
076/**
077 * IT test used to verify the deletes with visibility labels. The test creates three tables
078 * tablename_0, tablename_1 and tablename_2 and each table is associated with a unique pair of
079 * labels. Another common table with the name 'commontable' is created and it has the data combined
080 * from all these 3 tables such that there are 3 versions of every row but the visibility label in
081 * every row corresponds to the table from which the row originated. Then deletes are issued to the
082 * common table by selecting the visibility label associated with each of the smaller tables. After
083 * the delete is issued with one set of visibility labels we try to scan the common table with each
084 * of the visibility pairs defined for the 3 tables. So after the first delete is issued, a scan
085 * with the first set of visibility labels would return zero result whereas the scan issued with the
086 * other two sets of visibility labels should return all the rows corresponding to that set of
087 * visibility labels. The above process of delete and scan is repeated until after the last set of
088 * visibility labels are used for the deletes the common table should not return any row. To use
089 * this ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedListWithVisibility Loop 1 1
090 * 20000 /tmp 1 10000 or ./hbase org.apache.hadoop.hbase.IntegrationTestsDriver -r
091 * .*IntegrationTestBigLinkedListWithVisibility.*
092 */
093@Category(IntegrationTests.class)
094public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestBigLinkedList {
095
096  private static final String CONFIDENTIAL = "confidential";
097  private static final String TOPSECRET = "topsecret";
098  private static final String SECRET = "secret";
099  private static final String PUBLIC = "public";
100  private static final String PRIVATE = "private";
101  private static final String EVERYONE = "everyone";
102  private static final String RESTRICTED = "restricted";
103  private static final String GROUP = "group";
104  private static final String PREVILIGED = "previliged";
105  private static final String OPEN = "open";
106  public static String labels = CONFIDENTIAL + "," + TOPSECRET + "," + SECRET + "," + RESTRICTED
107    + "," + PRIVATE + "," + PREVILIGED + "," + GROUP + "," + OPEN + "," + PUBLIC + "," + EVERYONE;
108  private static final String COMMA = ",";
109  private static final String UNDER_SCORE = "_";
110  public static int DEFAULT_TABLES_COUNT = 3;
111  public static String tableName = "tableName";
112  public static final String COMMON_TABLE_NAME = "commontable";
113  public static final String LABELS_KEY = "LABELS";
114  public static final String INDEX_KEY = "INDEX";
115  private static User USER;
116  private static final String OR = "|";
117  private static String USER_OPT = "user";
118  private static String userName = "user1";
119
120  static class VisibilityGenerator extends Generator {
121    private static final Logger LOG = LoggerFactory.getLogger(VisibilityGenerator.class);
122
123    @Override
124    protected void createSchema() throws IOException {
125      LOG.info("Creating tables");
126      // Create three tables
127      boolean acl = AccessControlClient
128        .isAccessControllerRunning(ConnectionFactory.createConnection(getConf()));
129      if (!acl) {
130        LOG.info("No ACL available.");
131      }
132      try (Connection conn = ConnectionFactory.createConnection(getConf());
133        Admin admin = conn.getAdmin()) {
134        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
135          TableName tableName = IntegrationTestBigLinkedListWithVisibility.getTableName(i);
136          createTable(admin, tableName, false, acl);
137        }
138        TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
139        createTable(admin, tableName, true, acl);
140      }
141    }
142
143    private void createTable(Admin admin, TableName tableName, boolean setVersion, boolean acl)
144      throws IOException {
145      if (!admin.tableExists(tableName)) {
146        HTableDescriptor htd = new HTableDescriptor(tableName);
147        HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
148        if (setVersion) {
149          family.setMaxVersions(DEFAULT_TABLES_COUNT);
150        }
151        htd.addFamily(family);
152        admin.createTable(htd);
153        if (acl) {
154          LOG.info("Granting permissions for user " + USER.getShortName());
155          Permission.Action[] actions = { Permission.Action.READ };
156          try {
157            AccessControlClient.grant(ConnectionFactory.createConnection(getConf()), tableName,
158              USER.getShortName(), null, null, actions);
159          } catch (Throwable e) {
160            LOG.error(HBaseMarkers.FATAL,
161              "Error in granting permission for the user " + USER.getShortName(), e);
162            throw new IOException(e);
163          }
164        }
165      }
166    }
167
168    @Override
169    protected void setMapperForGenerator(Job job) {
170      job.setMapperClass(VisibilityGeneratorMapper.class);
171    }
172
173    static class VisibilityGeneratorMapper extends GeneratorMapper {
174      BufferedMutator[] tables = new BufferedMutator[DEFAULT_TABLES_COUNT];
175
176      @Override
177      protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
178        throws IOException, InterruptedException {
179        super.setup(context);
180      }
181
182      @Override
183      protected void instantiateHTable() throws IOException {
184        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
185          BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i));
186          params.writeBufferSize(4 * 1024 * 1024);
187          BufferedMutator table = connection.getBufferedMutator(params);
188          this.tables[i] = table;
189        }
190      }
191
192      @Override
193      protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
194        throws IOException, InterruptedException {
195        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
196          if (tables[i] != null) {
197            tables[i].close();
198          }
199        }
200      }
201
202      @Override
203      protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count,
204        byte[][] prev, byte[][] current, byte[] id) throws IOException {
205        String visibilityExps = "";
206        String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new);
207        for (int i = 0; i < current.length; i++) {
208          for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
209            Put put = new Put(current[i]);
210            byte[] value = prev == null ? NO_KEY : prev[i];
211            put.addColumn(FAMILY_NAME, COLUMN_PREV, value);
212
213            if (count >= 0) {
214              put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
215            }
216            if (id != null) {
217              put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
218            }
219            visibilityExps = split[j * 2] + OR + split[(j * 2) + 1];
220            put.setCellVisibility(new CellVisibility(visibilityExps));
221            tables[j].mutate(put);
222            try {
223              Thread.sleep(1);
224            } catch (InterruptedException e) {
225              throw new IOException();
226            }
227          }
228          if (i % 1000 == 0) {
229            // Tickle progress every so often else maprunner will think us hung
230            output.progress();
231          }
232        }
233      }
234    }
235  }
236
237  static class Copier extends Configured implements Tool {
238    private static final Logger LOG = LoggerFactory.getLogger(Copier.class);
239    private TableName tableName;
240    private int labelIndex;
241    private boolean delete;
242
243    public Copier(TableName tableName, int index, boolean delete) {
244      this.tableName = tableName;
245      this.labelIndex = index;
246      this.delete = delete;
247    }
248
249    public int runCopier(String outputDir) throws Exception {
250      Job job = new Job(getConf());
251      job.setJobName("Data copier");
252      job.getConfiguration().setInt("INDEX", labelIndex);
253      job.getConfiguration().set("LABELS", labels);
254      job.setJarByClass(getClass());
255      Scan scan = new Scan();
256      scan.setCacheBlocks(false);
257      scan.setRaw(true);
258
259      String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new);
260      scan.setAuthorizations(
261        new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1]));
262      if (delete) {
263        LOG.info("Running deletes");
264      } else {
265        LOG.info("Running copiers");
266      }
267      if (delete) {
268        TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
269          VisibilityDeleteImport.class, null, null, job);
270      } else {
271        TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
272          VisibilityImport.class, null, null, job);
273      }
274      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
275      job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false);
276      TableMapReduceUtil.initTableReducerJob(COMMON_TABLE_NAME, null, job, null, null, null, null);
277      TableMapReduceUtil.addDependencyJars(job);
278      TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
279      TableMapReduceUtil.initCredentials(job);
280      job.setNumReduceTasks(0);
281      boolean success = job.waitForCompletion(true);
282      return success ? 0 : 1;
283    }
284
285    @Override
286    public int run(String[] arg0) throws Exception {
287      // TODO Auto-generated method stub
288      return 0;
289    }
290  }
291
292  static class VisibilityImport extends Import.Importer {
293    private int index;
294    private String labels;
295    private String[] split;
296
297    @Override
298    public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
299      index = context.getConfiguration().getInt(INDEX_KEY, -1);
300      labels = context.getConfiguration().get(LABELS_KEY);
301      split = labels.split(COMMA);
302      super.setup(context);
303    }
304
305    @Override
306    protected void addPutToKv(Put put, Cell kv) throws IOException {
307      String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
308      put.setCellVisibility(new CellVisibility(visibilityExps));
309      super.addPutToKv(put, kv);
310    }
311  }
312
313  static class VisibilityDeleteImport extends Import.Importer {
314    private int index;
315    private String labels;
316    private String[] split;
317
318    @Override
319    public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
320      index = context.getConfiguration().getInt(INDEX_KEY, -1);
321      labels = context.getConfiguration().get(LABELS_KEY);
322      split = labels.split(COMMA);
323      super.setup(context);
324    }
325
326    // Creating delete here
327    @Override
328    protected void processKV(ImmutableBytesWritable key, Result result,
329      org.apache.hadoop.mapreduce.Mapper.Context context, Put put,
330      org.apache.hadoop.hbase.client.Delete delete) throws IOException, InterruptedException {
331      String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
332      for (Cell kv : result.rawCells()) {
333        // skip if we filter it out
334        if (kv == null) continue;
335        // Create deletes here
336        if (delete == null) {
337          delete = new Delete(key.get());
338        }
339        delete.setCellVisibility(new CellVisibility(visibilityExps));
340        delete.addFamily(CellUtil.cloneFamily(kv));
341      }
342      if (delete != null) {
343        context.write(key, delete);
344      }
345    }
346  }
347
348  @Override
349  protected void addOptions() {
350    super.addOptions();
351    addOptWithArg("u", USER_OPT, "User name");
352  }
353
354  @Override
355  protected void processOptions(CommandLine cmd) {
356    super.processOptions(cmd);
357    if (cmd.hasOption(USER_OPT)) {
358      userName = cmd.getOptionValue(USER_OPT);
359    }
360
361  }
362
363  @Override
364  public void setUpCluster() throws Exception {
365    util = getTestingUtil(null);
366    Configuration conf = util.getConfiguration();
367    VisibilityTestUtil.enableVisiblityLabels(conf);
368    conf.set("hbase.superuser", User.getCurrent().getName());
369    conf.setBoolean("dfs.permissions", false);
370    USER = User.createUserForTesting(conf, userName, new String[] {});
371    super.setUpCluster();
372    addLabels();
373  }
374
375  static TableName getTableName(int i) {
376    return TableName.valueOf(tableName + UNDER_SCORE + i);
377  }
378
379  private void addLabels() throws Exception {
380    try {
381      VisibilityClient.addLabels(util.getConnection(), labels.split(COMMA));
382      VisibilityClient.setAuths(util.getConnection(), labels.split(COMMA), USER.getName());
383    } catch (Throwable t) {
384      throw new IOException(t);
385    }
386  }
387
388  static class VisibilityVerify extends Verify {
389    private static final Logger LOG = LoggerFactory.getLogger(VisibilityVerify.class);
390    private TableName tableName;
391    private int labelIndex;
392
393    public VisibilityVerify(String tableName, int index) {
394      this.tableName = TableName.valueOf(tableName);
395      this.labelIndex = index;
396    }
397
398    @Override
399    public int run(final Path outputDir, final int numReducers) throws Exception {
400      LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers);
401      PrivilegedExceptionAction<Integer> scanAction = new PrivilegedExceptionAction<Integer>() {
402        @Override
403        public Integer run() throws Exception {
404          return doVerify(outputDir, numReducers);
405        }
406      };
407      return USER.runAs(scanAction);
408    }
409
410    private int doVerify(Path outputDir, int numReducers)
411      throws IOException, InterruptedException, ClassNotFoundException {
412      job = new Job(getConf());
413
414      job.setJobName("Link Verifier");
415      job.setNumReduceTasks(numReducers);
416      job.setJarByClass(getClass());
417
418      setJobScannerConf(job);
419
420      Scan scan = new Scan();
421      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
422      scan.setCaching(10000);
423      scan.setCacheBlocks(false);
424      String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new);
425
426      scan.setAuthorizations(
427        new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1]));
428
429      TableMapReduceUtil.initTableMapperJob(tableName.getName(), scan, VerifyMapper.class,
430        BytesWritable.class, BytesWritable.class, job);
431      TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
432
433      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
434
435      job.setReducerClass(VerifyReducer.class);
436      job.setOutputFormatClass(TextOutputFormat.class);
437      TextOutputFormat.setOutputPath(job, outputDir);
438      boolean success = job.waitForCompletion(true);
439
440      return success ? 0 : 1;
441    }
442
443    @Override
444    protected void handleFailure(Counters counters) throws IOException {
445      try (Connection conn = ConnectionFactory.createConnection(job.getConfiguration())) {
446        TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
447        CounterGroup g = counters.getGroup("undef");
448        Iterator<Counter> it = g.iterator();
449        while (it.hasNext()) {
450          String keyString = it.next().getName();
451          byte[] key = Bytes.toBytes(keyString);
452          HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true);
453          LOG.error("undefined row " + keyString + ", " + loc);
454        }
455        g = counters.getGroup("unref");
456        it = g.iterator();
457        while (it.hasNext()) {
458          String keyString = it.next().getName();
459          byte[] key = Bytes.toBytes(keyString);
460          HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true);
461          LOG.error("unreferred row " + keyString + ", " + loc);
462        }
463      }
464    }
465  }
466
467  static class VisibilityLoop extends Loop {
468    private static final int SLEEP_IN_MS = 5000;
469    private static final Logger LOG = LoggerFactory.getLogger(VisibilityLoop.class);
470
471    @Override
472    protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
473      Integer wrapMultiplier, Integer numWalkers) throws Exception {
474      Path outputPath = new Path(outputDir);
475      UUID uuid = UUID.randomUUID(); // create a random UUID.
476      Path generatorOutput = new Path(outputPath, uuid.toString());
477
478      Generator generator = new VisibilityGenerator();
479      generator.setConf(getConf());
480      int retCode =
481        generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, numWalkers);
482      if (retCode > 0) {
483        throw new RuntimeException("Generator failed with return code: " + retCode);
484      }
485    }
486
487    protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width,
488      Integer wrapMultiplier, int tableIndex) throws Exception {
489      LOG.info("Running copier on table "
490        + IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex));
491      Copier copier = new Copier(
492        IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true);
493      copier.setConf(getConf());
494      copier.runCopier(outputDir);
495      Thread.sleep(SLEEP_IN_MS);
496    }
497
498    protected void runVerify(String outputDir, int numReducers, long expectedNumNodes,
499      boolean allTables) throws Exception {
500      Path outputPath = new Path(outputDir);
501
502      if (allTables) {
503        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
504          LOG.info("Verifying table " + i);
505          sleep(SLEEP_IN_MS);
506          UUID uuid = UUID.randomUUID(); // create a random UUID.
507          Path iterationOutput = new Path(outputPath, uuid.toString());
508          Verify verify = new VisibilityVerify(getTableName(i).getNameAsString(), i);
509          verify(numReducers, expectedNumNodes, iterationOutput, verify);
510        }
511      }
512      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
513        runVerifyCommonTable(outputDir, numReducers, expectedNumNodes, i);
514      }
515    }
516
517    private void runVerify(String outputDir, int numReducers, long expectedNodes, int tableIndex)
518      throws Exception {
519      long temp = expectedNodes;
520      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
521        if (i <= tableIndex) {
522          expectedNodes = 0;
523        } else {
524          expectedNodes = temp;
525        }
526        LOG.info("Verifying data in the table with index " + i + " and expected nodes is "
527          + expectedNodes);
528        runVerifyCommonTable(outputDir, numReducers, expectedNodes, i);
529      }
530    }
531
532    private void sleep(long ms) throws InterruptedException {
533      Thread.sleep(ms);
534    }
535
536    protected void runVerifyCommonTable(String outputDir, int numReducers, long expectedNumNodes,
537      int index) throws Exception {
538      LOG.info("Verifying common table with index " + index);
539      sleep(SLEEP_IN_MS);
540      Path outputPath = new Path(outputDir);
541      UUID uuid = UUID.randomUUID(); // create a random UUID.
542      Path iterationOutput = new Path(outputPath, uuid.toString());
543      Verify verify =
544        new VisibilityVerify(TableName.valueOf(COMMON_TABLE_NAME).getNameAsString(), index);
545      verify(numReducers, expectedNumNodes, iterationOutput, verify);
546    }
547
548    protected void runCopier(String outputDir) throws Exception {
549      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
550        LOG.info("Running copier " + IntegrationTestBigLinkedListWithVisibility.getTableName(i));
551        sleep(SLEEP_IN_MS);
552        Copier copier =
553          new Copier(IntegrationTestBigLinkedListWithVisibility.getTableName(i), i, false);
554        copier.setConf(getConf());
555        copier.runCopier(outputDir);
556      }
557    }
558
559    private void verify(int numReducers, long expectedNumNodes, Path iterationOutput, Verify verify)
560      throws Exception {
561      verify.setConf(getConf());
562      int retCode = verify.run(iterationOutput, numReducers);
563      if (retCode > 0) {
564        throw new RuntimeException("Verify.run failed with return code: " + retCode);
565      }
566
567      if (!verify.verify(expectedNumNodes)) {
568        throw new RuntimeException("Verify.verify failed");
569      }
570
571      LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
572    }
573
574    @Override
575    public int run(String[] args) throws Exception {
576      if (args.length < 5) {
577        System.err.println(
578          "Usage: Loop <num iterations> " + "<num mappers> <num nodes per mapper> <output dir> "
579            + "<num reducers> [<width> <wrap multiplier>]");
580        return 1;
581      }
582      LOG.info("Running Loop with args:" + Arrays.deepToString(args));
583
584      int numIterations = Integer.parseInt(args[0]);
585      int numMappers = Integer.parseInt(args[1]);
586      long numNodes = Long.parseLong(args[2]);
587      String outputDir = args[3];
588      int numReducers = Integer.parseInt(args[4]);
589      Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
590      Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
591      long expectedNumNodes = 0;
592
593      if (numIterations < 0) {
594        numIterations = Integer.MAX_VALUE; // run indefinitely (kind of)
595      }
596
597      for (int i = 0; i < numIterations; i++) {
598        LOG.info("Starting iteration = " + i);
599        LOG.info("Generating data");
600        // By default run no concurrent walkers for test with visibility
601        runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, 0);
602        expectedNumNodes += numMappers * numNodes;
603        // Copying wont work because expressions are not returned back to the
604        // client
605        LOG.info("Running copier");
606        sleep(SLEEP_IN_MS);
607        runCopier(outputDir);
608        LOG.info("Verifying copied data");
609        sleep(SLEEP_IN_MS);
610        runVerify(outputDir, numReducers, expectedNumNodes, true);
611        sleep(SLEEP_IN_MS);
612        for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
613          LOG.info("Deleting data on table with index: " + j);
614          runDelete(numMappers, numNodes, outputDir, width, wrapMultiplier, j);
615          sleep(SLEEP_IN_MS);
616          LOG.info("Verifying common table after deleting");
617          runVerify(outputDir, numReducers, expectedNumNodes, j);
618          sleep(SLEEP_IN_MS);
619        }
620      }
621      return 0;
622    }
623  }
624
625  @Override
626  @Test
627  public void testContinuousIngest() throws IOException, Exception {
628    // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir>
629    // <num reducers>
630    int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new VisibilityLoop(),
631      new String[] { "1", "1", "20000",
632        util.getDataTestDirOnTestFS("IntegrationTestBigLinkedListWithVisibility").toString(), "1",
633        "10000" });
634    org.junit.Assert.assertEquals(0, ret);
635  }
636
637  public static void main(String[] args) throws Exception {
638    Configuration conf = HBaseConfiguration.create();
639    IntegrationTestingUtility.setUseDistributedCluster(conf);
640    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedListWithVisibility(), args);
641    System.exit(ret);
642  }
643
644  @Override
645  protected MonkeyFactory getDefaultMonkeyFactory() {
646    return MonkeyFactory.getFactory(MonkeyFactory.CALM);
647  }
648
649  @Override
650  public int runTestFromCommandLine() throws Exception {
651    return ToolRunner.run(getConf(), new VisibilityLoop(), otherArgs);
652  }
653}