001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import java.io.IOException;
021import java.security.PrivilegedExceptionAction;
022import java.util.Arrays;
023import java.util.Iterator;
024import java.util.UUID;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.conf.Configured;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellUtil;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HRegionLocation;
032import org.apache.hadoop.hbase.IntegrationTestingUtility;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.BufferedMutator;
037import org.apache.hadoop.hbase.client.BufferedMutatorParams;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
047import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
048import org.apache.hadoop.hbase.log.HBaseMarkers;
049import org.apache.hadoop.hbase.mapreduce.Import;
050import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
051import org.apache.hadoop.hbase.security.User;
052import org.apache.hadoop.hbase.security.access.AccessControlClient;
053import org.apache.hadoop.hbase.security.access.Permission;
054import org.apache.hadoop.hbase.security.visibility.Authorizations;
055import org.apache.hadoop.hbase.security.visibility.CellVisibility;
056import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
057import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
058import org.apache.hadoop.hbase.testclassification.IntegrationTests;
059import org.apache.hadoop.hbase.util.AbstractHBaseTool;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.io.BytesWritable;
062import org.apache.hadoop.mapreduce.Counter;
063import org.apache.hadoop.mapreduce.CounterGroup;
064import org.apache.hadoop.mapreduce.Counters;
065import org.apache.hadoop.mapreduce.Job;
066import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
067import org.apache.hadoop.util.Tool;
068import org.apache.hadoop.util.ToolRunner;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
075
076/**
077 * IT test used to verify the deletes with visibility labels.
078 * The test creates three tables tablename_0, tablename_1 and tablename_2 and each table
079 * is associated with a unique pair of labels.
080 * Another common table with the name 'commontable' is created and it has the data combined
081 * from all these 3 tables such that there are 3 versions of every row but the visibility label
082 * in every row corresponds to the table from which the row originated.
083 * Then deletes are issued to the common table by selecting the visibility label
084 * associated with each of the smaller tables.
085 * After the delete is issued with one set of visibility labels we try to scan the common table
086 * with each of the visibility pairs defined for the 3 tables.
087 * So after the first delete is issued, a scan with the first set of visibility labels would
088 * return zero result whereas the scan issued with the other two sets of visibility labels
089 * should return all the rows corresponding to that set of visibility labels.  The above
090 * process of delete and scan is repeated until after the last set of visibility labels are
091 * used for the deletes the common table should not return any row.
092 *
093 * To use this
094 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedListWithVisibility Loop 1 1 20000 /tmp 1 10000
095 * or
096 * ./hbase org.apache.hadoop.hbase.IntegrationTestsDriver -r .*IntegrationTestBigLinkedListWithVisibility.*
097 */
098@Category(IntegrationTests.class)
099public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestBigLinkedList {
100
101  private static final String CONFIDENTIAL = "confidential";
102  private static final String TOPSECRET = "topsecret";
103  private static final String SECRET = "secret";
104  private static final String PUBLIC = "public";
105  private static final String PRIVATE = "private";
106  private static final String EVERYONE = "everyone";
107  private static final String RESTRICTED = "restricted";
108  private static final String GROUP = "group";
109  private static final String PREVILIGED = "previliged";
110  private static final String OPEN = "open";
111  public static String labels = CONFIDENTIAL + "," + TOPSECRET + "," + SECRET + "," + RESTRICTED
112      + "," + PRIVATE + "," + PREVILIGED + "," + GROUP + "," + OPEN + "," + PUBLIC + "," + EVERYONE;
113  private static final String COMMA = ",";
114  private static final String UNDER_SCORE = "_";
115  public static int DEFAULT_TABLES_COUNT = 3;
116  public static String tableName = "tableName";
117  public static final String COMMON_TABLE_NAME = "commontable";
118  public static final String LABELS_KEY = "LABELS";
119  public static final String INDEX_KEY = "INDEX";
120  private static User USER;
121  private static final String OR = "|";
122  private static String USER_OPT = "user";
123  private static String userName = "user1";
124
125  static class VisibilityGenerator extends Generator {
126    private static final Logger LOG = LoggerFactory.getLogger(VisibilityGenerator.class);
127
128    @Override
129    protected void createSchema() throws IOException {
130      LOG.info("Creating tables");
131      // Create three tables
132      boolean acl = AccessControlClient.isAccessControllerRunning(ConnectionFactory
133          .createConnection(getConf()));
134      if(!acl) {
135        LOG.info("No ACL available.");
136      }
137      try (Connection conn = ConnectionFactory.createConnection(getConf());
138          Admin admin = conn.getAdmin()) {
139        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
140          TableName tableName = IntegrationTestBigLinkedListWithVisibility.getTableName(i);
141          createTable(admin, tableName, false, acl);
142        }
143        TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
144        createTable(admin, tableName, true, acl);
145      }
146    }
147
148    private void createTable(Admin admin, TableName tableName, boolean setVersion,
149        boolean acl) throws IOException {
150      if (!admin.tableExists(tableName)) {
151        ColumnFamilyDescriptorBuilder cfBuilder =
152          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME);
153        if (setVersion) {
154          cfBuilder.setMaxVersions(DEFAULT_TABLES_COUNT);
155        }
156        TableDescriptor tableDescriptor =
157          TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(cfBuilder.build()).build();
158        admin.createTable(tableDescriptor);
159        if (acl) {
160          LOG.info("Granting permissions for user " + USER.getShortName());
161          Permission.Action[] actions = { Permission.Action.READ };
162          try {
163            AccessControlClient.grant(ConnectionFactory.createConnection(getConf()), tableName,
164                USER.getShortName(), null, null, actions);
165          } catch (Throwable e) {
166            LOG.error(HBaseMarkers.FATAL, "Error in granting permission for the user " +
167                USER.getShortName(), e);
168            throw new IOException(e);
169          }
170        }
171      }
172    }
173
174    @Override
175    protected void setMapperForGenerator(Job job) {
176      job.setMapperClass(VisibilityGeneratorMapper.class);
177    }
178
179    static class VisibilityGeneratorMapper extends GeneratorMapper {
180      BufferedMutator[] tables = new BufferedMutator[DEFAULT_TABLES_COUNT];
181
182      @Override
183      protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException,
184          InterruptedException {
185        super.setup(context);
186      }
187
188      @Override
189      protected void instantiateHTable() throws IOException {
190        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
191          BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i));
192          params.writeBufferSize(4 * 1024 * 1024);
193          BufferedMutator table = connection.getBufferedMutator(params);
194          this.tables[i] = table;
195        }
196      }
197
198      @Override
199      protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
200          throws IOException, InterruptedException {
201        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
202          if (tables[i] != null) {
203            tables[i].close();
204          }
205        }
206      }
207
208      @Override
209      protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count,
210          byte[][] prev, byte[][] current, byte[] id) throws IOException {
211        String visibilityExps = "";
212        String[] split = labels.split(COMMA);
213        for (int i = 0; i < current.length; i++) {
214          for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
215            Put put = new Put(current[i]);
216            byte[] value = prev == null ? NO_KEY : prev[i];
217            put.addColumn(FAMILY_NAME, COLUMN_PREV, value);
218
219            if (count >= 0) {
220              put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
221            }
222            if (id != null) {
223              put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
224            }
225            visibilityExps = split[j * 2] + OR + split[(j * 2) + 1];
226            put.setCellVisibility(new CellVisibility(visibilityExps));
227            tables[j].mutate(put);
228            try {
229              Thread.sleep(1);
230            } catch (InterruptedException e) {
231              throw new IOException();
232            }
233          }
234          if (i % 1000 == 0) {
235            // Tickle progress every so often else maprunner will think us hung
236            output.progress();
237          }
238        }
239      }
240    }
241  }
242
243  static class Copier extends Configured implements Tool {
244    private static final Logger LOG = LoggerFactory.getLogger(Copier.class);
245    private TableName tableName;
246    private int labelIndex;
247    private boolean delete;
248
249    public Copier(TableName tableName, int index, boolean delete) {
250      this.tableName = tableName;
251      this.labelIndex = index;
252      this.delete = delete;
253    }
254
255    public int runCopier(String outputDir) throws Exception {
256      Job job = null;
257      Scan scan = null;
258      job = new Job(getConf());
259      job.setJobName("Data copier");
260      job.getConfiguration().setInt("INDEX", labelIndex);
261      job.getConfiguration().set("LABELS", labels);
262      job.setJarByClass(getClass());
263      scan = new Scan();
264      scan.setCacheBlocks(false);
265      scan.setRaw(true);
266
267      String[] split = labels.split(COMMA);
268      scan.setAuthorizations(new Authorizations(split[this.labelIndex * 2],
269          split[(this.labelIndex * 2) + 1]));
270      if (delete) {
271        LOG.info("Running deletes");
272      } else {
273        LOG.info("Running copiers");
274      }
275      if (delete) {
276        TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
277            VisibilityDeleteImport.class, null, null, job);
278      } else {
279        TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
280            VisibilityImport.class, null, null, job);
281      }
282      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
283      job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false);
284      TableMapReduceUtil.initTableReducerJob(COMMON_TABLE_NAME, null, job, null, null, null, null);
285      TableMapReduceUtil.addDependencyJars(job);
286      TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
287      TableMapReduceUtil.initCredentials(job);
288      job.setNumReduceTasks(0);
289      boolean success = job.waitForCompletion(true);
290      return success ? 0 : 1;
291    }
292
293    @Override
294    public int run(String[] arg0) throws Exception {
295      // TODO Auto-generated method stub
296      return 0;
297    }
298  }
299
300  static class VisibilityImport extends Import.Importer {
301    private int index;
302    private String labels;
303    private String[] split;
304
305    @Override
306    public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
307      index = context.getConfiguration().getInt(INDEX_KEY, -1);
308      labels = context.getConfiguration().get(LABELS_KEY);
309      split = labels.split(COMMA);
310      super.setup(context);
311    }
312
313    @Override
314    protected void addPutToKv(Put put, Cell kv) throws IOException {
315      String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
316      put.setCellVisibility(new CellVisibility(visibilityExps));
317      super.addPutToKv(put, kv);
318    }
319  }
320
321  static class VisibilityDeleteImport extends Import.Importer {
322    private int index;
323    private String labels;
324    private String[] split;
325
326    @Override
327    public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
328      index = context.getConfiguration().getInt(INDEX_KEY, -1);
329      labels = context.getConfiguration().get(LABELS_KEY);
330      split = labels.split(COMMA);
331      super.setup(context);
332    }
333
334    // Creating delete here
335    @Override
336    protected void processKV(ImmutableBytesWritable key, Result result,
337        org.apache.hadoop.mapreduce.Mapper.Context context, Put put,
338        org.apache.hadoop.hbase.client.Delete delete) throws
339        IOException, InterruptedException {
340      String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
341      for (Cell kv : result.rawCells()) {
342        // skip if we filter it out
343        if (kv == null)
344          continue;
345        // Create deletes here
346        if (delete == null) {
347          delete = new Delete(key.get());
348        }
349        delete.setCellVisibility(new CellVisibility(visibilityExps));
350        delete.addFamily(CellUtil.cloneFamily(kv));
351      }
352      if (delete != null) {
353        context.write(key, delete);
354      }
355    }
356  }
357
358  @Override
359  protected void addOptions() {
360    super.addOptions();
361    addOptWithArg("u", USER_OPT, "User name");
362  }
363
364  @Override
365  protected void processOptions(CommandLine cmd) {
366    super.processOptions(cmd);
367    if (cmd.hasOption(USER_OPT)) {
368      userName = cmd.getOptionValue(USER_OPT);
369    }
370
371  }
372  @Override
373  public void setUpCluster() throws Exception {
374    util = getTestingUtil(null);
375    Configuration conf = util.getConfiguration();
376    VisibilityTestUtil.enableVisiblityLabels(conf);
377    conf.set("hbase.superuser", User.getCurrent().getName());
378    conf.setBoolean("dfs.permissions", false);
379    USER = User.createUserForTesting(conf, userName, new String[] {});
380    super.setUpCluster();
381    addLabels();
382  }
383
384  static TableName getTableName(int i) {
385    return TableName.valueOf(tableName + UNDER_SCORE + i);
386  }
387
388  private void addLabels() throws Exception {
389    try {
390      VisibilityClient.addLabels(util.getConnection(), labels.split(COMMA));
391      VisibilityClient.setAuths(util.getConnection(), labels.split(COMMA), USER.getName());
392    } catch (Throwable t) {
393      throw new IOException(t);
394    }
395  }
396
397  static class VisibilityVerify extends Verify {
398    private static final Logger LOG = LoggerFactory.getLogger(VisibilityVerify.class);
399    private TableName tableName;
400    private int labelIndex;
401
402    public VisibilityVerify(String tableName, int index) {
403      this.tableName = TableName.valueOf(tableName);
404      this.labelIndex = index;
405    }
406
407    @Override
408    public int run(final Path outputDir, final int numReducers) throws Exception {
409      LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers);
410      PrivilegedExceptionAction<Integer> scanAction = new PrivilegedExceptionAction<Integer>() {
411        @Override
412        public Integer run() throws Exception {
413          return doVerify(outputDir, numReducers);
414        }
415      };
416      return USER.runAs(scanAction);
417    }
418
419    private int doVerify(Path outputDir, int numReducers) throws IOException, InterruptedException,
420        ClassNotFoundException {
421      job = new Job(getConf());
422
423      job.setJobName("Link Verifier");
424      job.setNumReduceTasks(numReducers);
425      job.setJarByClass(getClass());
426
427      setJobScannerConf(job);
428
429      Scan scan = new Scan();
430      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
431      scan.setCaching(10000);
432      scan.setCacheBlocks(false);
433      String[] split = labels.split(COMMA);
434
435      scan.setAuthorizations(new Authorizations(split[this.labelIndex * 2],
436          split[(this.labelIndex * 2) + 1]));
437
438      TableMapReduceUtil.initTableMapperJob(tableName.getName(), scan, VerifyMapper.class,
439          BytesWritable.class, BytesWritable.class, job);
440      TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
441
442      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
443
444      job.setReducerClass(VerifyReducer.class);
445      job.setOutputFormatClass(TextOutputFormat.class);
446      TextOutputFormat.setOutputPath(job, outputDir);
447      boolean success = job.waitForCompletion(true);
448
449      return success ? 0 : 1;
450    }
451
452    @Override
453    protected void handleFailure(Counters counters) throws IOException {
454      try (Connection conn = ConnectionFactory.createConnection(job.getConfiguration())) {
455        TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
456        CounterGroup g = counters.getGroup("undef");
457        Iterator<Counter> it = g.iterator();
458        while (it.hasNext()) {
459          String keyString = it.next().getName();
460          byte[] key = Bytes.toBytes(keyString);
461          HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true);
462          LOG.error("undefined row " + keyString + ", " + loc);
463        }
464        g = counters.getGroup("unref");
465        it = g.iterator();
466        while (it.hasNext()) {
467          String keyString = it.next().getName();
468          byte[] key = Bytes.toBytes(keyString);
469          HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true);
470          LOG.error("unreferred row " + keyString + ", " + loc);
471        }
472      }
473    }
474  }
475
476  static class VisibilityLoop extends Loop {
477    private static final int SLEEP_IN_MS = 5000;
478    private static final Logger LOG = LoggerFactory.getLogger(VisibilityLoop.class);
479    IntegrationTestBigLinkedListWithVisibility it;
480
481    @Override
482    protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
483        Integer wrapMultiplier, Integer numWalkers) throws Exception {
484      Path outputPath = new Path(outputDir);
485      UUID uuid = UUID.randomUUID(); // create a random UUID.
486      Path generatorOutput = new Path(outputPath, uuid.toString());
487
488      Generator generator = new VisibilityGenerator();
489      generator.setConf(getConf());
490      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier,
491          numWalkers);
492      if (retCode > 0) {
493        throw new RuntimeException("Generator failed with return code: " + retCode);
494      }
495    }
496
497    protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width,
498        Integer wrapMultiplier, int tableIndex) throws Exception {
499      LOG.info("Running copier on table "+IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex));
500      Copier copier = new Copier(
501          IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true);
502      copier.setConf(getConf());
503      copier.runCopier(outputDir);
504      Thread.sleep(SLEEP_IN_MS);
505    }
506
507    protected void runVerify(String outputDir, int numReducers, long expectedNumNodes,
508        boolean allTables) throws Exception {
509      Path outputPath = new Path(outputDir);
510
511      if (allTables) {
512        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
513          LOG.info("Verifying table " + i);
514          sleep(SLEEP_IN_MS);
515          UUID uuid = UUID.randomUUID(); // create a random UUID.
516          Path iterationOutput = new Path(outputPath, uuid.toString());
517          Verify verify = new VisibilityVerify(getTableName(i).getNameAsString(), i);
518          verify(numReducers, expectedNumNodes, iterationOutput, verify);
519        }
520      }
521      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
522        runVerifyCommonTable(outputDir, numReducers, expectedNumNodes, i);
523      }
524    }
525
526    private void runVerify(String outputDir, int numReducers, long expectedNodes, int tableIndex)
527        throws Exception {
528      long temp = expectedNodes;
529      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
530        if (i <= tableIndex) {
531          expectedNodes = 0;
532        } else {
533          expectedNodes = temp;
534        }
535        LOG.info("Verifying data in the table with index "+i+ " and expected nodes is "+expectedNodes);
536        runVerifyCommonTable(outputDir, numReducers, expectedNodes, i);
537      }
538    }
539
540    private void sleep(long ms) throws InterruptedException {
541      Thread.sleep(ms);
542    }
543
544    protected void runVerifyCommonTable(String outputDir, int numReducers, long expectedNumNodes,
545        int index) throws Exception {
546      LOG.info("Verifying common table with index " + index);
547      sleep(SLEEP_IN_MS);
548      Path outputPath = new Path(outputDir);
549      UUID uuid = UUID.randomUUID(); // create a random UUID.
550      Path iterationOutput = new Path(outputPath, uuid.toString());
551      Verify verify = new VisibilityVerify(TableName.valueOf(COMMON_TABLE_NAME).getNameAsString(),
552          index);
553      verify(numReducers, expectedNumNodes, iterationOutput, verify);
554    }
555
556    protected void runCopier(String outputDir) throws Exception {
557      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
558        LOG.info("Running copier " + IntegrationTestBigLinkedListWithVisibility.getTableName(i));
559        sleep(SLEEP_IN_MS);
560        Copier copier = new Copier(IntegrationTestBigLinkedListWithVisibility.getTableName(i), i,
561            false);
562        copier.setConf(getConf());
563        copier.runCopier(outputDir);
564      }
565    }
566
567    private void verify(int numReducers, long expectedNumNodes,
568        Path iterationOutput, Verify verify) throws Exception {
569      verify.setConf(getConf());
570      int retCode = verify.run(iterationOutput, numReducers);
571      if (retCode > 0) {
572        throw new RuntimeException("Verify.run failed with return code: " + retCode);
573      }
574
575      if (!verify.verify(expectedNumNodes)) {
576        throw new RuntimeException("Verify.verify failed");
577      }
578
579      LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
580    }
581
582    @Override
583    public int run(String[] args) throws Exception {
584      if (args.length < 5) {
585        System.err
586            .println("Usage: Loop <num iterations> " +
587                "<num mappers> <num nodes per mapper> <output dir> " +
588                "<num reducers> [<width> <wrap multiplier>]");
589        return 1;
590      }
591      LOG.info("Running Loop with args:" + Arrays.deepToString(args));
592
593      int numIterations = Integer.parseInt(args[0]);
594      int numMappers = Integer.parseInt(args[1]);
595      long numNodes = Long.parseLong(args[2]);
596      String outputDir = args[3];
597      int numReducers = Integer.parseInt(args[4]);
598      Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
599      Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
600      long expectedNumNodes = 0;
601
602      if (numIterations < 0) {
603        numIterations = Integer.MAX_VALUE; // run indefinitely (kind of)
604      }
605
606      for (int i = 0; i < numIterations; i++) {
607        LOG.info("Starting iteration = " + i);
608        LOG.info("Generating data");
609        // By default run no concurrent walkers for test with visibility
610        runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, 0);
611        expectedNumNodes += numMappers * numNodes;
612        // Copying wont work because expressions are not returned back to the
613        // client
614        LOG.info("Running copier");
615        sleep(SLEEP_IN_MS);
616        runCopier(outputDir);
617        LOG.info("Verifying copied data");
618        sleep(SLEEP_IN_MS);
619        runVerify(outputDir, numReducers, expectedNumNodes, true);
620        sleep(SLEEP_IN_MS);
621        for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
622          LOG.info("Deleting data on table with index: "+j);
623          runDelete(numMappers, numNodes, outputDir, width, wrapMultiplier, j);
624          sleep(SLEEP_IN_MS);
625          LOG.info("Verifying common table after deleting");
626          runVerify(outputDir, numReducers, expectedNumNodes, j);
627          sleep(SLEEP_IN_MS);
628        }
629      }
630      return 0;
631    }
632  }
633
634  @Override
635  @Test
636  public void testContinuousIngest() throws IOException, Exception {
637    // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir>
638    // <num reducers>
639    int ret = ToolRunner.run(
640        getTestingUtil(getConf()).getConfiguration(),
641        new VisibilityLoop(),
642        new String[] { "1", "1", "20000",
643            util.getDataTestDirOnTestFS("IntegrationTestBigLinkedListWithVisibility").toString(),
644            "1", "10000" });
645    org.junit.Assert.assertEquals(0, ret);
646  }
647
648  public static void main(String[] args) throws Exception {
649    Configuration conf = HBaseConfiguration.create();
650    IntegrationTestingUtility.setUseDistributedCluster(conf);
651    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedListWithVisibility(), args);
652    System.exit(ret);
653  }
654
655  @Override
656  protected MonkeyFactory getDefaultMonkeyFactory() {
657    return MonkeyFactory.getFactory(MonkeyFactory.CALM);
658  }
659
660  @Override
661  public int runTestFromCommandLine() throws Exception {
662    Tool tool = null;
663    Loop loop = new VisibilityLoop();
664    loop.it = this;
665    tool = loop;
666    return ToolRunner.run(getConf(), tool, otherArgs);
667  }
668}