001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import java.io.IOException;
021import java.security.PrivilegedExceptionAction;
022import java.util.Arrays;
023import java.util.Iterator;
024import java.util.UUID;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.conf.Configured;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellUtil;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HRegionLocation;
032import org.apache.hadoop.hbase.IntegrationTestingUtility;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.BufferedMutator;
037import org.apache.hadoop.hbase.client.BufferedMutatorParams;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
047import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
048import org.apache.hadoop.hbase.log.HBaseMarkers;
049import org.apache.hadoop.hbase.mapreduce.Import;
050import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
051import org.apache.hadoop.hbase.security.User;
052import org.apache.hadoop.hbase.security.access.AccessControlClient;
053import org.apache.hadoop.hbase.security.access.Permission;
054import org.apache.hadoop.hbase.security.visibility.Authorizations;
055import org.apache.hadoop.hbase.security.visibility.CellVisibility;
056import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
057import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
058import org.apache.hadoop.hbase.testclassification.IntegrationTests;
059import org.apache.hadoop.hbase.util.AbstractHBaseTool;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.io.BytesWritable;
062import org.apache.hadoop.mapreduce.Counter;
063import org.apache.hadoop.mapreduce.CounterGroup;
064import org.apache.hadoop.mapreduce.Counters;
065import org.apache.hadoop.mapreduce.Job;
066import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
067import org.apache.hadoop.util.Tool;
068import org.apache.hadoop.util.ToolRunner;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
075import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
076
077/**
078 * IT test used to verify the deletes with visibility labels. The test creates three tables
079 * tablename_0, tablename_1 and tablename_2 and each table is associated with a unique pair of
080 * labels. Another common table with the name 'commontable' is created and it has the data combined
081 * from all these 3 tables such that there are 3 versions of every row but the visibility label in
082 * every row corresponds to the table from which the row originated. Then deletes are issued to the
083 * common table by selecting the visibility label associated with each of the smaller tables. After
084 * the delete is issued with one set of visibility labels we try to scan the common table with each
085 * of the visibility pairs defined for the 3 tables. So after the first delete is issued, a scan
086 * with the first set of visibility labels would return zero result whereas the scan issued with the
087 * other two sets of visibility labels should return all the rows corresponding to that set of
088 * visibility labels. The above process of delete and scan is repeated until after the last set of
089 * visibility labels are used for the deletes the common table should not return any row. To use
090 * this ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedListWithVisibility Loop 1 1
091 * 20000 /tmp 1 10000 or ./hbase org.apache.hadoop.hbase.IntegrationTestsDriver -r
092 * .*IntegrationTestBigLinkedListWithVisibility.*
093 */
094@Category(IntegrationTests.class)
095public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestBigLinkedList {
096
097  private static final String CONFIDENTIAL = "confidential";
098  private static final String TOPSECRET = "topsecret";
099  private static final String SECRET = "secret";
100  private static final String PUBLIC = "public";
101  private static final String PRIVATE = "private";
102  private static final String EVERYONE = "everyone";
103  private static final String RESTRICTED = "restricted";
104  private static final String GROUP = "group";
105  private static final String PREVILIGED = "previliged";
106  private static final String OPEN = "open";
107  public static String labels = CONFIDENTIAL + "," + TOPSECRET + "," + SECRET + "," + RESTRICTED
108    + "," + PRIVATE + "," + PREVILIGED + "," + GROUP + "," + OPEN + "," + PUBLIC + "," + EVERYONE;
109  private static final String COMMA = ",";
110  private static final String UNDER_SCORE = "_";
111  public static int DEFAULT_TABLES_COUNT = 3;
112  public static String tableName = "tableName";
113  public static final String COMMON_TABLE_NAME = "commontable";
114  public static final String LABELS_KEY = "LABELS";
115  public static final String INDEX_KEY = "INDEX";
116  private static User USER;
117  private static final String OR = "|";
118  private static String USER_OPT = "user";
119  private static String userName = "user1";
120
121  static class VisibilityGenerator extends Generator {
122    private static final Logger LOG = LoggerFactory.getLogger(VisibilityGenerator.class);
123
124    @Override
125    protected void createSchema() throws IOException {
126      LOG.info("Creating tables");
127      // Create three tables
128      boolean acl = AccessControlClient
129        .isAccessControllerRunning(ConnectionFactory.createConnection(getConf()));
130      if (!acl) {
131        LOG.info("No ACL available.");
132      }
133      try (Connection conn = ConnectionFactory.createConnection(getConf());
134        Admin admin = conn.getAdmin()) {
135        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
136          TableName tableName = IntegrationTestBigLinkedListWithVisibility.getTableName(i);
137          createTable(admin, tableName, false, acl);
138        }
139        TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
140        createTable(admin, tableName, true, acl);
141      }
142    }
143
144    private void createTable(Admin admin, TableName tableName, boolean setVersion, boolean acl)
145      throws IOException {
146      if (!admin.tableExists(tableName)) {
147        ColumnFamilyDescriptorBuilder cfBuilder =
148          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME);
149        if (setVersion) {
150          cfBuilder.setMaxVersions(DEFAULT_TABLES_COUNT);
151        }
152        TableDescriptor tableDescriptor =
153          TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(cfBuilder.build()).build();
154        admin.createTable(tableDescriptor);
155        if (acl) {
156          LOG.info("Granting permissions for user " + USER.getShortName());
157          Permission.Action[] actions = { Permission.Action.READ };
158          try {
159            AccessControlClient.grant(ConnectionFactory.createConnection(getConf()), tableName,
160              USER.getShortName(), null, null, actions);
161          } catch (Throwable e) {
162            LOG.error(HBaseMarkers.FATAL,
163              "Error in granting permission for the user " + USER.getShortName(), e);
164            throw new IOException(e);
165          }
166        }
167      }
168    }
169
170    @Override
171    protected void setMapperForGenerator(Job job) {
172      job.setMapperClass(VisibilityGeneratorMapper.class);
173    }
174
175    static class VisibilityGeneratorMapper extends GeneratorMapper {
176      BufferedMutator[] tables = new BufferedMutator[DEFAULT_TABLES_COUNT];
177
178      @Override
179      protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
180        throws IOException, InterruptedException {
181        super.setup(context);
182      }
183
184      @Override
185      protected void instantiateHTable() throws IOException {
186        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
187          BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i));
188          params.writeBufferSize(4 * 1024 * 1024);
189          BufferedMutator table = connection.getBufferedMutator(params);
190          this.tables[i] = table;
191        }
192      }
193
194      @Override
195      protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
196        throws IOException, InterruptedException {
197        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
198          if (tables[i] != null) {
199            tables[i].close();
200          }
201        }
202      }
203
204      @Override
205      protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count,
206        byte[][] prev, byte[][] current, byte[] id) throws IOException {
207        String visibilityExps = "";
208        String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new);
209        for (int i = 0; i < current.length; i++) {
210          for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
211            Put put = new Put(current[i]);
212            byte[] value = prev == null ? NO_KEY : prev[i];
213            put.addColumn(FAMILY_NAME, COLUMN_PREV, value);
214
215            if (count >= 0) {
216              put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
217            }
218            if (id != null) {
219              put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
220            }
221            visibilityExps = split[j * 2] + OR + split[(j * 2) + 1];
222            put.setCellVisibility(new CellVisibility(visibilityExps));
223            tables[j].mutate(put);
224            try {
225              Thread.sleep(1);
226            } catch (InterruptedException e) {
227              throw new IOException();
228            }
229          }
230          if (i % 1000 == 0) {
231            // Tickle progress every so often else maprunner will think us hung
232            output.progress();
233          }
234        }
235      }
236    }
237  }
238
239  static class Copier extends Configured implements Tool {
240    private static final Logger LOG = LoggerFactory.getLogger(Copier.class);
241    private TableName tableName;
242    private int labelIndex;
243    private boolean delete;
244
245    public Copier(TableName tableName, int index, boolean delete) {
246      this.tableName = tableName;
247      this.labelIndex = index;
248      this.delete = delete;
249    }
250
251    public int runCopier(String outputDir) throws Exception {
252      Job job = new Job(getConf());
253      job.setJobName("Data copier");
254      job.getConfiguration().setInt("INDEX", labelIndex);
255      job.getConfiguration().set("LABELS", labels);
256      job.setJarByClass(getClass());
257      Scan scan = new Scan();
258      scan.setCacheBlocks(false);
259      scan.setRaw(true);
260
261      String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new);
262      scan.setAuthorizations(
263        new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1]));
264      if (delete) {
265        LOG.info("Running deletes");
266      } else {
267        LOG.info("Running copiers");
268      }
269      if (delete) {
270        TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
271          VisibilityDeleteImport.class, null, null, job);
272      } else {
273        TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
274          VisibilityImport.class, null, null, job);
275      }
276      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
277      job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false);
278      TableMapReduceUtil.initTableReducerJob(COMMON_TABLE_NAME, null, job, null, null, null, null);
279      TableMapReduceUtil.addDependencyJars(job);
280      TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
281      TableMapReduceUtil.initCredentials(job);
282      job.setNumReduceTasks(0);
283      boolean success = job.waitForCompletion(true);
284      return success ? 0 : 1;
285    }
286
287    @Override
288    public int run(String[] arg0) throws Exception {
289      // TODO Auto-generated method stub
290      return 0;
291    }
292  }
293
294  static class VisibilityImport extends Import.Importer {
295    private int index;
296    private String labels;
297    private String[] split;
298
299    @Override
300    public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
301      index = context.getConfiguration().getInt(INDEX_KEY, -1);
302      labels = context.getConfiguration().get(LABELS_KEY);
303      split = labels.split(COMMA);
304      super.setup(context);
305    }
306
307    @Override
308    protected void addPutToKv(Put put, Cell kv) throws IOException {
309      String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
310      put.setCellVisibility(new CellVisibility(visibilityExps));
311      super.addPutToKv(put, kv);
312    }
313  }
314
315  static class VisibilityDeleteImport extends Import.Importer {
316    private int index;
317    private String labels;
318    private String[] split;
319
320    @Override
321    public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
322      index = context.getConfiguration().getInt(INDEX_KEY, -1);
323      labels = context.getConfiguration().get(LABELS_KEY);
324      split = labels.split(COMMA);
325      super.setup(context);
326    }
327
328    // Creating delete here
329    @Override
330    protected void processKV(ImmutableBytesWritable key, Result result,
331      org.apache.hadoop.mapreduce.Mapper.Context context, Put put,
332      org.apache.hadoop.hbase.client.Delete delete) throws IOException, InterruptedException {
333      String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
334      for (Cell kv : result.rawCells()) {
335        // skip if we filter it out
336        if (kv == null) continue;
337        // Create deletes here
338        if (delete == null) {
339          delete = new Delete(key.get());
340        }
341        delete.setCellVisibility(new CellVisibility(visibilityExps));
342        delete.addFamily(CellUtil.cloneFamily(kv));
343      }
344      if (delete != null) {
345        context.write(key, delete);
346      }
347    }
348  }
349
350  @Override
351  protected void addOptions() {
352    super.addOptions();
353    addOptWithArg("u", USER_OPT, "User name");
354  }
355
356  @Override
357  protected void processOptions(CommandLine cmd) {
358    super.processOptions(cmd);
359    if (cmd.hasOption(USER_OPT)) {
360      userName = cmd.getOptionValue(USER_OPT);
361    }
362
363  }
364
365  @Override
366  public void setUpCluster() throws Exception {
367    util = getTestingUtil(null);
368    Configuration conf = util.getConfiguration();
369    VisibilityTestUtil.enableVisiblityLabels(conf);
370    conf.set("hbase.superuser", User.getCurrent().getName());
371    conf.setBoolean("dfs.permissions", false);
372    USER = User.createUserForTesting(conf, userName, new String[] {});
373    super.setUpCluster();
374    addLabels();
375  }
376
377  static TableName getTableName(int i) {
378    return TableName.valueOf(tableName + UNDER_SCORE + i);
379  }
380
381  private void addLabels() throws Exception {
382    try {
383      VisibilityClient.addLabels(util.getConnection(), labels.split(COMMA));
384      VisibilityClient.setAuths(util.getConnection(), labels.split(COMMA), USER.getName());
385    } catch (Throwable t) {
386      throw new IOException(t);
387    }
388  }
389
390  static class VisibilityVerify extends Verify {
391    private static final Logger LOG = LoggerFactory.getLogger(VisibilityVerify.class);
392    private TableName tableName;
393    private int labelIndex;
394
395    public VisibilityVerify(String tableName, int index) {
396      this.tableName = TableName.valueOf(tableName);
397      this.labelIndex = index;
398    }
399
400    @Override
401    public int run(final Path outputDir, final int numReducers) throws Exception {
402      LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers);
403      PrivilegedExceptionAction<Integer> scanAction = new PrivilegedExceptionAction<Integer>() {
404        @Override
405        public Integer run() throws Exception {
406          return doVerify(outputDir, numReducers);
407        }
408      };
409      return USER.runAs(scanAction);
410    }
411
412    private int doVerify(Path outputDir, int numReducers)
413      throws IOException, InterruptedException, ClassNotFoundException {
414      job = new Job(getConf());
415
416      job.setJobName("Link Verifier");
417      job.setNumReduceTasks(numReducers);
418      job.setJarByClass(getClass());
419
420      setJobScannerConf(job);
421
422      Scan scan = new Scan();
423      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
424      scan.setCaching(10000);
425      scan.setCacheBlocks(false);
426      String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new);
427
428      scan.setAuthorizations(
429        new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1]));
430
431      TableMapReduceUtil.initTableMapperJob(tableName.getName(), scan, VerifyMapper.class,
432        BytesWritable.class, BytesWritable.class, job);
433      TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
434
435      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
436
437      job.setReducerClass(VerifyReducer.class);
438      job.setOutputFormatClass(TextOutputFormat.class);
439      TextOutputFormat.setOutputPath(job, outputDir);
440      boolean success = job.waitForCompletion(true);
441
442      return success ? 0 : 1;
443    }
444
445    @Override
446    protected void handleFailure(Counters counters) throws IOException {
447      try (Connection conn = ConnectionFactory.createConnection(job.getConfiguration())) {
448        TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
449        CounterGroup g = counters.getGroup("undef");
450        Iterator<Counter> it = g.iterator();
451        while (it.hasNext()) {
452          String keyString = it.next().getName();
453          byte[] key = Bytes.toBytes(keyString);
454          HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true);
455          LOG.error("undefined row " + keyString + ", " + loc);
456        }
457        g = counters.getGroup("unref");
458        it = g.iterator();
459        while (it.hasNext()) {
460          String keyString = it.next().getName();
461          byte[] key = Bytes.toBytes(keyString);
462          HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true);
463          LOG.error("unreferred row " + keyString + ", " + loc);
464        }
465      }
466    }
467  }
468
469  static class VisibilityLoop extends Loop {
470    private static final int SLEEP_IN_MS = 5000;
471    private static final Logger LOG = LoggerFactory.getLogger(VisibilityLoop.class);
472
473    @Override
474    protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
475      Integer wrapMultiplier, Integer numWalkers) throws Exception {
476      Path outputPath = new Path(outputDir);
477      UUID uuid = UUID.randomUUID(); // create a random UUID.
478      Path generatorOutput = new Path(outputPath, uuid.toString());
479
480      Generator generator = new VisibilityGenerator();
481      generator.setConf(getConf());
482      int retCode =
483        generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, numWalkers);
484      if (retCode > 0) {
485        throw new RuntimeException("Generator failed with return code: " + retCode);
486      }
487    }
488
489    protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width,
490      Integer wrapMultiplier, int tableIndex) throws Exception {
491      LOG.info("Running copier on table "
492        + IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex));
493      Copier copier = new Copier(
494        IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true);
495      copier.setConf(getConf());
496      copier.runCopier(outputDir);
497      Thread.sleep(SLEEP_IN_MS);
498    }
499
500    protected void runVerify(String outputDir, int numReducers, long expectedNumNodes,
501      boolean allTables) throws Exception {
502      Path outputPath = new Path(outputDir);
503
504      if (allTables) {
505        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
506          LOG.info("Verifying table " + i);
507          sleep(SLEEP_IN_MS);
508          UUID uuid = UUID.randomUUID(); // create a random UUID.
509          Path iterationOutput = new Path(outputPath, uuid.toString());
510          Verify verify = new VisibilityVerify(getTableName(i).getNameAsString(), i);
511          verify(numReducers, expectedNumNodes, iterationOutput, verify);
512        }
513      }
514      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
515        runVerifyCommonTable(outputDir, numReducers, expectedNumNodes, i);
516      }
517    }
518
519    private void runVerify(String outputDir, int numReducers, long expectedNodes, int tableIndex)
520      throws Exception {
521      long temp = expectedNodes;
522      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
523        if (i <= tableIndex) {
524          expectedNodes = 0;
525        } else {
526          expectedNodes = temp;
527        }
528        LOG.info("Verifying data in the table with index " + i + " and expected nodes is "
529          + expectedNodes);
530        runVerifyCommonTable(outputDir, numReducers, expectedNodes, i);
531      }
532    }
533
534    private void sleep(long ms) throws InterruptedException {
535      Thread.sleep(ms);
536    }
537
538    protected void runVerifyCommonTable(String outputDir, int numReducers, long expectedNumNodes,
539      int index) throws Exception {
540      LOG.info("Verifying common table with index " + index);
541      sleep(SLEEP_IN_MS);
542      Path outputPath = new Path(outputDir);
543      UUID uuid = UUID.randomUUID(); // create a random UUID.
544      Path iterationOutput = new Path(outputPath, uuid.toString());
545      Verify verify =
546        new VisibilityVerify(TableName.valueOf(COMMON_TABLE_NAME).getNameAsString(), index);
547      verify(numReducers, expectedNumNodes, iterationOutput, verify);
548    }
549
550    protected void runCopier(String outputDir) throws Exception {
551      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
552        LOG.info("Running copier " + IntegrationTestBigLinkedListWithVisibility.getTableName(i));
553        sleep(SLEEP_IN_MS);
554        Copier copier =
555          new Copier(IntegrationTestBigLinkedListWithVisibility.getTableName(i), i, false);
556        copier.setConf(getConf());
557        copier.runCopier(outputDir);
558      }
559    }
560
561    private void verify(int numReducers, long expectedNumNodes, Path iterationOutput, Verify verify)
562      throws Exception {
563      verify.setConf(getConf());
564      int retCode = verify.run(iterationOutput, numReducers);
565      if (retCode > 0) {
566        throw new RuntimeException("Verify.run failed with return code: " + retCode);
567      }
568
569      if (!verify.verify(expectedNumNodes)) {
570        throw new RuntimeException("Verify.verify failed");
571      }
572
573      LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
574    }
575
576    @Override
577    public int run(String[] args) throws Exception {
578      if (args.length < 5) {
579        System.err.println(
580          "Usage: Loop <num iterations> " + "<num mappers> <num nodes per mapper> <output dir> "
581            + "<num reducers> [<width> <wrap multiplier>]");
582        return 1;
583      }
584      LOG.info("Running Loop with args:" + Arrays.deepToString(args));
585
586      int numIterations = Integer.parseInt(args[0]);
587      int numMappers = Integer.parseInt(args[1]);
588      long numNodes = Long.parseLong(args[2]);
589      String outputDir = args[3];
590      int numReducers = Integer.parseInt(args[4]);
591      Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
592      Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
593      long expectedNumNodes = 0;
594
595      if (numIterations < 0) {
596        numIterations = Integer.MAX_VALUE; // run indefinitely (kind of)
597      }
598
599      for (int i = 0; i < numIterations; i++) {
600        LOG.info("Starting iteration = " + i);
601        LOG.info("Generating data");
602        // By default run no concurrent walkers for test with visibility
603        runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, 0);
604        expectedNumNodes += numMappers * numNodes;
605        // Copying wont work because expressions are not returned back to the
606        // client
607        LOG.info("Running copier");
608        sleep(SLEEP_IN_MS);
609        runCopier(outputDir);
610        LOG.info("Verifying copied data");
611        sleep(SLEEP_IN_MS);
612        runVerify(outputDir, numReducers, expectedNumNodes, true);
613        sleep(SLEEP_IN_MS);
614        for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
615          LOG.info("Deleting data on table with index: " + j);
616          runDelete(numMappers, numNodes, outputDir, width, wrapMultiplier, j);
617          sleep(SLEEP_IN_MS);
618          LOG.info("Verifying common table after deleting");
619          runVerify(outputDir, numReducers, expectedNumNodes, j);
620          sleep(SLEEP_IN_MS);
621        }
622      }
623      return 0;
624    }
625  }
626
627  @Override
628  @Test
629  public void testContinuousIngest() throws IOException, Exception {
630    // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir>
631    // <num reducers>
632    int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new VisibilityLoop(),
633      new String[] { "1", "1", "20000",
634        util.getDataTestDirOnTestFS("IntegrationTestBigLinkedListWithVisibility").toString(), "1",
635        "10000" });
636    org.junit.Assert.assertEquals(0, ret);
637  }
638
639  public static void main(String[] args) throws Exception {
640    Configuration conf = HBaseConfiguration.create();
641    IntegrationTestingUtility.setUseDistributedCluster(conf);
642    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedListWithVisibility(), args);
643    System.exit(ret);
644  }
645
646  @Override
647  protected MonkeyFactory getDefaultMonkeyFactory() {
648    return MonkeyFactory.getFactory(MonkeyFactory.CALM);
649  }
650
651  @Override
652  public int runTestFromCommandLine() throws Exception {
653    return ToolRunner.run(getConf(), new VisibilityLoop(), otherArgs);
654  }
655}